tustvold commented on code in PR #7655:
URL: https://github.com/apache/arrow-datafusion/pull/7655#discussion_r1369983554


##########
datafusion/common/src/config.rs:
##########
@@ -271,7 +271,7 @@ config_namespace! {
         /// for each output file being worked. Higher values can potentially
         /// give faster write performance at the cost of higher peak
         /// memory consumption
-        pub max_buffered_batches_per_output_file: usize, default = 2
+        pub max_buffered_batches_per_output_file: usize, default = 10

Review Comment:
   This seems very high



##########
datafusion/common/src/config.rs:
##########
@@ -377,12 +377,24 @@ config_namespace! {
         pub bloom_filter_ndv: Option<u64>, default = None
 
         /// Controls whether DataFusion will attempt to speed up writing
-        /// large parquet files by first writing multiple smaller files
-        /// and then stitching them together into a single large file.
-        /// This will result in faster write speeds, but higher memory usage.
-        /// Also currently unsupported are bloom filters and column indexes
-        /// when single_file_parallelism is enabled.
-        pub allow_single_file_parallelism: bool, default = false
+        /// parquet files by serializing them in parallel. Each column
+        /// in each row group in each output file are serialized in parallel
+        /// leveraging a maximum possible core count of 
n_files*n_row_groups*n_columns.
+        pub allow_single_file_parallelism: bool, default = true
+
+        /// If allow_single_file_parallelism=true, this setting allows
+        /// applying backpressure to prevent working on too many row groups in
+        /// parallel in case of limited memory or slow I/O speed causing
+        /// OOM errors. Lowering this number limits memory growth at the cost
+        /// of potentially slower write speeds.
+        pub maximum_parallel_row_group_writers: usize, default = 16
+
+        /// If allow_single_file_parallelism=true, this setting allows
+        /// applying backpressure to prevent too many RecordBatches building
+        /// up in memory in case the parallel writers cannot consume them fast
+        /// enough. Lowering this number limits memory growth at the cost
+        /// of potentially lower write speeds.
+        pub maximum_buffered_record_batches_per_stream: usize, default = 200

Review Comment:
   This seems extraordinarily high, in order places we buffer up to 1



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -823,119 +814,226 @@ impl DataSink for ParquetSink {
     }
 }
 
-/// This is the return type when joining subtasks which are serializing 
parquet files
-/// into memory buffers. The first part of the tuple is the parquet bytes and 
the
-/// second is how many rows were written into the file.
-type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
+/// Consumes a stream of [ArrowLeafColumn] via a channel and serializes them 
using an [ArrowColumnWriter]
+/// Once the channel is exhausted, returns the ArrowColumnWriter.
+async fn column_serializer_task(
+    mut rx: Receiver<ArrowLeafColumn>,
+    mut writer: ArrowColumnWriter,
+) -> Result<ArrowColumnWriter> {
+    while let Some(col) = rx.recv().await {
+        writer.write(&col)?;
+    }
+    Ok(writer)
+}
 
-/// Parallelizes the serialization of a single parquet file, by first 
serializing N
-/// independent RecordBatch streams in parallel to parquet files in memory. 
Another
-/// task then stitches these independent files back together and streams this 
large
-/// single parquet file to an ObjectStore in multiple parts.
-async fn output_single_parquet_file_parallelized(
-    mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + 
Unpin>>,
-    mut data: Vec<SendableRecordBatchStream>,
-    output_schema: Arc<Schema>,
-    parquet_props: &WriterProperties,
-) -> Result<usize> {
-    let mut row_count = 0;
-    // TODO decrease parallelism / buffering:
-    // https://github.com/apache/arrow-datafusion/issues/7591
-    let parallelism = data.len();
-    let mut join_handles: Vec<JoinHandle<ParquetFileSerializedResult>> =
-        Vec::with_capacity(parallelism);
-    for _ in 0..parallelism {
-        let buffer: Vec<u8> = Vec::new();
-        let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
-            buffer,
-            output_schema.clone(),
-            Some(parquet_props.clone()),
-        )?;
-        let mut data_stream = data.remove(0);
-        join_handles.push(tokio::spawn(async move {
-            let mut inner_row_count = 0;
-            while let Some(batch) = data_stream.next().await.transpose()? {
-                inner_row_count += batch.num_rows();
-                writer.write(&batch)?;
-            }
-            let out = writer.into_inner()?;
-            Ok((out, inner_row_count))
-        }))
+type ColumnJoinHandle = JoinHandle<Result<ArrowColumnWriter>>;
+type ColSender = Sender<ArrowLeafColumn>;
+/// Spawns a parallel serialization task for each column
+/// Returns join handles for each columns serialization task along with a send 
channel
+/// to send arrow arrays to each serialization task.
+fn spawn_column_parallel_row_group_writer(
+    schema: Arc<Schema>,
+    parquet_props: Arc<WriterProperties>,
+    max_buffer_size: usize,
+) -> Result<(Vec<ColumnJoinHandle>, Vec<ColSender>)> {
+    let schema_desc = arrow_to_parquet_schema(&schema)?;
+    let col_writers = get_column_writers(&schema_desc, &parquet_props, 
&schema)?;
+    let num_columns = col_writers.len();
+
+    let mut col_writer_handles = Vec::with_capacity(num_columns);
+    let mut col_array_channels = Vec::with_capacity(num_columns);
+    for writer in col_writers.into_iter() {
+        // Buffer size of this channel limits the number of arrays queued up 
for column level serialization
+        let (send_array, recieve_array) =
+            mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
+        col_array_channels.push(send_array);
+        col_writer_handles
+            .push(tokio::spawn(column_serializer_task(recieve_array, writer)))
+    }
+
+    Ok((col_writer_handles, col_array_channels))
+}
+
+/// Settings related to writing parquet files in parallel
+#[derive(Clone)]
+struct ParallelParquetWriterOptions {
+    max_parallel_row_groups: usize,
+    max_buffered_record_batches_per_stream: usize,
+}
+
+/// This is the return type of calling [ArrowColumnWriter].close() on each 
column
+/// i.e. the Vec of encoded columns which can be appended to a row group
+type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, usize)>;
+
+/// Sends the ArrowArrays in passed [RecordBatch] through the channels to 
their respective
+/// parallel column serializers.
+async fn send_arrays_to_col_writers(
+    col_array_channels: &[ColSender],
+    rb: &RecordBatch,
+    schema: Arc<Schema>,
+) -> Result<()> {
+    for (tx, array, field) in col_array_channels
+        .iter()
+        .zip(rb.columns())
+        .zip(schema.fields())
+        .map(|((a, b), c)| (a, b, c))
+    {
+        for c in compute_leaves(field, array)? {
+            tx.send(c).await.map_err(|_| {
+                DataFusionError::Internal("Unable to send array to 
writer!".into())
+            })?;
+        }
     }
 
-    let mut writer = None;
-    let endpoints: (UnboundedSender<Vec<u8>>, UnboundedReceiver<Vec<u8>>) =
-        tokio::sync::mpsc::unbounded_channel();
-    let (tx, mut rx) = endpoints;
-    let writer_join_handle: JoinHandle<
-        Result<
-            AbortableWrite<Box<dyn tokio::io::AsyncWrite + std::marker::Send + 
Unpin>>,
-            DataFusionError,
-        >,
-    > = tokio::task::spawn(async move {
-        while let Some(data) = rx.recv().await {
-            // TODO write incrementally
-            // https://github.com/apache/arrow-datafusion/issues/7591
-            object_store_writer.write_all(data.as_slice()).await?;
+    Ok(())
+}
+
+/// Spawns a tokio task which joins the parallel column writer tasks,
+/// and finalizes the row group.
+fn spawn_rg_join_and_finalize_task(
+    column_writer_handles: Vec<JoinHandle<Result<ArrowColumnWriter>>>,

Review Comment:
   Should we be using a JoinSet here so that if a task fails it aborts them all?



##########
datafusion/common/src/config.rs:
##########
@@ -377,12 +377,24 @@ config_namespace! {
         pub bloom_filter_ndv: Option<u64>, default = None
 
         /// Controls whether DataFusion will attempt to speed up writing
-        /// large parquet files by first writing multiple smaller files
-        /// and then stitching them together into a single large file.
-        /// This will result in faster write speeds, but higher memory usage.
-        /// Also currently unsupported are bloom filters and column indexes
-        /// when single_file_parallelism is enabled.
-        pub allow_single_file_parallelism: bool, default = false
+        /// parquet files by serializing them in parallel. Each column
+        /// in each row group in each output file are serialized in parallel
+        /// leveraging a maximum possible core count of 
n_files*n_row_groups*n_columns.
+        pub allow_single_file_parallelism: bool, default = true
+
+        /// If allow_single_file_parallelism=true, this setting allows
+        /// applying backpressure to prevent working on too many row groups in
+        /// parallel in case of limited memory or slow I/O speed causing
+        /// OOM errors. Lowering this number limits memory growth at the cost
+        /// of potentially slower write speeds.
+        pub maximum_parallel_row_group_writers: usize, default = 16

Review Comment:
   Am I correct in thinking this is still bounded by the input parallelism? Is 
it worth noting this?



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -823,119 +814,226 @@ impl DataSink for ParquetSink {
     }
 }
 
-/// This is the return type when joining subtasks which are serializing 
parquet files
-/// into memory buffers. The first part of the tuple is the parquet bytes and 
the
-/// second is how many rows were written into the file.
-type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
+/// Consumes a stream of [ArrowLeafColumn] via a channel and serializes them 
using an [ArrowColumnWriter]
+/// Once the channel is exhausted, returns the ArrowColumnWriter.
+async fn column_serializer_task(
+    mut rx: Receiver<ArrowLeafColumn>,
+    mut writer: ArrowColumnWriter,
+) -> Result<ArrowColumnWriter> {
+    while let Some(col) = rx.recv().await {
+        writer.write(&col)?;
+    }
+    Ok(writer)
+}
 
-/// Parallelizes the serialization of a single parquet file, by first 
serializing N
-/// independent RecordBatch streams in parallel to parquet files in memory. 
Another
-/// task then stitches these independent files back together and streams this 
large
-/// single parquet file to an ObjectStore in multiple parts.
-async fn output_single_parquet_file_parallelized(
-    mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + 
Unpin>>,
-    mut data: Vec<SendableRecordBatchStream>,
-    output_schema: Arc<Schema>,
-    parquet_props: &WriterProperties,
-) -> Result<usize> {
-    let mut row_count = 0;
-    // TODO decrease parallelism / buffering:
-    // https://github.com/apache/arrow-datafusion/issues/7591
-    let parallelism = data.len();
-    let mut join_handles: Vec<JoinHandle<ParquetFileSerializedResult>> =
-        Vec::with_capacity(parallelism);
-    for _ in 0..parallelism {
-        let buffer: Vec<u8> = Vec::new();
-        let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
-            buffer,
-            output_schema.clone(),
-            Some(parquet_props.clone()),
-        )?;
-        let mut data_stream = data.remove(0);
-        join_handles.push(tokio::spawn(async move {
-            let mut inner_row_count = 0;
-            while let Some(batch) = data_stream.next().await.transpose()? {
-                inner_row_count += batch.num_rows();
-                writer.write(&batch)?;
-            }
-            let out = writer.into_inner()?;
-            Ok((out, inner_row_count))
-        }))
+type ColumnJoinHandle = JoinHandle<Result<ArrowColumnWriter>>;
+type ColSender = Sender<ArrowLeafColumn>;
+/// Spawns a parallel serialization task for each column
+/// Returns join handles for each columns serialization task along with a send 
channel
+/// to send arrow arrays to each serialization task.
+fn spawn_column_parallel_row_group_writer(
+    schema: Arc<Schema>,
+    parquet_props: Arc<WriterProperties>,
+    max_buffer_size: usize,
+) -> Result<(Vec<ColumnJoinHandle>, Vec<ColSender>)> {
+    let schema_desc = arrow_to_parquet_schema(&schema)?;
+    let col_writers = get_column_writers(&schema_desc, &parquet_props, 
&schema)?;
+    let num_columns = col_writers.len();
+
+    let mut col_writer_handles = Vec::with_capacity(num_columns);
+    let mut col_array_channels = Vec::with_capacity(num_columns);
+    for writer in col_writers.into_iter() {
+        // Buffer size of this channel limits the number of arrays queued up 
for column level serialization
+        let (send_array, recieve_array) =
+            mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
+        col_array_channels.push(send_array);
+        col_writer_handles
+            .push(tokio::spawn(column_serializer_task(recieve_array, writer)))
+    }
+
+    Ok((col_writer_handles, col_array_channels))
+}
+
+/// Settings related to writing parquet files in parallel
+#[derive(Clone)]
+struct ParallelParquetWriterOptions {
+    max_parallel_row_groups: usize,
+    max_buffered_record_batches_per_stream: usize,
+}
+
+/// This is the return type of calling [ArrowColumnWriter].close() on each 
column
+/// i.e. the Vec of encoded columns which can be appended to a row group
+type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, usize)>;
+
+/// Sends the ArrowArrays in passed [RecordBatch] through the channels to 
their respective
+/// parallel column serializers.
+async fn send_arrays_to_col_writers(
+    col_array_channels: &[ColSender],
+    rb: &RecordBatch,
+    schema: Arc<Schema>,
+) -> Result<()> {
+    for (tx, array, field) in col_array_channels
+        .iter()
+        .zip(rb.columns())
+        .zip(schema.fields())
+        .map(|((a, b), c)| (a, b, c))
+    {
+        for c in compute_leaves(field, array)? {
+            tx.send(c).await.map_err(|_| {
+                DataFusionError::Internal("Unable to send array to 
writer!".into())
+            })?;
+        }
     }
 
-    let mut writer = None;
-    let endpoints: (UnboundedSender<Vec<u8>>, UnboundedReceiver<Vec<u8>>) =
-        tokio::sync::mpsc::unbounded_channel();
-    let (tx, mut rx) = endpoints;
-    let writer_join_handle: JoinHandle<
-        Result<
-            AbortableWrite<Box<dyn tokio::io::AsyncWrite + std::marker::Send + 
Unpin>>,
-            DataFusionError,
-        >,
-    > = tokio::task::spawn(async move {
-        while let Some(data) = rx.recv().await {
-            // TODO write incrementally
-            // https://github.com/apache/arrow-datafusion/issues/7591
-            object_store_writer.write_all(data.as_slice()).await?;
+    Ok(())
+}
+
+/// Spawns a tokio task which joins the parallel column writer tasks,
+/// and finalizes the row group.
+fn spawn_rg_join_and_finalize_task(
+    column_writer_handles: Vec<JoinHandle<Result<ArrowColumnWriter>>>,
+    rg_rows: usize,
+) -> JoinHandle<RBStreamSerializeResult> {
+    tokio::spawn(async move {
+        let num_cols = column_writer_handles.len();
+        let mut finalized_rg = Vec::with_capacity(num_cols);
+        for handle in column_writer_handles.into_iter() {
+            match handle.await {
+                Ok(r) => {
+                    let w = r?;
+                    finalized_rg.push(w.close()?);
+                }
+                Err(e) => {
+                    if e.is_panic() {
+                        std::panic::resume_unwind(e.into_panic())
+                    } else {
+                        unreachable!()
+                    }
+                }
+            }
+        }
+
+        Ok((finalized_rg, rg_rows))
+    })
+}
+
+/// This task coordinates the serialization of a parquet file in parallel.
+/// As the query produces RecordBatches, these are written to a RowGroup
+/// via parallel [ArrowColumnWriter] tasks. Once the desired max rows per
+/// row group is reached, the parallel tasks are joined on another separate 
task
+/// and sent to a concatenation task. This task immediately continues to work
+/// on the next row group in parallel. So, parquet serialization is 
parallelized
+/// accross both columns and row_groups, with a theoretical max number of 
parallel tasks
+/// given by n_columns * num_row_groups.
+fn spawn_parquet_parallel_serialization_task(
+    mut data: Receiver<RecordBatch>,
+    serialize_tx: Sender<JoinHandle<RBStreamSerializeResult>>,
+    schema: Arc<Schema>,
+    writer_props: Arc<WriterProperties>,
+    parallel_options: ParallelParquetWriterOptions,
+) -> JoinHandle<Result<(), DataFusionError>> {
+    tokio::spawn(async move {
+        // This is divided by 2 because we move RecordBatches between two 
channels so the effective
+        // buffer limit is the sum of the size of each buffer, i.e. when both 
buffers are full.
+        let max_buffer_rb = 
parallel_options.max_buffered_record_batches_per_stream / 2;
+        let max_row_group_rows = writer_props.max_row_group_size();
+        let (mut column_writer_handles, mut col_array_channels) =
+            spawn_column_parallel_row_group_writer(
+                schema.clone(),
+                writer_props.clone(),
+                max_buffer_rb,
+            )?;
+        let mut current_rg_rows = 0;
+
+        while let Some(rb) = data.recv().await {
+            if current_rg_rows + rb.num_rows() < max_row_group_rows {
+                send_arrays_to_col_writers(&col_array_channels, &rb, 
schema.clone())
+                    .await?;
+                current_rg_rows += rb.num_rows();
+            } else {
+                let rows_left = max_row_group_rows - current_rg_rows;
+                let a = rb.slice(0, rows_left);
+                send_arrays_to_col_writers(&col_array_channels, &a, 
schema.clone())
+                    .await?;
+
+                // Signal the parallel column writers that the RowGroup is 
done, join and finalize RowGroup
+                // on a separate task, so that we can immediately start on the 
next RG before waiting
+                // for the current one to finish.
+                drop(col_array_channels);
+                let finalize_rg_task = spawn_rg_join_and_finalize_task(
+                    column_writer_handles,
+                    max_row_group_rows,
+                );
+
+                serialize_tx.send(finalize_rg_task).await.map_err(|_| {
+                    DataFusionError::Internal(
+                        "Unable to send closed RG to concat task!".into(),
+                    )
+                })?;
+
+                let b = rb.slice(rows_left, rb.num_rows() - rows_left);
+                (column_writer_handles, col_array_channels) =
+                    spawn_column_parallel_row_group_writer(
+                        schema.clone(),
+                        writer_props.clone(),
+                        max_buffer_rb,
+                    )?;
+                send_arrays_to_col_writers(&col_array_channels, &b, 
schema.clone())
+                    .await?;
+                current_rg_rows = b.num_rows();
+            }
         }
-        Ok(object_store_writer)
-    });
+
+        // Handle leftover rows as final rowgroup, which may be smaller than 
max_row_group_rows
+        drop(col_array_channels);
+        let finalize_rg_task =

Review Comment:
   Should this check that current_rg_rows != 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to