devinjdangelo commented on code in PR #7655:
URL: https://github.com/apache/arrow-datafusion/pull/7655#discussion_r1370945972
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -823,119 +814,226 @@ impl DataSink for ParquetSink {
}
}
-/// This is the return type when joining subtasks which are serializing
parquet files
-/// into memory buffers. The first part of the tuple is the parquet bytes and
the
-/// second is how many rows were written into the file.
-type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
+/// Consumes a stream of [ArrowLeafColumn] via a channel and serializes them
using an [ArrowColumnWriter]
+/// Once the channel is exhausted, returns the ArrowColumnWriter.
+async fn column_serializer_task(
+ mut rx: Receiver<ArrowLeafColumn>,
+ mut writer: ArrowColumnWriter,
+) -> Result<ArrowColumnWriter> {
+ while let Some(col) = rx.recv().await {
+ writer.write(&col)?;
+ }
+ Ok(writer)
+}
-/// Parallelizes the serialization of a single parquet file, by first
serializing N
-/// independent RecordBatch streams in parallel to parquet files in memory.
Another
-/// task then stitches these independent files back together and streams this
large
-/// single parquet file to an ObjectStore in multiple parts.
-async fn output_single_parquet_file_parallelized(
- mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send +
Unpin>>,
- mut data: Vec<SendableRecordBatchStream>,
- output_schema: Arc<Schema>,
- parquet_props: &WriterProperties,
-) -> Result<usize> {
- let mut row_count = 0;
- // TODO decrease parallelism / buffering:
- // https://github.com/apache/arrow-datafusion/issues/7591
- let parallelism = data.len();
- let mut join_handles: Vec<JoinHandle<ParquetFileSerializedResult>> =
- Vec::with_capacity(parallelism);
- for _ in 0..parallelism {
- let buffer: Vec<u8> = Vec::new();
- let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
- buffer,
- output_schema.clone(),
- Some(parquet_props.clone()),
- )?;
- let mut data_stream = data.remove(0);
- join_handles.push(tokio::spawn(async move {
- let mut inner_row_count = 0;
- while let Some(batch) = data_stream.next().await.transpose()? {
- inner_row_count += batch.num_rows();
- writer.write(&batch)?;
- }
- let out = writer.into_inner()?;
- Ok((out, inner_row_count))
- }))
+type ColumnJoinHandle = JoinHandle<Result<ArrowColumnWriter>>;
+type ColSender = Sender<ArrowLeafColumn>;
+/// Spawns a parallel serialization task for each column
+/// Returns join handles for each columns serialization task along with a send
channel
+/// to send arrow arrays to each serialization task.
+fn spawn_column_parallel_row_group_writer(
+ schema: Arc<Schema>,
+ parquet_props: Arc<WriterProperties>,
+ max_buffer_size: usize,
+) -> Result<(Vec<ColumnJoinHandle>, Vec<ColSender>)> {
+ let schema_desc = arrow_to_parquet_schema(&schema)?;
+ let col_writers = get_column_writers(&schema_desc, &parquet_props,
&schema)?;
+ let num_columns = col_writers.len();
+
+ let mut col_writer_handles = Vec::with_capacity(num_columns);
+ let mut col_array_channels = Vec::with_capacity(num_columns);
+ for writer in col_writers.into_iter() {
+ // Buffer size of this channel limits the number of arrays queued up
for column level serialization
+ let (send_array, recieve_array) =
+ mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
+ col_array_channels.push(send_array);
+ col_writer_handles
+ .push(tokio::spawn(column_serializer_task(recieve_array, writer)))
+ }
+
+ Ok((col_writer_handles, col_array_channels))
+}
+
+/// Settings related to writing parquet files in parallel
+#[derive(Clone)]
+struct ParallelParquetWriterOptions {
+ max_parallel_row_groups: usize,
+ max_buffered_record_batches_per_stream: usize,
+}
+
+/// This is the return type of calling [ArrowColumnWriter].close() on each
column
+/// i.e. the Vec of encoded columns which can be appended to a row group
+type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, usize)>;
+
+/// Sends the ArrowArrays in passed [RecordBatch] through the channels to
their respective
+/// parallel column serializers.
+async fn send_arrays_to_col_writers(
+ col_array_channels: &[ColSender],
+ rb: &RecordBatch,
+ schema: Arc<Schema>,
+) -> Result<()> {
+ for (tx, array, field) in col_array_channels
+ .iter()
+ .zip(rb.columns())
+ .zip(schema.fields())
+ .map(|((a, b), c)| (a, b, c))
+ {
+ for c in compute_leaves(field, array)? {
+ tx.send(c).await.map_err(|_| {
+ DataFusionError::Internal("Unable to send array to
writer!".into())
+ })?;
+ }
}
- let mut writer = None;
- let endpoints: (UnboundedSender<Vec<u8>>, UnboundedReceiver<Vec<u8>>) =
- tokio::sync::mpsc::unbounded_channel();
- let (tx, mut rx) = endpoints;
- let writer_join_handle: JoinHandle<
- Result<
- AbortableWrite<Box<dyn tokio::io::AsyncWrite + std::marker::Send +
Unpin>>,
- DataFusionError,
- >,
- > = tokio::task::spawn(async move {
- while let Some(data) = rx.recv().await {
- // TODO write incrementally
- // https://github.com/apache/arrow-datafusion/issues/7591
- object_store_writer.write_all(data.as_slice()).await?;
+ Ok(())
+}
+
+/// Spawns a tokio task which joins the parallel column writer tasks,
+/// and finalizes the row group.
+fn spawn_rg_join_and_finalize_task(
+ column_writer_handles: Vec<JoinHandle<Result<ArrowColumnWriter>>>,
Review Comment:
I think that the order they are joined/appended matters for the
`SerializedRowGroupWriter`, but I would have to double check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]