NGA-TRAN commented on code in PR #19304:
URL: https://github.com/apache/datafusion/pull/19304#discussion_r2620695566


##########
datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt:
##########
@@ -0,0 +1,528 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for Subset Partitioning Optimization
+#
+# Subset partitioning allows Hash([a]) to satisfy Hash([a, b]) requirements
+# when the required partitioning expressions are a strict subset of the
+# current partitioning expressions.
+##########
+
+##########
+# SETUP: Configuration and Data Generation
+##########
+
+statement ok
+set datafusion.optimizer.enable_round_robin_repartition = false;
+
+statement ok
+set datafusion.optimizer.preserve_file_partitions = 1;
+
+statement ok
+set datafusion.execution.target_partitions = 3;
+
+# Create fact table partitioned by f_dkey (3 partitions)
+# Each partition has data sorted by timestamp
+# Partition: f_dkey=A
+statement ok
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+    (TIMESTAMP '2023-01-01T09:00:00', 95.5),
+    (TIMESTAMP '2023-01-01T09:00:10', 102.3),
+    (TIMESTAMP '2023-01-01T09:00:20', 98.7),
+    (TIMESTAMP '2023-01-01T09:12:20', 105.1),
+    (TIMESTAMP '2023-01-01T09:12:30', 100.0),
+    (TIMESTAMP '2023-01-01T09:12:40', 150.0),
+    (TIMESTAMP '2023-01-01T09:12:50', 120.8)
+))
+TO 
'test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet'
+STORED AS PARQUET;
+
+# Partition: f_dkey=B
+statement ok
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+    (TIMESTAMP '2023-01-01T09:00:00', 75.2),
+    (TIMESTAMP '2023-01-01T09:00:10', 82.4),
+    (TIMESTAMP '2023-01-01T09:00:20', 78.9),
+    (TIMESTAMP '2023-01-01T09:00:30', 85.6),
+    (TIMESTAMP '2023-01-01T09:12:30', 80.0),
+    (TIMESTAMP '2023-01-01T09:12:40', 120.0),
+    (TIMESTAMP '2023-01-01T09:12:50', 92.3)
+))
+TO 
'test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet'
+STORED AS PARQUET;
+
+# Partition: f_dkey=C
+statement ok
+COPY (SELECT column1 as timestamp, column2 as value FROM (VALUES
+    (TIMESTAMP '2023-01-01T09:00:00', 300.5),
+    (TIMESTAMP '2023-01-01T09:00:10', 285.7),
+    (TIMESTAMP '2023-01-01T09:00:20', 310.2),
+    (TIMESTAMP '2023-01-01T09:00:30', 295.8),
+    (TIMESTAMP '2023-01-01T09:00:40', 300.0),
+    (TIMESTAMP '2023-01-01T09:12:40', 250.0),
+    (TIMESTAMP '2023-01-01T09:12:50', 275.4)
+))
+TO 
'test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet'
+STORED AS PARQUET;
+
+# Create dimension table partitioned by d_dkey (4 partitions)
+query I
+COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES
+    ('dev', 'log', 'ma')
+))
+TO 
'test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES
+    ('prod', 'log', 'ma')
+))
+TO 
'test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES
+    ('prod', 'log', 'vim')
+))
+TO 
'test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT column1 as env, column2 as service, column3 as host FROM (VALUES
+    ('prod', 'trace', 'vim')
+))
+TO 
'test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+##########
+# TABLE DECLARATIONS
+##########
+
+# Fact table with ordering
+statement ok
+CREATE EXTERNAL TABLE fact_table_ordered (timestamp TIMESTAMP, value DOUBLE)
+STORED AS PARQUET
+PARTITIONED BY (f_dkey STRING)
+WITH ORDER (f_dkey ASC, timestamp ASC)
+LOCATION 'test_files/scratch/repartition_subset_satisfaction/fact/';
+
+# Dimension table (for join tests)
+statement ok
+CREATE EXTERNAL TABLE dimension_table (env STRING, service STRING, host STRING)
+STORED AS PARQUET
+PARTITIONED BY (d_dkey STRING)
+LOCATION 'test_files/scratch/repartition_subset_satisfaction/dimension/';
+
+##########
+# TEST 1: Basic Aggregate with Subset Partitioning
+# Demonstrates that GROUP BY [f_dkey, time_bin] can use
+# file partitioning on just [f_dkey]
+##########
+
+# With subset repartitioning forced (disables subset optimization)
+statement ok
+set datafusion.optimizer.repartition_subset_satisfactions = true;
+
+query TT
+EXPLAIN SELECT f_dkey, date_bin(INTERVAL '30 seconds', timestamp) as time_bin,
+       COUNT(*), AVG(value)
+FROM fact_table_ordered
+GROUP BY f_dkey, date_bin(INTERVAL '30 seconds', timestamp)
+ORDER BY f_dkey, time_bin;
+----
+logical_plan
+01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST, time_bin ASC NULLS LAST
+02)--Projection: fact_table_ordered.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp) AS time_bin, 
count(Int64(1)) AS count(*), avg(fact_table_ordered.value)
+03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"), fact_table_ordered.timestamp)]], 
aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]]
+04)------TableScan: fact_table_ordered projection=[timestamp, value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, time_bin@1 ASC NULLS 
LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as time_bin, 
count(Int64(1))@2 as count(*), avg(fact_table_ordered.value)@3 as 
avg(fact_table_ordered.value)]
+03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], 
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 ASC NULLS LAST], 
preserve_partitioning=[true]
+05)--------CoalesceBatchesExec: target_batch_size=8192
+06)----------RepartitionExec: partitioning=Hash([f_dkey@0, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1], 3), 
input_partitions=3
+07)------------AggregateExec: mode=Partial, gby=[f_dkey@2 as f_dkey, 
date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, 
timestamp@0) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 
0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], 
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+08)--------------DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]},
 projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS 
