NGA-TRAN commented on code in PR #19124:
URL: https://github.com/apache/datafusion/pull/19124#discussion_r2599951073
##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -620,11 +636,15 @@ impl ListingTable {
ctx: &'a dyn Session,
filters: &'a [Expr],
limit: Option<usize>,
- ) -> datafusion_common::Result<(Vec<FileGroup>, Statistics)> {
+ ) -> datafusion_common::Result<ListFilesResult> {
Review Comment:
Nice refactor & good structure
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -176,6 +176,11 @@ pub struct FileScanConfig {
/// would be incorrect if there are filters being applied, thus this
should be accessed
/// via [`FileScanConfig::statistics`].
pub(crate) statistics: Statistics,
+ /// When true, file_groups are organized by partition column values
+ /// and output_partitioning will return Hash partitioning on partition
columns.
+ /// This allows the optimizer to skip hash repartitioning for aggregates
and joins
+ /// on partition columns.
+ pub partitioned_by_file_group: bool,
Review Comment:
Can you add comment here saying if the number of `target_partitions` <
number of partitions, we will group them further in round-robin fashion? This
ensure it does not get misused.
##########
datafusion/datasource/src/file_groups.rs:
##########
@@ -1160,4 +1212,42 @@ mod test {
assert_partitioned_files(repartitioned.clone(),
repartitioned_preserving_sort);
repartitioned
}
+
+ #[test]
+ fn test_group_by_partition_values() {
+ // Edge cases: empty and zero target
+ assert!(FileGroup::default().group_by_partition_values(4).is_empty());
+ assert!(FileGroup::new(vec![pfile("a", 100)])
+ .group_by_partition_values(0)
+ .is_empty());
+
+ // Helper to create file with partition value
+ let pfile_with_pv = |path: &str, pv: &str| {
+ let mut f = pfile(path, 10);
+ f.partition_values = vec![ScalarValue::from(pv)];
+ f
+ };
+
+ // Case 1: fewer partitions than target
+ let fg = FileGroup::new(vec![
+ pfile_with_pv("a", "p1"),
+ pfile_with_pv("b", "p1"),
+ pfile_with_pv("c", "p2"),
Review Comment:
Does this means 2 partitions: p1 and p2. p1 has 2 files and p1 has 1?
Adding this in the comments will help
##########
datafusion/sqllogictest/test_files/preserve_file_partitioning.slt:
##########
@@ -0,0 +1,604 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for preserve_file_partitions optimization
+#
+# Data Model:
+# - Fact table: Hive-partitioned by f_dkey, sorted by f_dkey, timestamp
+# Schema: timestamp TIMESTAMP, value FLOAT64, partition column: f_dkey STRING
+#
+# - Dimension table: Single file for CollectLeft joins
+# Schema: d_dkey STRING, env STRING, service STRING, host STRING
+#
+# Key benefits demonstrated:
+# - Eliminates RepartitionExec for aggregates/joins/windows on partition
columns
+# - Eliminates SortExec when data is already sorted by partition + order
columns
+# - Uses SinglePartitioned aggregation mode
+##########
+
+##########
+# SETUP: Configuration and Data Generation
+##########
+
+statement ok
+set datafusion.execution.target_partitions = 3;
+
+# Create fact table partitioned by f_dkey
+# Each partition has data sorted by timestamp
+# Partition: f_dkey=A
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 95.5),
+ (TIMESTAMP '2023-01-01T09:00:10', 102.3),
+ (TIMESTAMP '2023-01-01T09:00:20', 98.7),
+ (TIMESTAMP '2023-01-01T09:12:20', 105.1),
+ (TIMESTAMP '2023-01-01T09:12:30', 100.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 150.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 120.8)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Partition: f_dkey=B
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 75.2),
+ (TIMESTAMP '2023-01-01T09:00:10', 82.4),
+ (TIMESTAMP '2023-01-01T09:00:20', 78.9),
+ (TIMESTAMP '2023-01-01T09:00:30', 85.6),
+ (TIMESTAMP '2023-01-01T09:12:30', 80.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 120.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 92.3)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Partition: f_dkey=C
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 300.5),
+ (TIMESTAMP '2023-01-01T09:00:10', 285.7),
+ (TIMESTAMP '2023-01-01T09:00:20', 310.2),
+ (TIMESTAMP '2023-01-01T09:00:30', 295.8),
+ (TIMESTAMP '2023-01-01T09:00:40', 300.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 250.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 275.4)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Create dimension table (single file for CollectLeft joins)
+query I
+COPY (SELECT column1 as d_dkey, column2 as env, column3 as service, column4 as
host FROM (VALUES
+ ('A', 'dev', 'log', 'ma'),
+ ('B', 'prod', 'log', 'ma'),
+ ('C', 'prod', 'log', 'vim'),
+ ('D', 'prod', 'trace', 'vim')
+))
+TO 'test_files/scratch/preserve_file_partitioning/dimension/data.parquet'
+STORED AS PARQUET;
+----
+4
+
+# Create high-cardinality fact table (5 partitions > 3 target_partitions)
+# For testing partition merging with consistent hashing
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 100.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=A/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 200.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=B/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 300.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=C/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 400.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=D/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 500.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=E/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+##########
+# TABLE DECLARATIONS
+##########
+
+# Fact table without ordering (for basic aggregate tests)
+statement ok
+CREATE EXTERNAL TABLE fact_table (timestamp TIMESTAMP, value DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+LOCATION 'test_files/scratch/preserve_file_partitioning/fact/';
+
+# Fact table with ordering (for sort elimination tests)
+statement ok
+CREATE EXTERNAL TABLE fact_table_ordered (timestamp TIMESTAMP, value DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+WITH ORDER (f_dkey ASC, timestamp ASC)
+LOCATION 'test_files/scratch/preserve_file_partitioning/fact/';
+
+# Dimension table (for join tests)
+statement ok
+CREATE EXTERNAL TABLE dimension_table (d_dkey STRING, env STRING, service
STRING, host STRING)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/preserve_file_partitioning/dimension/';
+
+# 'High'-cardinality fact table (5 partitions > 3 target_partitions)
+statement ok
+CREATE EXTERNAL TABLE high_cardinality_table (timestamp TIMESTAMP, value
DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+LOCATION 'test_files/scratch/preserve_file_partitioning/high_cardinality/';
+
+##########
+# TEST 1: Basic Aggregate - Without Optimization
+# Shows RepartitionExec and two-phase aggregation
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 0;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+logical_plan
+01)Projection: fact_table.f_dkey, count(Int64(1)) AS count(*),
sum(fact_table.value)
+02)--Aggregate: groupBy=[[fact_table.f_dkey]], aggr=[[count(Int64(1)),
sum(fact_table.value)]]
+03)----TableScan: fact_table projection=[value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
sum(fact_table.value)@2 as sum(fact_table.value)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3
+05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+06)----------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], file_type=parquet
+
+# Verify results without optimization
+query TIR rowsort
+SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+A 7 772.4
+B 7 614.4
+C 7 2017.6
+
+##########
+# TEST 2: Basic Aggregate - With Optimization
+# Shows SinglePartitioned mode, no RepartitionExec
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+logical_plan
+01)Projection: fact_table.f_dkey, count(Int64(1)) AS count(*),
sum(fact_table.value)
+02)--Aggregate: groupBy=[[fact_table.f_dkey]], aggr=[[count(Int64(1)),
sum(fact_table.value)]]
+03)----TableScan: fact_table projection=[value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
sum(fact_table.value)@2 as sum(fact_table.value)]
+02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+03)----DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], file_type=parquet
+
+# Verify results with optimization match results without optimization
+query TIR rowsort
+SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+A 7 772.4
+B 7 614.4
+C 7 2017.6
+
+##########
+# TEST 3: Aggregate with ORDER BY - Without Optimization
+# Shows SortExec and RepartitionExec
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 0;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY
f_dkey ORDER BY f_dkey;
+----
+logical_plan
+01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST
+02)--Projection: fact_table_ordered.f_dkey, count(Int64(1)) AS count(*),
avg(fact_table_ordered.value)
+03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey]],
aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]]
+04)------TableScan: fact_table_ordered projection=[value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
+03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
+05)--------CoalesceBatchesExec: target_batch_size=8192
+06)----------RepartitionExec: partitioning=Hash([f_dkey@0], 3),
input_partitions=3
+07)------------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+08)--------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet
+
+# Verify results without optimization
+query TIR
+SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY f_dkey
ORDER BY f_dkey;
+----
+A 7 110.342857142857
+B 7 87.771428571429
+C 7 288.228571428571
+
+##########
+# TEST 4: Aggregate with ORDER BY - With Optimization
+# No SortExec, no RepartitionExec, just SortPreservingMergeExec
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY
f_dkey ORDER BY f_dkey;
+----
+logical_plan
+01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST
+02)--Projection: fact_table_ordered.f_dkey, count(Int64(1)) AS count(*),
avg(fact_table_ordered.value)
+03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey]],
aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]]
+04)------TableScan: fact_table_ordered projection=[value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
+03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+04)------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet
Review Comment:
🎉
##########
datafusion/sqllogictest/test_files/preserve_file_partitioning.slt:
##########
@@ -0,0 +1,604 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for preserve_file_partitions optimization
+#
+# Data Model:
+# - Fact table: Hive-partitioned by f_dkey, sorted by f_dkey, timestamp
+# Schema: timestamp TIMESTAMP, value FLOAT64, partition column: f_dkey STRING
+#
+# - Dimension table: Single file for CollectLeft joins
+# Schema: d_dkey STRING, env STRING, service STRING, host STRING
+#
+# Key benefits demonstrated:
+# - Eliminates RepartitionExec for aggregates/joins/windows on partition
columns
+# - Eliminates SortExec when data is already sorted by partition + order
columns
+# - Uses SinglePartitioned aggregation mode
+##########
+
+##########
+# SETUP: Configuration and Data Generation
+##########
+
+statement ok
+set datafusion.execution.target_partitions = 3;
+
+# Create fact table partitioned by f_dkey
+# Each partition has data sorted by timestamp
+# Partition: f_dkey=A
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 95.5),
+ (TIMESTAMP '2023-01-01T09:00:10', 102.3),
+ (TIMESTAMP '2023-01-01T09:00:20', 98.7),
+ (TIMESTAMP '2023-01-01T09:12:20', 105.1),
+ (TIMESTAMP '2023-01-01T09:12:30', 100.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 150.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 120.8)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Partition: f_dkey=B
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 75.2),
+ (TIMESTAMP '2023-01-01T09:00:10', 82.4),
+ (TIMESTAMP '2023-01-01T09:00:20', 78.9),
+ (TIMESTAMP '2023-01-01T09:00:30', 85.6),
+ (TIMESTAMP '2023-01-01T09:12:30', 80.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 120.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 92.3)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Partition: f_dkey=C
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 300.5),
+ (TIMESTAMP '2023-01-01T09:00:10', 285.7),
+ (TIMESTAMP '2023-01-01T09:00:20', 310.2),
+ (TIMESTAMP '2023-01-01T09:00:30', 295.8),
+ (TIMESTAMP '2023-01-01T09:00:40', 300.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 250.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 275.4)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Create dimension table (single file for CollectLeft joins)
+query I
+COPY (SELECT column1 as d_dkey, column2 as env, column3 as service, column4 as
host FROM (VALUES
+ ('A', 'dev', 'log', 'ma'),
+ ('B', 'prod', 'log', 'ma'),
+ ('C', 'prod', 'log', 'vim'),
+ ('D', 'prod', 'trace', 'vim')
+))
+TO 'test_files/scratch/preserve_file_partitioning/dimension/data.parquet'
+STORED AS PARQUET;
+----
+4
+
+# Create high-cardinality fact table (5 partitions > 3 target_partitions)
+# For testing partition merging with consistent hashing
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 100.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=A/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 200.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=B/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 300.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=C/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 400.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=D/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 500.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=E/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+##########
+# TABLE DECLARATIONS
+##########
+
+# Fact table without ordering (for basic aggregate tests)
+statement ok
+CREATE EXTERNAL TABLE fact_table (timestamp TIMESTAMP, value DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+LOCATION 'test_files/scratch/preserve_file_partitioning/fact/';
+
+# Fact table with ordering (for sort elimination tests)
+statement ok
+CREATE EXTERNAL TABLE fact_table_ordered (timestamp TIMESTAMP, value DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+WITH ORDER (f_dkey ASC, timestamp ASC)
+LOCATION 'test_files/scratch/preserve_file_partitioning/fact/';
+
+# Dimension table (for join tests)
+statement ok
+CREATE EXTERNAL TABLE dimension_table (d_dkey STRING, env STRING, service
STRING, host STRING)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/preserve_file_partitioning/dimension/';
+
+# 'High'-cardinality fact table (5 partitions > 3 target_partitions)
+statement ok
+CREATE EXTERNAL TABLE high_cardinality_table (timestamp TIMESTAMP, value
DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+LOCATION 'test_files/scratch/preserve_file_partitioning/high_cardinality/';
+
+##########
+# TEST 1: Basic Aggregate - Without Optimization
+# Shows RepartitionExec and two-phase aggregation
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 0;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+logical_plan
+01)Projection: fact_table.f_dkey, count(Int64(1)) AS count(*),
sum(fact_table.value)
+02)--Aggregate: groupBy=[[fact_table.f_dkey]], aggr=[[count(Int64(1)),
sum(fact_table.value)]]
+03)----TableScan: fact_table projection=[value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
sum(fact_table.value)@2 as sum(fact_table.value)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3
+05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+06)----------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], file_type=parquet
+
+# Verify results without optimization
+query TIR rowsort
+SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+A 7 772.4
+B 7 614.4
+C 7 2017.6
+
+##########
+# TEST 2: Basic Aggregate - With Optimization
+# Shows SinglePartitioned mode, no RepartitionExec
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+logical_plan
+01)Projection: fact_table.f_dkey, count(Int64(1)) AS count(*),
sum(fact_table.value)
+02)--Aggregate: groupBy=[[fact_table.f_dkey]], aggr=[[count(Int64(1)),
sum(fact_table.value)]]
+03)----TableScan: fact_table projection=[value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
sum(fact_table.value)@2 as sum(fact_table.value)]
+02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+03)----DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], file_type=parquet
+
+# Verify results with optimization match results without optimization
+query TIR rowsort
+SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+A 7 772.4
+B 7 614.4
+C 7 2017.6
+
+##########
+# TEST 3: Aggregate with ORDER BY - Without Optimization
+# Shows SortExec and RepartitionExec
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 0;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY
f_dkey ORDER BY f_dkey;
+----
+logical_plan
+01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST
+02)--Projection: fact_table_ordered.f_dkey, count(Int64(1)) AS count(*),
avg(fact_table_ordered.value)
+03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey]],
aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]]
+04)------TableScan: fact_table_ordered projection=[value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
+03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
+05)--------CoalesceBatchesExec: target_batch_size=8192
+06)----------RepartitionExec: partitioning=Hash([f_dkey@0], 3),
input_partitions=3
+07)------------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+08)--------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet
+
+# Verify results without optimization
+query TIR
+SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY f_dkey
ORDER BY f_dkey;
+----
+A 7 110.342857142857
+B 7 87.771428571429
+C 7 288.228571428571
+
+##########
+# TEST 4: Aggregate with ORDER BY - With Optimization
+# No SortExec, no RepartitionExec, just SortPreservingMergeExec
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY
f_dkey ORDER BY f_dkey;
+----
+logical_plan
+01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST
+02)--Projection: fact_table_ordered.f_dkey, count(Int64(1)) AS count(*),
avg(fact_table_ordered.value)
+03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey]],
aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]]
+04)------TableScan: fact_table_ordered projection=[value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
+03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+04)------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet
+
+query TIR
+SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY f_dkey
ORDER BY f_dkey;
+----
+A 7 110.342857142857
+B 7 87.771428571429
+C 7 288.228571428571
+
+##########
+# TEST 5: Join with Hash Partitioning Propagation - Without Optimization
+# CollectLeft join followed by RepartitionExec and SortExec for aggregate
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 0;
+
+query TT
+EXPLAIN SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value)
+FROM fact_table_ordered f
+INNER JOIN dimension_table d ON f.f_dkey = d.d_dkey
+WHERE d.service = 'log'
+GROUP BY f.f_dkey
+ORDER BY f.f_dkey;
+----
+logical_plan
+01)Sort: f.f_dkey ASC NULLS LAST
+02)--Projection: f.f_dkey, max(d.env), max(d.service), count(Int64(1)) AS
count(*), sum(f.value)
+03)----Aggregate: groupBy=[[f.f_dkey]], aggr=[[max(d.env), max(d.service),
count(Int64(1)), sum(f.value)]]
+04)------Projection: f.value, f.f_dkey, d.env, d.service
+05)--------Inner Join: f.f_dkey = d.d_dkey
+06)----------SubqueryAlias: f
+07)------------TableScan: fact_table_ordered projection=[value, f_dkey]
+08)----------SubqueryAlias: d
+09)------------Filter: dimension_table.service = Utf8View("log")
+10)--------------TableScan: dimension_table projection=[d_dkey, env, service],
partial_filters=[dimension_table.service = Utf8View("log")]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, max(d.env)@1 as max(d.env),
max(d.service)@2 as max(d.service), count(Int64(1))@3 as count(*),
sum(f.value)@4 as sum(f.value)]
+03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)],
ordering_mode=Sorted
+04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
+05)--------CoalesceBatchesExec: target_batch_size=8192
+06)----------RepartitionExec: partitioning=Hash([f_dkey@0], 3),
input_partitions=3
+07)------------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)],
ordering_mode=Sorted
+08)--------------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey,
env@0 as env, service@1 as service]
+09)----------------CoalesceBatchesExec: target_batch_size=8192
+10)------------------HashJoinExec: mode=CollectLeft, join_type=Inner,
on=[(d_dkey@0, f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4]
+11)--------------------CoalescePartitionsExec
+12)----------------------FilterExec: service@2 = log
+13)------------------------RepartitionExec: partitioning=RoundRobinBatch(3),
input_partitions=1
+14)--------------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]},
projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 =
log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <=
log AND log <= service_max@1, required_guarantees=[service in (log)]
+15)--------------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# Verify results without optimization
+query TTTIR rowsort
+SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value)
+FROM fact_table_ordered f
+INNER JOIN dimension_table d ON f.f_dkey = d.d_dkey
+WHERE d.service = 'log'
+GROUP BY f.f_dkey
+ORDER BY f.f_dkey;
+----
+A dev log 7 772.4
+B prod log 7 614.4
+C prod log 7 2017.6
+
+##########
+# TEST 6: Join with Hash Partitioning Propagation - With Optimization
+# Hash partitioning propagates through join, no RepartitionExec/SortExec after
join
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value)
+FROM fact_table_ordered f
+INNER JOIN dimension_table d ON f.f_dkey = d.d_dkey
+WHERE d.service = 'log'
+GROUP BY f.f_dkey
+ORDER BY f.f_dkey;
+----
+logical_plan
+01)Sort: f.f_dkey ASC NULLS LAST
+02)--Projection: f.f_dkey, max(d.env), max(d.service), count(Int64(1)) AS
count(*), sum(f.value)
+03)----Aggregate: groupBy=[[f.f_dkey]], aggr=[[max(d.env), max(d.service),
count(Int64(1)), sum(f.value)]]
+04)------Projection: f.value, f.f_dkey, d.env, d.service
+05)--------Inner Join: f.f_dkey = d.d_dkey
+06)----------SubqueryAlias: f
+07)------------TableScan: fact_table_ordered projection=[value, f_dkey]
+08)----------SubqueryAlias: d
+09)------------Filter: dimension_table.service = Utf8View("log")
+10)--------------TableScan: dimension_table projection=[d_dkey, env, service],
partial_filters=[dimension_table.service = Utf8View("log")]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, max(d.env)@1 as max(d.env),
max(d.service)@2 as max(d.service), count(Int64(1))@3 as count(*),
sum(f.value)@4 as sum(f.value)]
+03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey],
aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)],
ordering_mode=Sorted
+04)------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey, env@0 as
env, service@1 as service]
+05)--------CoalesceBatchesExec: target_batch_size=8192
+06)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0,
f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4]
+07)------------CoalescePartitionsExec
+08)--------------FilterExec: service@2 = log
+09)----------------RepartitionExec: partitioning=RoundRobinBatch(3),
input_partitions=1
+10)------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]},
projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 =
log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <=
log AND log <= service_max@1, required_guarantees=[service in (log)]
+11)------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet, predicate=DynamicFilter [ empty ]
Review Comment:
Awesome
##########
datafusion/sqllogictest/test_files/preserve_file_partitioning.slt:
##########
@@ -0,0 +1,604 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for preserve_file_partitions optimization
+#
+# Data Model:
+# - Fact table: Hive-partitioned by f_dkey, sorted by f_dkey, timestamp
+# Schema: timestamp TIMESTAMP, value FLOAT64, partition column: f_dkey STRING
+#
+# - Dimension table: Single file for CollectLeft joins
+# Schema: d_dkey STRING, env STRING, service STRING, host STRING
+#
+# Key benefits demonstrated:
+# - Eliminates RepartitionExec for aggregates/joins/windows on partition
columns
+# - Eliminates SortExec when data is already sorted by partition + order
columns
+# - Uses SinglePartitioned aggregation mode
+##########
+
+##########
+# SETUP: Configuration and Data Generation
+##########
+
+statement ok
+set datafusion.execution.target_partitions = 3;
+
+# Create fact table partitioned by f_dkey
+# Each partition has data sorted by timestamp
+# Partition: f_dkey=A
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 95.5),
+ (TIMESTAMP '2023-01-01T09:00:10', 102.3),
+ (TIMESTAMP '2023-01-01T09:00:20', 98.7),
+ (TIMESTAMP '2023-01-01T09:12:20', 105.1),
+ (TIMESTAMP '2023-01-01T09:12:30', 100.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 150.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 120.8)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Partition: f_dkey=B
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 75.2),
+ (TIMESTAMP '2023-01-01T09:00:10', 82.4),
+ (TIMESTAMP '2023-01-01T09:00:20', 78.9),
+ (TIMESTAMP '2023-01-01T09:00:30', 85.6),
+ (TIMESTAMP '2023-01-01T09:12:30', 80.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 120.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 92.3)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Partition: f_dkey=C
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 300.5),
+ (TIMESTAMP '2023-01-01T09:00:10', 285.7),
+ (TIMESTAMP '2023-01-01T09:00:20', 310.2),
+ (TIMESTAMP '2023-01-01T09:00:30', 295.8),
+ (TIMESTAMP '2023-01-01T09:00:40', 300.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 250.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 275.4)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Create dimension table (single file for CollectLeft joins)
+query I
+COPY (SELECT column1 as d_dkey, column2 as env, column3 as service, column4 as
host FROM (VALUES
+ ('A', 'dev', 'log', 'ma'),
+ ('B', 'prod', 'log', 'ma'),
+ ('C', 'prod', 'log', 'vim'),
+ ('D', 'prod', 'trace', 'vim')
+))
+TO 'test_files/scratch/preserve_file_partitioning/dimension/data.parquet'
+STORED AS PARQUET;
+----
+4
+
+# Create high-cardinality fact table (5 partitions > 3 target_partitions)
+# For testing partition merging with consistent hashing
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 100.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=A/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 200.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=B/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 300.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=C/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 400.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=D/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 500.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=E/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+##########
+# TABLE DECLARATIONS
+##########
+
+# Fact table without ordering (for basic aggregate tests)
+statement ok
+CREATE EXTERNAL TABLE fact_table (timestamp TIMESTAMP, value DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+LOCATION 'test_files/scratch/preserve_file_partitioning/fact/';
+
+# Fact table with ordering (for sort elimination tests)
+statement ok
+CREATE EXTERNAL TABLE fact_table_ordered (timestamp TIMESTAMP, value DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+WITH ORDER (f_dkey ASC, timestamp ASC)
+LOCATION 'test_files/scratch/preserve_file_partitioning/fact/';
+
+# Dimension table (for join tests)
+statement ok
+CREATE EXTERNAL TABLE dimension_table (d_dkey STRING, env STRING, service
STRING, host STRING)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/preserve_file_partitioning/dimension/';
+
+# 'High'-cardinality fact table (5 partitions > 3 target_partitions)
+statement ok
+CREATE EXTERNAL TABLE high_cardinality_table (timestamp TIMESTAMP, value
DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+LOCATION 'test_files/scratch/preserve_file_partitioning/high_cardinality/';
+
+##########
+# TEST 1: Basic Aggregate - Without Optimization
+# Shows RepartitionExec and two-phase aggregation
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 0;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+logical_plan
+01)Projection: fact_table.f_dkey, count(Int64(1)) AS count(*),
sum(fact_table.value)
+02)--Aggregate: groupBy=[[fact_table.f_dkey]], aggr=[[count(Int64(1)),
sum(fact_table.value)]]
+03)----TableScan: fact_table projection=[value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
sum(fact_table.value)@2 as sum(fact_table.value)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3
+05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+06)----------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], file_type=parquet
+
+# Verify results without optimization
+query TIR rowsort
+SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+A 7 772.4
+B 7 614.4
+C 7 2017.6
+
+##########
+# TEST 2: Basic Aggregate - With Optimization
+# Shows SinglePartitioned mode, no RepartitionExec
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+logical_plan
+01)Projection: fact_table.f_dkey, count(Int64(1)) AS count(*),
sum(fact_table.value)
+02)--Aggregate: groupBy=[[fact_table.f_dkey]], aggr=[[count(Int64(1)),
sum(fact_table.value)]]
+03)----TableScan: fact_table projection=[value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
sum(fact_table.value)@2 as sum(fact_table.value)]
+02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+03)----DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], file_type=parquet
Review Comment:
Nice
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -591,7 +625,52 @@ impl DataSource for FileScanConfig {
Ok(source.map(|s| Arc::new(s) as _))
}
+ /// Returns the output partitioning for this file scan.
+ ///
+ /// When `partitioned_by_file_group` is true, this returns
`Partitioning::Hash` on
Review Comment:
Note to reviewers: `Partitioning::Hash` here does not mean the data is hash
partitioned. It means we will use the **enum** `Partitioning::Hash`. For future
PR, we will refactor to rename this `Partitioning::Hash` to `Partition:Expr`
or the like to say that it is partitioned by an expression
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -591,7 +625,52 @@ impl DataSource for FileScanConfig {
Ok(source.map(|s| Arc::new(s) as _))
}
+ /// Returns the output partitioning for this file scan.
+ ///
+ /// When `partitioned_by_file_group` is true, this returns
`Partitioning::Hash` on
+ /// the Hive partition columns, allowing the optimizer to skip hash
repartitioning
+ /// for aggregates and joins on those columns.
+ ///
+ /// Tradeoffs
+ /// - Benefit: Eliminates `RepartitionExec` and `SortExec` for queries with
+ /// `GROUP BY` or `ORDER BY` on partition columns.
+ /// - Cost: Files are grouped by partition values rather than split by byte
+ /// ranges, which may reduce I/O parallelism when partition sizes are
uneven.
+ /// For simple aggregations without `ORDER BY`, this cost may outweigh
the benefit.
+ ///
+ /// Follow-up Work
+ /// - Idea: Could allow byte-range splitting within partition-aware groups,
+ /// preserving I/O parallelism while maintaining partition semantics.
fn output_partitioning(&self) -> Partitioning {
Review Comment:
Can we have unit tests for this function?
##########
datafusion/datasource/src/file_groups.rs:
##########
@@ -464,6 +464,57 @@ impl FileGroup {
chunks
}
+
+ /// Groups files by their partition values, ensuring all files with same
+ /// partition values are in the same group.
+ ///
+ /// Note: May return fewer groups than `max_target_partitions` when the
+ /// number of unique partition values is less than the target.
+ pub fn group_by_partition_values(
+ self,
+ max_target_partitions: usize,
+ ) -> Vec<FileGroup> {
+ if self.is_empty() || max_target_partitions == 0 {
+ return vec![];
+ }
+
+ let mut partition_groups: HashMap<
+ Vec<datafusion_common::ScalarValue>,
+ Vec<PartitionedFile>,
+ > = HashMap::new();
+
+ for file in self.files {
+ partition_groups
+ .entry(file.partition_values.clone())
+ .or_default()
+ .push(file);
+ }
+
+ let num_unique_partitions = partition_groups.len();
+
+ // Sort for deterministic bucket assignment across query executions.
+ let mut sorted_partitions: Vec<_> =
partition_groups.into_iter().collect();
+ sorted_partitions
+ .sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
+
+ if num_unique_partitions <= max_target_partitions {
+ sorted_partitions
+ .into_iter()
+ .map(|(_, files)| FileGroup::new(files))
+ .collect()
+ } else {
+ // Merge into max_target_partitions buckets using round-robin.
+ let mut target_groups: Vec<Vec<PartitionedFile>> =
+ vec![vec![]; max_target_partitions];
+
+ for (idx, (_, files)) in sorted_partitions.into_iter().enumerate()
{
+ let bucket = idx % max_target_partitions;
+ target_groups[bucket].extend(files);
+ }
+
+ target_groups.into_iter().map(FileGroup::new).collect()
Review Comment:
Does this `else` condition means the statement in your comment `ensuring all
files with same partition values are in the same group` no longer hold? If the
expression is simple (e.g column name), the round robin will preserved file
partitions but it might not in complicated use cases. You may want to add
comment here.
##########
datafusion/common/src/config.rs:
##########
@@ -965,6 +965,15 @@ config_namespace! {
/// record tables provided to the MemTable on creation.
pub repartition_file_scans: bool, default = true
+ /// Minimum number of distinct partition values required to group
files by their
+ /// Hive partition column values (enabling Hash partitioning
declaration).
+ /// Set to 0 to disable. When enabled and the threshold is met, allows
the optimizer
+ /// to skip hash repartitioning for aggregates and joins on partition
columns.
Review Comment:
You may want to add a comment here saying if `preserve_file_partitions=1`,
the optimizer will always skip hash partitioning.
🤔 I am wondering if it is useful to add the comments with 3 bullets:
- preserve_file_partitions=0: disable the hash/expression partitioning
- preserve_file_partitions=1: always enable it
- preserve_file_partitions=N, target_partitionings=M (M > 1): only enable
when N >= M
##########
datafusion/datasource/src/file_groups.rs:
##########
@@ -1160,4 +1212,42 @@ mod test {
assert_partitioned_files(repartitioned.clone(),
repartitioned_preserving_sort);
repartitioned
}
+
+ #[test]
+ fn test_group_by_partition_values() {
+ // Edge cases: empty and zero target
+ assert!(FileGroup::default().group_by_partition_values(4).is_empty());
+ assert!(FileGroup::new(vec![pfile("a", 100)])
+ .group_by_partition_values(0)
+ .is_empty());
+
+ // Helper to create file with partition value
+ let pfile_with_pv = |path: &str, pv: &str| {
+ let mut f = pfile(path, 10);
+ f.partition_values = vec![ScalarValue::from(pv)];
+ f
+ };
+
+ // Case 1: fewer partitions than target
+ let fg = FileGroup::new(vec![
+ pfile_with_pv("a", "p1"),
+ pfile_with_pv("b", "p1"),
+ pfile_with_pv("c", "p2"),
+ ]);
+ let groups = fg.group_by_partition_values(4);
+ assert_eq!(groups.len(), 2);
+ assert_eq!(groups[0].len(), 2);
+ assert_eq!(groups[1].len(), 1);
+
+ // Case 2: more partitions than target
+ let fg = FileGroup::new(vec![
+ pfile_with_pv("a", "p1"),
+ pfile_with_pv("b", "p2"),
+ pfile_with_pv("c", "p3"),
+ pfile_with_pv("d", "p4"),
+ pfile_with_pv("e", "p5"),
+ ]);
+ let groups = fg.group_by_partition_values(3);
+ assert_eq!(groups.len(), 3);
Review Comment:
I would assert the content of each partitions, too. You had a deterministic
round robin distribution
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -581,6 +608,13 @@ impl DataSource for FileScanConfig {
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
) -> Result<Option<Arc<dyn DataSource>>> {
+ // When files are grouped by partition values, we cannot not allow
byte-range
Review Comment:
```suggestion
// When files are grouped by partition values, we cannot allow
byte-range
```
##########
datafusion/sqllogictest/test_files/preserve_file_partitioning.slt:
##########
@@ -0,0 +1,604 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for preserve_file_partitions optimization
+#
+# Data Model:
+# - Fact table: Hive-partitioned by f_dkey, sorted by f_dkey, timestamp
+# Schema: timestamp TIMESTAMP, value FLOAT64, partition column: f_dkey STRING
+#
+# - Dimension table: Single file for CollectLeft joins
+# Schema: d_dkey STRING, env STRING, service STRING, host STRING
+#
+# Key benefits demonstrated:
+# - Eliminates RepartitionExec for aggregates/joins/windows on partition
columns
+# - Eliminates SortExec when data is already sorted by partition + order
columns
+# - Uses SinglePartitioned aggregation mode
+##########
+
+##########
+# SETUP: Configuration and Data Generation
+##########
+
+statement ok
+set datafusion.execution.target_partitions = 3;
+
+# Create fact table partitioned by f_dkey
+# Each partition has data sorted by timestamp
+# Partition: f_dkey=A
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 95.5),
+ (TIMESTAMP '2023-01-01T09:00:10', 102.3),
+ (TIMESTAMP '2023-01-01T09:00:20', 98.7),
+ (TIMESTAMP '2023-01-01T09:12:20', 105.1),
+ (TIMESTAMP '2023-01-01T09:12:30', 100.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 150.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 120.8)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Partition: f_dkey=B
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 75.2),
+ (TIMESTAMP '2023-01-01T09:00:10', 82.4),
+ (TIMESTAMP '2023-01-01T09:00:20', 78.9),
+ (TIMESTAMP '2023-01-01T09:00:30', 85.6),
+ (TIMESTAMP '2023-01-01T09:12:30', 80.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 120.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 92.3)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Partition: f_dkey=C
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 300.5),
+ (TIMESTAMP '2023-01-01T09:00:10', 285.7),
+ (TIMESTAMP '2023-01-01T09:00:20', 310.2),
+ (TIMESTAMP '2023-01-01T09:00:30', 295.8),
+ (TIMESTAMP '2023-01-01T09:00:40', 300.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 250.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 275.4)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Create dimension table (single file for CollectLeft joins)
+query I
+COPY (SELECT column1 as d_dkey, column2 as env, column3 as service, column4 as
host FROM (VALUES
+ ('A', 'dev', 'log', 'ma'),
+ ('B', 'prod', 'log', 'ma'),
+ ('C', 'prod', 'log', 'vim'),
+ ('D', 'prod', 'trace', 'vim')
+))
+TO 'test_files/scratch/preserve_file_partitioning/dimension/data.parquet'
+STORED AS PARQUET;
+----
+4
+
+# Create high-cardinality fact table (5 partitions > 3 target_partitions)
+# For testing partition merging with consistent hashing
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 100.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=A/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 200.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=B/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 300.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=C/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 400.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=D/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 500.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=E/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+##########
+# TABLE DECLARATIONS
+##########
+
+# Fact table without ordering (for basic aggregate tests)
+statement ok
+CREATE EXTERNAL TABLE fact_table (timestamp TIMESTAMP, value DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+LOCATION 'test_files/scratch/preserve_file_partitioning/fact/';
+
+# Fact table with ordering (for sort elimination tests)
+statement ok
+CREATE EXTERNAL TABLE fact_table_ordered (timestamp TIMESTAMP, value DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+WITH ORDER (f_dkey ASC, timestamp ASC)
+LOCATION 'test_files/scratch/preserve_file_partitioning/fact/';
+
+# Dimension table (for join tests)
+statement ok
+CREATE EXTERNAL TABLE dimension_table (d_dkey STRING, env STRING, service
STRING, host STRING)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/preserve_file_partitioning/dimension/';
+
+# 'High'-cardinality fact table (5 partitions > 3 target_partitions)
+statement ok
+CREATE EXTERNAL TABLE high_cardinality_table (timestamp TIMESTAMP, value
DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+LOCATION 'test_files/scratch/preserve_file_partitioning/high_cardinality/';
+
+##########
+# TEST 1: Basic Aggregate - Without Optimization
+# Shows RepartitionExec and two-phase aggregation
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 0;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+logical_plan
+01)Projection: fact_table.f_dkey, count(Int64(1)) AS count(*),
sum(fact_table.value)
+02)--Aggregate: groupBy=[[fact_table.f_dkey]], aggr=[[count(Int64(1)),
sum(fact_table.value)]]
+03)----TableScan: fact_table projection=[value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
sum(fact_table.value)@2 as sum(fact_table.value)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3
+05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+06)----------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], file_type=parquet
+
+# Verify results without optimization
+query TIR rowsort
+SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+A 7 772.4
+B 7 614.4
+C 7 2017.6
+
+##########
+# TEST 2: Basic Aggregate - With Optimization
+# Shows SinglePartitioned mode, no RepartitionExec
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+logical_plan
+01)Projection: fact_table.f_dkey, count(Int64(1)) AS count(*),
sum(fact_table.value)
+02)--Aggregate: groupBy=[[fact_table.f_dkey]], aggr=[[count(Int64(1)),
sum(fact_table.value)]]
+03)----TableScan: fact_table projection=[value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
sum(fact_table.value)@2 as sum(fact_table.value)]
+02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+03)----DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], file_type=parquet
+
+# Verify results with optimization match results without optimization
+query TIR rowsort
+SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+A 7 772.4
+B 7 614.4
+C 7 2017.6
+
+##########
+# TEST 3: Aggregate with ORDER BY - Without Optimization
+# Shows SortExec and RepartitionExec
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 0;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY
f_dkey ORDER BY f_dkey;
+----
+logical_plan
+01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST
+02)--Projection: fact_table_ordered.f_dkey, count(Int64(1)) AS count(*),
avg(fact_table_ordered.value)
+03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey]],
aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]]
+04)------TableScan: fact_table_ordered projection=[value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
+03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
+05)--------CoalesceBatchesExec: target_batch_size=8192
+06)----------RepartitionExec: partitioning=Hash([f_dkey@0], 3),
input_partitions=3
+07)------------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+08)--------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet
+
+# Verify results without optimization
+query TIR
+SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY f_dkey
ORDER BY f_dkey;
+----
+A 7 110.342857142857
+B 7 87.771428571429
+C 7 288.228571428571
+
+##########
+# TEST 4: Aggregate with ORDER BY - With Optimization
+# No SortExec, no RepartitionExec, just SortPreservingMergeExec
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY
f_dkey ORDER BY f_dkey;
+----
+logical_plan
+01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST
+02)--Projection: fact_table_ordered.f_dkey, count(Int64(1)) AS count(*),
avg(fact_table_ordered.value)
+03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey]],
aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]]
+04)------TableScan: fact_table_ordered projection=[value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
+03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+04)------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet
+
+query TIR
+SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY f_dkey
ORDER BY f_dkey;
+----
+A 7 110.342857142857
+B 7 87.771428571429
+C 7 288.228571428571
+
+##########
+# TEST 5: Join with Hash Partitioning Propagation - Without Optimization
+# CollectLeft join followed by RepartitionExec and SortExec for aggregate
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 0;
+
+query TT
+EXPLAIN SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value)
+FROM fact_table_ordered f
+INNER JOIN dimension_table d ON f.f_dkey = d.d_dkey
+WHERE d.service = 'log'
+GROUP BY f.f_dkey
+ORDER BY f.f_dkey;
+----
+logical_plan
+01)Sort: f.f_dkey ASC NULLS LAST
+02)--Projection: f.f_dkey, max(d.env), max(d.service), count(Int64(1)) AS
count(*), sum(f.value)
+03)----Aggregate: groupBy=[[f.f_dkey]], aggr=[[max(d.env), max(d.service),
count(Int64(1)), sum(f.value)]]
+04)------Projection: f.value, f.f_dkey, d.env, d.service
+05)--------Inner Join: f.f_dkey = d.d_dkey
+06)----------SubqueryAlias: f
+07)------------TableScan: fact_table_ordered projection=[value, f_dkey]
+08)----------SubqueryAlias: d
+09)------------Filter: dimension_table.service = Utf8View("log")
+10)--------------TableScan: dimension_table projection=[d_dkey, env, service],
partial_filters=[dimension_table.service = Utf8View("log")]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, max(d.env)@1 as max(d.env),
max(d.service)@2 as max(d.service), count(Int64(1))@3 as count(*),
sum(f.value)@4 as sum(f.value)]
+03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)],
ordering_mode=Sorted
+04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
+05)--------CoalesceBatchesExec: target_batch_size=8192
+06)----------RepartitionExec: partitioning=Hash([f_dkey@0], 3),
input_partitions=3
+07)------------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)],
ordering_mode=Sorted
+08)--------------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey,
env@0 as env, service@1 as service]
+09)----------------CoalesceBatchesExec: target_batch_size=8192
+10)------------------HashJoinExec: mode=CollectLeft, join_type=Inner,
on=[(d_dkey@0, f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4]
+11)--------------------CoalescePartitionsExec
+12)----------------------FilterExec: service@2 = log
+13)------------------------RepartitionExec: partitioning=RoundRobinBatch(3),
input_partitions=1
+14)--------------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]},
projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 =
log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <=
log AND log <= service_max@1, required_guarantees=[service in (log)]
+15)--------------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# Verify results without optimization
+query TTTIR rowsort
+SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value)
+FROM fact_table_ordered f
+INNER JOIN dimension_table d ON f.f_dkey = d.d_dkey
+WHERE d.service = 'log'
+GROUP BY f.f_dkey
+ORDER BY f.f_dkey;
+----
+A dev log 7 772.4
+B prod log 7 614.4
+C prod log 7 2017.6
+
+##########
+# TEST 6: Join with Hash Partitioning Propagation - With Optimization
+# Hash partitioning propagates through join, no RepartitionExec/SortExec after
join
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value)
+FROM fact_table_ordered f
+INNER JOIN dimension_table d ON f.f_dkey = d.d_dkey
+WHERE d.service = 'log'
+GROUP BY f.f_dkey
+ORDER BY f.f_dkey;
+----
+logical_plan
+01)Sort: f.f_dkey ASC NULLS LAST
+02)--Projection: f.f_dkey, max(d.env), max(d.service), count(Int64(1)) AS
count(*), sum(f.value)
+03)----Aggregate: groupBy=[[f.f_dkey]], aggr=[[max(d.env), max(d.service),
count(Int64(1)), sum(f.value)]]
+04)------Projection: f.value, f.f_dkey, d.env, d.service
+05)--------Inner Join: f.f_dkey = d.d_dkey
+06)----------SubqueryAlias: f
+07)------------TableScan: fact_table_ordered projection=[value, f_dkey]
+08)----------SubqueryAlias: d
+09)------------Filter: dimension_table.service = Utf8View("log")
+10)--------------TableScan: dimension_table projection=[d_dkey, env, service],
partial_filters=[dimension_table.service = Utf8View("log")]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, max(d.env)@1 as max(d.env),
max(d.service)@2 as max(d.service), count(Int64(1))@3 as count(*),
sum(f.value)@4 as sum(f.value)]
+03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey],
aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)],
ordering_mode=Sorted
+04)------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey, env@0 as
env, service@1 as service]
+05)--------CoalesceBatchesExec: target_batch_size=8192
+06)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0,
f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4]
+07)------------CoalescePartitionsExec
+08)--------------FilterExec: service@2 = log
+09)----------------RepartitionExec: partitioning=RoundRobinBatch(3),
input_partitions=1
+10)------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]},
projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 =
log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <=
log AND log <= service_max@1, required_guarantees=[service in (log)]
+11)------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet, predicate=DynamicFilter [ empty ]
+
+query TTTIR rowsort
+SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value)
+FROM fact_table_ordered f
+INNER JOIN dimension_table d ON f.f_dkey = d.d_dkey
+WHERE d.service = 'log'
+GROUP BY f.f_dkey
+ORDER BY f.f_dkey;
+----
+A dev log 7 772.4
+B prod log 7 614.4
+C prod log 7 2017.6
+
+##########
+# TEST 7: Window Function - Without Optimization
+# Shows RepartitionExec and SortExec (hash repartition destroys ordering)
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 0;
+
+query TT
+EXPLAIN SELECT f_dkey, timestamp, value,
+ ROW_NUMBER() OVER (PARTITION BY f_dkey ORDER BY timestamp) as rn
+FROM fact_table_ordered;
+----
+logical_plan
+01)Projection: fact_table_ordered.f_dkey, fact_table_ordered.timestamp,
fact_table_ordered.value, row_number() PARTITION BY [fact_table_ordered.f_dkey]
ORDER BY [fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW AS rn
+02)--WindowAggr: windowExpr=[[row_number() PARTITION BY
[fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+03)----TableScan: fact_table_ordered projection=[timestamp, value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@2 as f_dkey, timestamp@0 as timestamp, value@1
as value, row_number() PARTITION BY [fact_table_ordered.f_dkey] ORDER BY
[fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@3 as rn]
+02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY
[fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number()
PARTITION BY [fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 },
frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
+03)----SortExec: expr=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST],
preserve_partitioning=[true]
+04)------CoalesceBatchesExec: target_batch_size=8192
+05)--------RepartitionExec: partitioning=Hash([f_dkey@2], 3),
input_partitions=3
+06)----------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS
LAST, timestamp@0 ASC NULLS LAST], file_type=parquet
+
+# Verify results without optimization (limited for readability)
+query TPRI rowsort
+SELECT f_dkey, timestamp, value,
+ ROW_NUMBER() OVER (PARTITION BY f_dkey ORDER BY timestamp) as rn
+FROM fact_table_ordered
+WHERE timestamp < TIMESTAMP '2023-01-01T09:00:30';
+----
+A 2023-01-01T09:00:00 95.5 1
+A 2023-01-01T09:00:10 102.3 2
+A 2023-01-01T09:00:20 98.7 3
+B 2023-01-01T09:00:00 75.2 1
+B 2023-01-01T09:00:10 82.4 2
+B 2023-01-01T09:00:20 78.9 3
+C 2023-01-01T09:00:00 300.5 1
+C 2023-01-01T09:00:10 285.7 2
+C 2023-01-01T09:00:20 310.2 3
+
+##########
+# TEST 8: Window Function - With Optimization
+# No RepartitionExec, no SortExec (data already sorted by f_dkey, timestamp)
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f_dkey, timestamp, value,
+ ROW_NUMBER() OVER (PARTITION BY f_dkey ORDER BY timestamp) as rn
+FROM fact_table_ordered;
+----
+logical_plan
+01)Projection: fact_table_ordered.f_dkey, fact_table_ordered.timestamp,
fact_table_ordered.value, row_number() PARTITION BY [fact_table_ordered.f_dkey]
ORDER BY [fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW AS rn
+02)--WindowAggr: windowExpr=[[row_number() PARTITION BY
[fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+03)----TableScan: fact_table_ordered projection=[timestamp, value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@2 as f_dkey, timestamp@0 as timestamp, value@1
as value, row_number() PARTITION BY [fact_table_ordered.f_dkey] ORDER BY
[fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@3 as rn]
+02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY
[fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number()
PARTITION BY [fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 },
frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
+03)----DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS
LAST, timestamp@0 ASC NULLS LAST], file_type=parquet
Review Comment:
🎉
##########
datafusion/sqllogictest/test_files/preserve_file_partitioning.slt:
##########
@@ -0,0 +1,604 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for preserve_file_partitions optimization
+#
+# Data Model:
+# - Fact table: Hive-partitioned by f_dkey, sorted by f_dkey, timestamp
+# Schema: timestamp TIMESTAMP, value FLOAT64, partition column: f_dkey STRING
+#
+# - Dimension table: Single file for CollectLeft joins
+# Schema: d_dkey STRING, env STRING, service STRING, host STRING
+#
+# Key benefits demonstrated:
+# - Eliminates RepartitionExec for aggregates/joins/windows on partition
columns
+# - Eliminates SortExec when data is already sorted by partition + order
columns
+# - Uses SinglePartitioned aggregation mode
+##########
+
+##########
+# SETUP: Configuration and Data Generation
+##########
+
+statement ok
+set datafusion.execution.target_partitions = 3;
+
+# Create fact table partitioned by f_dkey
+# Each partition has data sorted by timestamp
+# Partition: f_dkey=A
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 95.5),
+ (TIMESTAMP '2023-01-01T09:00:10', 102.3),
+ (TIMESTAMP '2023-01-01T09:00:20', 98.7),
+ (TIMESTAMP '2023-01-01T09:12:20', 105.1),
+ (TIMESTAMP '2023-01-01T09:12:30', 100.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 150.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 120.8)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Partition: f_dkey=B
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 75.2),
+ (TIMESTAMP '2023-01-01T09:00:10', 82.4),
+ (TIMESTAMP '2023-01-01T09:00:20', 78.9),
+ (TIMESTAMP '2023-01-01T09:00:30', 85.6),
+ (TIMESTAMP '2023-01-01T09:12:30', 80.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 120.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 92.3)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Partition: f_dkey=C
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 300.5),
+ (TIMESTAMP '2023-01-01T09:00:10', 285.7),
+ (TIMESTAMP '2023-01-01T09:00:20', 310.2),
+ (TIMESTAMP '2023-01-01T09:00:30', 295.8),
+ (TIMESTAMP '2023-01-01T09:00:40', 300.0),
+ (TIMESTAMP '2023-01-01T09:12:40', 250.0),
+ (TIMESTAMP '2023-01-01T09:12:50', 275.4)
+))
+TO 'test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet'
+STORED AS PARQUET;
+----
+7
+
+# Create dimension table (single file for CollectLeft joins)
+query I
+COPY (SELECT column1 as d_dkey, column2 as env, column3 as service, column4 as
host FROM (VALUES
+ ('A', 'dev', 'log', 'ma'),
+ ('B', 'prod', 'log', 'ma'),
+ ('C', 'prod', 'log', 'vim'),
+ ('D', 'prod', 'trace', 'vim')
+))
+TO 'test_files/scratch/preserve_file_partitioning/dimension/data.parquet'
+STORED AS PARQUET;
+----
+4
+
+# Create high-cardinality fact table (5 partitions > 3 target_partitions)
+# For testing partition merging with consistent hashing
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 100.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=A/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 200.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=B/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 300.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=C/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 400.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=D/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+ (TIMESTAMP '2023-01-01T09:00:00', 500.0)
+))
+TO
'test_files/scratch/preserve_file_partitioning/high_cardinality/f_dkey=E/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+##########
+# TABLE DECLARATIONS
+##########
+
+# Fact table without ordering (for basic aggregate tests)
+statement ok
+CREATE EXTERNAL TABLE fact_table (timestamp TIMESTAMP, value DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+LOCATION 'test_files/scratch/preserve_file_partitioning/fact/';
+
+# Fact table with ordering (for sort elimination tests)
+statement ok
+CREATE EXTERNAL TABLE fact_table_ordered (timestamp TIMESTAMP, value DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+WITH ORDER (f_dkey ASC, timestamp ASC)
+LOCATION 'test_files/scratch/preserve_file_partitioning/fact/';
+
+# Dimension table (for join tests)
+statement ok
+CREATE EXTERNAL TABLE dimension_table (d_dkey STRING, env STRING, service
STRING, host STRING)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/preserve_file_partitioning/dimension/';
+
+# 'High'-cardinality fact table (5 partitions > 3 target_partitions)
+statement ok
+CREATE EXTERNAL TABLE high_cardinality_table (timestamp TIMESTAMP, value
DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+LOCATION 'test_files/scratch/preserve_file_partitioning/high_cardinality/';
+
+##########
+# TEST 1: Basic Aggregate - Without Optimization
+# Shows RepartitionExec and two-phase aggregation
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 0;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+logical_plan
+01)Projection: fact_table.f_dkey, count(Int64(1)) AS count(*),
sum(fact_table.value)
+02)--Aggregate: groupBy=[[fact_table.f_dkey]], aggr=[[count(Int64(1)),
sum(fact_table.value)]]
+03)----TableScan: fact_table projection=[value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
sum(fact_table.value)@2 as sum(fact_table.value)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3
+05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+06)----------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], file_type=parquet
+
+# Verify results without optimization
+query TIR rowsort
+SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+A 7 772.4
+B 7 614.4
+C 7 2017.6
+
+##########
+# TEST 2: Basic Aggregate - With Optimization
+# Shows SinglePartitioned mode, no RepartitionExec
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+logical_plan
+01)Projection: fact_table.f_dkey, count(Int64(1)) AS count(*),
sum(fact_table.value)
+02)--Aggregate: groupBy=[[fact_table.f_dkey]], aggr=[[count(Int64(1)),
sum(fact_table.value)]]
+03)----TableScan: fact_table projection=[value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
sum(fact_table.value)@2 as sum(fact_table.value)]
+02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), sum(fact_table.value)]
+03)----DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], file_type=parquet
+
+# Verify results with optimization match results without optimization
+query TIR rowsort
+SELECT f_dkey, count(*), sum(value) FROM fact_table GROUP BY f_dkey;
+----
+A 7 772.4
+B 7 614.4
+C 7 2017.6
+
+##########
+# TEST 3: Aggregate with ORDER BY - Without Optimization
+# Shows SortExec and RepartitionExec
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 0;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY
f_dkey ORDER BY f_dkey;
+----
+logical_plan
+01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST
+02)--Projection: fact_table_ordered.f_dkey, count(Int64(1)) AS count(*),
avg(fact_table_ordered.value)
+03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey]],
aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]]
+04)------TableScan: fact_table_ordered projection=[value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
+03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
+05)--------CoalesceBatchesExec: target_batch_size=8192
+06)----------RepartitionExec: partitioning=Hash([f_dkey@0], 3),
input_partitions=3
+07)------------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+08)--------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet
+
+# Verify results without optimization
+query TIR
+SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY f_dkey
ORDER BY f_dkey;
+----
+A 7 110.342857142857
+B 7 87.771428571429
+C 7 288.228571428571
+
+##########
+# TEST 4: Aggregate with ORDER BY - With Optimization
+# No SortExec, no RepartitionExec, just SortPreservingMergeExec
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY
f_dkey ORDER BY f_dkey;
+----
+logical_plan
+01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST
+02)--Projection: fact_table_ordered.f_dkey, count(Int64(1)) AS count(*),
avg(fact_table_ordered.value)
+03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey]],
aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]]
+04)------TableScan: fact_table_ordered projection=[value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
+03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+04)------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet
+
+query TIR
+SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY f_dkey
ORDER BY f_dkey;
+----
+A 7 110.342857142857
+B 7 87.771428571429
+C 7 288.228571428571
+
+##########
+# TEST 5: Join with Hash Partitioning Propagation - Without Optimization
+# CollectLeft join followed by RepartitionExec and SortExec for aggregate
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 0;
+
+query TT
+EXPLAIN SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value)
+FROM fact_table_ordered f
+INNER JOIN dimension_table d ON f.f_dkey = d.d_dkey
+WHERE d.service = 'log'
+GROUP BY f.f_dkey
+ORDER BY f.f_dkey;
+----
+logical_plan
+01)Sort: f.f_dkey ASC NULLS LAST
+02)--Projection: f.f_dkey, max(d.env), max(d.service), count(Int64(1)) AS
count(*), sum(f.value)
+03)----Aggregate: groupBy=[[f.f_dkey]], aggr=[[max(d.env), max(d.service),
count(Int64(1)), sum(f.value)]]
+04)------Projection: f.value, f.f_dkey, d.env, d.service
+05)--------Inner Join: f.f_dkey = d.d_dkey
+06)----------SubqueryAlias: f
+07)------------TableScan: fact_table_ordered projection=[value, f_dkey]
+08)----------SubqueryAlias: d
+09)------------Filter: dimension_table.service = Utf8View("log")
+10)--------------TableScan: dimension_table projection=[d_dkey, env, service],
partial_filters=[dimension_table.service = Utf8View("log")]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, max(d.env)@1 as max(d.env),
max(d.service)@2 as max(d.service), count(Int64(1))@3 as count(*),
sum(f.value)@4 as sum(f.value)]
+03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)],
ordering_mode=Sorted
+04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
+05)--------CoalesceBatchesExec: target_batch_size=8192
+06)----------RepartitionExec: partitioning=Hash([f_dkey@0], 3),
input_partitions=3
+07)------------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)],
ordering_mode=Sorted
+08)--------------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey,
env@0 as env, service@1 as service]
+09)----------------CoalesceBatchesExec: target_batch_size=8192
+10)------------------HashJoinExec: mode=CollectLeft, join_type=Inner,
on=[(d_dkey@0, f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4]
+11)--------------------CoalescePartitionsExec
+12)----------------------FilterExec: service@2 = log
+13)------------------------RepartitionExec: partitioning=RoundRobinBatch(3),
input_partitions=1
+14)--------------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]},
projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 =
log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <=
log AND log <= service_max@1, required_guarantees=[service in (log)]
+15)--------------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# Verify results without optimization
+query TTTIR rowsort
+SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value)
+FROM fact_table_ordered f
+INNER JOIN dimension_table d ON f.f_dkey = d.d_dkey
+WHERE d.service = 'log'
+GROUP BY f.f_dkey
+ORDER BY f.f_dkey;
+----
+A dev log 7 772.4
+B prod log 7 614.4
+C prod log 7 2017.6
+
+##########
+# TEST 6: Join with Hash Partitioning Propagation - With Optimization
+# Hash partitioning propagates through join, no RepartitionExec/SortExec after
join
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value)
+FROM fact_table_ordered f
+INNER JOIN dimension_table d ON f.f_dkey = d.d_dkey
+WHERE d.service = 'log'
+GROUP BY f.f_dkey
+ORDER BY f.f_dkey;
+----
+logical_plan
+01)Sort: f.f_dkey ASC NULLS LAST
+02)--Projection: f.f_dkey, max(d.env), max(d.service), count(Int64(1)) AS
count(*), sum(f.value)
+03)----Aggregate: groupBy=[[f.f_dkey]], aggr=[[max(d.env), max(d.service),
count(Int64(1)), sum(f.value)]]
+04)------Projection: f.value, f.f_dkey, d.env, d.service
+05)--------Inner Join: f.f_dkey = d.d_dkey
+06)----------SubqueryAlias: f
+07)------------TableScan: fact_table_ordered projection=[value, f_dkey]
+08)----------SubqueryAlias: d
+09)------------Filter: dimension_table.service = Utf8View("log")
+10)--------------TableScan: dimension_table projection=[d_dkey, env, service],
partial_filters=[dimension_table.service = Utf8View("log")]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, max(d.env)@1 as max(d.env),
max(d.service)@2 as max(d.service), count(Int64(1))@3 as count(*),
sum(f.value)@4 as sum(f.value)]
+03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey],
aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)],
ordering_mode=Sorted
+04)------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey, env@0 as
env, service@1 as service]
+05)--------CoalesceBatchesExec: target_batch_size=8192
+06)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0,
f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4]
+07)------------CoalescePartitionsExec
+08)--------------FilterExec: service@2 = log
+09)----------------RepartitionExec: partitioning=RoundRobinBatch(3),
input_partitions=1
+10)------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]},
projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 =
log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <=
log AND log <= service_max@1, required_guarantees=[service in (log)]
+11)------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet, predicate=DynamicFilter [ empty ]
+
+query TTTIR rowsort
+SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value)
+FROM fact_table_ordered f
+INNER JOIN dimension_table d ON f.f_dkey = d.d_dkey
+WHERE d.service = 'log'
+GROUP BY f.f_dkey
+ORDER BY f.f_dkey;
+----
+A dev log 7 772.4
+B prod log 7 614.4
+C prod log 7 2017.6
+
+##########
+# TEST 7: Window Function - Without Optimization
+# Shows RepartitionExec and SortExec (hash repartition destroys ordering)
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 0;
+
+query TT
+EXPLAIN SELECT f_dkey, timestamp, value,
+ ROW_NUMBER() OVER (PARTITION BY f_dkey ORDER BY timestamp) as rn
+FROM fact_table_ordered;
+----
+logical_plan
+01)Projection: fact_table_ordered.f_dkey, fact_table_ordered.timestamp,
fact_table_ordered.value, row_number() PARTITION BY [fact_table_ordered.f_dkey]
ORDER BY [fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW AS rn
+02)--WindowAggr: windowExpr=[[row_number() PARTITION BY
[fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+03)----TableScan: fact_table_ordered projection=[timestamp, value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@2 as f_dkey, timestamp@0 as timestamp, value@1
as value, row_number() PARTITION BY [fact_table_ordered.f_dkey] ORDER BY
[fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@3 as rn]
+02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY
[fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number()
PARTITION BY [fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 },
frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
+03)----SortExec: expr=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST],
preserve_partitioning=[true]
+04)------CoalesceBatchesExec: target_batch_size=8192
+05)--------RepartitionExec: partitioning=Hash([f_dkey@2], 3),
input_partitions=3
+06)----------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS
LAST, timestamp@0 ASC NULLS LAST], file_type=parquet
+
+# Verify results without optimization (limited for readability)
+query TPRI rowsort
+SELECT f_dkey, timestamp, value,
+ ROW_NUMBER() OVER (PARTITION BY f_dkey ORDER BY timestamp) as rn
+FROM fact_table_ordered
+WHERE timestamp < TIMESTAMP '2023-01-01T09:00:30';
+----
+A 2023-01-01T09:00:00 95.5 1
+A 2023-01-01T09:00:10 102.3 2
+A 2023-01-01T09:00:20 98.7 3
+B 2023-01-01T09:00:00 75.2 1
+B 2023-01-01T09:00:10 82.4 2
+B 2023-01-01T09:00:20 78.9 3
+C 2023-01-01T09:00:00 300.5 1
+C 2023-01-01T09:00:10 285.7 2
+C 2023-01-01T09:00:20 310.2 3
+
+##########
+# TEST 8: Window Function - With Optimization
+# No RepartitionExec, no SortExec (data already sorted by f_dkey, timestamp)
+##########
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+query TT
+EXPLAIN SELECT f_dkey, timestamp, value,
+ ROW_NUMBER() OVER (PARTITION BY f_dkey ORDER BY timestamp) as rn
+FROM fact_table_ordered;
+----
+logical_plan
+01)Projection: fact_table_ordered.f_dkey, fact_table_ordered.timestamp,
fact_table_ordered.value, row_number() PARTITION BY [fact_table_ordered.f_dkey]
ORDER BY [fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW AS rn
+02)--WindowAggr: windowExpr=[[row_number() PARTITION BY
[fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+03)----TableScan: fact_table_ordered projection=[timestamp, value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@2 as f_dkey, timestamp@0 as timestamp, value@1
as value, row_number() PARTITION BY [fact_table_ordered.f_dkey] ORDER BY
[fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@3 as rn]
+02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY
[fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number()
PARTITION BY [fact_table_ordered.f_dkey] ORDER BY [fact_table_ordered.timestamp
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 },
frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
+03)----DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS
LAST, timestamp@0 ASC NULLS LAST], file_type=parquet
+
+query TPRI rowsort
+SELECT f_dkey, timestamp, value,
+ ROW_NUMBER() OVER (PARTITION BY f_dkey ORDER BY timestamp) as rn
+FROM fact_table_ordered
+WHERE timestamp < TIMESTAMP '2023-01-01T09:00:30';
+----
+A 2023-01-01T09:00:00 95.5 1
+A 2023-01-01T09:00:10 102.3 2
+A 2023-01-01T09:00:20 98.7 3
+B 2023-01-01T09:00:00 75.2 1
+B 2023-01-01T09:00:10 82.4 2
+B 2023-01-01T09:00:20 78.9 3
+C 2023-01-01T09:00:00 300.5 1
+C 2023-01-01T09:00:10 285.7 2
+C 2023-01-01T09:00:20 310.2 3
+
+##########
+# TEST 9: High-Cardinality Partitions (more partitions than target_partitions)
+# When num_partitions > target_partitions (5 > 3), files are merged using
+# round-robin assignment to ensure exactly target_partitions groups are
created.
+# This enables Hash partitioning to be recognized and RepartitionExec
eliminated.
Review Comment:
Nice and it works for this use cases. See my comment above to see if it
works for all use cases.
I think we only need to add comments as needed to accurate usage
##########
datafusion/core/benches/preserve_file_partitioning.rs:
##########
@@ -0,0 +1,837 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmark for `preserve_file_partitions` optimization.
+//!
+//! When enabled, this optimization declares Hive-partitioned tables as
+//! `Hash([partition_col])` partitioned, allowing the query optimizer to
+//! skip unnecessary repartitioning and sorting operations.
+//!
+//! When This Optimization Helps
+//! - Window functions: PARTITION BY on partition column eliminates
RepartitionExec and SortExec
+//! - Aggregates with ORDER BY: GROUP BY partition column and ORDER BY
eliminates post aggregate sort
+//!
+//! When This Optimization Does NOT Help
+//! - GROUP BY non-partition columns: Required Hash distribution doesn't match
declared partitioning
+//! - I/O Intensive Queries: Limits the parallilization at I/O level, benefits
may not outweigh.
+//!
+//! Usage
+//! - BENCH_SIZE=small|medium|large cargo bench -p datafusion --bench
preserve_file_partitions
+//! - SAVE_PLANS=1 cargo bench ... # Save query plans to files
+
+use arrow::array::{ArrayRef, Float64Array, StringArray,
TimestampMillisecondArray};
+use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty::pretty_format_batches;
+use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion::prelude::{col, ParquetReadOptions, SessionConfig,
SessionContext};
+use datafusion_expr::SortExpr;
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::WriterProperties;
+use std::fs::{self, File};
+use std::io::Write;
+use std::path::Path;
+use std::sync::Arc;
+use tempfile::TempDir;
+use tokio::runtime::Runtime;
+
+#[derive(Debug, Clone, Copy)]
+struct BenchConfig {
+ fact_partitions: usize,
+ rows_per_partition: usize,
+ target_partitions: usize,
+ measurement_time_secs: u64,
+}
+
+impl BenchConfig {
+ fn small() -> Self {
+ Self {
+ fact_partitions: 10,
+ rows_per_partition: 1_000_000,
+ target_partitions: 10,
+ measurement_time_secs: 15,
+ }
+ }
+
+ fn medium() -> Self {
+ Self {
+ fact_partitions: 30,
+ rows_per_partition: 3_000_000,
+ target_partitions: 30,
+ measurement_time_secs: 30,
+ }
+ }
+
+ fn large() -> Self {
+ Self {
+ fact_partitions: 50,
+ rows_per_partition: 6_000_000,
+ target_partitions: 50,
+ measurement_time_secs: 90,
+ }
+ }
+
+ fn from_env() -> Self {
+ match std::env::var("BENCH_SIZE").as_deref() {
+ Ok("small") | Ok("SMALL") => Self::small(),
+ Ok("medium") | Ok("MEDIUM") => Self::medium(),
+ Ok("large") | Ok("LARGE") => Self::large(),
+ _ => {
+ println!("Using SMALL dataset (set
BENCH_SIZE=small|medium|large)");
+ Self::small()
+ }
+ }
+ }
+
+ fn total_rows(&self) -> usize {
+ self.fact_partitions * self.rows_per_partition
+ }
+
+ fn high_cardinality(base: &Self) -> Self {
+ Self {
+ fact_partitions: (base.fact_partitions as f64 * 2.5) as usize,
+ rows_per_partition: base.rows_per_partition / 2,
+ target_partitions: base.target_partitions,
+ measurement_time_secs: base.measurement_time_secs,
+ }
+ }
+}
+
+fn dkey_names(count: usize) -> Vec<String> {
+ (0..count)
+ .map(|i| {
+ if i < 26 {
+ ((b'A' + i as u8) as char).to_string()
+ } else {
+ format!(
+ "{}{}",
+ (b'A' + ((i / 26) - 1) as u8) as char,
+ (b'A' + (i % 26) as u8) as char
+ )
+ }
+ })
+ .collect()
+}
+
+/// Hive-partitioned fact table, sorted by timestamp within each partition.
+fn generate_fact_table(
+ base_dir: &Path,
+ num_partitions: usize,
+ rows_per_partition: usize,
+) {
+ let fact_dir = base_dir.join("fact");
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new(
+ "timestamp",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ false,
+ ),
+ Field::new("value", DataType::Float64, false),
+ ]));
+
+ let props = WriterProperties::builder()
+ .set_compression(parquet::basic::Compression::SNAPPY)
+ .build();
+
+ let dkeys = dkey_names(num_partitions);
+
+ for dkey in &dkeys {
+ let part_dir = fact_dir.join(format!("f_dkey={dkey}"));
+ fs::create_dir_all(&part_dir).unwrap();
+ let file_path = part_dir.join("data.parquet");
+ let file = File::create(file_path).unwrap();
+
+ let mut writer =
+ ArrowWriter::try_new(file, schema.clone(),
Some(props.clone())).unwrap();
+
+ let base_ts = 1672567200000i64; // 2023-01-01T09:00:00
+ let timestamps: Vec<i64> = (0..rows_per_partition)
+ .map(|i| base_ts + (i as i64 * 10000))
+ .collect();
+
+ let values: Vec<f64> = (0..rows_per_partition)
+ .map(|i| 50.0 + (i % 100) as f64 + ((i % 7) as f64 * 10.0))
+ .collect();
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(TimestampMillisecondArray::from(timestamps)) as
ArrayRef,
+ Arc::new(Float64Array::from(values)),
+ ],
+ )
+ .unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+ }
+}
+
+/// Single-file dimension table for CollectLeft joins.
+fn generate_dimension_table(base_dir: &Path, num_partitions: usize) {
+ let dim_dir = base_dir.join("dimension");
+ fs::create_dir_all(&dim_dir).unwrap();
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("d_dkey", DataType::Utf8, false),
+ Field::new("env", DataType::Utf8, false),
+ Field::new("service", DataType::Utf8, false),
+ Field::new("host", DataType::Utf8, false),
+ ]));
+
+ let props = WriterProperties::builder()
+ .set_compression(parquet::basic::Compression::SNAPPY)
+ .build();
+
+ let file_path = dim_dir.join("data.parquet");
+ let file = File::create(file_path).unwrap();
+ let mut writer = ArrowWriter::try_new(file, schema.clone(),
Some(props)).unwrap();
+
+ let dkeys = dkey_names(num_partitions);
+ let envs = ["dev", "prod", "staging", "test"];
+ let services = ["log", "trace", "metric"];
+ let hosts = ["ma", "vim", "nano", "emacs"];
+
+ let d_dkey_vals: Vec<String> = dkeys.clone();
+ let env_vals: Vec<String> = dkeys
+ .iter()
+ .enumerate()
+ .map(|(i, _)| envs[i % envs.len()].to_string())
+ .collect();
+ let service_vals: Vec<String> = dkeys
+ .iter()
+ .enumerate()
+ .map(|(i, _)| services[i % services.len()].to_string())
+ .collect();
+ let host_vals: Vec<String> = dkeys
+ .iter()
+ .enumerate()
+ .map(|(i, _)| hosts[i % hosts.len()].to_string())
+ .collect();
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(StringArray::from(d_dkey_vals)) as ArrayRef,
+ Arc::new(StringArray::from(env_vals)),
+ Arc::new(StringArray::from(service_vals)),
+ Arc::new(StringArray::from(host_vals)),
+ ],
+ )
+ .unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+}
+
+struct BenchVariant {
+ name: &'static str,
+ preserve_file_partitions: usize,
+ prefer_existing_sort: bool,
+}
+
+const BENCH_VARIANTS: [BenchVariant; 3] = [
+ BenchVariant {
+ name: "with_optimization",
+ preserve_file_partitions: 1,
+ prefer_existing_sort: false,
+ },
+ BenchVariant {
+ name: "prefer_existing_sort",
+ preserve_file_partitions: 0,
+ prefer_existing_sort: true,
+ },
+ BenchVariant {
+ name: "without_optimization",
+ preserve_file_partitions: 0,
+ prefer_existing_sort: false,
+ },
+];
+
+async fn save_plans(
+ output_file: &Path,
+ fact_path: &str,
+ dim_path: Option<&str>,
+ target_partitions: usize,
+ query: &str,
+ file_sort_order: Option<Vec<Vec<SortExpr>>>,
+) {
+ let mut file = File::create(output_file).unwrap();
+ writeln!(file, "Query: {query}\n").unwrap();
+
+ for variant in &BENCH_VARIANTS {
+ let session_config = SessionConfig::new()
+ .with_target_partitions(target_partitions)
+ .set_usize(
+ "datafusion.optimizer.preserve_file_partitions",
+ variant.preserve_file_partitions,
+ )
+ .set_bool(
+ "datafusion.optimizer.prefer_existing_sort",
+ variant.prefer_existing_sort,
+ );
+ let ctx = SessionContext::new_with_config(session_config);
+
+ let mut fact_options = ParquetReadOptions {
+ table_partition_cols: vec![("f_dkey".to_string(), DataType::Utf8)],
+ ..Default::default()
+ };
+ if let Some(ref order) = file_sort_order {
+ fact_options.file_sort_order = order.clone();
+ }
+ ctx.register_parquet("fact", fact_path, fact_options)
+ .await
+ .unwrap();
+
+ if let Some(dim) = dim_path {
+ let dim_schema = Arc::new(Schema::new(vec![
+ Field::new("d_dkey", DataType::Utf8, false),
+ Field::new("env", DataType::Utf8, false),
+ Field::new("service", DataType::Utf8, false),
+ Field::new("host", DataType::Utf8, false),
+ ]));
+ let dim_options = ParquetReadOptions {
+ schema: Some(&dim_schema),
+ ..Default::default()
+ };
+ ctx.register_parquet("dimension", dim, dim_options)
+ .await
+ .unwrap();
+ }
+
+ let df = ctx.sql(query).await.unwrap();
+ let plan = df.explain(false, false).unwrap().collect().await.unwrap();
+ writeln!(file, "=== {} ===", variant.name).unwrap();
+ writeln!(file, "{}\n", pretty_format_batches(&plan).unwrap()).unwrap();
+ }
+}
+
+#[allow(clippy::too_many_arguments)]
+fn run_benchmark(
+ c: &mut Criterion,
+ rt: &Runtime,
+ name: &str,
+ fact_path: &str,
+ dim_path: Option<&str>,
+ target_partitions: usize,
+ query: &str,
+ file_sort_order: &Option<Vec<Vec<SortExpr>>>,
+) {
+ if std::env::var("SAVE_PLANS").is_ok() {
+ let output_path = format!("{name}_plans.txt");
+ rt.block_on(save_plans(
+ Path::new(&output_path),
+ fact_path,
+ dim_path,
+ target_partitions,
+ query,
+ file_sort_order.clone(),
+ ));
+ println!("Plans saved to {output_path}");
+ }
+
+ let mut group = c.benchmark_group(name);
+
+ for variant in &BENCH_VARIANTS {
+ let fact_path_owned = fact_path.to_string();
+ let dim_path_owned = dim_path.map(|s| s.to_string());
+ let sort_order = file_sort_order.clone();
+ let query_owned = query.to_string();
+ let preserve_file_partitions = variant.preserve_file_partitions;
+ let prefer_existing_sort = variant.prefer_existing_sort;
+
+ group.bench_function(variant.name, |b| {
+ b.to_async(rt).iter(|| {
+ let fact_path = fact_path_owned.clone();
+ let dim_path = dim_path_owned.clone();
+ let sort_order = sort_order.clone();
+ let query = query_owned.clone();
+ async move {
+ let session_config = SessionConfig::new()
+ .with_target_partitions(target_partitions)
+ .set_usize(
+ "datafusion.optimizer.preserve_file_partitions",
+ preserve_file_partitions,
+ )
+ .set_bool(
+ "datafusion.optimizer.prefer_existing_sort",
+ prefer_existing_sort,
+ );
+ let ctx = SessionContext::new_with_config(session_config);
+
+ let mut fact_options = ParquetReadOptions {
+ table_partition_cols: vec![(
+ "f_dkey".to_string(),
+ DataType::Utf8,
+ )],
+ ..Default::default()
+ };
+ if let Some(ref order) = sort_order {
+ fact_options.file_sort_order = order.clone();
+ }
+ ctx.register_parquet("fact", &fact_path, fact_options)
+ .await
+ .unwrap();
+
+ if let Some(ref dim) = dim_path {
+ let dim_schema = Arc::new(Schema::new(vec![
+ Field::new("d_dkey", DataType::Utf8, false),
+ Field::new("env", DataType::Utf8, false),
+ Field::new("service", DataType::Utf8, false),
+ Field::new("host", DataType::Utf8, false),
+ ]));
+ let dim_options = ParquetReadOptions {
+ schema: Some(&dim_schema),
+ ..Default::default()
+ };
+ ctx.register_parquet("dimension", dim, dim_options)
+ .await
+ .unwrap();
+ }
+
+ let df = ctx.sql(&query).await.unwrap();
+ df.collect().await.unwrap()
+ }
+ })
+ });
+ }
+
+ group.finish();
+}
+
+/// Aggregate on high-cardinality partitions which eliminates repartition and
sort.
+///
+/// Query: SELECT f_dkey, COUNT(*), SUM(value) FROM fact GROUP BY f_dkey ORDER
BY f_dkey
+///
+///
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────┐
+/// │ with_optimization
│
+/// │ (preserve_file_partitions=enabled)
│
+/// │
│
+/// │ ┌───────────────────────────┐
│
+/// │ │ SortPreservingMergeExec │ Sort Preserved
│
+/// │ └─────────────┬─────────────┘
│
+/// │ │
│
+/// │ ┌─────────────▼─────────────┐
│
+/// │ │ AggregateExec │ No repartitioning needed
│
+/// │ │ (SinglePartitioned) │
│
+/// │ └─────────────┬─────────────┘
│
+/// │ │
│
+/// │ ┌─────────────▼─────────────┐
│
+/// │ │ DataSourceExec │ partitioning=Hash([f_dkey])
│
+/// │ │ file_groups={N groups} │
│
+/// │ └───────────────────────────┘
│
+///
└─────────────────────────────────────────────────────────────────────────────────────────────────────────┘
+///
+///
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────┐
+/// │ prefer_existing_sort
│
+/// │ (preserve_file_partitions=disabled,
prefer_existing_sort=true) │
+/// │
│
+/// │ ┌───────────────────────────┐
│
+/// │ │ SortPreservingMergeExec │ Sort Preserved
│
+/// │ └─────────────┬─────────────┘
│
+/// │ │
│
+/// │ ┌─────────────▼─────────────┐
│
+/// │ │ AggregateExec │
│
+/// │ │ (FinalPartitioned) │
│
+/// │ └─────────────┬─────────────┘
│
+/// │ │
│
+/// │ ┌─────────────▼─────────────┐
│
+/// │ │ RepartitionExec │ Hash shuffle with order preservation
│
+/// │ │ Hash([f_dkey], N) │ Uses k-way merge to maintain sort, has
overhead │
+/// │ │ preserve_order=true │
│
+/// │ └─────────────┬─────────────┘
│
+/// │ │
│
+/// │ ┌─────────────▼─────────────┐
│
+/// │ │ AggregateExec │
│
+/// │ │ (Partial) │
│
+/// │ └─────────────┬─────────────┘
│
+/// │ │
│
+/// │ ┌─────────────▼─────────────┐
│
+/// │ │ DataSourceExec │ partitioning=UnknownPartitioning
│
+/// │ └───────────────────────────┘
│
+///
└─────────────────────────────────────────────────────────────────────────────────────────────────────────┘
+///
+///
┌─────────────────────────────────────────────────────────────────────────────────────────────────────────┐
+/// │ without_optimization
│
+/// │ (preserve_file_partitions=disabled,
prefer_existing_sort=false) │
+/// │
│
+/// │ ┌───────────────────────────┐
│
+/// │ │ SortPreservingMergeExec │
│
+/// │ └─────────────┬─────────────┘
│
+/// │ │
│
+/// │ ┌─────────────▼─────────────┐
│
+/// │ │ AggregateExec │ FinalPartitioned
│
+/// │ │ (FinalPartitioned) │
│
+/// │ └─────────────┬─────────────┘
│
+/// │ │
│
+/// │ ┌─────────────▼─────────────┐
│
+/// │ │ SortExec │ Must sort after shuffle
│
+/// │ │ [f_dkey ASC] │
│
+/// │ └─────────────┬─────────────┘
│
+/// │ │
│
+/// │ ┌─────────────▼─────────────┐
│
+/// │ │ RepartitionExec │ Hash shuffle destroys ordering
│
+/// │ │ Hash([f_dkey], N) │
│
+/// │ └─────────────┬─────────────┘
│
+/// │ │
│
+/// │ ┌─────────────▼─────────────┐
│
+/// │ │ AggregateExec │
│
+/// │ │ (Partial) │
│
+/// │ └─────────────┬─────────────┘
│
+/// │ │
│
+/// │ ┌─────────────▼─────────────┐
│
+/// │ │ DataSourceExec │ partitioning=UnknownPartitioning
│
+/// │ └───────────────────────────┘
│
+///
└─────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Review Comment:
Nice description
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]