alamb commented on code in PR #7632:
URL: https://github.com/apache/arrow-datafusion/pull/7632#discussion_r1336307445
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -828,113 +833,84 @@ impl DataSink for ParquetSink {
}
}
-/// This is the return type when joining subtasks which are serializing
parquet files
-/// into memory buffers. The first part of the tuple is the parquet bytes and
the
-/// second is how many rows were written into the file.
-type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
+/// This is the return type of ArrowRowGroupWriter.close(), i.e. the Vec of
+/// encoded columns which can be appended to a SerializedRowGroupWriter
+type RBStreamSerializeResult =
+ Result<(Vec<(ArrowColumnChunk, ColumnCloseResult)>, usize)>;
/// Parallelizes the serialization of a single parquet file, by first
serializing N
/// independent RecordBatch streams in parallel to parquet files in memory.
Another
/// task then stitches these independent files back together and streams this
large
/// single parquet file to an ObjectStore in multiple parts.
async fn output_single_parquet_file_parallelized(
mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send +
Unpin>>,
- mut data: Vec<SendableRecordBatchStream>,
+ data: Vec<SendableRecordBatchStream>,
output_schema: Arc<Schema>,
parquet_props: &WriterProperties,
) -> Result<usize> {
let mut row_count = 0;
- let parallelism = data.len();
- let mut join_handles: Vec<JoinHandle<ParquetFileSerializedResult>> =
- Vec::with_capacity(parallelism);
- for _ in 0..parallelism {
- let buffer: Vec<u8> = Vec::new();
- let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
- buffer,
- output_schema.clone(),
- Some(parquet_props.clone()),
- )?;
- let mut data_stream = data.remove(0);
- join_handles.push(tokio::spawn(async move {
- let mut inner_row_count = 0;
- while let Some(batch) = data_stream.next().await.transpose()? {
- inner_row_count += batch.num_rows();
- writer.write(&batch)?;
+ let (serialize_tx, mut serialize_rx) =
+ mpsc::channel::<JoinHandle<RBStreamSerializeResult>>(100);
+
+ // Create some Arc<> copies that we can move into launch_serialization
task and still access later
+ let arc_props = Arc::new(parquet_props.clone());
+ let arc_props_clone = arc_props.clone();
+ let schema_clone = output_schema.clone();
+ let launch_serialization_task: JoinHandle<Result<(), DataFusionError>> =
Review Comment:
It probably doesn't matter but if something goes wrong and
`output_single_parquet_file_parallelized` returns an error, this code may well
still launch tasks and try to buffer / serialize the streams.
I think this could be avoided if we put all the handles into a `JoinSet` so
when they were dropped all the tasks would be canceled:
https://docs.rs/tokio/1.32.0/tokio/task/struct.JoinSet.html
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -828,113 +833,84 @@ impl DataSink for ParquetSink {
}
}
-/// This is the return type when joining subtasks which are serializing
parquet files
-/// into memory buffers. The first part of the tuple is the parquet bytes and
the
-/// second is how many rows were written into the file.
-type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
+/// This is the return type of ArrowRowGroupWriter.close(), i.e. the Vec of
+/// encoded columns which can be appended to a SerializedRowGroupWriter
+type RBStreamSerializeResult =
+ Result<(Vec<(ArrowColumnChunk, ColumnCloseResult)>, usize)>;
/// Parallelizes the serialization of a single parquet file, by first
serializing N
/// independent RecordBatch streams in parallel to parquet files in memory.
Another
/// task then stitches these independent files back together and streams this
large
/// single parquet file to an ObjectStore in multiple parts.
async fn output_single_parquet_file_parallelized(
mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send +
Unpin>>,
- mut data: Vec<SendableRecordBatchStream>,
+ data: Vec<SendableRecordBatchStream>,
output_schema: Arc<Schema>,
parquet_props: &WriterProperties,
) -> Result<usize> {
let mut row_count = 0;
- let parallelism = data.len();
- let mut join_handles: Vec<JoinHandle<ParquetFileSerializedResult>> =
- Vec::with_capacity(parallelism);
- for _ in 0..parallelism {
- let buffer: Vec<u8> = Vec::new();
- let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
- buffer,
- output_schema.clone(),
- Some(parquet_props.clone()),
- )?;
- let mut data_stream = data.remove(0);
- join_handles.push(tokio::spawn(async move {
- let mut inner_row_count = 0;
- while let Some(batch) = data_stream.next().await.transpose()? {
- inner_row_count += batch.num_rows();
- writer.write(&batch)?;
+ let (serialize_tx, mut serialize_rx) =
+ mpsc::channel::<JoinHandle<RBStreamSerializeResult>>(100);
+
+ // Create some Arc<> copies that we can move into launch_serialization
task and still access later
+ let arc_props = Arc::new(parquet_props.clone());
+ let arc_props_clone = arc_props.clone();
+ let schema_clone = output_schema.clone();
+ let launch_serialization_task: JoinHandle<Result<(), DataFusionError>> =
+ tokio::spawn(async move {
+ for mut stream in data {
+ let schema_desc = arrow_to_parquet_schema(&schema_clone)?;
+ let mut writer = ArrowRowGroupWriter::new(
+ &schema_desc,
+ &arc_props_clone,
+ &schema_clone,
+ )?;
+ serialize_tx
+ .send(tokio::spawn(async move {
+ let mut inner_row_count = 0;
+ while let Some(rb) = stream.next().await.transpose()? {
+ inner_row_count += rb.num_rows();
+ writer.write(&rb)?;
+ }
+ Ok((writer.close()?, inner_row_count))
+ }))
+ .await
+ .map_err(|_| {
Review Comment:
I think the only way the send will fail is if the receiver was dropped --
either due to early plan cancel or some other error
Thus it might make sense to ignore the error and return (with a comment
about rationale) rather than returning an error
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -828,113 +833,84 @@ impl DataSink for ParquetSink {
}
}
-/// This is the return type when joining subtasks which are serializing
parquet files
-/// into memory buffers. The first part of the tuple is the parquet bytes and
the
-/// second is how many rows were written into the file.
-type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
+/// This is the return type of ArrowRowGroupWriter.close(), i.e. the Vec of
+/// encoded columns which can be appended to a SerializedRowGroupWriter
+type RBStreamSerializeResult =
+ Result<(Vec<(ArrowColumnChunk, ColumnCloseResult)>, usize)>;
/// Parallelizes the serialization of a single parquet file, by first
serializing N
/// independent RecordBatch streams in parallel to parquet files in memory.
Another
/// task then stitches these independent files back together and streams this
large
/// single parquet file to an ObjectStore in multiple parts.
async fn output_single_parquet_file_parallelized(
mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send +
Unpin>>,
- mut data: Vec<SendableRecordBatchStream>,
+ data: Vec<SendableRecordBatchStream>,
output_schema: Arc<Schema>,
parquet_props: &WriterProperties,
) -> Result<usize> {
let mut row_count = 0;
- let parallelism = data.len();
- let mut join_handles: Vec<JoinHandle<ParquetFileSerializedResult>> =
- Vec::with_capacity(parallelism);
- for _ in 0..parallelism {
- let buffer: Vec<u8> = Vec::new();
- let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
- buffer,
- output_schema.clone(),
- Some(parquet_props.clone()),
- )?;
- let mut data_stream = data.remove(0);
- join_handles.push(tokio::spawn(async move {
- let mut inner_row_count = 0;
- while let Some(batch) = data_stream.next().await.transpose()? {
- inner_row_count += batch.num_rows();
- writer.write(&batch)?;
+ let (serialize_tx, mut serialize_rx) =
Review Comment:
my reading of this suggests it allows up to 100 row groups to be created in
parallel, which likely results in more buffering than necessary.
Rather than formulating this as a `mspc::channel` it would be really neat to
see it formulated as a `Stream<(ArrowColumnChunk, ColumnCloseResult)>`.
then, in combination with
[`StreamExt::buffered()`](https://docs.rs/futures/0.3.28/futures/stream/trait.StreamExt.html#method.buffered)
we could control the parallelism at the row group level
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]