devinjdangelo commented on code in PR #7791: URL: https://github.com/apache/arrow-datafusion/pull/7791#discussion_r1356911479
########## datafusion/core/src/datasource/file_format/write.rs: ########## @@ -529,5 +560,187 @@ pub(crate) async fn stateless_serialize_and_write_files( } } - Ok(row_count) + tx.send(row_count).await.map_err(|_| { + DataFusionError::Internal( + "Error encountered while sending row count back to file sink!".into(), + ) + })?; + Ok(()) +} + +/// Orchestrates multipart put of a dynamic number of output files from a single input stream +/// for any statelessly serialized file type. That is, any file type for which each [RecordBatch] +/// can be serialized independently of all other [RecordBatch]s. +pub(crate) async fn stateless_multipart_put( + data: SendableRecordBatchStream, + context: &Arc<TaskContext>, + file_extension: String, + get_serializer: Box<dyn Fn() -> Box<dyn BatchSerializer> + Send>, + config: &FileSinkConfig, + compression: FileCompressionType, +) -> Result<u64> { + let object_store = context + .runtime_env() + .object_store(&config.object_store_url)?; + + let single_file_output = config.single_file_output; + let base_output_path = &config.table_paths[0]; + let unbounded_input = config.unbounded_input; + + let (demux_task, mut file_stream_rx) = start_demuxer_task( + data, + context, + None, + base_output_path.clone(), + file_extension, + single_file_output, + ); + + let rb_buffer_size = &context + .session_config() + .options() + .execution + .max_buffered_batches_per_output_file; + + let (tx_file_bundle, rx_file_bundle) = tokio::sync::mpsc::channel(rb_buffer_size / 2); + let (tx_row_cnt, mut rx_row_cnt) = tokio::sync::mpsc::channel(1); + let write_coordinater_task = tokio::spawn(async move { + stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, unbounded_input) + .await + }); + while let Some((output_location, rb_stream)) = file_stream_rx.recv().await { + let serializer = get_serializer(); + let object_meta = ObjectMeta { + location: output_location, + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + }; + let writer = create_writer( + FileWriterMode::PutMultipart, + compression, + object_meta.into(), + object_store.clone(), + ) + .await?; + + tx_file_bundle + .send((rb_stream, serializer, writer)) + .await + .map_err(|_| { + DataFusionError::Internal( + "Writer receive file bundle channel closed unexpectedly!".into(), + ) + })?; + } + + // Signal to the write coordinater that no more files are coming + drop(tx_file_bundle); + + let total_count = if let Some(cnt) = rx_row_cnt.recv().await { + cnt + } else { + return Err(DataFusionError::Internal( + "Did not receive final row count from write cooridnator task!".into(), + )); + }; + + match try_join!(write_coordinater_task, demux_task) { + Ok((r1, r2)) => { + r1?; + r2?; + } + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } + } + } + + Ok(total_count) +} + +/// Orchestrates append_all for any statelessly serialized file type. Appends to all files provided +/// in a round robin fashion. +pub(crate) async fn stateless_append_all( + mut data: SendableRecordBatchStream, + object_store: Arc<dyn ObjectStore>, + file_groups: &Vec<PartitionedFile>, + unbounded_input: bool, + compression: FileCompressionType, + get_serializer: Box<dyn Fn(usize) -> Box<dyn BatchSerializer> + Send>, +) -> Result<u64> { + let (tx_file_bundle, rx_file_bundle) = tokio::sync::mpsc::channel(file_groups.len()); + let mut send_channels = vec![]; + for file_group in file_groups { + let serializer = get_serializer(file_group.object_meta.size); + + let file = file_group.clone(); + let writer = create_writer( + FileWriterMode::Append, + compression, + file.object_meta.clone().into(), + object_store.clone(), + ) + .await?; + + let (tx, rx) = tokio::sync::mpsc::channel(9000); Review Comment: This was actually supposed to be a specific configured value. Thanks for flagging I fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org