alamb commented on code in PR #1719:
URL: https://github.com/apache/arrow-rs/pull/1719#discussion_r879302492
##########
parquet/src/file/writer.rs:
##########
@@ -92,102 +111,90 @@ pub trait FileWriter {
/// All columns should be written sequentially; the main workflow is:
/// - Request the next column using `next_column` method - this will return
`None` if no
/// more columns are available to write.
-/// - Once done writing a column, close column writer with `close_column`
method - this
-/// will finalise column chunk metadata and update row group metrics.
+/// - Once done writing a column, close column writer with `close`
Review Comment:
What is the reason to remove the rationale ('this will finalize column chunk
metadata ...')?
##########
parquet/src/util/cursor.rs:
##########
@@ -133,68 +131,6 @@ impl Seek for SliceableCursor {
}
}
-/// Use this type to write Parquet to memory rather than a file.
-#[derive(Debug, Default, Clone)]
Review Comment:
In terms of easing the transition of this change, what would you think about
marking this struct "deprecated" and keeping the code backwards compatible for
a few releases (e.g. `impl Wite for InMemoryWriteableCursor`)?
That might make the API change easier to manage and give users some time to
remove it.
##########
parquet/src/file/mod.rs:
##########
@@ -48,12 +48,14 @@
//! let props = Arc::new(WriterProperties::builder().build());
//! let file = fs::File::create(&path).unwrap();
//! let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
-//! let mut row_group_writer = writer.next_row_group().unwrap();
-//! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() {
-//! // ... write values to a column writer
-//! row_group_writer.close_column(col_writer).unwrap();
+//! {
Review Comment:
Also, it seems to me like we could change this example to actually run (not
sure why it is marked `norun` -- it just needs to pick a tempfile as target
rather than `/path/to/sample.parquet`)
##########
parquet/src/file/writer.rs:
##########
@@ -270,69 +273,45 @@ impl<W: 'static + ParquetWriter> FileWriter for
SerializedFileWriter<W> {
/// A serialized implementation for Parquet [`RowGroupWriter`].
/// Coordinates writing of a row group with column writers.
/// See documentation on row group writer for more information.
-pub struct SerializedRowGroupWriter<W: ParquetWriter> {
+pub struct SerializedRowGroupWriter<'a, W: Write> {
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
- buf: W,
+ buf: &'a mut TrackedWrite<W>,
total_rows_written: Option<u64>,
total_bytes_written: u64,
column_index: usize,
- previous_writer_closed: bool,
row_group_metadata: Option<RowGroupMetaDataPtr>,
column_chunks: Vec<ColumnChunkMetaData>,
+ on_close: Option<OnCloseRowGroup<'a>>,
}
-impl<W: 'static + ParquetWriter> SerializedRowGroupWriter<W> {
+impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
+ /// Creates a new `SerializedRowGroupWriter` with:
+ ///
+ /// - `schema_descr` - the schema to write
+ /// - `properties` - writer properties
+ /// - `buf` - the buffer to write data to
+ /// - `on_close` - an optional callback that will invoked on
[`Self::close`]
pub fn new(
schema_descr: SchemaDescPtr,
properties: WriterPropertiesPtr,
- buf: &W,
+ buf: &'a mut TrackedWrite<W>,
+ on_close: Option<OnCloseRowGroup<'a>>,
) -> Self {
let num_columns = schema_descr.num_columns();
Self {
+ buf,
+ on_close,
+ total_rows_written: None,
descr: schema_descr,
props: properties,
- buf: buf.try_clone().unwrap(),
- total_rows_written: None,
- total_bytes_written: 0,
column_index: 0,
- previous_writer_closed: true,
row_group_metadata: None,
column_chunks: Vec::with_capacity(num_columns),
+ total_bytes_written: 0,
}
}
- /// Checks and finalises current column writer.
- fn finalise_column_writer(&mut self, writer: ColumnWriter) -> Result<()> {
Review Comment:
This code appears to have been inlined into `next_column` below
##########
parquet/src/util/io.rs:
##########
@@ -153,47 +153,6 @@ impl<R: ParquetReader> Length for FileSource<R> {
self.end - self.start
}
}
-
-/// Struct that represents `File` output stream with position tracking.
-/// Used as a sink in file writer.
Review Comment:
Likewise, I recommend considering leaving this structure in (and marking it
as "deprecated") for a release or two to give people a chance to update their
code rather over time.
##########
parquet/benches/arrow_writer.rs:
##########
@@ -278,8 +276,8 @@ fn _create_nested_bench_batch(
#[inline]
fn write_batch(batch: &RecordBatch) -> Result<()> {
// Write batch to an in-memory writer
- let cursor = InMemoryWriteableCursor::default();
- let mut writer = ArrowWriter::try_new(cursor, batch.schema(), None)?;
+ let buffer = vec![];
Review Comment:
here is a nice example of the new API in action: use something that does
`std::io::Write`
##########
parquet/src/file/writer.rs:
##########
@@ -214,7 +221,7 @@ impl<W: ParquetWriter> SerializedFileWriter<W> {
}
#[inline]
- fn assert_closed(&self) -> Result<()> {
+ fn assert_not_closed(&self) -> Result<()> {
Review Comment:
👍
##########
parquet/src/file/writer.rs:
##########
@@ -196,13 +203,13 @@ impl<W: ParquetWriter> SerializedFileWriter<W> {
};
// Write file metadata
- let start_pos = self.buf.seek(SeekFrom::Current(0))?;
+ let start_pos = self.buf.bytes_written();
Review Comment:
❤️
##########
parquet/src/file/writer.rs:
##########
@@ -270,69 +273,45 @@ impl<W: 'static + ParquetWriter> FileWriter for
SerializedFileWriter<W> {
/// A serialized implementation for Parquet [`RowGroupWriter`].
/// Coordinates writing of a row group with column writers.
/// See documentation on row group writer for more information.
-pub struct SerializedRowGroupWriter<W: ParquetWriter> {
+pub struct SerializedRowGroupWriter<'a, W: Write> {
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
- buf: W,
+ buf: &'a mut TrackedWrite<W>,
total_rows_written: Option<u64>,
total_bytes_written: u64,
column_index: usize,
- previous_writer_closed: bool,
row_group_metadata: Option<RowGroupMetaDataPtr>,
column_chunks: Vec<ColumnChunkMetaData>,
+ on_close: Option<OnCloseRowGroup<'a>>,
}
-impl<W: 'static + ParquetWriter> SerializedRowGroupWriter<W> {
+impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
+ /// Creates a new `SerializedRowGroupWriter` with:
+ ///
+ /// - `schema_descr` - the schema to write
+ /// - `properties` - writer properties
+ /// - `buf` - the buffer to write data to
+ /// - `on_close` - an optional callback that will invoked on
[`Self::close`]
pub fn new(
schema_descr: SchemaDescPtr,
properties: WriterPropertiesPtr,
- buf: &W,
+ buf: &'a mut TrackedWrite<W>,
+ on_close: Option<OnCloseRowGroup<'a>>,
Review Comment:
why is this `Option` -- shouldn't it always be required?
##########
parquet/src/file/mod.rs:
##########
@@ -48,12 +48,14 @@
//! let props = Arc::new(WriterProperties::builder().build());
//! let file = fs::File::create(&path).unwrap();
//! let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
-//! let mut row_group_writer = writer.next_row_group().unwrap();
-//! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() {
-//! // ... write values to a column writer
-//! row_group_writer.close_column(col_writer).unwrap();
+//! {
Review Comment:
Which trait indirection are you referring to?
##########
parquet/src/file/writer.rs:
##########
@@ -92,102 +111,90 @@ pub trait FileWriter {
/// All columns should be written sequentially; the main workflow is:
/// - Request the next column using `next_column` method - this will return
`None` if no
/// more columns are available to write.
-/// - Once done writing a column, close column writer with `close_column`
method - this
-/// will finalise column chunk metadata and update row group metrics.
+/// - Once done writing a column, close column writer with `close`
/// - Once all columns have been written, close row group writer with `close`
method -
/// it will return row group metadata and is no-op on already closed row group.
pub trait RowGroupWriter {
/// Returns the next column writer, if available; otherwise returns `None`.
/// In case of any IO error or Thrift error, or if row group writer has
already been
/// closed returns `Err`.
- ///
- /// To request the next column writer, the previous one must be finalised
and closed
- /// using `close_column`.
- fn next_column(&mut self) -> Result<Option<ColumnWriter>>;
-
- /// Closes column writer that was created using `next_column` method.
- /// This should be called before requesting the next column writer.
- fn close_column(&mut self, column_writer: ColumnWriter) -> Result<()>;
+ fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>>;
/// Closes this row group writer and returns row group metadata.
/// After calling this method row group writer must not be used.
///
- /// It is recommended to call this method before requesting another row
group, but it
- /// will be closed automatically before returning a new row group.
- ///
/// Can be called multiple times. In subsequent calls will result in no-op
and return
/// already created row group metadata.
fn close(&mut self) -> Result<RowGroupMetaDataPtr>;
}
+/// Callback invoked on closing a column chunk, arguments are:
Review Comment:
Also importantly, by keeping a mutable reference it lets the compiler
prevent concurrent writes to the same writer, as reported in
https://github.com/apache/arrow-rs/issues/1717
##########
parquet/src/file/writer.rs:
##########
@@ -65,15 +91,8 @@ pub trait FileWriter {
///
/// There is no limit on a number of row groups in a file; however, row
groups have
/// to be written sequentially. Every time the next row group is
requested, the
- /// previous row group must be finalised and closed using
`close_row_group` method.
- fn next_row_group(&mut self) -> Result<Box<dyn RowGroupWriter>>;
-
- /// Finalises and closes row group that was created using `next_row_group`
method.
- /// After calling this method, the next row group is available for writes.
- fn close_row_group(
- &mut self,
- row_group_writer: Box<dyn RowGroupWriter>,
- ) -> Result<()>;
+ /// previous row group must be finalised and closed using
`RowGroupWriter::close` method.
+ fn next_row_group(&mut self) -> Result<Box<dyn RowGroupWriter + '_>>;
Review Comment:
It might help here to leave a link back to the docs in
`parquet/src/file/mod.rs` to bring readers to an example
##########
parquet_derive_test/src/lib.rs:
##########
@@ -131,9 +131,13 @@ mod tests {
let mut writer =
SerializedFileWriter::new(file, generated_schema, props).unwrap();
- let mut row_group = writer.next_row_group().unwrap();
- drs.as_slice().write_to_row_group(&mut row_group).unwrap();
- writer.close_row_group(row_group).unwrap();
+ {
+ let mut row_group = writer.next_row_group().unwrap();
+ drs.as_slice()
+ .write_to_row_group(row_group.as_mut())
+ .unwrap();
+ row_group.close().unwrap();
Review Comment:
I do like this pattern better (close the row group writer rather than
passing it back to the parquet file writer to close)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]