LAST, timestamp@0 ASC NULLS LAST], file_type=parquet
+
+# Verify results without subset satisfaction
+query TPIR rowsort
+SELECT f_dkey, date_bin(INTERVAL '30 seconds', timestamp) as time_bin,
+       COUNT(*), AVG(value)
+FROM fact_table_ordered
+GROUP BY f_dkey, date_bin(INTERVAL '30 seconds', timestamp)
+ORDER BY f_dkey, time_bin;
+----
+A 2023-01-01T09:00:00 3 98.833333333333
+A 2023-01-01T09:12:00 1 105.1
+A 2023-01-01T09:12:30 3 123.6
+B 2023-01-01T09:00:00 3 78.833333333333
+B 2023-01-01T09:00:30 1 85.6
+B 2023-01-01T09:12:30 3 97.433333333333
+C 2023-01-01T09:00:00 3 298.8
+C 2023-01-01T09:00:30 2 297.9
+C 2023-01-01T09:12:30 2 262.7
+
+# With subset logic enabled (default - enables subset optimization)
+statement ok
+set datafusion.optimizer.repartition_subset_satisfactions = false;
+
+query TT
+EXPLAIN SELECT f_dkey, date_bin(INTERVAL '30 seconds', timestamp) as time_bin,
+       COUNT(*), AVG(value)
+FROM fact_table_ordered
+GROUP BY f_dkey, date_bin(INTERVAL '30 seconds', timestamp)
+ORDER BY f_dkey, time_bin;
+----
+logical_plan
+01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST, time_bin ASC NULLS LAST
+02)--Projection: fact_table_ordered.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp) AS time_bin, 
count(Int64(1)) AS count(*), avg(fact_table_ordered.value)
+03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"), fact_table_ordered.timestamp)]], 
aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]]
+04)------TableScan: fact_table_ordered projection=[timestamp, value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, time_bin@1 ASC NULLS 
LAST]
+02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as time_bin, 
count(Int64(1))@2 as count(*), avg(fact_table_ordered.value)@3 as 
avg(fact_table_ordered.value)]
+03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@2 as f_dkey, 
date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, 
timestamp@0) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 
0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], 
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+04)------DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]},
 projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS 
