This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 276d8c5770 Add RowConverter::append (#4479) (#4541)
276d8c5770 is described below

commit 276d8c5770a3734faf5ed32c2391783d53305de7
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Jul 18 16:50:09 2023 -0400

    Add RowConverter::append (#4479) (#4541)
    
    * Add RowConverter::append (#4479)
    
    * Add overwrite test
---
 arrow-row/src/lib.rs | 147 +++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 107 insertions(+), 40 deletions(-)

diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs
index e8c5ff708d..31942cb7e8 100644
--- a/arrow-row/src/lib.rs
+++ b/arrow-row/src/lib.rs
@@ -680,6 +680,52 @@ impl RowConverter {
     ///
     /// Panics if the schema of `columns` does not match that provided to 
[`RowConverter::new`]
     pub fn convert_columns(&mut self, columns: &[ArrayRef]) -> Result<Rows, 
ArrowError> {
+        let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
+        let mut rows = self.empty_rows(num_rows, 0);
+        self.append(&mut rows, columns)?;
+        Ok(rows)
+    }
+
+    /// Convert [`ArrayRef`] columns appending to an existing [`Rows`]
+    ///
+    /// See [`Row`] for information on when [`Row`] can be compared
+    ///
+    /// # Panics
+    ///
+    /// Panics if
+    /// * The schema of `columns` does not match that provided to 
[`RowConverter::new`]
+    /// * The provided [`Rows`] were not created by this [`RowConverter`]
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use std::collections::HashSet;
+    /// # use arrow_array::cast::AsArray;
+    /// # use arrow_array::StringArray;
+    /// # use arrow_row::{Row, RowConverter, SortField};
+    /// # use arrow_schema::DataType;
+    /// #
+    /// let mut converter = 
RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
+    /// let a1 = StringArray::from(vec!["hello", "world"]);
+    /// let a2 = StringArray::from(vec!["a", "a", "hello"]);
+    ///
+    /// let mut rows = converter.empty_rows(5, 128);
+    /// converter.append(&mut rows, &[Arc::new(a1)]).unwrap();
+    /// converter.append(&mut rows, &[Arc::new(a2)]).unwrap();
+    ///
+    /// let back = converter.convert_rows(&rows).unwrap();
+    /// let values: Vec<_> = 
back[0].as_string::<i32>().iter().map(Option::unwrap).collect();
+    /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]);
+    /// ```
+    pub fn append(
+        &mut self,
+        rows: &mut Rows,
+        columns: &[ArrayRef],
+    ) -> Result<(), ArrowError> {
+        assert!(
+            Arc::ptr_eq(&rows.config.fields, &self.fields),
+            "rows were not produced by this RowConverter"
+        );
+
         if columns.len() != self.fields.len() {
             return Err(ArrowError::InvalidArgumentError(format!(
                 "Incorrect number of arrays provided to RowConverter, expected 
{} got {}",
@@ -704,12 +750,35 @@ impl RowConverter {
             })
             .collect::<Result<Vec<_>, _>>()?;
 
-        let config = RowConfig {
-            fields: Arc::clone(&self.fields),
-            // Don't need to validate UTF-8 as came from arrow array
-            validate_utf8: false,
-        };
-        let mut rows = new_empty_rows(columns, &encoders, config);
+        let write_offset = rows.num_rows();
+        let lengths = row_lengths(columns, &encoders);
+
+        // We initialize the offsets shifted down by one row index.
+        //
+        // As the rows are appended to the offsets will be incremented to match
+        //
+        // For example, consider the case of 3 rows of length 3, 4, and 6 
respectively.
+        // The offsets would be initialized to `0, 0, 3, 7`
+        //
+        // Writing the first row entirely would yield `0, 3, 3, 7`
+        // The second, `0, 3, 7, 7`
+        // The third, `0, 3, 7, 13`
+        //
+        // This would be the final offsets for reading
+        //
+        // In this way offsets tracks the position during writing whilst 
eventually serving
+        // as identifying the offsets of the written rows
+        rows.offsets.reserve(lengths.len());
+        let mut cur_offset = rows.offsets[write_offset];
+        for l in lengths {
+            rows.offsets.push(cur_offset);
+            cur_offset = cur_offset.checked_add(l).expect("overflow");
+        }
+
+        // Note this will not zero out any trailing data in `rows.buffer`,
+        // e.g. resulting from a call to `Rows::clear`, relying instead on the
+        // encoders not assuming a zero-initialized buffer
+        rows.buffer.resize(cur_offset, 0);
 
         for ((column, field), encoder) in
             columns.iter().zip(self.fields.iter()).zip(encoders)
@@ -717,7 +786,7 @@ impl RowConverter {
             // We encode a column at a time to minimise dispatch overheads
             encode_column(
                 &mut rows.buffer,
-                &mut rows.offsets,
+                &mut rows.offsets[write_offset..],
                 column.as_ref(),
                 field.options,
                 &encoder,
@@ -731,7 +800,7 @@ impl RowConverter {
                 .for_each(|w| assert!(w[0] <= w[1], "offsets should be 
monotonic"));
         }
 
-        Ok(rows)
+        Ok(())
     }
 
     /// Convert [`Rows`] columns into [`ArrayRef`]
@@ -899,6 +968,7 @@ impl Rows {
         self.offsets.push(self.buffer.len())
     }
 
+    /// Returns the row at index `row`
     pub fn row(&self, row: usize) -> Row<'_> {
         let end = self.offsets[row + 1];
         let start = self.offsets[row];
@@ -908,10 +978,17 @@ impl Rows {
         }
     }
 
+    /// Sets the length of this [`Rows`] to 0
+    pub fn clear(&mut self) {
+        self.offsets.truncate(1);
+    }
+
+    /// Returns the number of [`Row`] in this [`Rows`]
     pub fn num_rows(&self) -> usize {
         self.offsets.len() - 1
     }
 
+    /// Returns an iterator over the [`Row`] in this [`Rows`]
     pub fn iter(&self) -> RowsIter<'_> {
         self.into_iter()
     }
@@ -1116,7 +1193,7 @@ fn null_sentinel(options: SortOptions) -> u8 {
 }
 
 /// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
-fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) 
-> Rows {
+fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> {
     use fixed::FixedLengthEncoding;
 
     let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
@@ -1203,37 +1280,7 @@ fn new_empty_rows(cols: &[ArrayRef], encoders: 
&[Encoder], config: RowConfig) ->
         }
     }
 
-    let mut offsets = Vec::with_capacity(num_rows + 1);
-    offsets.push(0);
-
-    // We initialize the offsets shifted down by one row index.
-    //
-    // As the rows are appended to the offsets will be incremented to match
-    //
-    // For example, consider the case of 3 rows of length 3, 4, and 6 
respectively.
-    // The offsets would be initialized to `0, 0, 3, 7`
-    //
-    // Writing the first row entirely would yield `0, 3, 3, 7`
-    // The second, `0, 3, 7, 7`
-    // The third, `0, 3, 7, 13`
-    //
-    // This would be the final offsets for reading
-    //
-    // In this way offsets tracks the position during writing whilst 
eventually serving
-    // as identifying the offsets of the written rows
-    let mut cur_offset = 0_usize;
-    for l in lengths {
-        offsets.push(cur_offset);
-        cur_offset = cur_offset.checked_add(l).expect("overflow");
-    }
-
-    let buffer = vec![0_u8; cur_offset];
-
-    Rows {
-        buffer,
-        offsets,
-        config,
-    }
+    lengths
 }
 
 /// Encodes a column to the provided [`Rows`] incrementing the offsets as it 
progresses
@@ -2375,4 +2422,24 @@ mod tests {
             }
         }
     }
+
+    #[test]
+    fn test_clear() {
+        let mut converter =
+            RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
+        let mut rows = converter.empty_rows(3, 128);
+
+        let arrays = [
+            Int32Array::from(vec![None, Some(2), Some(4)]),
+            Int32Array::from(vec![Some(2), None, Some(4)]),
+        ];
+
+        for array in arrays {
+            rows.clear();
+            let array = Arc::new(array) as ArrayRef;
+            converter.append(&mut rows, &[array.clone()]).unwrap();
+            let back = converter.convert_rows(&rows).unwrap();
+            assert_eq!(&back[0], &array);
+        }
+    }
 }

Reply via email to