alamb commented on code in PR #19304:
URL: https://github.com/apache/datafusion/pull/19304#discussion_r2628676775


##########
datafusion/common/src/config.rs:
##########
@@ -1000,6 +1000,34 @@ config_namespace! {
         /// ```
         pub repartition_sorts: bool, default = true
 
+        /// Partition count threshold for subset satisfaction optimization.
+        ///
+        /// When the current partition count is >= this threshold, DataFusion 
will
+        /// skip repartitioning if the required partitioning expression is a 
subset
+        /// of the current partition expression such as Hash(a) satisfies 
Hash(a, b).
+        ///
+        /// When the current partition count is < this threshold, DataFusion 
will
+        /// repartition to increase parallelism even when subset satisfaction 
applies.
+        ///
+        /// Set to 0 to always repartition (disable subset satisfaction 
optimization).
+        /// Set to a high value to always use subset satisfaction.
+        ///
+        /// Example (subset_satisfaction_partition_threshold = 4):
+        /// ```text
+        ///     Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is 
subset of Hash([a])
+        ///
+        ///     If current partitions (3) < threshold (4), repartition:
+        ///     AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)]
+        ///       RepartitionExec: partitioning=Hash([a, b], 8), 
input_partitions=3
+        ///         AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)]
+        ///           DataSourceExec: file_groups={...}, 
output_partitioning=Hash([a], 3)
+        ///
+        ///     If current partitions (8) >= threshold (4), use subset 
satisfaction:
+        ///     AggregateExec: mode=SinglePartitioned, gby=[a, b], 
aggr=[SUM(x)]
+        ///       DataSourceExec: file_groups={...}, 
output_partitioning=Hash([a], 8)
+        /// ```
+        pub subset_satisfaction_partition_threshold: usize, default = 4

Review Comment:
   that is quite a name. Subset satisfaction. I like it



##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -339,4 +414,403 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_partitioning_satisfy_by_subset() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Int64, false),
+            Field::new("c", DataType::Int64, false),
+        ]));
+
+        let col_a: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("a", &schema)?);
+        let col_b: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("b", &schema)?);
+        let col_c: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("c", &schema)?);
+        let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
+
+        let test_cases = vec![
+            (
+                "Hash([a]) vs Hash([a, b])",
+                Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
+                Distribution::HashPartitioned(vec![
+                    Arc::clone(&col_a),
+                    Arc::clone(&col_b),
+                ]),
+                PartitioningSatisfaction::Subset,
+                PartitioningSatisfaction::NotSatisfied,
+            ),
+            (
+                "Hash([a]) vs Hash([a, b, c])",
+                Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
+                Distribution::HashPartitioned(vec![
+                    Arc::clone(&col_a),
+                    Arc::clone(&col_b),
+                    Arc::clone(&col_c),
+                ]),
+                PartitioningSatisfaction::Subset,
+                PartitioningSatisfaction::NotSatisfied,
+            ),
+            (
+                "Hash([a, b]) vs Hash([a, b, c])",

Review Comment:
   Thank you for these comments ❤️ 
   Can you also please add cases for 
   * `Hash([b]) vs Hash([a, b, c])`
   * `Hash([b, a]) vs Hash([a, b, c])`



##########
datafusion/common/src/config.rs:
##########
@@ -1000,6 +1000,34 @@ config_namespace! {
         /// ```
         pub repartition_sorts: bool, default = true
 
+        /// Partition count threshold for subset satisfaction optimization.

Review Comment:
   Something I don't fully understand after reading this PR is what is the 
rationale for defaulting this to 4? 
   
   It seems like this optimization will potentially slow things down only when 
there is large skew across partitions so that hashing on more columns would 
even out the work
   
   Is this something to do with trading off repartitioning to get more 
partitions and more cores involved?
   
   A little rationale here might help readers understand that



##########
datafusion/common/src/config.rs:
##########
@@ -1000,6 +1000,34 @@ config_namespace! {
         /// ```
         pub repartition_sorts: bool, default = true
 
+        /// Partition count threshold for subset satisfaction optimization.
+        ///
+        /// When the current partition count is >= this threshold, DataFusion 
will
+        /// skip repartitioning if the required partitioning expression is a 
subset
+        /// of the current partition expression such as Hash(a) satisfies 
Hash(a, b).
+        ///
+        /// When the current partition count is < this threshold, DataFusion 
will
+        /// repartition to increase parallelism even when subset satisfaction 
applies.
+        ///
+        /// Set to 0 to always repartition (disable subset satisfaction 
optimization).

Review Comment:
   this is great to document how to turn the feature off



##########
datafusion/sqllogictest/test_files/aggregate.slt:
##########
@@ -6193,14 +6193,12 @@ logical_plan
 04)------TableScan: aggregate_test_100 projection=[c1, c3]
 physical_plan
 01)CoalescePartitionsExec: fetch=5
-02)--AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, 
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
-03)----RepartitionExec: partitioning=Hash([c3@0, 
min(aggregate_test_100.c1)@1], 4), input_partitions=4
-04)------AggregateExec: mode=Partial, gby=[c3@0 as c3, 
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
-05)--------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], 
aggr=[min(aggregate_test_100.c1)]
-06)----------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
-07)------------AggregateExec: mode=Partial, gby=[c3@1 as c3], 
aggr=[min(aggregate_test_100.c1)]
-08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-09)----------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, 
projection=[c1, c3], file_type=csv, has_header=true
+02)--AggregateExec: mode=SinglePartitioned, gby=[c3@0 as c3, 
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]

