alamb commented on a change in pull request #1214:
URL: https://github.com/apache/arrow-rs/pull/1214#discussion_r790278638
##########
File path: parquet/src/arrow/arrow_writer.rs
##########
@@ -75,54 +87,109 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
Ok(Self {
writer: file_writer,
+ buffer: vec![Default::default(); arrow_schema.fields().len()],
+ buffered_rows: 0,
arrow_schema,
max_row_group_size,
})
}
- /// Write a RecordBatch to writer
+ /// Enqueues the provided `RecordBatch` to be written
///
- /// The writer will slice the `batch` into `max_row_group_size`,
- /// but if a batch has left-over rows less than the row group size,
- /// the last row group will have fewer records.
- /// This is currently a limitation because we close the row group
- /// instead of keeping it open for the next batch.
+ /// If following this there are more than `max_row_group_size` rows
buffered,
+ /// this will flush out one or more row groups with `max_row_group_size`
rows,
+ /// and drop any fully written `RecordBatch`
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
// validate batch schema against writer's supplied schema
if self.arrow_schema != batch.schema() {
return Err(ParquetError::ArrowError(
"Record batch schema does not match writer schema".to_string(),
));
}
- // Track the number of rows being written in the batch.
- // We currently do not have a way of slicing nested arrays, thus we
- // track this manually.
- let num_rows = batch.num_rows();
- let batches = (num_rows + self.max_row_group_size - 1) /
self.max_row_group_size;
- let min_batch = num_rows.min(self.max_row_group_size);
- for batch_index in 0..batches {
- // Determine the offset and length of arrays
- let offset = batch_index * min_batch;
- let length = (num_rows - offset).min(self.max_row_group_size);
-
- // Compute the definition and repetition levels of the batch
- let batch_level = LevelInfo::new(offset, length);
- let mut row_group_writer = self.writer.next_row_group()?;
- for (array, field) in
batch.columns().iter().zip(batch.schema().fields()) {
- let mut levels = batch_level.calculate_array_levels(array,
field);
- // Reverse levels as we pop() them when writing arrays
- levels.reverse();
- write_leaves(&mut row_group_writer, array, &mut levels)?;
+
+ for (buffer, batch) in self.buffer.iter_mut().zip(batch.columns()) {
+ buffer.push_back(batch.clone())
+ }
Review comment:
```suggestion
for (buffer, column) in self.buffer.iter_mut().zip(batch.columns()) {
buffer.push_back(column.clone())
}
```
##########
File path: parquet/src/arrow/arrow_writer.rs
##########
@@ -75,54 +87,109 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
Ok(Self {
writer: file_writer,
+ buffer: vec![Default::default(); arrow_schema.fields().len()],
+ buffered_rows: 0,
arrow_schema,
max_row_group_size,
})
}
- /// Write a RecordBatch to writer
+ /// Enqueues the provided `RecordBatch` to be written
///
- /// The writer will slice the `batch` into `max_row_group_size`,
- /// but if a batch has left-over rows less than the row group size,
- /// the last row group will have fewer records.
- /// This is currently a limitation because we close the row group
- /// instead of keeping it open for the next batch.
+ /// If following this there are more than `max_row_group_size` rows
buffered,
+ /// this will flush out one or more row groups with `max_row_group_size`
rows,
+ /// and drop any fully written `RecordBatch`
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
// validate batch schema against writer's supplied schema
if self.arrow_schema != batch.schema() {
return Err(ParquetError::ArrowError(
"Record batch schema does not match writer schema".to_string(),
));
}
- // Track the number of rows being written in the batch.
- // We currently do not have a way of slicing nested arrays, thus we
- // track this manually.
- let num_rows = batch.num_rows();
- let batches = (num_rows + self.max_row_group_size - 1) /
self.max_row_group_size;
- let min_batch = num_rows.min(self.max_row_group_size);
- for batch_index in 0..batches {
- // Determine the offset and length of arrays
- let offset = batch_index * min_batch;
- let length = (num_rows - offset).min(self.max_row_group_size);
-
- // Compute the definition and repetition levels of the batch
- let batch_level = LevelInfo::new(offset, length);
- let mut row_group_writer = self.writer.next_row_group()?;
- for (array, field) in
batch.columns().iter().zip(batch.schema().fields()) {
- let mut levels = batch_level.calculate_array_levels(array,
field);
- // Reverse levels as we pop() them when writing arrays
- levels.reverse();
- write_leaves(&mut row_group_writer, array, &mut levels)?;
+
+ for (buffer, batch) in self.buffer.iter_mut().zip(batch.columns()) {
+ buffer.push_back(batch.clone())
+ }
+
+ self.buffered_rows += batch.num_rows();
+
+ self.flush_completed()?;
+
+ Ok(())
+ }
+
+ /// Flushes buffered data until there are less than `max_row_group_size`
rows buffered
+ fn flush_completed(&mut self) -> Result<()> {
+ while self.buffered_rows >= self.max_row_group_size {
+ self.flush_row_group(self.max_row_group_size)?;
+ }
+ Ok(())
+ }
+
+ /// Flushes `num_rows` from the buffer into a new row group
+ fn flush_row_group(&mut self, num_rows: usize) -> Result<()> {
+ if num_rows == 0 {
+ return Ok(());
+ }
+
+ assert!(
+ num_rows <= self.buffered_rows,
+ "cannot flush {} rows only have {}",
+ num_rows,
+ self.buffered_rows
+ );
+
+ assert!(
+ num_rows <= self.max_row_group_size,
+ "cannot flush {} rows would exceed max row group size of {}",
+ num_rows,
+ self.max_row_group_size
+ );
+
+ let batch_level = LevelInfo::new(0, num_rows);
+ let mut row_group_writer = self.writer.next_row_group()?;
+
+ for (col_buffer, field) in
self.buffer.iter_mut().zip(self.arrow_schema.fields())
+ {
+ // Collect the number of arrays to append
+ let mut remaining = num_rows;
+ let mut arrays = Vec::with_capacity(col_buffer.len());
+ while remaining != 0 {
+ match col_buffer.pop_front() {
+ Some(next) if next.len() > remaining => {
+ col_buffer
+ .push_front(next.slice(remaining, next.len() -
remaining));
+ arrays.push(next.slice(0, remaining));
+ remaining = 0;
+ }
+ Some(next) => {
+ remaining -= next.len();
+ arrays.push(next);
+ }
+ _ => break,
+ }
}
- self.writer.close_row_group(row_group_writer)?;
+ // Workaround write logic expecting a single array
Review comment:
is it possible (maybe as a follow on) to special case ("if no nested
structs, avoid the take")?
Or maybe if there are nested list / structs fall back to writing smaller row
group sizes?
##########
File path: parquet/src/arrow/arrow_writer.rs
##########
@@ -1723,4 +1790,47 @@ mod tests {
one_column_roundtrip(array, true, Some(10));
}
+
+ #[test]
+ fn test_aggregates_records() {
+ let arrays = [
+ Int32Array::from((0..100).collect::<Vec<_>>()),
+ Int32Array::from((0..50).collect::<Vec<_>>()),
+ Int32Array::from((200..500).collect::<Vec<_>>()),
+ ];
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "int",
+ ArrowDataType::Int32,
+ false,
+ )]));
+
+ let file = tempfile::tempfile().unwrap();
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(200)
+ .build();
+
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(),
Some(props))
+ .unwrap();
+
+ for array in arrays {
+ let batch =
+ RecordBatch::try_new(schema.clone(),
vec![Arc::new(array)]).unwrap();
+ writer.write(&batch).unwrap();
+ }
+
+ writer.close().unwrap();
+
+ let reader = SerializedFileReader::new(file).unwrap();
+ let row_group_sizes: Vec<_> = reader
+ .metadata()
+ .row_groups()
+ .iter()
+ .map(|x| x.num_rows())
+ .collect();
+
+ assert_eq!(row_group_sizes, vec![200, 200, 50]);
Review comment:
Would it be possible to read the data back and ensure it is as expected?
##########
File path: parquet/src/arrow/levels.rs
##########
@@ -711,12 +711,11 @@ impl LevelInfo {
((0..=(len as i64)).collect(), array_mask)
}
DataType::List(_) | DataType::Map(_, _) => {
- let data = array.data();
- let offsets = unsafe { data.buffers()[0].typed_data::<i32>() };
+ let offsets = unsafe {
array.data().buffers()[0].typed_data::<i32>() };
let offsets = offsets
.to_vec()
.into_iter()
- .skip(offset)
+ .skip(array.offset() + offset)
Review comment:
Filed https://github.com/apache/arrow-rs/issues/1226 to track (so that
we can document this in the release notes)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]