metesynnada commented on code in PR #7791:
URL: https://github.com/apache/arrow-datafusion/pull/7791#discussion_r1356819482


##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -390,29 +391,101 @@ async fn serialize_rb_stream_to_object_store(
             ))
         }
     };
-    Ok((serializer, writer, row_count as u64))
+    Ok((writer, row_count as u64))
 }
 
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = Receiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput 
parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the 
unique
+/// values of a specific column 
`<https://github.com/apache/arrow-datafusion/issues/7744>``
+pub(crate) fn start_demuxer_task(
+    mut input: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    _partition_by: Option<&str>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
+    let exec_options = &context.session_config().options().execution;
+
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_parallel_files = exec_options.max_parallel_ouput_files;
+    let max_buffered_recordbatches = 
exec_options.max_buffered_batches_per_output_file;
+
+    let (tx, rx) = tokio::sync::mpsc::channel(max_parallel_files);
+    let task: JoinHandle<std::result::Result<(), DataFusionError>> =
+        tokio::spawn(async move {
+            let mut total_rows_current_file = 0;
+            let mut part_idx = 0;
+            let write_id = rand::distributions::Alphanumeric
+                .sample_string(&mut rand::thread_rng(), 16);
+            let file_path = if !single_file_output {

Review Comment:
   `file_path` can be formatted inside a method like `generate_file_path `



##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -529,5 +560,187 @@ pub(crate) async fn stateless_serialize_and_write_files(
         }
     }
 
-    Ok(row_count)
+    tx.send(row_count).await.map_err(|_| {
+        DataFusionError::Internal(
+            "Error encountered while sending row count back to file 
sink!".into(),
+        )
+    })?;
+    Ok(())
+}
+
+/// Orchestrates multipart put of a dynamic number of output files from a 
single input stream
+/// for any statelessly serialized file type. That is, any file type for which 
each [RecordBatch]
+/// can be serialized independently of all other [RecordBatch]s.
+pub(crate) async fn stateless_multipart_put(
+    data: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    file_extension: String,
+    get_serializer: Box<dyn Fn() -> Box<dyn BatchSerializer> + Send>,
+    config: &FileSinkConfig,
+    compression: FileCompressionType,
+) -> Result<u64> {
+    let object_store = context
+        .runtime_env()
+        .object_store(&config.object_store_url)?;
+
+    let single_file_output = config.single_file_output;
+    let base_output_path = &config.table_paths[0];
+    let unbounded_input = config.unbounded_input;
+
+    let (demux_task, mut file_stream_rx) = start_demuxer_task(
+        data,
+        context,
+        None,
+        base_output_path.clone(),
+        file_extension,
+        single_file_output,
+    );
+
+    let rb_buffer_size = &context
+        .session_config()
+        .options()
+        .execution
+        .max_buffered_batches_per_output_file;
+
+    let (tx_file_bundle, rx_file_bundle) = 
tokio::sync::mpsc::channel(rb_buffer_size / 2);
+    let (tx_row_cnt, mut rx_row_cnt) = tokio::sync::mpsc::channel(1);

Review Comment:
   This can be a `tokio::sync::oneshot::channel()`



##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -390,29 +391,101 @@ async fn serialize_rb_stream_to_object_store(
             ))
         }
     };
-    Ok((serializer, writer, row_count as u64))
+    Ok((writer, row_count as u64))
 }
 
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = Receiver<(Path, RecordBatchReceiver)>;
+
+/// Splits a single [SendableRecordBatchStream] into a dynamically determined
+/// number of partitions at execution time. The partitions are determined by
+/// factors known only at execution time, such as total number of rows and
+/// partition column values. The demuxer task communicates to the caller
+/// by sending channels over a channel. The inner channels send RecordBatches
+/// which should be contained within the same output file. The outer channel
+/// is used to send a dynamic number of inner channels, representing a dynamic
+/// number of total output files. The caller is also responsible to monitor
+/// the demux task for errors and abort accordingly. The single_file_ouput 
parameter
+/// overrides all other settings to force only a single file to be written.
+/// partition_by parameter will additionally split the input based on the 
unique
+/// values of a specific column 
`<https://github.com/apache/arrow-datafusion/issues/7744>``
+pub(crate) fn start_demuxer_task(

Review Comment:
   You can improve code readability by dividing the logic into methods. For the 
start_demux_task function, it might be helpful to divide it into smaller parts:
   
   ```rust
   pub(crate) fn start_demuxer_task(
       input: SendableRecordBatchStream,
       context: &Arc<TaskContext>,
       _partition_by: Option<&str>,
       base_output_path: ListingTableUrl,
       file_extension: String,
       single_file_output: bool,
   ) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
       let exec_options = &context.session_config().options().execution;
   
       let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
       let max_parallel_files = exec_options.max_parallel_ouput_files;
       let max_buffered_batches = 
exec_options.max_buffered_batches_per_output_file;
   
       let (tx, rx) = mpsc::channel(max_parallel_files);
   
       let task = tokio::spawn(async move {
           demux_stream(
               input,
               base_output_path,
               file_extension,
               single_file_output,
               max_rows_per_file,
               max_buffered_batches,
               tx,
           )
               .await
       });
       (task, rx)
   }
   
   fn generate_file_path(
       base_output_path: &ListingTableUrl,
       write_id: &str,
       part_idx: usize,
       file_extension: &str,
       single_file_output: bool,
   ) -> Path {
       if !single_file_output {
           base_output_path.prefix().child(format!("{}_{}.{}", write_id, 
part_idx, file_extension))
       } else {
           base_output_path.prefix().to_owned()
       }
   }
   
   async fn create_new_file_stream(
       base_output_path: &ListingTableUrl,
       write_id: &str,
       part_idx: usize,
       file_extension: &str,
       single_file_output: bool,
       max_buffered_batches: usize,
       tx: &mut Sender<(Path, Receiver<RecordBatch>)>,
   ) -> Result<Sender<RecordBatch>> {
       let file_path = generate_file_path(
           base_output_path,
           write_id,
           part_idx,
           file_extension,
           single_file_output,
       );
       let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2);
       tx.send((file_path, rx_file)).await.map_err(|_| {
           DataFusionError::Execution(
               "Error sending RecordBatch to file stream!".into(),
           )
       })?;
       Ok(tx_file)
   }
   
   async fn demux_stream(
       mut input: SendableRecordBatchStream,
       base_output_path: ListingTableUrl,
       file_extension: String,
       single_file_output: bool,
       max_rows_per_file: usize,
       max_buffered_batches: usize,
       mut tx: Sender<(Path, Receiver<RecordBatch>)>,
   ) -> Result<()> {
       let mut total_rows_current_file = 0;
       let mut part_idx = 0;
       let write_id = rand::distributions::Alphanumeric
           .sample_string(&mut rand::thread_rng(), 16);
   
       let mut tx_file = create_new_file_stream(
           &base_output_path,
           &write_id,
           part_idx,
           &file_extension,
           single_file_output,
           max_buffered_batches,
           &mut tx,
       )
           .await?;
       part_idx += 1;
   
       while let Some(rb) = input.next().await.transpose()? {
           total_rows_current_file += rb.num_rows();
           tx_file.send(rb).await.map_err(|_| {
               DataFusionError::Execution(
                   "Error sending RecordBatch to file stream!".into(),
               )
           })?;
   
           if total_rows_current_file >= max_rows_per_file && 
!single_file_output {
               total_rows_current_file = 0;
               tx_file = create_new_file_stream(
                   &base_output_path,
                   &write_id,
                   part_idx,
                   &file_extension,
                   single_file_output,
                   max_buffered_batches,
                   &mut tx,
               )
                   .await?;
               part_idx += 1;
           }
       }
       Ok(())
   }
   ```



##########
datafusion/physical-plan/src/insert.rs:
##########
@@ -209,13 +197,15 @@ impl ExecutionPlan for FileSinkExec {
     }
 
     fn benefits_from_input_partitioning(&self) -> Vec<bool> {
-        // Incoming number of partitions is taken to be the
-        // number of files the query is required to write out.
-        // The optimizer should not change this number.
-        // Parrallelism is handled within the appropriate DataSink
+        // DataSink is responsible for dynamically partitioning its
+        // own input at execution time
         vec![false]
     }
 
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        vec![Distribution::SinglePartition; self.children().len()]

Review Comment:
   Adding a docstring here could be beneficial for maintainers.



##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -529,5 +560,187 @@ pub(crate) async fn stateless_serialize_and_write_files(
         }
     }
 