Review Comment:
   that is certainly nicer -- it cuts out an entire repartition



##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1212,6 +1225,14 @@ pub fn ensure_distribution(
         plan = updated_window;
     };
 
+    // For joins in partitioned mode, we need exact hash matching between

Review Comment:
   Can you please add some comments that explain **why** joins need exact hash 
matching?  This comment explains what the code does (which you can get from 
reading the code, though this is a nice summary) but you can't get the intent 
from the comments
   
   I don't think it is for correctness as my understanding is that a subset of 
a hash will still ensure that all matching join keys are in the corresponding 
partitions. So is it performance related?
   
   Also, what about the other join types (like CrossJoin, NestedLoopsJoin)?



##########
datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt:
##########
@@ -0,0 +1,526 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for Subset Partitioning Optimization
+#
+# Subset partitioning allows Hash([a]) to satisfy Hash([a, b]) requirements
+# when the required partitioning expressions are a strict subset of the
+# current partitioning expressions.
+##########
+
+##########
+# SETUP: Configuration and Data Generation
+##########
+
+statement ok
+set datafusion.optimizer.enable_round_robin_repartition = false;
+
+statement ok
+set datafusion.execution.target_partitions = 3;
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+# Create fact table partitioned by f_dkey (3 partitions)
+# Each partition has data sorted by timestamp
+# Partition: f_dkey=A
+statement ok
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+    (TIMESTAMP '2023-01-01T09:00:00', 95.5),
+    (TIMESTAMP '2023-01-01T09:00:10', 102.3),
+    (TIMESTAMP '2023-01-01T09:00:20', 98.7),
+    (TIMESTAMP '2023-01-01T09:12:20', 105.1),
+    (TIMESTAMP '2023-01-01T09:12:30', 100.0),
+    (TIMESTAMP '2023-01-01T09:12:40', 150.0),
+    (TIMESTAMP '2023-01-01T09:12:50', 120.8)
+))
+TO 
'test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet'
+STORED AS PARQUET;
+
+# Partition: f_dkey=B
+statement ok
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+    (TIMESTAMP '2023-01-01T09:00:00', 75.2),
+    (TIMESTAMP '2023-01-01T09:00:10', 82.4),
+    (TIMESTAMP '2023-01-01T09:00:20', 78.9),
+    (TIMESTAMP '2023-01-01T09:00:30', 85.6),
+    (TIMESTAMP '2023-01-01T09:12:30', 80.0),
+    (TIMESTAMP '2023-01-01T09:12:40', 120.0),
+    (TIMESTAMP '2023-01-01T09:12:50', 92.3)
+))
+TO 
'test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet'
+STORED AS PARQUET;
+
+# Partition: f_dkey=C
+statement ok
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+    (TIMESTAMP '2023-01-01T09:00:00', 300.5),
+    (TIMESTAMP '2023-01-01T09:00:10', 285.7),
+    (TIMESTAMP '2023-01-01T09:00:20', 310.2),
+    (TIMESTAMP '2023-01-01T09:00:30', 295.8),
+    (TIMESTAMP '2023-01-01T09:00:40', 300.0),
+    (TIMESTAMP '2023-01-01T09:12:40', 250.0),
+    (TIMESTAMP '2023-01-01T09:12:50', 275.4)
+))
+TO 
'test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet'
+STORED AS PARQUET;
+
+# Create dimension table partitioned by d_dkey (4 partitions)
+query I
+COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES
+    ('dev', 'log', 'ma')
+))
+TO 
'test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES
+    ('prod', 'log', 'ma')
+))
+TO 
'test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES
+    ('prod', 'log', 'vim')
+))
+TO 
'test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES
+    ('prod', 'trace', 'vim')
+))
+TO 
'test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+##########
+# TABLE DECLARATIONS
+##########
+
+# Fact table with ordering
+statement ok
+CREATE EXTERNAL TABLE fact_table_ordered (timestamp TIMESTAMP, value DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+WITH ORDER (f_dkey ASC, timestamp ASC)
+LOCATION 'test_files/scratch/repartition_subset_satisfaction/fact/';
+
+# Dimension table (for join tests)
+statement ok
+CREATE EXTERNAL TABLE dimension_table (env STRING, service STRING, host STRING)
+STORED AS PARQUET
+PARTITIONED BY (d_dkey STRING)
+LOCATION 'test_files/scratch/repartition_subset_satisfaction/dimension/';
+
+##########
+# TEST 1: Basic Aggregate with Subset Partitioning
+# Demonstrates that GROUP BY [f_dkey, time_bin] can use
+# file partitioning on just [f_dkey]
+##########
+
+# With subset repartitioning forced (disables subset optimization)
+statement ok
+set datafusion.optimizer.subset_satisfaction_partition_threshold = 4;
+
+query TT
+EXPLAIN SELECT f_dkey, date_bin(INTERVAL '30 seconds', timestamp) as time_bin,
+       COUNT(*), AVG(value)
+FROM fact_table_ordered
+GROUP BY f_dkey, date_bin(INTERVAL '30 seconds', timestamp)
+ORDER BY f_dkey, time_bin;
+----
+logical_plan
+01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST, time_bin ASC NULLS LAST
+02)--Projection: fact_table_ordered.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp) AS time_bin, 
count(Int64(1)) AS count(*), avg(fact_table_ordered.value)
+03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"), fact_table_ordered.timestamp)]], 
aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]]
+04)------TableScan: fact_table_ordered projection=[timestamp, value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, time_bin@1 ASC NULLS 
LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as time_bin, 
count(Int64(1))@2 as count(*), avg(fact_table_ordered.value)@3 as 
avg(fact_table_ordered.value)]
+03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], 
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 ASC NULLS LAST], 
preserve_partitioning=[true]
+05)--------RepartitionExec: partitioning=Hash([f_dkey@0, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1], 3), 
input_partitions=3
+06)----------AggregateExec: mode=Partial, gby=[f_dkey@2 as f_dkey, 
date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, 
timestamp@0) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 
0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], 
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+07)------------DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]},
 projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS 
