This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fde5c4282 Allow Setting Minimum Parallelism with RowCount Based 
Demuxer (#7841)
9fde5c4282 is described below

commit 9fde5c4282fd9f0e3332fb40998bf1562c17fcda
Author: Devin D'Angelo <devinjdang...@gmail.com>
AuthorDate: Sat Oct 21 16:23:56 2023 -0400

    Allow Setting Minimum Parallelism with RowCount Based Demuxer (#7841)
    
    * minimum_parallel_files_added
    
    * update docs
    
    * Apply suggestions from code review
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
    
    * generalize tests
    
    * update docs
    
    ---------
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
---
 datafusion/common/src/config.rs                    |  6 ++
 .../core/src/datasource/file_format/write/demux.rs | 65 +++++++++++++++-------
 datafusion/core/src/datasource/listing/table.rs    | 58 +++++++++++++------
 .../sqllogictest/test_files/information_schema.slt |  2 +
 docs/source/user-guide/configs.md                  |  1 +
 5 files changed, 97 insertions(+), 35 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index f40324ae3a..6aefa4e05d 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -255,6 +255,12 @@ config_namespace! {
         /// Number of files to read in parallel when inferring schema and 
statistics
         pub meta_fetch_concurrency: usize, default = 32
 
+        /// Guarantees a minimum level of output files running in parallel.
+        /// RecordBatches will be distributed in round robin fashion to each
+        /// parallel writer. Each writer is closed and a new file opened once
+        /// soft_max_rows_per_output_file is reached.
+        pub minimum_parallel_output_files: usize, default = 4
+
         /// Target number of rows in output files when writing multiple.
         /// This is a soft max, so it can be exceeded slightly. There also
         /// will be one file smaller than the limit if the total
diff --git a/datafusion/core/src/datasource/file_format/write/demux.rs 
b/datafusion/core/src/datasource/file_format/write/demux.rs
index 2c44c0922c..67dd1f9406 100644
--- a/datafusion/core/src/datasource/file_format/write/demux.rs
+++ b/datafusion/core/src/datasource/file_format/write/demux.rs
@@ -122,33 +122,49 @@ async fn row_count_demuxer(
     single_file_output: bool,
 ) -> Result<()> {
     let exec_options = &context.session_config().options().execution;
+
     let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
     let max_buffered_batches = 
exec_options.max_buffered_batches_per_output_file;
-    let mut total_rows_current_file = 0;
+    let minimum_parallel_files = exec_options.minimum_parallel_output_files;
     let mut part_idx = 0;
     let write_id =
         rand::distributions::Alphanumeric.sample_string(&mut 
rand::thread_rng(), 16);
 
-    let mut tx_file = create_new_file_stream(
-        &base_output_path,
-        &write_id,
-        part_idx,
-        &file_extension,
-        single_file_output,
-        max_buffered_batches,
-        &mut tx,
-    )?;
-    part_idx += 1;
+    let mut open_file_streams = Vec::with_capacity(minimum_parallel_files);
+
+    let mut next_send_steam = 0;
+    let mut row_counts = Vec::with_capacity(minimum_parallel_files);
+
+    // Overrides if single_file_output is set
+    let minimum_parallel_files = if single_file_output {
+        1
+    } else {
+        minimum_parallel_files
+    };
+
+    let max_rows_per_file = if single_file_output {
+        usize::MAX
+    } else {
+        max_rows_per_file
+    };
 
     while let Some(rb) = input.next().await.transpose()? {
-        total_rows_current_file += rb.num_rows();
-        tx_file.send(rb).await.map_err(|_| {
-            DataFusionError::Execution("Error sending RecordBatch to file 
stream!".into())
-        })?;
-
-        if total_rows_current_file >= max_rows_per_file && !single_file_output 
{
-            total_rows_current_file = 0;
-            tx_file = create_new_file_stream(
+        // ensure we have at least minimum_parallel_files open
+        if open_file_streams.len() < minimum_parallel_files {
+            open_file_streams.push(create_new_file_stream(
+                &base_output_path,
+                &write_id,
+                part_idx,
+                &file_extension,
+                single_file_output,
+                max_buffered_batches,
+                &mut tx,
+            )?);
+            row_counts.push(0);
+            part_idx += 1;
+        } else if row_counts[next_send_steam] >= max_rows_per_file {
+            row_counts[next_send_steam] = 0;
+            open_file_streams[next_send_steam] = create_new_file_stream(
                 &base_output_path,
                 &write_id,
                 part_idx,
@@ -159,6 +175,17 @@ async fn row_count_demuxer(
             )?;
             part_idx += 1;
         }
+        row_counts[next_send_steam] += rb.num_rows();
+        open_file_streams[next_send_steam]
+            .send(rb)
+            .await
+            .map_err(|_| {
+                DataFusionError::Execution(
+                    "Error sending RecordBatch to file stream!".into(),
+                )
+            })?;
+
+        next_send_steam = (next_send_steam + 1) % minimum_parallel_files;
     }
     Ok(())
 }
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index ae6aa317ce..485ab0a902 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -1608,15 +1608,16 @@ mod tests {
     #[tokio::test]
     async fn test_insert_into_append_new_json_files() -> Result<()> {
         let mut config_map: HashMap<String, String> = HashMap::new();
-        config_map.insert("datafusion.execution.batch_size".into(), 
"1".into());
+        config_map.insert("datafusion.execution.batch_size".into(), 
"10".into());
         config_map.insert(
             "datafusion.execution.soft_max_rows_per_output_file".into(),
-            "1".into(),
+            "10".into(),
         );
         helper_test_append_new_files_to_table(
             FileType::JSON,
             FileCompressionType::UNCOMPRESSED,
             Some(config_map),
+            2,
         )
         .await?;
         Ok(())
@@ -1636,32 +1637,52 @@ mod tests {
     #[tokio::test]
     async fn test_insert_into_append_new_csv_files() -> Result<()> {
         let mut config_map: HashMap<String, String> = HashMap::new();
-        config_map.insert("datafusion.execution.batch_size".into(), 
"1".into());
+        config_map.insert("datafusion.execution.batch_size".into(), 
"10".into());
         config_map.insert(
             "datafusion.execution.soft_max_rows_per_output_file".into(),
-            "1".into(),
+            "10".into(),
         );
         helper_test_append_new_files_to_table(
             FileType::CSV,
             FileCompressionType::UNCOMPRESSED,
             Some(config_map),
+            2,
         )
         .await?;
         Ok(())
     }
 
     #[tokio::test]
-    async fn test_insert_into_append_new_parquet_files_defaults() -> 
Result<()> {
+    async fn test_insert_into_append_2_new_parquet_files_defaults() -> 
Result<()> {
         let mut config_map: HashMap<String, String> = HashMap::new();
-        config_map.insert("datafusion.execution.batch_size".into(), 
"1".into());
+        config_map.insert("datafusion.execution.batch_size".into(), 
"10".into());
+        config_map.insert(
+            "datafusion.execution.soft_max_rows_per_output_file".into(),
+            "10".into(),
+        );
+        helper_test_append_new_files_to_table(
+            FileType::PARQUET,
+            FileCompressionType::UNCOMPRESSED,
+            Some(config_map),
+            2,
+        )
+        .await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_insert_into_append_1_new_parquet_files_defaults() -> 
Result<()> {
+        let mut config_map: HashMap<String, String> = HashMap::new();
+        config_map.insert("datafusion.execution.batch_size".into(), 
"20".into());
         config_map.insert(
             "datafusion.execution.soft_max_rows_per_output_file".into(),
-            "1".into(),
+            "20".into(),
         );
         helper_test_append_new_files_to_table(
             FileType::PARQUET,
             FileCompressionType::UNCOMPRESSED,
             Some(config_map),
+            1,
         )
         .await?;
         Ok(())
@@ -1788,10 +1809,10 @@ mod tests {
     #[tokio::test]
     async fn test_insert_into_append_new_parquet_files_session_overrides() -> 
Result<()> {
         let mut config_map: HashMap<String, String> = HashMap::new();
-        config_map.insert("datafusion.execution.batch_size".into(), 
"1".into());
+        config_map.insert("datafusion.execution.batch_size".into(), 
"10".into());
         config_map.insert(
             "datafusion.execution.soft_max_rows_per_output_file".into(),
-            "1".into(),
+            "10".into(),
         );
         config_map.insert(
             "datafusion.execution.parquet.compression".into(),
@@ -1858,6 +1879,7 @@ mod tests {
             FileType::PARQUET,
             FileCompressionType::UNCOMPRESSED,
             Some(config_map),
+            2,
         )
         .await?;
         Ok(())
@@ -1875,6 +1897,7 @@ mod tests {
             FileType::PARQUET,
             FileCompressionType::UNCOMPRESSED,
             Some(config_map),
+            2,
         )
         .await
         .expect_err("Example should fail!");
@@ -2092,6 +2115,7 @@ mod tests {
         file_type: FileType,
         file_compression_type: FileCompressionType,
         session_config_map: Option<HashMap<String, String>>,
+        expected_n_files_per_insert: usize,
     ) -> Result<()> {
         // Create the initial context, schema, and batch.
         let session_ctx = match session_config_map {
@@ -2118,7 +2142,9 @@ mod tests {
         // Create a new batch of data to insert into the table
         let batch = RecordBatch::try_new(
             schema.clone(),
-            vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))],
+            vec![Arc::new(arrow_array::Int32Array::from(vec![
+                1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
+            ]))],
         )?;
 
         // Register appropriate table depending on file_type we want to test
@@ -2214,7 +2240,7 @@ mod tests {
             "+-------+",
             "| count |",
             "+-------+",
-            "| 6     |",
+            "| 20    |",
             "+-------+",
         ];
 
@@ -2231,7 +2257,7 @@ mod tests {
             "+-------+",
             "| count |",
             "+-------+",
-            "| 6     |",
+            "| 20    |",
             "+-------+",
         ];
 
@@ -2240,7 +2266,7 @@ mod tests {
 
         // Assert that `target_partition_number` many files were added to the 
table.
         let num_files = tmp_dir.path().read_dir()?.count();
-        assert_eq!(num_files, 3);
+        assert_eq!(num_files, expected_n_files_per_insert);
 
         // Create a physical plan from the insert plan
         let plan = session_ctx
@@ -2255,7 +2281,7 @@ mod tests {
             "+-------+",
             "| count |",
             "+-------+",
-            "| 6     |",
+            "| 20    |",
             "+-------+",
         ];
 
@@ -2274,7 +2300,7 @@ mod tests {
             "+-------+",
             "| count |",
             "+-------+",
-            "| 12    |",
+            "| 40    |",
             "+-------+",
         ];
 
@@ -2283,7 +2309,7 @@ mod tests {
 
         // Assert that another `target_partition_number` many files were added 
to the table.
         let num_files = tmp_dir.path().read_dir()?.count();
-        assert_eq!(num_files, 6);
+        assert_eq!(num_files, expected_n_files_per_insert * 2);
 
         // Return Ok if the function
         Ok(())
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt 
b/datafusion/sqllogictest/test_files/information_schema.slt
index e6a69c213b..4a2b6220fd 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -152,6 +152,7 @@ datafusion.execution.coalesce_batches true
 datafusion.execution.collect_statistics false
 datafusion.execution.max_buffered_batches_per_output_file 2
 datafusion.execution.meta_fetch_concurrency 32
+datafusion.execution.minimum_parallel_output_files 4
 datafusion.execution.parquet.allow_single_file_parallelism false
 datafusion.execution.parquet.bloom_filter_enabled false
 datafusion.execution.parquet.bloom_filter_fpp NULL
@@ -221,6 +222,7 @@ datafusion.execution.coalesce_batches true When set to 
true, record batches will
 datafusion.execution.collect_statistics false Should DataFusion collect 
statistics after listing files
 datafusion.execution.max_buffered_batches_per_output_file 2 This is the 
maximum number of RecordBatches buffered for each output file being worked. 
Higher values can potentially give faster write performance at the cost of 
higher peak memory consumption
 datafusion.execution.meta_fetch_concurrency 32 Number of files to read in 
parallel when inferring schema and statistics
+datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum 
level of output files running in parallel. RecordBatches will be distributed in 
round robin fashion to each parallel writer. Each writer is closed and a new 
file opened once soft_max_rows_per_output_file is reached.
 datafusion.execution.parquet.allow_single_file_parallelism false Controls 
whether DataFusion will attempt to speed up writing large parquet files by 
first writing multiple smaller files and then stitching them together into a 
single large file. This will result in faster write speeds, but higher memory 
usage. Also currently unsupported are bloom filters and column indexes when 
single_file_parallelism is enabled.
 datafusion.execution.parquet.bloom_filter_enabled false Sets if bloom filter 
is enabled for any column
 datafusion.execution.parquet.bloom_filter_fpp NULL Sets bloom filter false 
positive probability. If NULL, uses default parquet writer setting
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index f331b2de44..3476118ca6 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -77,6 +77,7 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 | datafusion.execution.sort_spill_reservation_bytes          | 10485760        
          | Specifies the reserved memory for each spillable sort operation to 
facilitate an in-memory merge. When a sort operation spills to disk, the 
in-memory data must be sorted and merged before being written to a file. This 
setting reserves a specific amount of memory for that in-memory sort/merge 
process. Note: This setting is irrelevant if the sort operation cannot spill 
(i.e., if there's no `DiskManag [...]
 | datafusion.execution.sort_in_place_threshold_bytes         | 1048576         
          | When sorting, below what size should data be concatenated and 
sorted in a single RecordBatch rather than sorted in batches and merged.        
                                                                                
                                                                                
                                                                                
                    [...]
 | datafusion.execution.meta_fetch_concurrency                | 32              
          | Number of files to read in parallel when inferring schema and 
statistics                                                                      
                                                                                
                                                                                
                                                                                
                    [...]
+| datafusion.execution.minimum_parallel_output_files         | 4               
          | Guarantees a minimum level of output files running in parallel. 
RecordBatches will be distributed in round robin fashion to each parallel 
writer. Each writer is closed and a new file opened once 
soft_max_rows_per_output_file is reached.                                       
                                                                                
                                               [...]
 | datafusion.execution.soft_max_rows_per_output_file         | 50000000        
          | Target number of rows in output files when writing multiple. This 
is a soft max, so it can be exceeded slightly. There also will be one file 
smaller than the limit if the total number of rows written is not roughly 
divisible by the soft max                                                       
                                                                                
                           [...]
 | datafusion.execution.max_buffered_batches_per_output_file  | 2               
          | This is the maximum number of RecordBatches buffered for each 
output file being worked. Higher values can potentially give faster write 
performance at the cost of higher peak memory consumption                       
                                                                                
                                                                                
                          [...]
 | datafusion.optimizer.enable_round_robin_repartition        | true            
          | When set to true, the physical plan optimizer will try to add round 
robin repartitioning to increase parallelism to leverage more CPU cores         
                                                                                
                                                                                
                                                                                
              [...]

Reply via email to