This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 276d8c5770 Add RowConverter::append (#4479) (#4541)
276d8c5770 is described below
commit 276d8c5770a3734faf5ed32c2391783d53305de7
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Jul 18 16:50:09 2023 -0400
Add RowConverter::append (#4479) (#4541)
* Add RowConverter::append (#4479)
* Add overwrite test
---
arrow-row/src/lib.rs | 147 +++++++++++++++++++++++++++++++++++++--------------
1 file changed, 107 insertions(+), 40 deletions(-)
diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs
index e8c5ff708d..31942cb7e8 100644
--- a/arrow-row/src/lib.rs
+++ b/arrow-row/src/lib.rs
@@ -680,6 +680,52 @@ impl RowConverter {
///
/// Panics if the schema of `columns` does not match that provided to
[`RowConverter::new`]
pub fn convert_columns(&mut self, columns: &[ArrayRef]) -> Result<Rows,
ArrowError> {
+ let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
+ let mut rows = self.empty_rows(num_rows, 0);
+ self.append(&mut rows, columns)?;
+ Ok(rows)
+ }
+
+ /// Convert [`ArrayRef`] columns appending to an existing [`Rows`]
+ ///
+ /// See [`Row`] for information on when [`Row`] can be compared
+ ///
+ /// # Panics
+ ///
+ /// Panics if
+ /// * The schema of `columns` does not match that provided to
[`RowConverter::new`]
+ /// * The provided [`Rows`] were not created by this [`RowConverter`]
+ ///
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use std::collections::HashSet;
+ /// # use arrow_array::cast::AsArray;
+ /// # use arrow_array::StringArray;
+ /// # use arrow_row::{Row, RowConverter, SortField};
+ /// # use arrow_schema::DataType;
+ /// #
+ /// let mut converter =
RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
+ /// let a1 = StringArray::from(vec!["hello", "world"]);
+ /// let a2 = StringArray::from(vec!["a", "a", "hello"]);
+ ///
+ /// let mut rows = converter.empty_rows(5, 128);
+ /// converter.append(&mut rows, &[Arc::new(a1)]).unwrap();
+ /// converter.append(&mut rows, &[Arc::new(a2)]).unwrap();
+ ///
+ /// let back = converter.convert_rows(&rows).unwrap();
+ /// let values: Vec<_> =
back[0].as_string::<i32>().iter().map(Option::unwrap).collect();
+ /// assert_eq!(&values, &["hello", "world", "a", "a", "hello"]);
+ /// ```
+ pub fn append(
+ &mut self,
+ rows: &mut Rows,
+ columns: &[ArrayRef],
+ ) -> Result<(), ArrowError> {
+ assert!(
+ Arc::ptr_eq(&rows.config.fields, &self.fields),
+ "rows were not produced by this RowConverter"
+ );
+
if columns.len() != self.fields.len() {
return Err(ArrowError::InvalidArgumentError(format!(
"Incorrect number of arrays provided to RowConverter, expected
{} got {}",
@@ -704,12 +750,35 @@ impl RowConverter {
})
.collect::<Result<Vec<_>, _>>()?;
- let config = RowConfig {
- fields: Arc::clone(&self.fields),
- // Don't need to validate UTF-8 as came from arrow array
- validate_utf8: false,
- };
- let mut rows = new_empty_rows(columns, &encoders, config);
+ let write_offset = rows.num_rows();
+ let lengths = row_lengths(columns, &encoders);
+
+ // We initialize the offsets shifted down by one row index.
+ //
+ // As the rows are appended to the offsets will be incremented to match
+ //
+ // For example, consider the case of 3 rows of length 3, 4, and 6
respectively.
+ // The offsets would be initialized to `0, 0, 3, 7`
+ //
+ // Writing the first row entirely would yield `0, 3, 3, 7`
+ // The second, `0, 3, 7, 7`
+ // The third, `0, 3, 7, 13`
+ //
+ // This would be the final offsets for reading
+ //
+ // In this way offsets tracks the position during writing whilst
eventually serving
+ // as identifying the offsets of the written rows
+ rows.offsets.reserve(lengths.len());
+ let mut cur_offset = rows.offsets[write_offset];
+ for l in lengths {
+ rows.offsets.push(cur_offset);
+ cur_offset = cur_offset.checked_add(l).expect("overflow");
+ }
+
+ // Note this will not zero out any trailing data in `rows.buffer`,
+ // e.g. resulting from a call to `Rows::clear`, relying instead on the
+ // encoders not assuming a zero-initialized buffer
+ rows.buffer.resize(cur_offset, 0);
for ((column, field), encoder) in
columns.iter().zip(self.fields.iter()).zip(encoders)
@@ -717,7 +786,7 @@ impl RowConverter {
// We encode a column at a time to minimise dispatch overheads
encode_column(
&mut rows.buffer,
- &mut rows.offsets,
+ &mut rows.offsets[write_offset..],
column.as_ref(),
field.options,
&encoder,
@@ -731,7 +800,7 @@ impl RowConverter {
.for_each(|w| assert!(w[0] <= w[1], "offsets should be
monotonic"));
}
- Ok(rows)
+ Ok(())
}
/// Convert [`Rows`] columns into [`ArrayRef`]
@@ -899,6 +968,7 @@ impl Rows {
self.offsets.push(self.buffer.len())
}
+ /// Returns the row at index `row`
pub fn row(&self, row: usize) -> Row<'_> {
let end = self.offsets[row + 1];
let start = self.offsets[row];
@@ -908,10 +978,17 @@ impl Rows {
}
}
+ /// Sets the length of this [`Rows`] to 0
+ pub fn clear(&mut self) {
+ self.offsets.truncate(1);
+ }
+
+ /// Returns the number of [`Row`] in this [`Rows`]
pub fn num_rows(&self) -> usize {
self.offsets.len() - 1
}
+ /// Returns an iterator over the [`Row`] in this [`Rows`]
pub fn iter(&self) -> RowsIter<'_> {
self.into_iter()
}
@@ -1116,7 +1193,7 @@ fn null_sentinel(options: SortOptions) -> u8 {
}
/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
-fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig)
-> Rows {
+fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> {
use fixed::FixedLengthEncoding;
let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
@@ -1203,37 +1280,7 @@ fn new_empty_rows(cols: &[ArrayRef], encoders:
&[Encoder], config: RowConfig) ->
}
}
- let mut offsets = Vec::with_capacity(num_rows + 1);
- offsets.push(0);
-
- // We initialize the offsets shifted down by one row index.
- //
- // As the rows are appended to the offsets will be incremented to match
- //
- // For example, consider the case of 3 rows of length 3, 4, and 6
respectively.
- // The offsets would be initialized to `0, 0, 3, 7`
- //
- // Writing the first row entirely would yield `0, 3, 3, 7`
- // The second, `0, 3, 7, 7`
- // The third, `0, 3, 7, 13`
- //
- // This would be the final offsets for reading
- //
- // In this way offsets tracks the position during writing whilst
eventually serving
- // as identifying the offsets of the written rows
- let mut cur_offset = 0_usize;
- for l in lengths {
- offsets.push(cur_offset);
- cur_offset = cur_offset.checked_add(l).expect("overflow");
- }
-
- let buffer = vec![0_u8; cur_offset];
-
- Rows {
- buffer,
- offsets,
- config,
- }
+ lengths
}
/// Encodes a column to the provided [`Rows`] incrementing the offsets as it
progresses
@@ -2375,4 +2422,24 @@ mod tests {
}
}
}
+
+ #[test]
+ fn test_clear() {
+ let mut converter =
+ RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
+ let mut rows = converter.empty_rows(3, 128);
+
+ let arrays = [
+ Int32Array::from(vec![None, Some(2), Some(4)]),
+ Int32Array::from(vec![Some(2), None, Some(4)]),
+ ];
+
+ for array in arrays {
+ rows.clear();
+ let array = Arc::new(array) as ArrayRef;
+ converter.append(&mut rows, &[array.clone()]).unwrap();
+ let back = converter.convert_rows(&rows).unwrap();
+ assert_eq!(&back[0], &array);
+ }
+ }
}