-    Ok(row_count)
+    tx.send(row_count).await.map_err(|_| {
+        DataFusionError::Internal(
+            "Error encountered while sending row count back to file 
sink!".into(),
+        )
+    })?;
+    Ok(())
+}
+
+/// Orchestrates multipart put of a dynamic number of output files from a 
single input stream
+/// for any statelessly serialized file type. That is, any file type for which 
each [RecordBatch]
+/// can be serialized independently of all other [RecordBatch]s.
+pub(crate) async fn stateless_multipart_put(
+    data: SendableRecordBatchStream,
+    context: &Arc<TaskContext>,
+    file_extension: String,
+    get_serializer: Box<dyn Fn() -> Box<dyn BatchSerializer> + Send>,
+    config: &FileSinkConfig,
+    compression: FileCompressionType,
+) -> Result<u64> {
+    let object_store = context
+        .runtime_env()
+        .object_store(&config.object_store_url)?;
+
+    let single_file_output = config.single_file_output;
+    let base_output_path = &config.table_paths[0];
+    let unbounded_input = config.unbounded_input;
+
+    let (demux_task, mut file_stream_rx) = start_demuxer_task(
+        data,
+        context,
+        None,
+        base_output_path.clone(),
+        file_extension,
+        single_file_output,
+    );
+
+    let rb_buffer_size = &context
+        .session_config()
+        .options()
+        .execution
+        .max_buffered_batches_per_output_file;
+
+    let (tx_file_bundle, rx_file_bundle) = 
tokio::sync::mpsc::channel(rb_buffer_size / 2);
+    let (tx_row_cnt, mut rx_row_cnt) = tokio::sync::mpsc::channel(1);
+    let write_coordinater_task = tokio::spawn(async move {
+        stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, 
unbounded_input)
+            .await
+    });
+    while let Some((output_location, rb_stream)) = file_stream_rx.recv().await 
{
+        let serializer = get_serializer();
+        let object_meta = ObjectMeta {
+            location: output_location,
+            last_modified: chrono::offset::Utc::now(),
+            size: 0,
+            e_tag: None,
+        };
+        let writer = create_writer(
+            FileWriterMode::PutMultipart,
+            compression,
+            object_meta.into(),
+            object_store.clone(),
+        )
+        .await?;
+
+        tx_file_bundle
+            .send((rb_stream, serializer, writer))
+            .await
+            .map_err(|_| {
+                DataFusionError::Internal(
+                    "Writer receive file bundle channel closed 
unexpectedly!".into(),
+                )
+            })?;
+    }
+
+    // Signal to the write coordinater that no more files are coming
+    drop(tx_file_bundle);
+
+    let total_count = if let Some(cnt) = rx_row_cnt.recv().await {
+        cnt
+    } else {
+        return Err(DataFusionError::Internal(
+            "Did not receive final row count from write cooridnator 
task!".into(),
+        ));
+    };
+
+    match try_join!(write_coordinater_task, demux_task) {
+        Ok((r1, r2)) => {
+            r1?;
+            r2?;
+        }
+        Err(e) => {
+            if e.is_panic() {
+                std::panic::resume_unwind(e.into_panic());
+            } else {
+                unreachable!();
+            }
+        }
+    }
+
+    Ok(total_count)
+}
+
+/// Orchestrates append_all for any statelessly serialized file type. Appends 
to all files provided
+/// in a round robin fashion.
+pub(crate) async fn stateless_append_all(
+    mut data: SendableRecordBatchStream,
+    object_store: Arc<dyn ObjectStore>,
+    file_groups: &Vec<PartitionedFile>,
+    unbounded_input: bool,
+    compression: FileCompressionType,
+    get_serializer: Box<dyn Fn(usize) -> Box<dyn BatchSerializer> + Send>,
+) -> Result<u64> {
+    let (tx_file_bundle, rx_file_bundle) = 
tokio::sync::mpsc::channel(file_groups.len());
+    let mut send_channels = vec![];
+    for file_group in file_groups {
+        let serializer = get_serializer(file_group.object_meta.size);
+
+        let file = file_group.clone();
+        let writer = create_writer(
+            FileWriterMode::Append,
+            compression,
+            file.object_meta.clone().into(),
+            object_store.clone(),
+        )
+        .await?;
+
+        let (tx, rx) = tokio::sync::mpsc::channel(9000);

Review Comment:
   This can be an unbounded channel.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to