LAST, timestamp@0 ASC NULLS LAST], file_type=parquet
+
+# Verify results match with subset satisfaction
+query TPIR rowsort
+SELECT f_dkey, date_bin(INTERVAL '30 seconds', timestamp) as time_bin,
+       COUNT(*), AVG(value)
+FROM fact_table_ordered
+GROUP BY f_dkey, date_bin(INTERVAL '30 seconds', timestamp)
+ORDER BY f_dkey, time_bin;
+----
+A 2023-01-01T09:00:00 3 98.833333333333
+A 2023-01-01T09:12:00 1 105.1
+A 2023-01-01T09:12:30 3 123.6
+B 2023-01-01T09:00:00 3 78.833333333333
+B 2023-01-01T09:00:30 1 85.6
+B 2023-01-01T09:12:30 3 97.433333333333
+C 2023-01-01T09:00:00 3 298.8
+C 2023-01-01T09:00:30 2 297.9
+C 2023-01-01T09:12:30 2 262.7
+
+##########
+# TEST 2: Window Functions with Subset Partitioning
+# Demonstrates that PARTITION BY [f_dkey, time_bin] can use
+# file partitioning on just [f_dkey]
+##########
+
+# With subset repartitioning forced (disables subset optimization)
+statement ok
+set datafusion.optimizer.repartition_subset_satisfactions = true;
+
+query TT
+EXPLAIN SELECT f_dkey, timestamp, value,
+       ROW_NUMBER() OVER (
+           PARTITION BY f_dkey, date_bin(INTERVAL '30 seconds', timestamp)
+           ORDER BY timestamp
+       ) as rn
+FROM fact_table_ordered;
+----
+logical_plan
+01)Projection: fact_table_ordered.f_dkey, fact_table_ordered.timestamp, 
fact_table_ordered.value, row_number() PARTITION BY [fact_table_ordered.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)] ORDER BY 
[fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS rn
+02)--WindowAggr: windowExpr=[[row_number() PARTITION BY 
[fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano 
{ months: 0, days: 0, nanoseconds: 30000000000 }"), 
fact_table_ordered.timestamp)] ORDER BY [fact_table_ordered.timestamp ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+03)----TableScan: fact_table_ordered projection=[timestamp, value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@2 as f_dkey, timestamp@0 as timestamp, value@1 
as value, row_number() PARTITION BY [fact_table_ordered.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)] ORDER BY 
[fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@3 as rn]
+02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY 
[fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano 
{ months: 0, days: 0, nanoseconds: 30000000000 
}"),fact_table_ordered.timestamp)] ORDER BY [fact_table_ordered.timestamp ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { 
"row_number() PARTITION BY [fact_table_ordered.f_dkey, 
date_bin(IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }\"),fact_table_ordered.timestamp)] ORDER BY 
[fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], mode=[Sorted]
+03)----SortExec: expr=[f_dkey@2 ASC NULLS LAST, date_bin(IntervalMonthDayNano 
{ months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@0) ASC NULLS LAST, 
timestamp@0 ASC NULLS LAST], preserve_partitioning=[true]
+04)------CoalesceBatchesExec: target_batch_size=8192
+05)--------RepartitionExec: partitioning=Hash([f_dkey@2, 
date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, 
timestamp@0)], 3), input_partitions=3
+06)----------DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]},
 projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS 
