devinjdangelo commented on code in PR #7791:
URL: https://github.com/apache/arrow-datafusion/pull/7791#discussion_r1356910805


##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -529,5 +560,187 @@ pub(crate) async fn stateless_serialize_and_write_files(
         }
     }
 
-    Ok(row_count)
+    tx.send(row_count).await.map_err(|_| {
+        DataFusionError::Internal(
+            "Error encountered while sending row count back to file 
sink!".into(),
+        )
+    })?;
+    Ok(())
+}
+
+/// Orchestrates multipart put of a dynamic number of output files from a 
single input stream
+/// for any statelessly serialized file type. That is, any file type for which 
each [RecordBatch]
+/// can be serialized independently of all other [RecordBatch]s.
+pub(crate) async fn stateless_multipart_put(
+    data: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    file_extension: String,
+    get_serializer: Box<dyn Fn() -> Box<dyn BatchSerializer> + Send>,
+    config: &FileSinkConfig,
+    compression: FileCompressionType,
+) -> Result<u64> {
+    let object_store = context
+        .runtime_env()
+        .object_store(&config.object_store_url)?;
+
+    let single_file_output = config.single_file_output;
+    let base_output_path = &config.table_paths[0];
+    let unbounded_input = config.unbounded_input;
+
+    let (demux_task, mut file_stream_rx) = start_demuxer_task(
+        data,
+        context,
+        None,
+        base_output_path.clone(),
+        file_extension,
+        single_file_output,
+    );
+
+    let rb_buffer_size = &context
+        .session_config()
+        .options()
+        .execution
+        .max_buffered_batches_per_output_file;
+
+    let (tx_file_bundle, rx_file_bundle) = 
tokio::sync::mpsc::channel(rb_buffer_size / 2);
+    let (tx_row_cnt, mut rx_row_cnt) = tokio::sync::mpsc::channel(1);

Review Comment:
   Nice that's perfect. I made this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to