metesynnada commented on code in PR #7791: URL: https://github.com/apache/arrow-datafusion/pull/7791#discussion_r1356858826
########## datafusion/core/src/datasource/file_format/write.rs: ########## @@ -390,29 +391,101 @@ async fn serialize_rb_stream_to_object_store( )) } }; - Ok((serializer, writer, row_count as u64)) + Ok((writer, row_count as u64)) } +type RecordBatchReceiver = Receiver<RecordBatch>; +type DemuxedStreamReceiver = Receiver<(Path, RecordBatchReceiver)>; + +/// Splits a single [SendableRecordBatchStream] into a dynamically determined +/// number of partitions at execution time. The partitions are determined by +/// factors known only at execution time, such as total number of rows and +/// partition column values. The demuxer task communicates to the caller +/// by sending channels over a channel. The inner channels send RecordBatches +/// which should be contained within the same output file. The outer channel +/// is used to send a dynamic number of inner channels, representing a dynamic +/// number of total output files. The caller is also responsible to monitor +/// the demux task for errors and abort accordingly. The single_file_ouput parameter +/// overrides all other settings to force only a single file to be written. +/// partition_by parameter will additionally split the input based on the unique +/// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>`` +pub(crate) fn start_demuxer_task( Review Comment: You can improve code readability by dividing the logic into methods. For the start_demux_task function, it might be helpful to divide it into smaller parts: ```rust pub(crate) fn start_demuxer_task( input: SendableRecordBatchStream, context: &Arc<TaskContext>, _partition_by: Option<&str>, base_output_path: ListingTableUrl, file_extension: String, single_file_output: bool, ) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) { let exec_options = &context.session_config().options().execution; let max_rows_per_file = exec_options.soft_max_rows_per_output_file; let max_parallel_files = exec_options.max_parallel_ouput_files; let max_buffered_batches = exec_options.max_buffered_batches_per_output_file; let (tx, rx) = mpsc::channel(max_parallel_files); let task = tokio::spawn(async move { demux_stream( input, base_output_path, file_extension, single_file_output, max_rows_per_file, max_buffered_batches, tx, ) .await }); (task, rx) } fn generate_file_path( base_output_path: &ListingTableUrl, write_id: &str, part_idx: usize, file_extension: &str, single_file_output: bool, ) -> Path { if !single_file_output { base_output_path.prefix().child(format!("{}_{}.{}", write_id, part_idx, file_extension)) } else { base_output_path.prefix().to_owned() } } async fn create_new_file_stream( base_output_path: &ListingTableUrl, write_id: &str, part_idx: usize, file_extension: &str, single_file_output: bool, max_buffered_batches: usize, tx: &mut Sender<(Path, Receiver<RecordBatch>)>, ) -> Result<Sender<RecordBatch>> { let file_path = generate_file_path( base_output_path, write_id, part_idx, file_extension, single_file_output, ); let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2); tx.send((file_path, rx_file)).await.map_err(|_| { DataFusionError::Execution( "Error sending RecordBatch to file stream!".into(), ) })?; Ok(tx_file) } async fn demux_stream( mut input: SendableRecordBatchStream, base_output_path: ListingTableUrl, file_extension: String, single_file_output: bool, max_rows_per_file: usize, max_buffered_batches: usize, mut tx: Sender<(Path, Receiver<RecordBatch>)>, ) -> Result<()> { let mut total_rows_current_file = 0; let mut part_idx = 0; let write_id = rand::distributions::Alphanumeric .sample_string(&mut rand::thread_rng(), 16); let mut tx_file = create_new_file_stream( &base_output_path, &write_id, part_idx, &file_extension, single_file_output, max_buffered_batches, &mut tx, ) .await?; part_idx += 1; while let Some(rb) = input.next().await.transpose()? { total_rows_current_file += rb.num_rows(); tx_file.send(rb).await.map_err(|_| { DataFusionError::Execution( "Error sending RecordBatch to file stream!".into(), ) })?; if total_rows_current_file >= max_rows_per_file && !single_file_output { total_rows_current_file = 0; tx_file = create_new_file_stream( &base_output_path, &write_id, part_idx, &file_extension, single_file_output, max_buffered_batches, &mut tx, ) .await?; part_idx += 1; } } Ok(()) } ``` Btw, this code is tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org