LAST, timestamp@0 ASC NULLS LAST], file_type=parquet
+
+# Verify results without subset satisfaction
+query TPRI rowsort
+SELECT f_dkey, timestamp, value,
+       ROW_NUMBER() OVER (
+           PARTITION BY f_dkey, date_bin(INTERVAL '30 seconds', timestamp)
+           ORDER BY timestamp
+       ) as rn
+FROM fact_table_ordered
+WHERE timestamp < TIMESTAMP '2023-01-01T09:00:30';
+----
+A 2023-01-01T09:00:00 95.5 1
+A 2023-01-01T09:00:10 102.3 2
+A 2023-01-01T09:00:20 98.7 3
+B 2023-01-01T09:00:00 75.2 1
+B 2023-01-01T09:00:10 82.4 2
+B 2023-01-01T09:00:20 78.9 3
+C 2023-01-01T09:00:00 300.5 1
+C 2023-01-01T09:00:10 285.7 2
+C 2023-01-01T09:00:20 310.2 3
+
+# With subset logic enabled (default - enables subset optimization)
+statement ok
+set datafusion.optimizer.repartition_subset_satisfactions = false;
+
+query TT
+EXPLAIN SELECT f_dkey, timestamp, value,
+       ROW_NUMBER() OVER (
+           PARTITION BY f_dkey, date_bin(INTERVAL '30 seconds', timestamp)
+           ORDER BY timestamp
+       ) as rn
+FROM fact_table_ordered;
+----
+logical_plan
+01)Projection: fact_table_ordered.f_dkey, fact_table_ordered.timestamp, 
fact_table_ordered.value, row_number() PARTITION BY [fact_table_ordered.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)] ORDER BY 
[fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS rn
+02)--WindowAggr: windowExpr=[[row_number() PARTITION BY 
[fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano 
{ months: 0, days: 0, nanoseconds: 30000000000 }"), 
fact_table_ordered.timestamp)] ORDER BY [fact_table_ordered.timestamp ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+03)----TableScan: fact_table_ordered projection=[timestamp, value, f_dkey]
+physical_plan
+01)ProjectionExec: expr=[f_dkey@2 as f_dkey, timestamp@0 as timestamp, value@1 
as value, row_number() PARTITION BY [fact_table_ordered.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)] ORDER BY 
[fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@3 as rn]
+02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY 
[fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano 
{ months: 0, days: 0, nanoseconds: 30000000000 
}"),fact_table_ordered.timestamp)] ORDER BY [fact_table_ordered.timestamp ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { 
"row_number() PARTITION BY [fact_table_ordered.f_dkey, 
date_bin(IntervalMonthDayNano(\"IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }\"),fact_table_ordered.timestamp)] ORDER BY 
[fact_table_ordered.timestamp ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], mode=[Sorted]
+03)----DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]},
 projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS 
