fresh-borzoni commented on code in PR #544:
URL: https://github.com/apache/fluss-rust/pull/544#discussion_r3233889068


##########
crates/fluss/src/row/column.rs:
##########
@@ -433,6 +627,40 @@ impl InternalRow for ColumnarRow {
         write_arrow_values_to_fluss_array(&*values, &element_fluss_type, &mut 
writer)?;
         writer.complete()
     }
+
+    fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> {
+        let cache_idx = self
+            .row_column_indices
+            .iter()
+            .position(|&i| i == pos)
+            .ok_or_else(|| IllegalArgument {
+                message: format!("get_row called on non-ROW column at position 
{pos}"),
+            })?;
+        let column = self.record_batch.column(pos);
+        // Children of a null parent may carry stale bytes; caller must
+        // check is_null_at first rather than rely on what we'd read.
+        if column.is_null(self.row_id) {

Review Comment:
   it will disappear after https://github.com/apache/fluss-rust/issues/543



##########
crates/fluss/src/row/column.rs:
##########
@@ -35,25 +41,71 @@ use std::sync::Arc;
 pub struct ColumnarRow {
     record_batch: Arc<RecordBatch>,
     row_id: usize,
+    fluss_row_type: Option<Arc<RowType>>,
+    row_column_indices: Arc<[usize]>,
+    row_caches: Box<[std::sync::OnceLock<GenericRow<'static>>]>,
+}
+
+pub(crate) fn fluss_row_column_indices(row_type: &RowType) -> Arc<[usize]> {
+    row_type
+        .fields()
+        .iter()
+        .enumerate()
+        .filter_map(|(i, f)| matches!(f.data_type, 
DataType::Row(_)).then_some(i))
+        .collect()
+}
+
+pub(crate) fn arrow_row_column_indices(batch: &RecordBatch) -> Arc<[usize]> {
+    batch
+        .columns()
+        .iter()
+        .enumerate()
+        .filter_map(|(i, c)| matches!(c.data_type(), 
ArrowDataType::Struct(_)).then_some(i))
+        .collect()
+}
+
+fn make_row_caches(indices: &[usize]) -> 
Box<[std::sync::OnceLock<GenericRow<'static>>]> {
+    indices.iter().map(|_| std::sync::OnceLock::new()).collect()
 }
 
 impl ColumnarRow {
-    pub fn new(batch: Arc<RecordBatch>) -> Self {
-        ColumnarRow {
-            record_batch: batch,
-            row_id: 0,
-        }
+    pub fn new(
+        batch: Arc<RecordBatch>,
+        row_id: usize,
+        fluss_row_type: Option<Arc<RowType>>,
+    ) -> Self {
+        let row_column_indices = match &fluss_row_type {
+            Some(rt) => fluss_row_column_indices(rt),
+            None => arrow_row_column_indices(&batch),
+        };
+        Self::with_indices(batch, row_id, fluss_row_type, row_column_indices)
     }
 
-    pub fn new_with_row_id(bach: Arc<RecordBatch>, row_id: usize) -> Self {
+    pub(crate) fn with_indices(
+        batch: Arc<RecordBatch>,
+        row_id: usize,
+        fluss_row_type: Option<Arc<RowType>>,
+        row_column_indices: Arc<[usize]>,
+    ) -> Self {
+        let row_caches = make_row_caches(&row_column_indices);
         ColumnarRow {
-            record_batch: bach,
+            record_batch: batch,
             row_id,
+            fluss_row_type,
+            row_column_indices,
+            row_caches,
         }
     }
 
+    pub fn fluss_row_type(&self) -> Option<&Arc<RowType>> {
+        self.fluss_row_type.as_ref()
+    }
+
     pub fn set_row_id(&mut self, row_id: usize) {
-        self.row_id = row_id
+        self.row_id = row_id;
+        for lock in self.row_caches.iter_mut() {
+            *lock = std::sync::OnceLock::new();

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to