LAST, timestamp@0 ASC NULLS LAST], file_type=parquet
+
+# Verify results without subset satisfaction
+query TPIR rowsort
+SELECT f_dkey, date_bin(INTERVAL '30 seconds', timestamp) as time_bin,
+       COUNT(*), AVG(value)
+FROM fact_table_ordered
+GROUP BY f_dkey, date_bin(INTERVAL '30 seconds', timestamp)
+ORDER BY f_dkey, time_bin;
+----
+A 2023-01-01T09:00:00 3 98.833333333333
+A 2023-01-01T09:12:00 1 105.1
+A 2023-01-01T09:12:30 3 123.6
+B 2023-01-01T09:00:00 3 78.833333333333
+B 2023-01-01T09:00:30 1 85.6
+B 2023-01-01T09:12:30 3 97.433333333333
+C 2023-01-01T09:00:00 3 298.8
+C 2023-01-01T09:00:30 2 297.9
+C 2023-01-01T09:12:30 2 262.7
+
+# With subset logic enabled (default - enables subset optimization)
+statement ok
+set datafusion.optimizer.subset_satisfaction_partition_threshold = 1;
+
+query TT
+EXPLAIN SELECT f_dkey, date_bin(INTERVAL '30 seconds', timestamp) as time_bin,
+       COUNT(*), AVG(value)
+FROM fact_table_ordered
+GROUP BY f_dkey, date_bin(INTERVAL '30 seconds', timestamp)
+ORDER BY f_dkey, time_bin;
+----
+logical_plan
+01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST, time_bin ASC NULLS LAST
+02)--Projection: fact_table_ordered.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp) AS time_bin, 
count(Int64(1)) AS count(*), avg(fact_table_ordered.value)
+03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"), fact_table_ordered.timestamp)]], 
aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]]
+04)------TableScan: fact_table_ordered projection=[timestamp, value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, time_bin@1 ASC NULLS 
LAST]