LAST, timestamp@0 ASC NULLS LAST], file_type=parquet
+
+# Verify results match with subset satisfaction
+query TPRI rowsort
+SELECT f_dkey, timestamp, value,
+       ROW_NUMBER() OVER (
+           PARTITION BY f_dkey, date_bin(INTERVAL '30 seconds', timestamp)
+           ORDER BY timestamp
+       ) as rn
+FROM fact_table_ordered
+WHERE timestamp < TIMESTAMP '2023-01-01T09:00:30';
+----
+A 2023-01-01T09:00:00 95.5 1
+A 2023-01-01T09:00:10 102.3 2
+A 2023-01-01T09:00:20 98.7 3
+B 2023-01-01T09:00:00 75.2 1
+B 2023-01-01T09:00:10 82.4 2
+B 2023-01-01T09:00:20 78.9 3
+C 2023-01-01T09:00:00 300.5 1
+C 2023-01-01T09:00:10 285.7 2
+C 2023-01-01T09:00:20 310.2 3
+
+##########
+# TEST 3: Complex Join and Aggregate with Subset Partitioning
+# Demonstrates subset partitioning with joins and nested aggregations
+##########
+
+# With subset repartitioning forced (disables subset optimization)
+statement ok
+set datafusion.optimizer.repartition_subset_satisfactions = true;
+
+query TT
+EXPLAIN SELECT env, time_bin, AVG(max_bin_val)
+FROM
+  (
+    SELECT  f_dkey,
+            date_bin(INTERVAL '30 seconds', timestamp) AS time_bin,
+            MAX(env) AS env,
+            MAX(value) AS max_bin_val
+    FROM
+    (
+        SELECT f.f_dkey, f.timestamp, d.env, f.value
+        FROM
+            (SELECT DISTINCT d_dkey, env
+            FROM   dimension_table
+            WHERE  service = 'log'
+            ) AS d,
+            fact_table_ordered AS f
+        WHERE d.d_dkey = f.f_dkey
+    ) AS j
+    GROUP BY f_dkey, time_bin
+  ) AS a
+GROUP BY env, time_bin
+ORDER BY env, time_bin;
+----
+logical_plan
+01)Sort: a.env ASC NULLS LAST, a.time_bin ASC NULLS LAST
+02)--Aggregate: groupBy=[[a.env, a.time_bin]], aggr=[[avg(a.max_bin_val)]]
+03)----SubqueryAlias: a
+04)------Projection: date_bin(IntervalMonthDayNano("IntervalMonthDayNano { 
months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp) AS time_bin, 
max(j.env) AS env, max(j.value) AS max_bin_val
+05)--------Aggregate: groupBy=[[j.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"), j.timestamp)]], aggr=[[max(j.env), max(j.value)]]
+06)----------SubqueryAlias: j
+07)------------Projection: f.f_dkey, f.timestamp, d.env, f.value
+08)--------------Inner Join: d.d_dkey = f.f_dkey
+09)----------------SubqueryAlias: d
+10)------------------Aggregate: groupBy=[[dimension_table.d_dkey, 
dimension_table.env]], aggr=[[]]
+11)--------------------Projection: dimension_table.d_dkey, dimension_table.env
+12)----------------------Filter: dimension_table.service = Utf8View("log")
+13)------------------------TableScan: dimension_table projection=[env, 
service, d_dkey], partial_filters=[dimension_table.service = Utf8View("log")]
+14)----------------SubqueryAlias: f
+15)------------------TableScan: fact_table_ordered projection=[timestamp, 
value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [env@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST]
+02)--SortExec: expr=[env@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST], 
preserve_partitioning=[true]
+03)----AggregateExec: mode=FinalPartitioned, gby=[env@0 as env, time_bin@1 as 
time_bin], aggr=[avg(a.max_bin_val)]
+04)------CoalesceBatchesExec: target_batch_size=8192
+05)--------RepartitionExec: partitioning=Hash([env@0, time_bin@1], 3), 
input_partitions=3
+06)----------AggregateExec: mode=Partial, gby=[env@1 as env, time_bin@0 as 
time_bin], aggr=[avg(a.max_bin_val)]
+07)------------ProjectionExec: 
expr=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),j.timestamp)@1 as time_bin, max(j.env)@2 as env, 
max(j.value)@3 as max_bin_val]
+08)--------------AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as 
f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 
0, nanoseconds: 30000000000 }"),j.timestamp)@1 as 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),j.timestamp)], aggr=[max(j.env), max(j.value)], 
ordering_mode=Sorted
+09)----------------SortExec: expr=[f_dkey@0 ASC NULLS LAST, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),j.timestamp)@1 ASC NULLS LAST], 
preserve_partitioning=[true]
+10)------------------CoalesceBatchesExec: target_batch_size=8192
+11)--------------------RepartitionExec: partitioning=Hash([f_dkey@0, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),j.timestamp)@1], 3), input_partitions=3
+12)----------------------AggregateExec: mode=Partial, gby=[f_dkey@0 as f_dkey, 
date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, 
timestamp@1) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 
0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)], aggr=[max(j.env), 
max(j.value)], ordering_mode=Sorted
+13)------------------------ProjectionExec: expr=[f_dkey@3 as f_dkey, 
timestamp@1 as timestamp, env@0 as env, value@2 as value]
+14)--------------------------HashJoinExec: mode=CollectLeft, join_type=Inner, 
on=[(d_dkey@0, f_dkey@2)], projection=[env@1, timestamp@2, value@3, f_dkey@4]
+15)----------------------------CoalescePartitionsExec
+16)------------------------------AggregateExec: mode=FinalPartitioned, 
gby=[d_dkey@0 as d_dkey, env@1 as env], aggr=[]
+17)--------------------------------CoalesceBatchesExec: target_batch_size=8192
+18)----------------------------------RepartitionExec: 
partitioning=Hash([d_dkey@0, env@1], 3), input_partitions=3
+19)------------------------------------AggregateExec: mode=Partial, 
gby=[d_dkey@0 as d_dkey, env@1 as env], aggr=[]
+20)--------------------------------------ProjectionExec: expr=[d_dkey@1 as 
d_dkey, env@0 as env]
+21)----------------------------------------FilterExec: service@1 = log, 
projection=[env@0, d_dkey@2]
+22)------------------------------------------DataSourceExec: file_groups={3 
groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]},
 projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = 
