This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d2ff112de0 Disable Parallel Parquet Writer by Default, Improve Writing 
Test Coverage (#8854)
d2ff112de0 is described below

commit d2ff112de073f63490f049425f197d6066ea1980
Author: Devin D'Angelo <devinjdang...@gmail.com>
AuthorDate: Tue Jan 16 09:05:19 2024 -0500

    Disable Parallel Parquet Writer by Default, Improve Writing Test Coverage 
(#8854)
    
    * disable parallel writer and add test
    
    * more tests
    
    * --complete sqllogic tests
    
    * make rows distinct and add make_array of struct
---
 datafusion/common/src/config.rs                       |  2 +-
 datafusion/sqllogictest/test_files/copy.slt           | 19 +++++++++++++++++++
 .../sqllogictest/test_files/information_schema.slt    |  4 ++--
 .../sqllogictest/test_files/repartition_scan.slt      |  8 ++++----
 docs/source/user-guide/configs.md                     |  2 +-
 5 files changed, 27 insertions(+), 8 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 5c051a7dee..996a505dea 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -403,7 +403,7 @@ config_namespace! {
         /// parquet files by serializing them in parallel. Each column
         /// in each row group in each output file are serialized in parallel
         /// leveraging a maximum possible core count of 
n_files*n_row_groups*n_columns.
-        pub allow_single_file_parallelism: bool, default = true
+        pub allow_single_file_parallelism: bool, default = false
 
         /// By default parallel parquet writer is tuned for minimum
         /// memory usage in a streaming execution plan. You may see
diff --git a/datafusion/sqllogictest/test_files/copy.slt 
b/datafusion/sqllogictest/test_files/copy.slt
index 89b2391788..9f5b7af415 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -64,6 +64,25 @@ select * from validate_parquet;
 1 Foo
 2 Bar
 
+query ??
+COPY 
+(values (struct ('foo', (struct ('foo', make_array(struct('a',1), 
struct('b',2))))), make_array(timestamp '2023-01-01 01:00:01',timestamp 
'2023-01-01 01:00:01')), 
+(struct('bar', (struct ('foo', make_array(struct('aa',10), 
struct('bb',20))))), make_array(timestamp '2024-01-01 01:00:01', timestamp 
'2024-01-01 01:00:01'))) 
+to 'test_files/scratch/copy/table_nested' (format parquet, single_file_output 
false);
+----
+2
+
+# validate multiple parquet file output
+statement ok
+CREATE EXTERNAL TABLE validate_parquet_nested STORED AS PARQUET LOCATION 
'test_files/scratch/copy/table_nested/';
+
+query ??
+select * from validate_parquet_nested;
+----
+{c0: foo, c1: {c0: foo, c1: [{c0: a, c1: 1}, {c0: b, c1: 2}]}} 
[2023-01-01T01:00:01, 2023-01-01T01:00:01]
+{c0: bar, c1: {c0: foo, c1: [{c0: aa, c1: 10}, {c0: bb, c1: 20}]}} 
[2024-01-01T01:00:01, 2024-01-01T01:00:01]
+
+
 # Copy parquet with all supported statment overrides
 query IT
 COPY source_table
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt 
b/datafusion/sqllogictest/test_files/information_schema.slt
index f8893bf7ae..44daa51416 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -154,7 +154,7 @@ datafusion.execution.listing_table_ignore_subdirectory true
 datafusion.execution.max_buffered_batches_per_output_file 2
 datafusion.execution.meta_fetch_concurrency 32
 datafusion.execution.minimum_parallel_output_files 4
-datafusion.execution.parquet.allow_single_file_parallelism true
+datafusion.execution.parquet.allow_single_file_parallelism false
 datafusion.execution.parquet.bloom_filter_enabled false
 datafusion.execution.parquet.bloom_filter_fpp NULL
 datafusion.execution.parquet.bloom_filter_ndv NULL
@@ -229,7 +229,7 @@ datafusion.execution.listing_table_ignore_subdirectory true 
Should sub directori
 datafusion.execution.max_buffered_batches_per_output_file 2 This is the 
maximum number of RecordBatches buffered for each output file being worked. 
Higher values can potentially give faster write performance at the cost of 
higher peak memory consumption
 datafusion.execution.meta_fetch_concurrency 32 Number of files to read in 
parallel when inferring schema and statistics
 datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum 
level of output files running in parallel. RecordBatches will be distributed in 
round robin fashion to each parallel writer. Each writer is closed and a new 
file opened once soft_max_rows_per_output_file is reached.
-datafusion.execution.parquet.allow_single_file_parallelism true Controls 
whether DataFusion will attempt to speed up writing parquet files by 
serializing them in parallel. Each column in each row group in each output file 
are serialized in parallel leveraging a maximum possible core count of 
n_files*n_row_groups*n_columns.
+datafusion.execution.parquet.allow_single_file_parallelism false Controls 
whether DataFusion will attempt to speed up writing parquet files by 
serializing them in parallel. Each column in each row group in each output file 
are serialized in parallel leveraging a maximum possible core count of 
n_files*n_row_groups*n_columns.
 datafusion.execution.parquet.bloom_filter_enabled false Sets if bloom filter 
is enabled for any column
 datafusion.execution.parquet.bloom_filter_fpp NULL Sets bloom filter false 
positive probability. If NULL, uses default parquet writer setting
 datafusion.execution.parquet.bloom_filter_ndv NULL Sets bloom filter number of 
distinct values. If NULL, uses default parquet writer setting
diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt 
b/datafusion/sqllogictest/test_files/repartition_scan.slt
index 4b8c8f2f08..5ee0da2d33 100644
--- a/datafusion/sqllogictest/test_files/repartition_scan.slt
+++ b/datafusion/sqllogictest/test_files/repartition_scan.slt
@@ -61,7 +61,7 @@ Filter: parquet_table.column1 != Int32(42)
 physical_plan
 CoalesceBatchesExec: target_batch_size=8192
 --FilterExec: column1@0 != 42
-----ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..104],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:104..208],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:208..312],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:312..414]]},
 project [...]
