leekeiabstraction commented on code in PR #474:
URL: https://github.com/apache/fluss-rust/pull/474#discussion_r3067974511


##########
crates/fluss/src/row/column_writer.rs:
##########
@@ -339,41 +365,65 @@ impl ColumnWriter {
     /// directly to the concrete Arrow builder.
     #[inline]
     pub fn write_field(&mut self, row: &dyn InternalRow) -> Result<()> {
-        if self.nullable && row.is_null_at(self.pos)? {
+        self.write_field_at(row, self.pos)
+    }
+
+    /// Read one value from `row` at position `pos` and append it
+    /// directly to the concrete Arrow builder.
+    #[inline]
+    pub fn write_field_at(&mut self, row: &dyn InternalRow, pos: usize) -> 
Result<()> {
+        if self.nullable && row.is_null_at(pos)? {
             self.append_null();
             return Ok(());
         }
-        self.write_non_null(row)
+        self.write_non_null_at(row, pos)
     }
 
     /// Finish the builder, producing the final Arrow array.
     pub fn finish(&mut self) -> ArrayRef {
-        self.as_builder_mut().finish()
+        match &mut self.inner {
+            TypedWriter::List {
+                element_writer,
+                offsets,
+                validity,
+            } => {
+                let values = element_writer.finish();
+                finish_list_array(values, offsets, validity)

Review Comment:
   I think we need to reset offsets and validity vectors. If we optimise in the 
future to re-use column writer, subsequent finish() will operate on dirty 
offsets and validity. Something like the following?
   
   ```rust
   let values = element_writer.finish();
   let taken_offsets = std::mem::replace(offsets, vec![0]);                     
                                                                                
                                                                                
                                                          
   let taken_validity = std::mem::take(validity);          
   finish_list_array(values, &taken_offsets, &taken_validity)         
   ```



##########
crates/fluss/src/row/column_writer.rs:
##########
@@ -550,15 +600,53 @@ impl ColumnWriter {
                 )?);
                 Ok(())
             }
+            TypedWriter::List {
+                element_writer,
+                offsets,
+                validity,
+            } => {
+                let array = row.get_array(pos)?;
+                for i in 0..array.size() {
+                    element_writer.write_field_at(&array, i)?;
+                }
+                let last = *offsets.last().unwrap();
+                offsets.push(
+                    last + i32::try_from(array.size()).map_err(|_| 
RowConvertError {
+                        message: format!("Array size {} exceeds i32 range", 
array.size()),
+                    })?,
+                );
+                validity.push(true);
+                Ok(())
+            }
         }
     }
 }
 
+fn finish_list_array(values: ArrayRef, offsets: &[i32], validity: &[bool]) -> 
ArrayRef {
+    use arrow::array::ListArray;
+    use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
+    use arrow::datatypes::{Field, FieldRef};
+    use std::sync::Arc;
+
+    let offsets_buffer = 
OffsetBuffer::new(ScalarBuffer::from(offsets.to_vec()));
+    let null_buffer = NullBuffer::from(validity.to_vec());
+    let field = Arc::new(Field::new("item", values.data_type().clone(), true));

Review Comment:
   It seems like we're hardcoding to be nullable. Will this cause an issue when 
user defines type to be nullable and use batch builder?
   
   If this is an actual issue, we should add test coverage as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to