log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= 
log AND log <= service_max@1, required_guarantees=[service in (log)]
+23)----------------------------DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]},
 projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS 
LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ 
empty ]
+
+# Verify results without subset satisfaction
+query TPR rowsort
+SELECT env, time_bin, AVG(max_bin_val)
+FROM
+  (
+    SELECT  f_dkey,
+            date_bin(INTERVAL '30 seconds', timestamp) AS time_bin,
+            MAX(env) AS env,
+            MAX(value) AS max_bin_val
+    FROM
+    (
+        SELECT f.f_dkey, f.timestamp, d.env, f.value
+        FROM
+            (SELECT DISTINCT d_dkey, env
+            FROM   dimension_table
+            WHERE  service = 'log'
+            ) AS d,
+            fact_table_ordered AS f
+        WHERE d.d_dkey = f.f_dkey
+    ) AS j
+    GROUP BY f_dkey, time_bin
+  ) AS a
+GROUP BY env, time_bin
+ORDER BY env, time_bin;
+----
+dev 2023-01-01T09:00:00 102.3
+dev 2023-01-01T09:12:00 105.1
+dev 2023-01-01T09:12:30 150
+prod 2023-01-01T09:00:00 196.3
+prod 2023-01-01T09:00:30 192.8
+prod 2023-01-01T09:12:30 197.7
+
+# With subset logic enabled (default - enables subset optimization)
+statement ok
+set datafusion.optimizer.repartition_subset_satisfactions = false;
+
+query TT
+EXPLAIN SELECT env, time_bin, AVG(max_bin_val)
+FROM
+  (
+    SELECT  f_dkey,
+            date_bin(INTERVAL '30 seconds', timestamp) AS time_bin,
+            MAX(env) AS env,
+            MAX(value) AS max_bin_val
+    FROM
+    (
+        SELECT f.f_dkey, f.timestamp, d.env, f.value
+        FROM
+            (SELECT DISTINCT d_dkey, env
+            FROM   dimension_table
+            WHERE  service = 'log'
+            ) AS d,
+            fact_table_ordered AS f
+        WHERE d.d_dkey = f.f_dkey
+    ) AS j
+    GROUP BY f_dkey, time_bin
+  ) AS a
+GROUP BY env, time_bin
+ORDER BY env, time_bin;
+----
+logical_plan
+01)Sort: a.env ASC NULLS LAST, a.time_bin ASC NULLS LAST
+02)--Aggregate: groupBy=[[a.env, a.time_bin]], aggr=[[avg(a.max_bin_val)]]
+03)----SubqueryAlias: a
+04)------Projection: date_bin(IntervalMonthDayNano("IntervalMonthDayNano { 
months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp) AS time_bin, 
max(j.env) AS env, max(j.value) AS max_bin_val
+05)--------Aggregate: groupBy=[[j.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"), j.timestamp)]], aggr=[[max(j.env), max(j.value)]]
+06)----------SubqueryAlias: j
+07)------------Projection: f.f_dkey, f.timestamp, d.env, f.value
+08)--------------Inner Join: d.d_dkey = f.f_dkey
+09)----------------SubqueryAlias: d
+10)------------------Aggregate: groupBy=[[dimension_table.d_dkey, 
dimension_table.env]], aggr=[[]]
+11)--------------------Projection: dimension_table.d_dkey, dimension_table.env
+12)----------------------Filter: dimension_table.service = Utf8View("log")
+13)------------------------TableScan: dimension_table projection=[env, 
service, d_dkey], partial_filters=[dimension_table.service = Utf8View("log")]
+14)----------------SubqueryAlias: f
+15)------------------TableScan: fact_table_ordered projection=[timestamp, 
value, f_dkey]
+physical_plan
+01)SortPreservingMergeExec: [env@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST]
+02)--SortExec: expr=[env@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST], 
preserve_partitioning=[true]
+03)----AggregateExec: mode=FinalPartitioned, gby=[env@0 as env, time_bin@1 as 
time_bin], aggr=[avg(a.max_bin_val)]
+04)------CoalesceBatchesExec: target_batch_size=8192
+05)--------RepartitionExec: partitioning=Hash([env@0, time_bin@1], 3), 
input_partitions=3
+06)----------AggregateExec: mode=Partial, gby=[env@1 as env, time_bin@0 as 
time_bin], aggr=[avg(a.max_bin_val)]
+07)------------ProjectionExec: 
expr=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),j.timestamp)@1 as time_bin, max(j.env)@2 as env, 
max(j.value)@3 as max_bin_val]
+08)--------------AggregateExec: mode=SinglePartitioned, gby=[f_dkey@0 as 
f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 
30000000000 }, timestamp@1) as 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),j.timestamp)], aggr=[max(j.env), max(j.value)], 
ordering_mode=Sorted
+09)----------------ProjectionExec: expr=[f_dkey@3 as f_dkey, timestamp@1 as 
timestamp, env@0 as env, value@2 as value]
+10)------------------HashJoinExec: mode=CollectLeft, join_type=Inner, 
on=[(d_dkey@0, f_dkey@2)], projection=[env@1, timestamp@2, value@3, f_dkey@4]
+11)--------------------CoalescePartitionsExec
+12)----------------------AggregateExec: mode=SinglePartitioned, gby=[d_dkey@0 
as d_dkey, env@1 as env], aggr=[]
+13)------------------------ProjectionExec: expr=[d_dkey@1 as d_dkey, env@0 as 
env]
+14)--------------------------FilterExec: service@1 = log, projection=[env@0, 
d_dkey@2]
+15)----------------------------DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]},
 projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = 
