This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 38d5f75de4 Fix handling of nested leaf columns in parallel parquet
writer (#8923)
38d5f75de4 is described below
commit 38d5f75de45ae3a7e1602456da4f86e127ed319f
Author: Devin D'Angelo <[email protected]>
AuthorDate: Mon Jan 22 13:47:08 2024 -0500
Fix handling of nested leaf columns in parallel parquet writer (#8923)
* fix handling of nested columns
* lint
* add suggested tests
---
datafusion/common/src/config.rs | 2 +-
.../core/src/datasource/file_format/parquet.rs | 21 +++++----
datafusion/sqllogictest/test_files/copy.slt | 54 +++++++++++++++++++++-
.../sqllogictest/test_files/information_schema.slt | 4 +-
.../sqllogictest/test_files/repartition_scan.slt | 8 ++--
docs/source/user-guide/configs.md | 2 +-
6 files changed, 71 insertions(+), 20 deletions(-)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index eb516f97a4..0d773ddb2b 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -408,7 +408,7 @@ config_namespace! {
/// parquet files by serializing them in parallel. Each column
/// in each row group in each output file are serialized in parallel
/// leveraging a maximum possible core count of
n_files*n_row_groups*n_columns.
- pub allow_single_file_parallelism: bool, default = false
+ pub allow_single_file_parallelism: bool, default = true
/// By default parallel parquet writer is tuned for minimum
/// memory usage in a streaming execution plan. You may see
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 9729bfa163..fdf6277a5e 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -885,16 +885,17 @@ async fn send_arrays_to_col_writers(
rb: &RecordBatch,
schema: Arc<Schema>,
) -> Result<()> {
- for (tx, array, field) in col_array_channels
- .iter()
- .zip(rb.columns())
- .zip(schema.fields())
- .map(|((a, b), c)| (a, b, c))
- {
+ // Each leaf column has its own channel, increment next_channel for each
leaf column sent.
+ let mut next_channel = 0;
+ for (array, field) in rb.columns().iter().zip(schema.fields()) {
for c in compute_leaves(field, array)? {
- tx.send(c).await.map_err(|_| {
- DataFusionError::Internal("Unable to send array to
writer!".into())
- })?;
+ col_array_channels[next_channel]
+ .send(c)
+ .await
+ .map_err(|_| {
+ DataFusionError::Internal("Unable to send array to
writer!".into())
+ })?;
+ next_channel += 1;
}
}
@@ -902,7 +903,7 @@ async fn send_arrays_to_col_writers(
}
/// Spawns a tokio task which joins the parallel column writer tasks,
-/// and finalizes the row group.
+/// and finalizes the row group
fn spawn_rg_join_and_finalize_task(
column_writer_handles: Vec<JoinHandle<Result<ArrowColumnWriter>>>,
rg_rows: usize,
diff --git a/datafusion/sqllogictest/test_files/copy.slt
b/datafusion/sqllogictest/test_files/copy.slt
index 9f5b7af415..c9b3bdfa33 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -64,6 +64,24 @@ select * from validate_parquet;
1 Foo
2 Bar
+query ?
+copy (values (struct(timestamp '2021-01-01 01:00:01', 1)), (struct(timestamp
'2022-01-01 01:00:01', 2)),
+(struct(timestamp '2023-01-03 01:00:01', 3)), (struct(timestamp '2024-01-01
01:00:01', 4)))
+to 'test_files/scratch/copy/table_nested2' (format parquet, single_file_output
false);
+----
+4
+
+statement ok
+CREATE EXTERNAL TABLE validate_parquet_nested2 STORED AS PARQUET LOCATION
'test_files/scratch/copy/table_nested2/';
+
+query ?
+select * from validate_parquet_nested2;
+----
+{c0: 2021-01-01T01:00:01, c1: 1}
+{c0: 2022-01-01T01:00:01, c1: 2}
+{c0: 2023-01-03T01:00:01, c1: 3}
+{c0: 2024-01-01T01:00:01, c1: 4}
+
query ??
COPY
(values (struct ('foo', (struct ('foo', make_array(struct('a',1),
struct('b',2))))), make_array(timestamp '2023-01-01 01:00:01',timestamp
'2023-01-01 01:00:01')),
@@ -72,9 +90,9 @@ to 'test_files/scratch/copy/table_nested' (format parquet,
single_file_output fa
----
2
-# validate multiple parquet file output
statement ok
-CREATE EXTERNAL TABLE validate_parquet_nested STORED AS PARQUET LOCATION
'test_files/scratch/copy/table_nested/';
+CREATE EXTERNAL TABLE validate_parquet_nested STORED AS PARQUET
+LOCATION 'test_files/scratch/copy/table_nested/';
query ??
select * from validate_parquet_nested;
@@ -82,6 +100,38 @@ select * from validate_parquet_nested;
{c0: foo, c1: {c0: foo, c1: [{c0: a, c1: 1}, {c0: b, c1: 2}]}}
[2023-01-01T01:00:01, 2023-01-01T01:00:01]
{c0: bar, c1: {c0: foo, c1: [{c0: aa, c1: 10}, {c0: bb, c1: 20}]}}
[2024-01-01T01:00:01, 2024-01-01T01:00:01]
+query ?
+copy (values ([struct('foo', 1), struct('bar', 2)]))
+to 'test_files/scratch/copy/array_of_struct/'
+(format parquet, single_file_output false);
+----
+1
+
+statement ok
+CREATE EXTERNAL TABLE validate_array_of_struct
+STORED AS PARQUET LOCATION 'test_files/scratch/copy/array_of_struct/';
+
+query ?
+select * from validate_array_of_struct;
+----
+[{c0: foo, c1: 1}, {c0: bar, c1: 2}]
+
+query ?
+copy (values (struct('foo', [1,2,3], struct('bar', [2,3,4]))))
+to 'test_files/scratch/copy/struct_with_array/'
+(format parquet, single_file_output false);
+----
+1
+
+statement ok
+CREATE EXTERNAL TABLE validate_struct_with_array
+STORED AS PARQUET LOCATION 'test_files/scratch/copy/struct_with_array/';
+
+query ?
+select * from validate_struct_with_array;
+----
+{c0: foo, c1: [1, 2, 3], c2: {c0: bar, c1: [2, 3, 4]}}
+
# Copy parquet with all supported statment overrides
query IT
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt
b/datafusion/sqllogictest/test_files/information_schema.slt
index 768292d3d4..43899f7567 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -155,7 +155,7 @@ datafusion.execution.listing_table_ignore_subdirectory true
datafusion.execution.max_buffered_batches_per_output_file 2
datafusion.execution.meta_fetch_concurrency 32
datafusion.execution.minimum_parallel_output_files 4
-datafusion.execution.parquet.allow_single_file_parallelism false
+datafusion.execution.parquet.allow_single_file_parallelism true
datafusion.execution.parquet.bloom_filter_enabled false
datafusion.execution.parquet.bloom_filter_fpp NULL
datafusion.execution.parquet.bloom_filter_ndv NULL
@@ -232,7 +232,7 @@ datafusion.execution.listing_table_ignore_subdirectory true
Should sub directori
datafusion.execution.max_buffered_batches_per_output_file 2 This is the
maximum number of RecordBatches buffered for each output file being worked.
Higher values can potentially give faster write performance at the cost of
higher peak memory consumption
datafusion.execution.meta_fetch_concurrency 32 Number of files to read in
parallel when inferring schema and statistics
datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum
level of output files running in parallel. RecordBatches will be distributed in
round robin fashion to each parallel writer. Each writer is closed and a new
file opened once soft_max_rows_per_output_file is reached.
-datafusion.execution.parquet.allow_single_file_parallelism false Controls
whether DataFusion will attempt to speed up writing parquet files by
serializing them in parallel. Each column in each row group in each output file
are serialized in parallel leveraging a maximum possible core count of
n_files*n_row_groups*n_columns.
+datafusion.execution.parquet.allow_single_file_parallelism true Controls
whether DataFusion will attempt to speed up writing parquet files by
serializing them in parallel. Each column in each row group in each output file
are serialized in parallel leveraging a maximum possible core count of
n_files*n_row_groups*n_columns.
datafusion.execution.parquet.bloom_filter_enabled false Sets if bloom filter
is enabled for any column
datafusion.execution.parquet.bloom_filter_fpp NULL Sets bloom filter false
positive probability. If NULL, uses default parquet writer setting
datafusion.execution.parquet.bloom_filter_ndv NULL Sets bloom filter number of
distinct values. If NULL, uses default parquet writer setting
diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt
b/datafusion/sqllogictest/test_files/repartition_scan.slt
index 5ee0da2d33..4b8c8f2f08 100644
--- a/datafusion/sqllogictest/test_files/repartition_scan.slt
+++ b/datafusion/sqllogictest/test_files/repartition_scan.slt
@@ -61,7 +61,7 @@ Filter: parquet_table.column1 != Int32(42)
physical_plan
CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
-----ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..153],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:153..306],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:306..459],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:459..610]]},
project [...]
+----ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..104],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:104..208],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:208..312],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:312..414]]},
project [...]
# disable round robin repartitioning
statement ok
@@ -77,7 +77,7 @@ Filter: parquet_table.column1 != Int32(42)
physical_plan
CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
-----ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..153],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:153..306],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:306..459],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:459..610]]},
project [...]
+----ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..104],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:104..208],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:208..312],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:312..414]]},
project [...]
# enable round robin repartitioning again
statement ok
@@ -102,7 +102,7 @@ SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
--SortExec: expr=[column1@0 ASC NULLS LAST]
----CoalesceBatchesExec: target_batch_size=8192
------FilterExec: column1@0 != 42
---------ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..303],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:303..601,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..5],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:5..308],
[WORKSPACE_RO [...]
+--------ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..205],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:205..405,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..5],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:5..210],
[WORKSPACE_RO [...]
## Read the files as though they are ordered
@@ -138,7 +138,7 @@ physical_plan
SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
--CoalesceBatchesExec: target_batch_size=8192
----FilterExec: column1@0 != 42
-------ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..300],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..305],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:305..610],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:300..601]]},
project [...]
+------ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..202],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..207],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:207..414],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:202..405]]},
project [...]
# Cleanup
statement ok
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index 9d914aaaf1..8b039102d4 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -71,7 +71,7 @@ Environment variables are read during `SessionConfig`
initialisation so they mus
| datafusion.execution.parquet.bloom_filter_enabled |
false | Sets if bloom filter is enabled for any column
[...]
| datafusion.execution.parquet.bloom_filter_fpp |
NULL | Sets bloom filter false positive probability. If
NULL, uses default parquet writer setting
[...]
| datafusion.execution.parquet.bloom_filter_ndv |
NULL | Sets bloom filter number of distinct values. If
NULL, uses default parquet writer setting
[...]
-| datafusion.execution.parquet.allow_single_file_parallelism |
false | Controls whether DataFusion will attempt to speed
up writing parquet files by serializing them in parallel. Each column in each
row group in each output file are serialized in parallel leveraging a maximum
possible core count of n_files*n_row_groups*n_columns.
[...]
+| datafusion.execution.parquet.allow_single_file_parallelism |
true | Controls whether DataFusion will attempt to speed
up writing parquet files by serializing them in parallel. Each column in each
row group in each output file are serialized in parallel leveraging a maximum
possible core count of n_files*n_row_groups*n_columns.
[...]
| datafusion.execution.parquet.maximum_parallel_row_group_writers | 1
| By default parallel parquet writer is tuned for
minimum memory usage in a streaming execution plan. You may see a performance
benefit when writing large parquet files by increasing
maximum_parallel_row_group_writers and
maximum_buffered_record_batches_per_stream if your system has idle cores and
can tolerate additional memory usage. Boosting these values is likely
worthwhile when writi [...]
| datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2
| By default parallel parquet writer is tuned for
minimum memory usage in a streaming execution plan. You may see a performance
benefit when writing large parquet files by increasing
maximum_parallel_row_group_writers and
maximum_buffered_record_batches_per_stream if your system has idle cores and
can tolerate additional memory usage. Boosting these values is likely
worthwhile when writi [...]
| datafusion.execution.aggregate.scalar_update_factor | 10
| Specifies the threshold for using `ScalarValue`s to
update accumulators during high-cardinality aggregations for each input batch.
The aggregation is considered high-cardinality if the number of affected groups
is greater than or equal to `batch_size / scalar_update_factor`. In such cases,
`ScalarValue`s are utilized for updating accumulators, rather than the default
batch-slice approa [...]