Review Comment:
   a nice single grouping 👌 



##########
datafusion/sqllogictest/test_files/tpch/plans/q18.slt.part:
##########
@@ -69,21 +69,19 @@ logical_plan
 physical_plan
 01)SortPreservingMergeExec: [o_totalprice@4 DESC, o_orderdate@3 ASC NULLS LAST]
 02)--SortExec: expr=[o_totalprice@4 DESC, o_orderdate@3 ASC NULLS LAST], 
preserve_partitioning=[true]
-03)----AggregateExec: mode=FinalPartitioned, gby=[c_name@0 as c_name, 
c_custkey@1 as c_custkey, o_orderkey@2 as o_orderkey, o_orderdate@3 as 
o_orderdate, o_totalprice@4 as o_totalprice], aggr=[sum(lineitem.l_quantity)]
-04)------RepartitionExec: partitioning=Hash([c_name@0, c_custkey@1, 
o_orderkey@2, o_orderdate@3, o_totalprice@4], 4), input_partitions=4
-05)--------AggregateExec: mode=Partial, gby=[c_name@1 as c_name, c_custkey@0 
as c_custkey, o_orderkey@2 as o_orderkey, o_orderdate@4 as o_orderdate, 
o_totalprice@3 as o_totalprice], aggr=[sum(lineitem.l_quantity)]
-06)----------HashJoinExec: mode=Partitioned, join_type=LeftSemi, 
on=[(o_orderkey@2, l_orderkey@0)]
-07)------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(o_orderkey@2, l_orderkey@0)], projection=[c_custkey@0, c_name@1, 
o_orderkey@2, o_totalprice@3, o_orderdate@4, l_quantity@6]
-08)--------------RepartitionExec: partitioning=Hash([o_orderkey@2], 4), 
input_partitions=4
-09)----------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(c_custkey@0, o_custkey@1)], projection=[c_custkey@0, c_name@1, 
o_orderkey@2, o_totalprice@4, o_orderdate@5]
-10)------------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), 
input_partitions=1
-11)--------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, 
projection=[c_custkey, c_name], file_type=csv, has_header=false
-12)------------------RepartitionExec: partitioning=Hash([o_custkey@1], 4), 
input_partitions=4
-13)--------------------DataSourceExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]},
 projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate], file_type=csv, 
has_header=false
-14)--------------RepartitionExec: partitioning=Hash([l_orderkey@0], 4), 
input_partitions=4
-15)----------------DataSourceExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_orderkey, l_quantity], file_type=csv, has_header=false
-16)------------FilterExec: sum(lineitem.l_quantity)@1 > Some(30000),25,2, 
projection=[l_orderkey@0]
-17)--------------AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as 
l_orderkey], aggr=[sum(lineitem.l_quantity)]
-18)----------------RepartitionExec: partitioning=Hash([l_orderkey@0], 4), 
input_partitions=4
-19)------------------AggregateExec: mode=Partial, gby=[l_orderkey@0 as 
l_orderkey], aggr=[sum(lineitem.l_quantity)]
-20)--------------------DataSourceExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_orderkey, l_quantity], file_type=csv, has_header=false
+03)----AggregateExec: mode=SinglePartitioned, gby=[c_name@1 as c_name, 
c_custkey@0 as c_custkey, o_orderkey@2 as o_orderkey, o_orderdate@4 as 
o_orderdate, o_totalprice@3 as o_totalprice], aggr=[sum(lineitem.l_quantity)]

Review Comment:
   this is the same improvement as in q16 -- there is no repartitioning for the 
final aggregateexec



##########
datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part:
##########
@@ -71,19 +71,17 @@ physical_plan
 04)------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand, 
p_type@1 as p_type, p_size@2 as p_size], aggr=[count(alias1)]
 05)--------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1, p_size@2], 
