tustvold commented on code in PR #4280:
URL: https://github.com/apache/arrow-rs/pull/4280#discussion_r1205524983
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -152,43 +151,69 @@ impl<W: Write> ArrowWriter<W> {
self.writer.flushed_row_groups()
}
- /// Enqueues the provided `RecordBatch` to be written
+ /// Returns the length in bytes of the current in progress row group
+ pub fn in_progress_size(&self) -> usize {
+ match &self.in_progress {
+ Some(in_progress) => in_progress
+ .writers
+ .iter()
+ .map(|(x, _)| x.lock().unwrap().length)
+ .sum(),
+ None => 0,
+ }
+ }
+
+ /// Encodes the provided [`RecordBatch`]
///
- /// If following this there are more than `max_row_group_size` rows
buffered,
- /// this will flush out one or more row groups with `max_row_group_size`
rows,
- /// and drop any fully written `RecordBatch`
+ /// If this would cause the current row group to exceed
[`WriterProperties::max_row_group_size`]
+ /// rows, the contents of `batch` will be distributed across multiple row
groups such that all
+ /// but the final row group in the file contain
[`WriterProperties::max_row_group_size`] rows
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
- // validate batch schema against writer's supplied schema
- let batch_schema = batch.schema();
- if !(Arc::ptr_eq(&self.arrow_schema, &batch_schema)
- || self.arrow_schema.contains(&batch_schema))
- {
- return Err(ParquetError::ArrowError(
- "Record batch schema does not match writer schema".to_string(),
- ));
+ if batch.num_rows() == 0 {
+ return Ok(());
}
- for (buffer, column) in self.buffer.iter_mut().zip(batch.columns()) {
- buffer.push_back(column.clone())
+ // If would exceed max_row_group_size, split batch
+ if self.buffered_rows + batch.num_rows() > self.max_row_group_size {
+ let to_write = self.max_row_group_size - self.buffered_rows;
+ let a = batch.slice(0, to_write);
+ let b = batch.slice(to_write, batch.num_rows() - to_write);
+ self.write(&a)?;
+ return self.write(&b);
}
self.buffered_rows += batch.num_rows();
- self.flush_completed()?;
+ let in_progress = match &mut self.in_progress {
+ Some(in_progress) => in_progress,
+ x => x.insert(ArrowRowGroupWriter::new(
+ self.writer.schema_descr(),
+ self.writer.properties(),
+ &self.arrow_schema,
+ )?),
+ };
- Ok(())
- }
+ in_progress.write(batch)?;
- /// Flushes buffered data until there are less than `max_row_group_size`
rows buffered
- fn flush_completed(&mut self) -> Result<()> {
- while self.buffered_rows >= self.max_row_group_size {
- self.flush_rows(self.max_row_group_size)?;
+ if self.buffered_rows >= self.max_row_group_size {
+ self.flush()?
}
Ok(())
}
/// Flushes all buffered rows into a new row group
pub fn flush(&mut self) -> Result<()> {
- self.flush_rows(self.buffered_rows)
+ let in_progress = match self.in_progress.take() {
+ Some(in_progress) => in_progress,
+ None => return Ok(()),
+ };
+
+ self.buffered_rows = 0;
+ let mut row_group_writer = self.writer.next_row_group()?;
+ for (chunk, close) in in_progress.close()? {
+ row_group_writer.append_column(&chunk, close)?;
Review Comment:
This is the new API from #4269
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]