gene-bordegaray opened a new issue, #18777:
URL: https://github.com/apache/datafusion/issues/18777

   ### Is your feature request related to a problem or challenge?
   
   When data files are non-overlapping on the `GROUP BY` key (e.g. file 1 
contains id='A', file 2 contains id='B'), DataFusion should use 
`AggregateMode::SinglePartitioned` instead of the current behavior: `Parital -> 
Merge -> Final` pattern.
   
   This would create parallel aggregation without unnecessary merging.
   
   ## Current Behavior / Minimal Reproducer
   To reproduce this behavior run this query:
   
   ```sql
   -- Config to prevent repartitioning
   SET datafusion.execution.target_partitions = 3;
   SET datafusion.optimizer.repartition_aggregations=false;
   SET datafusion.optimizer.enable_round_robin_repartition = false;
   SET datafusion.explain.format = 'indent';
   
   -- Create 3 files, each with DISTINCT f_dkey values (non-overlapping)
   COPY (
       SELECT 'A' as f_dkey, TIMESTAMP '2024-01-01 00:00:01' as ts, 10.5 as 
value
       UNION ALL SELECT 'A', TIMESTAMP '2024-01-01 00:00:05', 15.2
       UNION ALL SELECT 'A', TIMESTAMP '2024-01-01 00:00:35', 20.1
   ) TO 'fact_A.parquet';
   
   COPY (
       SELECT 'B' as f_dkey, TIMESTAMP '2024-01-01 00:00:02' as ts, 100.5 as 
value
       UNION ALL SELECT 'B', TIMESTAMP '2024-01-01 00:00:08', 150.2
       UNION ALL SELECT 'B', TIMESTAMP '2024-01-01 00:00:38', 200.1
   ) TO 'fact_B.parquet';
   
   COPY (
       SELECT 'C' as f_dkey, TIMESTAMP '2024-01-01 00:00:03' as ts, 1000.5 as 
value
       UNION ALL SELECT 'C', TIMESTAMP '2024-01-01 00:00:12', 1500.2
       UNION ALL SELECT 'C', TIMESTAMP '2024-01-01 00:00:42', 2000.1
   ) TO 'fact_C.parquet';
   
   -- Create table reading all 3 files
   CREATE EXTERNAL TABLE facts
   STORED AS PARQUET
   LOCATION 'fact_*.parquet';
   
   -- Query that should use SinglePartitioned mode
   EXPLAIN
   SELECT
       f_dkey,
       date_bin(INTERVAL '30 seconds', ts) AS time_bin,
       MAX(value) as max_value
   FROM facts
   GROUP BY f_dkey, time_bin;
   ```
   
   This will produce the plan:
   
   ```text
   
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                                
                                |
   
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Projection: facts.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),facts.ts) AS time_bin, max(facts.value) AS 
max_value                                                                       
                                                                               |
   |               |   Aggregate: groupBy=[[facts.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"), facts.ts)]], aggr=[[max(facts.value)]]            
                                                                                
                                                            |
   |               |     TableScan: facts projection=[f_dkey, ts, value]        
                                                                                
                                                                                
                                                                                
                                |
   | physical_plan | ProjectionExec: expr=[f_dkey@0 as f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),facts.ts)@1 as time_bin, max(facts.value)@2 as 
max_value]                                                                      
                                                           |
   |               |   AggregateExec: mode=Final, gby=[f_dkey@0 as f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),facts.ts)@1 as 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),facts.ts)], aggr=[max(facts.value)]                
     |
   |               |     CoalescePartitionsExec                                 
                                                                                
                                                                                
                                                                                
                                |
   |               |       AggregateExec: mode=Partial, gby=[f_dkey@0 as 
f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 
30000000000 }, ts@1) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { 
months: 0, days: 0, nanoseconds: 30000000000 }"),facts.ts)], 
aggr=[max(facts.value)]                                            |
   |               |         DataSourceExec: file_groups={3 groups: 
[[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/fact_A.parquet], 
[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/fact_B.parquet], 
[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/fact_C.parquet]]}, 
projection=[f_dkey, ts, value], file_type=parquet |
   |               |                                                            
                                                                                
                                                                                
                                                                                
                                |
   
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   The `Aggregate Partial -> CoalescePartitionsExec -> Aggregate Final` is 
inefficient because the Aggregate Final is forced to run on a single partition. 
Because we are grouping by `[f_dkey, datebin(ts)]` and no group key spans 
multiple files, each file/partition can independently compute complete 
aggregate results.
   
   ### Describe the solution you'd like
   
   The scenario described above should produce a plan like this:
   
   ```text
   
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                                
                                |
   
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Projection: facts.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),facts.ts) AS time_bin, max(facts.value) AS 
max_value                                                                       
                                                                               |
   |               |   Aggregate: groupBy=[[facts.f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"), facts.ts)]], aggr=[[max(facts.value)]]            
                                                                                
                                                            |
   |               |     TableScan: facts projection=[f_dkey, ts, value]        
                                                                                
                                                                                
                                                                                
                                |
   | physical_plan | ProjectionExec: expr=[f_dkey@0 as f_dkey, 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),facts.ts)@1 as time_bin, max(facts.value)@2 as 
max_value]                                                                      
                                                           |
   |               |   AggregateExec: mode=SinglePartitioned, gby=[f_dkey@0 as 
f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 
0, nanoseconds: 30000000000 }"),facts.ts)@1 as 
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, 
nanoseconds: 30000000000 }"),facts.ts)], aggr=[max(facts.value)]         |
   |               |     DataSourceExec: file_groups={3 groups: 
[[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/fact_A.parquet], 
[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/fact_B.parquet], 
[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/fact_C.parquet]]}, 
projection=[f_dkey, ts, value], file_type=parquet     |
   
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   Eliminating the merge overhead from the `CoalescePartitionsExec` and 
increase parallelism by computing aggregates independently.
   
   ## Implementation (in combine_partial_final_agg.rs)
   - Detect Pattern of `Aggregate Final -> Coalesce or SPM -> Aggregate Partial`
   - Extract the `GROUP BY` column expressions from aggregate
   - For each input partitions check the min max ranges to see if they are 
overlapping
   - If they aren't parallelize -> replace the patter with  `Aggregate 
SinglePartitioned`
   
   ### Describe alternatives you've considered
   
   1. User Annotation (Explicit) -> allow users to mark tables as partitioned. 
This is simple but depends on the user to input leading to error-prone if 
assumption is wrong, might be under utilized
   2. New Partitioning type -> create a new NonOverlapping Partition variant 
and propagate through the plan. This would require large refactor for smething 
that can be done with existing infra.
   
   ### Additional context
   
   The `SinglePartitioned` mode already exists and is designed for this exact 
use but isn't being applied when it could be. It's definition can be found 
[here](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/aggregates/mod.rs#L133)
   
   Here is the provided documentation on its use case:
   ```text
       /// *Single* layer of Aggregation, input is *Partitioned*
       ///
       /// Applies the entire logical aggregation operation in a single 
operator,
       /// as opposed to Partial / Final modes which apply the logical 
aggregation
       /// using two operators.
       ///
       /// This mode requires that the input has more than one partition, and is
       /// partitioned by group key (like FinalPartitioned).
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to