charlesdong1991 commented on code in PR #544:
URL: https://github.com/apache/fluss-rust/pull/544#discussion_r3232309987


##########
crates/fluss/src/row/column.rs:
##########
@@ -433,6 +627,40 @@ impl InternalRow for ColumnarRow {
         write_arrow_values_to_fluss_array(&*values, &element_fluss_type, &mut 
writer)?;
         writer.complete()
     }
+
+    fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> {
+        let cache_idx = self
+            .row_column_indices
+            .iter()
+            .position(|&i| i == pos)
+            .ok_or_else(|| IllegalArgument {
+                message: format!("get_row called on non-ROW column at position 
{pos}"),
+            })?;
+        let column = self.record_batch.column(pos);
+        // Children of a null parent may carry stale bytes; caller must
+        // check is_null_at first rather than rely on what we'd read.
+        if column.is_null(self.row_id) {

Review Comment:
   i wonder should user check `is_null_at` before `get_row`?



##########
crates/fluss/src/row/column.rs:
##########
@@ -35,25 +41,71 @@ use std::sync::Arc;
 pub struct ColumnarRow {
     record_batch: Arc<RecordBatch>,
     row_id: usize,
+    fluss_row_type: Option<Arc<RowType>>,
+    row_column_indices: Arc<[usize]>,
+    row_caches: Box<[std::sync::OnceLock<GenericRow<'static>>]>,
+}
+
+pub(crate) fn fluss_row_column_indices(row_type: &RowType) -> Arc<[usize]> {
+    row_type
+        .fields()
+        .iter()
+        .enumerate()
+        .filter_map(|(i, f)| matches!(f.data_type, 
DataType::Row(_)).then_some(i))
+        .collect()
+}
+
+pub(crate) fn arrow_row_column_indices(batch: &RecordBatch) -> Arc<[usize]> {
+    batch
+        .columns()
+        .iter()
+        .enumerate()
+        .filter_map(|(i, c)| matches!(c.data_type(), 
ArrowDataType::Struct(_)).then_some(i))
+        .collect()
+}
+
+fn make_row_caches(indices: &[usize]) -> 
Box<[std::sync::OnceLock<GenericRow<'static>>]> {
+    indices.iter().map(|_| std::sync::OnceLock::new()).collect()
 }
 
 impl ColumnarRow {
-    pub fn new(batch: Arc<RecordBatch>) -> Self {
-        ColumnarRow {
-            record_batch: batch,
-            row_id: 0,
-        }
+    pub fn new(
+        batch: Arc<RecordBatch>,
+        row_id: usize,
+        fluss_row_type: Option<Arc<RowType>>,
+    ) -> Self {
+        let row_column_indices = match &fluss_row_type {
+            Some(rt) => fluss_row_column_indices(rt),
+            None => arrow_row_column_indices(&batch),
+        };
+        Self::with_indices(batch, row_id, fluss_row_type, row_column_indices)
     }
 
-    pub fn new_with_row_id(bach: Arc<RecordBatch>, row_id: usize) -> Self {
+    pub(crate) fn with_indices(
+        batch: Arc<RecordBatch>,
+        row_id: usize,
+        fluss_row_type: Option<Arc<RowType>>,
+        row_column_indices: Arc<[usize]>,
+    ) -> Self {
+        let row_caches = make_row_caches(&row_column_indices);
         ColumnarRow {
-            record_batch: bach,
+            record_batch: batch,
             row_id,
+            fluss_row_type,
+            row_column_indices,
+            row_caches,
         }
     }
 
+    pub fn fluss_row_type(&self) -> Option<&Arc<RowType>> {
+        self.fluss_row_type.as_ref()
+    }
+
     pub fn set_row_id(&mut self, row_id: usize) {
-        self.row_id = row_id
+        self.row_id = row_id;
+        for lock in self.row_caches.iter_mut() {
+            *lock = std::sync::OnceLock::new();

Review Comment:
   you mentioned in PR description that `Per-row Vec<OnceLock> allocation 
killed`, looking at this, i wonder if it is still the case that it will 
allocate OnceLock on every row iteration?



##########
crates/fluss/src/row/field_getter.rs:
##########
@@ -183,8 +187,9 @@ impl InnerFieldGetter {
             InnerFieldGetter::TimestampLtz { pos, precision } => {
                 Datum::TimestampLtz(row.get_timestamp_ltz(*pos, *precision)?)
             }
-            // TODO: add Map and Row field getter support once their binary 
forms are implemented.
+            // TODO: add Map field getter support once its binary form is 
implemented.
             InnerFieldGetter::Array { pos } => 
Datum::Array(row.get_array(*pos)?),
+            InnerFieldGetter::Row { pos } => 
Datum::Row(Box::new(row.get_row(*pos)?.clone())),

Review Comment:
   i wonder how performant this will be since it seems it clones the whole 
generic row? maybe we can flag awareness on hot scan paths here and revisit if 
needed later because the PR is huge already



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to