+----ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..153],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:153..306],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:306..459],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:459..610]]},
 project [...]
 
 # disable round robin repartitioning
 statement ok
@@ -77,7 +77,7 @@ Filter: parquet_table.column1 != Int32(42)
 physical_plan
 CoalesceBatchesExec: target_batch_size=8192
 --FilterExec: column1@0 != 42
-----ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..104],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:104..208],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:208..312],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:312..414]]},
 project [...]
+----ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..153],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:153..306],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:306..459],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:459..610]]},
 project [...]
 
 # enable round robin repartitioning again
 statement ok
@@ -102,7 +102,7 @@ SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
 --SortExec: expr=[column1@0 ASC NULLS LAST]
 ----CoalesceBatchesExec: target_batch_size=8192
 ------FilterExec: column1@0 != 42
---------ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..205],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:205..405,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..5],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:5..210],
 [WORKSPACE_RO [...]
+--------ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..303],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:303..601,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..5],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:5..308],
 [WORKSPACE_RO [...]
 
 
 ## Read the files as though they are ordered
@@ -138,7 +138,7 @@ physical_plan
 SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
 --CoalesceBatchesExec: target_batch_size=8192
 ----FilterExec: column1@0 != 42
-------ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..202],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..207],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:207..414],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:202..405]]},
 project [...]
+------ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..300],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..305],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:305..610],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:300..601]]},
 project [...]
 
 # Cleanup
 statement ok
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 7111ea1d0a..5e26e2b205 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -71,7 +71,7 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 | datafusion.execution.parquet.bloom_filter_enabled                       | 
false                     | Sets if bloom filter is enabled for any column      
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
 | datafusion.execution.parquet.bloom_filter_fpp                           | 
NULL                      | Sets bloom filter false positive probability. If 
NULL, uses default parquet writer setting                                       
                                                                                
                                                                                
                                                                                
                    [...]
 | datafusion.execution.parquet.bloom_filter_ndv                           | 
NULL                      | Sets bloom filter number of distinct values. If 
NULL, uses default parquet writer setting                                       
                                                                                
                                                                                
                                                                                
                     [...]
-| datafusion.execution.parquet.allow_single_file_parallelism              | 
true                      | Controls whether DataFusion will attempt to speed 
up writing parquet files by serializing them in parallel. Each column in each 
row group in each output file are serialized in parallel leveraging a maximum 
possible core count of n_files*n_row_groups*n_columns.                          
                                                                                
                       [...]
+| datafusion.execution.parquet.allow_single_file_parallelism              | 
false                     | Controls whether DataFusion will attempt to speed 
up writing parquet files by serializing them in parallel. Each column in each 
row group in each output file are serialized in parallel leveraging a maximum 
possible core count of n_files*n_row_groups*n_columns.                          
                                                                                
                       [...]
 | datafusion.execution.parquet.maximum_parallel_row_group_writers         | 1  
                       | By default parallel parquet writer is tuned for 
minimum memory usage in a streaming execution plan. You may see a performance 
benefit when writing large parquet files by increasing 
maximum_parallel_row_group_writers and 
maximum_buffered_record_batches_per_stream if your system has idle cores and 
can tolerate additional memory usage. Boosting these values is likely 
worthwhile when writi [...]
 | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2  
                       | By default parallel parquet writer is tuned for 
minimum memory usage in a streaming execution plan. You may see a performance 
benefit when writing large parquet files by increasing 
maximum_parallel_row_group_writers and 
maximum_buffered_record_batches_per_stream if your system has idle cores and 
can tolerate additional memory usage. Boosting these values is likely 
worthwhile when writi [...]
 | datafusion.execution.aggregate.scalar_update_factor                     | 10 
                       | Specifies the threshold for using `ScalarValue`s to 
update accumulators during high-cardinality aggregations for each input batch. 
The aggregation is considered high-cardinality if the number of affected groups 
is greater than or equal to `batch_size / scalar_update_factor`. In such cases, 
`ScalarValue`s are utilized for updating accumulators, rather than the default 
batch-slice approa [...]

Reply via email to