This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9289c207c7 Fix required partitioning of Single aggregation mode (#6950)
9289c207c7 is described below

commit 9289c207c73acc46f083493e7ecd86d9add5b122
Author: Daniël Heres <[email protected]>
AuthorDate: Sat Jul 15 14:21:10 2023 +0200

    Fix required partitioning of Single aggregation mode (#6950)
    
    * Fix required partitioning of Single aggregation mode
    
    * Add SinglePartition mode
    
    * Merge
    
    * Fmt
    
    * Tests
    
    * Remove
    
    * Remove
    
    * Fix
    
    ---------
    
    Co-authored-by: Daniël Heres <[email protected]>
---
 .../combine_partial_final_agg.rs                   |  8 +++++++-
 .../aggregates/bounded_aggregate_stream.rs         | 18 +++++++++++-----
 .../core/src/physical_plan/aggregates/mod.rs       | 24 ++++++++++++++++------
 .../src/physical_plan/aggregates/no_grouping.rs    | 10 +++++----
 .../core/src/physical_plan/aggregates/row_hash.rs  | 11 +++++++---
 .../core/tests/sqllogictests/test_files/joins.slt  |  4 ++--
 .../sqllogictests/test_files/tpch/q13.slt.part     |  2 +-
 .../core/tests/sqllogictests/test_files/union.slt  |  2 +-
 .../core/tests/sqllogictests/test_files/window.slt |  2 +-
 datafusion/proto/proto/datafusion.proto            |  1 +
 datafusion/proto/src/generated/pbjson.rs           |  3 +++
 datafusion/proto/src/generated/prost.rs            |  3 +++
 datafusion/proto/src/physical_plan/mod.rs          |  6 ++++++
 13 files changed, 70 insertions(+), 24 deletions(-)

diff --git 
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs 
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index 05b0dc4f19..de47f3fbee 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -91,8 +91,14 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
                                             ),
                                         )
                                     {
+                                        let mode = if *final_mode == 
AggregateMode::Final
+                                        {
+                                            AggregateMode::Single
+                                        } else {
+                                            AggregateMode::SinglePartitioned
+                                        };
                                         AggregateExec::try_new(
-                                            AggregateMode::Single,
+                                            mode,
                                             input_group_by.clone(),
                                             input_aggr_expr.to_vec(),
                                             input_filter_expr.to_vec(),
diff --git 
a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs 
b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
index a89ef3aaff..5982701e21 100644
--- a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
+++ b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
@@ -155,7 +155,9 @@ impl BoundedAggregateStream {
         let all_aggregate_expressions =
             aggregates::aggregate_expressions(&agg.aggr_expr, &agg.mode, 
start_idx)?;
         let filter_expressions = match agg.mode {
-            AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
+            AggregateMode::Partial
+            | AggregateMode::Single
+            | AggregateMode::SinglePartitioned => agg_filter_expr,
             AggregateMode::Final | AggregateMode::FinalPartitioned => {
                 vec![None; agg.aggr_expr.len()]
             }
@@ -599,7 +601,9 @@ impl BoundedAggregateStream {
                         state_accessor
                             .point_to(0, 
group_state.aggregation_buffer.as_mut_slice());
                         match self.mode {
-                            AggregateMode::Partial | AggregateMode::Single => {
+                            AggregateMode::Partial
+                            | AggregateMode::Single
+                            | AggregateMode::SinglePartitioned => {
                                 accumulator.update_batch(&values, &mut 
state_accessor)
                             }
                             AggregateMode::FinalPartitioned | 
AggregateMode::Final => {
@@ -622,7 +626,9 @@ impl BoundedAggregateStream {
                         )?;
                         let size_pre = accumulator.size();
                         let res = match self.mode {
-                            AggregateMode::Partial | AggregateMode::Single => {
+                            AggregateMode::Partial
+                            | AggregateMode::Single
+                            | AggregateMode::SinglePartitioned => {
                                 accumulator.update_batch(&values)
                             }
                             AggregateMode::FinalPartitioned | 
AggregateMode::Final => {
@@ -973,7 +979,8 @@ impl BoundedAggregateStream {
             }
             AggregateMode::Final
             | AggregateMode::FinalPartitioned
-            | AggregateMode::Single => {
+            | AggregateMode::Single
+            | AggregateMode::SinglePartitioned => {
                 let mut results = vec![];
                 for (idx, acc) in self.row_accumulators.iter().enumerate() {
                     let mut state_accessor = 
RowAccessor::new(&self.row_aggr_schema);
@@ -1016,7 +1023,8 @@ impl BoundedAggregateStream {
                     ),
                     AggregateMode::Final
                     | AggregateMode::FinalPartitioned
-                    | AggregateMode::Single => ScalarValue::iter_to_array(
+                    | AggregateMode::Single
+                    | AggregateMode::SinglePartitioned => 
ScalarValue::iter_to_array(
                         group_state_chunk.iter().map(|group_state| {
                             group_state.group_state.accumulator_set[idx]
                                 .evaluate()
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs 
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 5edb047677..b406320dda 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -77,7 +77,13 @@ pub enum AggregateMode {
     /// Applies the entire logical aggregation operation in a single operator,
     /// as opposed to Partial / Final modes which apply the logical 
aggregation using
     /// two operators.
+    /// This mode requires tha the input is a single partition (like Final)
     Single,
+    /// Applies the entire logical aggregation operation in a single operator,
+    /// as opposed to Partial / Final modes which apply the logical 
aggregation using
+    /// two operators.
+    /// This mode requires tha the input is partitioned by group key (like 
FinalPartitioned)
+    SinglePartitioned,
 }
 
 /// Group By expression modes
@@ -872,13 +878,15 @@ impl ExecutionPlan for AggregateExec {
 
     fn required_input_distribution(&self) -> Vec<Distribution> {
         match &self.mode {
-            AggregateMode::Partial | AggregateMode::Single => {
+            AggregateMode::Partial => {
                 vec![Distribution::UnspecifiedDistribution]
             }
-            AggregateMode::FinalPartitioned => {
+            AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned 
=> {
                 vec![Distribution::HashPartitioned(self.output_group_expr())]
             }
-            AggregateMode::Final => vec![Distribution::SinglePartition],
+            AggregateMode::Final | AggregateMode::Single => {
+                vec![Distribution::SinglePartition]
+            }
         }
     }
 
@@ -982,7 +990,8 @@ fn create_schema(
         }
         AggregateMode::Final
         | AggregateMode::FinalPartitioned
-        | AggregateMode::Single => {
+        | AggregateMode::Single
+        | AggregateMode::SinglePartitioned => {
             // in final mode, the field with the final result of the 
accumulator
             for expr in aggr_expr {
                 fields.push(expr.field()?)
@@ -1008,7 +1017,9 @@ fn aggregate_expressions(
     col_idx_base: usize,
 ) -> Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
     match mode {
-        AggregateMode::Partial | AggregateMode::Single => Ok(aggr_expr
+        AggregateMode::Partial
+        | AggregateMode::Single
+        | AggregateMode::SinglePartitioned => Ok(aggr_expr
             .iter()
             .map(|agg| {
                 let pre_cast_type = if let Some(Sum {
@@ -1141,7 +1152,8 @@ fn finalize_aggregation(
         }
         AggregateMode::Final
         | AggregateMode::FinalPartitioned
-        | AggregateMode::Single => {
+        | AggregateMode::Single
+        | AggregateMode::SinglePartitioned => {
             // merge the state to the final value
             accumulators
                 .iter()
diff --git a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs 
b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
index 89d392f0b6..610c24faab 100644
--- a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
+++ b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
@@ -79,7 +79,9 @@ impl AggregateStream {
 
         let aggregate_expressions = aggregate_expressions(&agg.aggr_expr, 
&agg.mode, 0)?;
         let filter_expressions = match agg.mode {
-            AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
+            AggregateMode::Partial
+            | AggregateMode::Single
+            | AggregateMode::SinglePartitioned => agg_filter_expr,
             AggregateMode::Final | AggregateMode::FinalPartitioned => {
                 vec![None; agg.aggr_expr.len()]
             }
@@ -222,9 +224,9 @@ fn aggregate_batch(
             // 1.4
             let size_pre = accum.size();
             let res = match mode {
-                AggregateMode::Partial | AggregateMode::Single => {
-                    accum.update_batch(values)
-                }
+                AggregateMode::Partial
+                | AggregateMode::Single
+                | AggregateMode::SinglePartitioned => 
accum.update_batch(values),
                 AggregateMode::Final | AggregateMode::FinalPartitioned => {
                     accum.merge_batch(values)
                 }
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs 
b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index e272b60b05..c57f436324 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -230,7 +230,9 @@ impl GroupedHashAggregateStream {
         )?;
 
         let filter_expressions = match agg.mode {
-            AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
+            AggregateMode::Partial
+            | AggregateMode::Single
+            | AggregateMode::SinglePartitioned => agg_filter_expr,
             AggregateMode::Final | AggregateMode::FinalPartitioned => {
                 vec![None; agg.aggr_expr.len()]
             }
@@ -496,7 +498,9 @@ impl GroupedHashAggregateStream {
                 // Call the appropriate method on each aggregator with
                 // the entire input row and the relevant group indexes
                 match self.mode {
-                    AggregateMode::Partial | AggregateMode::Single => {
+                    AggregateMode::Partial
+                    | AggregateMode::Single
+                    | AggregateMode::SinglePartitioned => {
                         acc.update_batch(
                             values,
                             group_indices,
@@ -543,7 +547,8 @@ impl GroupedHashAggregateStream {
                 AggregateMode::Partial => output.extend(acc.state()?),
                 AggregateMode::Final
                 | AggregateMode::FinalPartitioned
-                | AggregateMode::Single => output.push(acc.evaluate()?),
+                | AggregateMode::Single
+                | AggregateMode::SinglePartitioned => 
output.push(acc.evaluate()?),
             }
         }
 
diff --git a/datafusion/core/tests/sqllogictests/test_files/joins.slt 
b/datafusion/core/tests/sqllogictests/test_files/joins.slt
index 95f8999aee..c2517ead0f 100644
--- a/datafusion/core/tests/sqllogictests/test_files/joins.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/joins.slt
@@ -1309,7 +1309,7 @@ Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[]]
 ------TableScan: join_t1 projection=[t1_id]
 ------TableScan: join_t2 projection=[t2_id]
 physical_plan
-AggregateExec: mode=Single, gby=[t1_id@0 as t1_id], aggr=[]
+AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id], aggr=[]
 --ProjectionExec: expr=[t1_id@0 as t1_id]
 ----CoalesceBatchesExec: target_batch_size=4096
 ------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)]
@@ -1337,7 +1337,7 @@ Projection: COUNT(UInt8(1))
 --------TableScan: join_t2 projection=[t2_id]
 physical_plan
 ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]
---AggregateExec: mode=Single, gby=[t1_id@0 as t1_id], aggr=[COUNT(UInt8(1))]
+--AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id], 
aggr=[COUNT(UInt8(1))]
 ----ProjectionExec: expr=[t1_id@0 as t1_id]
 ------CoalesceBatchesExec: target_batch_size=4096
 --------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, 
t2_id@0)]
diff --git a/datafusion/core/tests/sqllogictests/test_files/tpch/q13.slt.part 
b/datafusion/core/tests/sqllogictests/test_files/tpch/q13.slt.part
index bd358962b5..732ae27f01 100644
--- a/datafusion/core/tests/sqllogictests/test_files/tpch/q13.slt.part
+++ b/datafusion/core/tests/sqllogictests/test_files/tpch/q13.slt.part
@@ -65,7 +65,7 @@ GlobalLimitExec: skip=0, fetch=10
 --------------AggregateExec: mode=Partial, gby=[c_count@0 as c_count], 
aggr=[COUNT(UInt8(1))]
 ----------------ProjectionExec: expr=[COUNT(orders.o_orderkey)@0 as c_count]
 ------------------ProjectionExec: expr=[COUNT(orders.o_orderkey)@1 as 
COUNT(orders.o_orderkey)]
---------------------AggregateExec: mode=Single, gby=[c_custkey@0 as 
c_custkey], aggr=[COUNT(orders.o_orderkey)]
+--------------------AggregateExec: mode=SinglePartitioned, gby=[c_custkey@0 as 
c_custkey], aggr=[COUNT(orders.o_orderkey)]
 ----------------------ProjectionExec: expr=[c_custkey@0 as c_custkey, 
o_orderkey@1 as o_orderkey]
 ------------------------CoalesceBatchesExec: target_batch_size=8192
 --------------------------HashJoinExec: mode=Partitioned, join_type=Left, 
on=[(c_custkey@0, o_custkey@1)]
diff --git a/datafusion/core/tests/sqllogictests/test_files/union.slt 
b/datafusion/core/tests/sqllogictests/test_files/union.slt
index 2b3022ddd1..1f0fa1d456 100644
--- a/datafusion/core/tests/sqllogictests/test_files/union.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/union.slt
@@ -347,7 +347,7 @@ Projection: COUNT(UInt8(1))
 --------TableScan: t2 projection=[name]
 physical_plan
 ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]
---AggregateExec: mode=Single, gby=[name@0 as name], aggr=[COUNT(UInt8(1))]
+--AggregateExec: mode=SinglePartitioned, gby=[name@0 as name], 
aggr=[COUNT(UInt8(1))]
 ----InterleaveExec
 ------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
 --------CoalesceBatchesExec: target_batch_size=8192
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt 
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index c4d745f8f1..21f76eb9cb 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -357,7 +357,7 @@ Sort: d.b ASC NULLS LAST
 physical_plan
 SortPreservingMergeExec: [b@0 ASC NULLS LAST]
 --ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as 
MAX(d.seq)]
-----AggregateExec: mode=Single, gby=[b@2 as b], aggr=[MAX(d.a), MAX(d.seq)], 
ordering_mode=FullyOrdered
+----AggregateExec: mode=SinglePartitioned, gby=[b@2 as b], aggr=[MAX(d.a), 
MAX(d.seq)], ordering_mode=FullyOrdered
 ------ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0 as 
a, b@1 as b]
 --------BoundedWindowAggExec: wdw=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY 
[s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: 
Ok(Field { name: "ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: 
CurrentRow }], mode=[Sorted]
 ----------SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST]
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index b26d18947f..a1caa4c621 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1322,6 +1322,7 @@ enum AggregateMode {
   FINAL = 1;
   FINAL_PARTITIONED = 2;
   SINGLE = 3;
+  SINGLE_PARTITIONED = 4;
 }
 
 message WindowAggExecNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 09e85f11a7..4155a052cf 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -588,6 +588,7 @@ impl serde::Serialize for AggregateMode {
             Self::Final => "FINAL",
             Self::FinalPartitioned => "FINAL_PARTITIONED",
             Self::Single => "SINGLE",
+            Self::SinglePartitioned => "SINGLE_PARTITIONED",
         };
         serializer.serialize_str(variant)
     }
@@ -603,6 +604,7 @@ impl<'de> serde::Deserialize<'de> for AggregateMode {
             "FINAL",
             "FINAL_PARTITIONED",
             "SINGLE",
+            "SINGLE_PARTITIONED",
         ];
 
         struct GeneratedVisitor;
@@ -649,6 +651,7 @@ impl<'de> serde::Deserialize<'de> for AggregateMode {
                     "FINAL" => Ok(AggregateMode::Final),
                     "FINAL_PARTITIONED" => Ok(AggregateMode::FinalPartitioned),
                     "SINGLE" => Ok(AggregateMode::Single),
+                    "SINGLE_PARTITIONED" => 
Ok(AggregateMode::SinglePartitioned),
                     _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
                 }
             }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 516aa325bc..af0703460a 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2855,6 +2855,7 @@ pub enum AggregateMode {
     Final = 1,
     FinalPartitioned = 2,
     Single = 3,
+    SinglePartitioned = 4,
 }
 impl AggregateMode {
     /// String value of the enum field names used in the ProtoBuf definition.
@@ -2867,6 +2868,7 @@ impl AggregateMode {
             AggregateMode::Final => "FINAL",
             AggregateMode::FinalPartitioned => "FINAL_PARTITIONED",
             AggregateMode::Single => "SINGLE",
+            AggregateMode::SinglePartitioned => "SINGLE_PARTITIONED",
         }
     }
     /// Creates an enum from field names used in the ProtoBuf definition.
@@ -2876,6 +2878,7 @@ impl AggregateMode {
             "FINAL" => Some(Self::Final),
             "FINAL_PARTITIONED" => Some(Self::FinalPartitioned),
             "SINGLE" => Some(Self::Single),
+            "SINGLE_PARTITIONED" => Some(Self::SinglePartitioned),
             _ => None,
         }
     }
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index b5b4aeb2da..9e9b391a6c 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -358,6 +358,9 @@ impl AsExecutionPlan for PhysicalPlanNode {
                         AggregateMode::FinalPartitioned
                     }
                     protobuf::AggregateMode::Single => AggregateMode::Single,
+                    protobuf::AggregateMode::SinglePartitioned => {
+                        AggregateMode::SinglePartitioned
+                    }
                 };
 
                 let num_expr = hash_agg.group_expr.len();
@@ -996,6 +999,9 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     protobuf::AggregateMode::FinalPartitioned
                 }
                 AggregateMode::Single => protobuf::AggregateMode::Single,
+                AggregateMode::SinglePartitioned => {
+                    protobuf::AggregateMode::SinglePartitioned
+                }
             };
             let input_schema = exec.input_schema();
             let input = protobuf::PhysicalPlanNode::try_from_physical_plan(

Reply via email to