devinjdangelo commented on code in PR #7655:
URL: https://github.com/apache/arrow-datafusion/pull/7655#discussion_r1370946223
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -823,119 +814,226 @@ impl DataSink for ParquetSink {
}
}
-/// This is the return type when joining subtasks which are serializing
parquet files
-/// into memory buffers. The first part of the tuple is the parquet bytes and
the
-/// second is how many rows were written into the file.
-type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
+/// Consumes a stream of [ArrowLeafColumn] via a channel and serializes them
using an [ArrowColumnWriter]
+/// Once the channel is exhausted, returns the ArrowColumnWriter.
+async fn column_serializer_task(
+ mut rx: Receiver<ArrowLeafColumn>,
+ mut writer: ArrowColumnWriter,
+) -> Result<ArrowColumnWriter> {
+ while let Some(col) = rx.recv().await {
+ writer.write(&col)?;
+ }
+ Ok(writer)
+}
-/// Parallelizes the serialization of a single parquet file, by first
serializing N
-/// independent RecordBatch streams in parallel to parquet files in memory.
Another
-/// task then stitches these independent files back together and streams this
large
-/// single parquet file to an ObjectStore in multiple parts.
-async fn output_single_parquet_file_parallelized(
- mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send +
Unpin>>,
- mut data: Vec<SendableRecordBatchStream>,
- output_schema: Arc<Schema>,
- parquet_props: &WriterProperties,
-) -> Result<usize> {
- let mut row_count = 0;
- // TODO decrease parallelism / buffering:
- // https://github.com/apache/arrow-datafusion/issues/7591
- let parallelism = data.len();
- let mut join_handles: Vec<JoinHandle<ParquetFileSerializedResult>> =
- Vec::with_capacity(parallelism);
- for _ in 0..parallelism {
- let buffer: Vec<u8> = Vec::new();
- let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
- buffer,
- output_schema.clone(),
- Some(parquet_props.clone()),
- )?;
- let mut data_stream = data.remove(0);
- join_handles.push(tokio::spawn(async move {
- let mut inner_row_count = 0;
- while let Some(batch) = data_stream.next().await.transpose()? {
- inner_row_count += batch.num_rows();
- writer.write(&batch)?;
- }
- let out = writer.into_inner()?;
- Ok((out, inner_row_count))
- }))
+type ColumnJoinHandle = JoinHandle<Result<ArrowColumnWriter>>;
+type ColSender = Sender<ArrowLeafColumn>;
+/// Spawns a parallel serialization task for each column
+/// Returns join handles for each columns serialization task along with a send
channel
+/// to send arrow arrays to each serialization task.
+fn spawn_column_parallel_row_group_writer(
+ schema: Arc<Schema>,
+ parquet_props: Arc<WriterProperties>,
+ max_buffer_size: usize,
+) -> Result<(Vec<ColumnJoinHandle>, Vec<ColSender>)> {
+ let schema_desc = arrow_to_parquet_schema(&schema)?;
+ let col_writers = get_column_writers(&schema_desc, &parquet_props,
&schema)?;
+ let num_columns = col_writers.len();
+
+ let mut col_writer_handles = Vec::with_capacity(num_columns);
+ let mut col_array_channels = Vec::with_capacity(num_columns);
+ for writer in col_writers.into_iter() {
+ // Buffer size of this channel limits the number of arrays queued up
for column level serialization
+ let (send_array, recieve_array) =
+ mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
+ col_array_channels.push(send_array);
+ col_writer_handles
+ .push(tokio::spawn(column_serializer_task(recieve_array, writer)))
+ }
+
+ Ok((col_writer_handles, col_array_channels))
+}
+
+/// Settings related to writing parquet files in parallel
+#[derive(Clone)]
+struct ParallelParquetWriterOptions {
+ max_parallel_row_groups: usize,
+ max_buffered_record_batches_per_stream: usize,
+}
+
+/// This is the return type of calling [ArrowColumnWriter].close() on each
column
+/// i.e. the Vec of encoded columns which can be appended to a row group
+type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, usize)>;
+
+/// Sends the ArrowArrays in passed [RecordBatch] through the channels to
their respective
+/// parallel column serializers.
+async fn send_arrays_to_col_writers(
+ col_array_channels: &[ColSender],
+ rb: &RecordBatch,
+ schema: Arc<Schema>,
+) -> Result<()> {
+ for (tx, array, field) in col_array_channels
+ .iter()
+ .zip(rb.columns())
+ .zip(schema.fields())
+ .map(|((a, b), c)| (a, b, c))
+ {
+ for c in compute_leaves(field, array)? {
+ tx.send(c).await.map_err(|_| {
+ DataFusionError::Internal("Unable to send array to
writer!".into())
+ })?;
+ }
}
- let mut writer = None;
- let endpoints: (UnboundedSender<Vec<u8>>, UnboundedReceiver<Vec<u8>>) =
- tokio::sync::mpsc::unbounded_channel();
- let (tx, mut rx) = endpoints;
- let writer_join_handle: JoinHandle<
- Result<
- AbortableWrite<Box<dyn tokio::io::AsyncWrite + std::marker::Send +
Unpin>>,
- DataFusionError,
- >,
- > = tokio::task::spawn(async move {
- while let Some(data) = rx.recv().await {
- // TODO write incrementally
- // https://github.com/apache/arrow-datafusion/issues/7591
- object_store_writer.write_all(data.as_slice()).await?;
+ Ok(())
+}
+
+/// Spawns a tokio task which joins the parallel column writer tasks,
+/// and finalizes the row group.
+fn spawn_rg_join_and_finalize_task(
+ column_writer_handles: Vec<JoinHandle<Result<ArrowColumnWriter>>>,
+ rg_rows: usize,
+) -> JoinHandle<RBStreamSerializeResult> {
+ tokio::spawn(async move {
+ let num_cols = column_writer_handles.len();
+ let mut finalized_rg = Vec::with_capacity(num_cols);
+ for handle in column_writer_handles.into_iter() {
+ match handle.await {
+ Ok(r) => {
+ let w = r?;
+ finalized_rg.push(w.close()?);
+ }
+ Err(e) => {
+ if e.is_panic() {
+ std::panic::resume_unwind(e.into_panic())
+ } else {
+ unreachable!()
+ }
+ }
+ }
+ }
+
+ Ok((finalized_rg, rg_rows))
+ })
+}
+
+/// This task coordinates the serialization of a parquet file in parallel.
+/// As the query produces RecordBatches, these are written to a RowGroup
+/// via parallel [ArrowColumnWriter] tasks. Once the desired max rows per
+/// row group is reached, the parallel tasks are joined on another separate
task
+/// and sent to a concatenation task. This task immediately continues to work
+/// on the next row group in parallel. So, parquet serialization is
parallelized
+/// accross both columns and row_groups, with a theoretical max number of
parallel tasks
+/// given by n_columns * num_row_groups.
+fn spawn_parquet_parallel_serialization_task(
+ mut data: Receiver<RecordBatch>,
+ serialize_tx: Sender<JoinHandle<RBStreamSerializeResult>>,
+ schema: Arc<Schema>,
+ writer_props: Arc<WriterProperties>,
+ parallel_options: ParallelParquetWriterOptions,
+) -> JoinHandle<Result<(), DataFusionError>> {
+ tokio::spawn(async move {
+ // This is divided by 2 because we move RecordBatches between two
channels so the effective
+ // buffer limit is the sum of the size of each buffer, i.e. when both
buffers are full.
+ let max_buffer_rb =
parallel_options.max_buffered_record_batches_per_stream / 2;
+ let max_row_group_rows = writer_props.max_row_group_size();
+ let (mut column_writer_handles, mut col_array_channels) =
+ spawn_column_parallel_row_group_writer(
+ schema.clone(),
+ writer_props.clone(),
+ max_buffer_rb,
+ )?;
+ let mut current_rg_rows = 0;
+
+ while let Some(rb) = data.recv().await {
+ if current_rg_rows + rb.num_rows() < max_row_group_rows {
+ send_arrays_to_col_writers(&col_array_channels, &rb,
schema.clone())
+ .await?;
+ current_rg_rows += rb.num_rows();
+ } else {
+ let rows_left = max_row_group_rows - current_rg_rows;
+ let a = rb.slice(0, rows_left);
+ send_arrays_to_col_writers(&col_array_channels, &a,
schema.clone())
+ .await?;
+
+ // Signal the parallel column writers that the RowGroup is
done, join and finalize RowGroup
+ // on a separate task, so that we can immediately start on the
next RG before waiting
+ // for the current one to finish.
+ drop(col_array_channels);
+ let finalize_rg_task = spawn_rg_join_and_finalize_task(
+ column_writer_handles,
+ max_row_group_rows,
+ );
+
+ serialize_tx.send(finalize_rg_task).await.map_err(|_| {
+ DataFusionError::Internal(
+ "Unable to send closed RG to concat task!".into(),
+ )
+ })?;
+
+ let b = rb.slice(rows_left, rb.num_rows() - rows_left);
+ (column_writer_handles, col_array_channels) =
+ spawn_column_parallel_row_group_writer(
+ schema.clone(),
+ writer_props.clone(),
+ max_buffer_rb,
+ )?;
+ send_arrays_to_col_writers(&col_array_channels, &b,
schema.clone())
+ .await?;
+ current_rg_rows = b.num_rows();
+ }
}
- Ok(object_store_writer)
- });
+
+ // Handle leftover rows as final rowgroup, which may be smaller than
max_row_group_rows
+ drop(col_array_channels);
+ let finalize_rg_task =
Review Comment:
Yes good call, I updated this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]