4), input_partitions=4
 06)----------AggregateExec: mode=Partial, gby=[p_brand@0 as p_brand, p_type@1 
as p_type, p_size@2 as p_size], aggr=[count(alias1)]
-07)------------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as 
p_brand, p_type@1 as p_type, p_size@2 as p_size, alias1@3 as alias1], aggr=[]
-08)--------------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1, 
p_size@2, alias1@3], 4), input_partitions=4
-09)----------------AggregateExec: mode=Partial, gby=[p_brand@1 as p_brand, 
p_type@2 as p_type, p_size@3 as p_size, ps_suppkey@0 as alias1], aggr=[]
-10)------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, 
on=[(ps_suppkey@0, s_suppkey@0)]
-11)--------------------RepartitionExec: partitioning=Hash([ps_suppkey@0], 4), 
input_partitions=4
-12)----------------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(ps_partkey@0, p_partkey@0)], projection=[ps_suppkey@1, p_brand@3, 
p_type@4, p_size@5]
-13)------------------------RepartitionExec: partitioning=Hash([ps_partkey@0], 
4), input_partitions=4
-14)--------------------------DataSourceExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]},
 projection=[ps_partkey, ps_suppkey], file_type=csv, has_header=false
-15)------------------------RepartitionExec: partitioning=Hash([p_partkey@0], 
4), input_partitions=4
-16)--------------------------FilterExec: p_brand@1 != Brand#45 AND p_type@2 
NOT LIKE MEDIUM POLISHED% AND p_size@3 IN (SET) ([49, 14, 23, 45, 19, 3, 36, 9])
-17)----------------------------RepartitionExec: 
partitioning=RoundRobinBatch(4), input_partitions=1
-18)------------------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]}, 
projection=[p_partkey, p_brand, p_type, p_size], file_type=csv, has_header=false
-19)--------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), 
input_partitions=4
-20)----------------------FilterExec: s_comment@1 LIKE %Customer%Complaints%, 
projection=[s_suppkey@0]
-21)------------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-22)--------------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, 
projection=[s_suppkey, s_comment], file_type=csv, has_header=false
+07)------------AggregateExec: mode=SinglePartitioned, gby=[p_brand@1 as 
p_brand, p_type@2 as p_type, p_size@3 as p_size, ps_suppkey@0 as alias1], 
aggr=[]

Review Comment:
   this plan looks significantly better as it removes one layer of hash 
repartitioning / partial grouping.
   
   I also double checked that I think it is correct (my reasoning is below)
   
   <details>
   <summary>details</summary>
   
   The first group is grouping on
   
   ```
   GROUP BY p_branch, p_type, p_size, ps_suppkey
   ```
   
   However, the join partitioned on `ps_partkey` and `p_partkey`
   
   So that means any particular key will only ever be in one  partition
   </details>
   



##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -148,51 +170,94 @@ impl Partitioning {
         }
     }
 