log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= 
log AND log <= service_max@1, required_guarantees=[service in (log)]
+16)--------------------DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]},
 projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS 
LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ 
empty ]
+
+# Verify results match with subset satisfaction
+query TPR rowsort
+SELECT env, time_bin, AVG(max_bin_val)
+FROM
+  (
+    SELECT  f_dkey,
+            date_bin(INTERVAL '30 seconds', timestamp) AS time_bin,
+            MAX(env) AS env,
+            MAX(value) AS max_bin_val
+    FROM
+    (
+        SELECT f.f_dkey, f.timestamp, d.env, f.value
+        FROM
+            (SELECT DISTINCT d_dkey, env
+            FROM   dimension_table
+            WHERE  service = 'log'
+            ) AS d,
+            fact_table_ordered AS f
+        WHERE d.d_dkey = f.f_dkey
+    ) AS j
+    GROUP BY f_dkey, time_bin
+  ) AS a
+GROUP BY env, time_bin
+ORDER BY env, time_bin;
+----
+dev 2023-01-01T09:00:00 102.3
+dev 2023-01-01T09:12:00 105.1
+dev 2023-01-01T09:12:30 150
+prod 2023-01-01T09:00:00 196.3
+prod 2023-01-01T09:00:30 192.8
+prod 2023-01-01T09:12:30 197.7

Review Comment:
   Ver nice tests.
   Can you add Test 4 for this query. This is a different representative query 
that works well with this superset optimization. Note that the `max(env)` is 
now just env and it is in the group by too
   
   ```SQL
   SELECT env, time_bin, AVG(max_bin_value) AS avg_max_value
   FROM
   (
       SELECT  f_dkey, 
               date_bin(INTERVAL '30 seconds', timestamp) AS time_bin,
               env,
               MAX(value) AS max_bin_value
       FROM
           (
           SELECT 
               f.f_dkey,
               d.env,
               d.service,
               d.host,
               f.timestamp,
               f.value
           FROM dimension_table d
           INNER JOIN fact_table_ordered f ON d.d_dkey = f.f_dkey
           WHERE service = 'log'
           ) AS j
       GROUP BY f_dkey, time_bin, env
   ) AS a
   GROUP BY env, time_bin
   ORDER BY env, time_bin;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to