devinjdangelo commented on code in PR #7801:
URL: https://github.com/apache/arrow-datafusion/pull/7801#discussion_r1355878988


##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -390,29 +396,239 @@ async fn serialize_rb_stream_to_object_store(
             ))
         }
     };
-    Ok((serializer, writer, row_count as u64))
+    Ok((writer, row_count as u64))
 }
 
+type RecordBatchReceiver = Receiver<RecordBatch>;
+type DemuxedStreamReceiver = Receiver<(Path, RecordBatchReceiver)>;
+
+/// Dynamically partitions input stream to acheive desired maximum rows per 
file
+async fn row_count_demuxer(
+    tx: Sender<(Path, Receiver<RecordBatch>)>,
+    mut input: SendableRecordBatchStream,
+    context: Arc<TaskContext>,
+    base_output_path: ListingTableUrl,
+    file_extension: String,
+    single_file_output: bool,
+) -> Result<()> {
+    let exec_options = &context.session_config().options().execution;
+    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
+    let max_buffered_recordbatches = 
exec_options.max_buffered_batches_per_output_file;
+
+    let mut total_rows_current_file = 0;
+    let mut part_idx = 0;
+    let write_id =
+        rand::distributions::Alphanumeric.sample_string(&mut 
rand::thread_rng(), 16);
+    let file_path = if !single_file_output {
+        base_output_path
+            .prefix()
+            .child(format!("{}_{}.{}", write_id, part_idx, file_extension))
+    } else {
+        base_output_path.prefix().to_owned()
+    };
+
+    let (mut tx_file, mut rx_file) =
+        tokio::sync::mpsc::channel(max_buffered_recordbatches / 2);
+    tx.send((file_path, rx_file)).await.map_err(|_| {
+        DataFusionError::Execution("Error sending new file stream!".into())
+    })?;
+    part_idx += 1;
+    while let Some(rb) = input.next().await.transpose()? {
+        total_rows_current_file += rb.num_rows();
+        tx_file.send(rb).await.map_err(|_| {
+            DataFusionError::Execution("Error sending RecordBatch to file 
stream!".into())
+        })?;
+
+        // Once we have sent enough data to previous file stream, spawn a new 
one
+        if total_rows_current_file >= max_rows_per_file && !single_file_output 
{
+            total_rows_current_file = 0;
+
+            let file_path = base_output_path
+                .prefix()
+                .child(format!("{}_{}.{}", write_id, part_idx, 
file_extension));
+            (tx_file, rx_file) = 
tokio::sync::mpsc::channel(max_buffered_recordbatches);
+            tx.send((file_path, rx_file)).await.map_err(|_| {
+                DataFusionError::Execution("Error sending new file 
stream!".into())
+            })?;
+
+            part_idx += 1;
+        }
+    }
+    Ok(())
+}
+
+/// Splits an input stream based on the distinct values of a set of columns
+/// Assumes standard hive style partition paths such as
+/// /col1=val1/col2=val2/outputfile.parquet
+async fn hive_style_partitions_demuxer(

Review Comment:
   This is the key new code in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to