-    /// Returns true when the guarantees made by this [`Partitioning`] are 
sufficient to
-    /// satisfy the partitioning scheme mandated by the `required` 
[`Distribution`].
+    /// Returns true if `subset_exprs` is a subset of `exprs`.
+    /// For example: Hash(a, b) is subset of Hash(a) since a partition with 
all occurrences of
+    /// a distinct (a) must also contain all occurrences of a distinct (a, b) 
with the same (a).
+    fn is_subset_partitioning(
+        subset_exprs: &[Arc<dyn PhysicalExpr>],
+        superset_exprs: &[Arc<dyn PhysicalExpr>],
+    ) -> bool {
+        // Require strict subset: fewer expressions, not equal
+        if subset_exprs.is_empty() || subset_exprs.len() >= 
superset_exprs.len() {
+            return false;
+        }
+
+        subset_exprs.iter().all(|subset_expr| {
+            superset_exprs
+                .iter()
+                .any(|superset_expr| subset_expr.eq(superset_expr))
+        })
+    }
+
+    /// Returns how this [`Partitioning`] satisfies the partitioning scheme 
mandated
+    /// by the `required` [`Distribution`].
     pub fn satisfy(
         &self,
         required: &Distribution,
         eq_properties: &EquivalenceProperties,
-    ) -> bool {
+        allow_subset: bool,

Review Comment:
   this is technically an API change (which is fine)
   
   However a nicer change for downstream consumers would be to leave the API 
and deprecate it and add a new function as described in 
https://datafusion.apache.org/contributor-guide/api-health.html#deprecation-guidelines
   
   
   Something like
   
   ```rust
       #[deprecated(since = "52.0.0", note = "Use satisfaction instead")]
       pub fn satisfy(
           &self,
           required: &Distribution,
           eq_properties: &EquivalenceProperties) -> bool {
       self.satisfaction(required, eq_props, false) == 
PartitioningSatisfaction::Exact
       }
   
       /// Returns how this [`Partitioning`] satisfies the partitioning scheme 
mandated
       /// by the `required` [`Distribution`].
       pub fn satisfaction(
           &self,
           required: &Distribution,
           eq_properties: &EquivalenceProperties,
           allow_subset: bool,
       ) -> { 
   ....
   ```



##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -339,4 +414,403 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_partitioning_satisfy_by_subset() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Int64, false),
+            Field::new("c", DataType::Int64, false),
+        ]));
+
+        let col_a: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("a", &schema)?);
+        let col_b: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("b", &schema)?);
+        let col_c: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("c", &schema)?);
+        let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
+
+        let test_cases = vec![
+            (
+                "Hash([a]) vs Hash([a, b])",
+                Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
+                Distribution::HashPartitioned(vec![
+                    Arc::clone(&col_a),
+                    Arc::clone(&col_b),
+                ]),
+                PartitioningSatisfaction::Subset,
+                PartitioningSatisfaction::NotSatisfied,
+            ),
+            (
+                "Hash([a]) vs Hash([a, b, c])",
+                Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
+                Distribution::HashPartitioned(vec![
+                    Arc::clone(&col_a),
+                    Arc::clone(&col_b),
+                    Arc::clone(&col_c),
+                ]),
+                PartitioningSatisfaction::Subset,
+                PartitioningSatisfaction::NotSatisfied,
+            ),
+            (
+                "Hash([a, b]) vs Hash([a, b, c])",
+                Partitioning::Hash(vec![Arc::clone(&col_a), 
Arc::clone(&col_b)], 4),
+                Distribution::HashPartitioned(vec![
+                    Arc::clone(&col_a),
+                    Arc::clone(&col_b),
+                    Arc::clone(&col_c),
+                ]),
+                PartitioningSatisfaction::Subset,
+                PartitioningSatisfaction::NotSatisfied,
+            ),
+        ];
+
+        for (desc, partition, required, expected_with_subset, 
expected_without_subset) in
+            test_cases
+        {
+            let result = partition.satisfy(&required, &eq_properties, true);
+            assert_eq!(
+                result, expected_with_subset,
+                "Failed for {desc} with subset enabled"
+            );
+
+            let result = partition.satisfy(&required, &eq_properties, false);
+            assert_eq!(
+                result, expected_without_subset,
+                "Failed for {desc} with subset disabled"
+            );
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_partitioning_current_superset() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Int64, false),
+            Field::new("c", DataType::Int64, false),
+        ]));
+
+        let col_a: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("a", &schema)?);
+        let col_b: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("b", &schema)?);
+        let col_c: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("c", &schema)?);
+        let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
+
+        let test_cases = vec![
+            (
+                "Hash([a, b]) vs Hash([a])",
+                Partitioning::Hash(vec![Arc::clone(&col_a), 
Arc::clone(&col_b)], 4),
+                Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
+                PartitioningSatisfaction::NotSatisfied,
+                PartitioningSatisfaction::NotSatisfied,
+            ),
+            (
+                "Hash([a, b, c]) vs Hash([a])",
+                Partitioning::Hash(
+                    vec![Arc::clone(&col_a), Arc::clone(&col_b), 
Arc::clone(&col_c)],
+                    4,
+                ),
+                Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
+                PartitioningSatisfaction::NotSatisfied,
+                PartitioningSatisfaction::NotSatisfied,
+            ),
+            (
+                "Hash([a, b, c]) vs Hash([a, b])",
+                Partitioning::Hash(
+                    vec![Arc::clone(&col_a), Arc::clone(&col_b), 
Arc::clone(&col_c)],
+                    4,
+                ),
+                Distribution::HashPartitioned(vec![
+                    Arc::clone(&col_a),
+                    Arc::clone(&col_b),
+                ]),
+                PartitioningSatisfaction::NotSatisfied,
+                PartitioningSatisfaction::NotSatisfied,
+            ),
+        ];
+
+        for (desc, partition, required, expected_with_subset, 
expected_without_subset) in

Review Comment:
   There seems to be a lot of duplication in these tests (the input schemas are 
the same and the loop over cases is the same). Is there a reason not to just 
make one large list of cases in a single function? It would probably be 
significantly less verbose



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to