This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch aggregate_partition_mode
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/aggregate_partition_mode by 
this push:
     new 94ac9d19ba Add partition aggregation mode
94ac9d19ba is described below

commit 94ac9d19ba704d9aa1fdf4cb8731a96369ddeaa1
Author: DaniĆ«l Heres <[email protected]>
AuthorDate: Wed Jul 12 22:29:27 2023 +0200

    Add partition aggregation mode
---
 .../aggregates/bounded_aggregate_stream.rs         |  12 +++--
 .../core/src/physical_plan/aggregates/mod.rs       |  23 +++++---
 .../src/physical_plan/aggregates/no_grouping.rs    |   4 +-
 .../core/src/physical_plan/aggregates/row_hash.rs  |   9 ++--
 datafusion/core/src/physical_planner.rs            |  60 ++++++++-------------
 datafusion/proto/proto/datafusion.proto            |   1 +
 datafusion/proto/proto/proto_descriptor.bin        | Bin 0 -> 88806 bytes
 .../src/{generated/prost.rs => datafusion.rs}      |   3 ++
 .../{generated/pbjson.rs => datafusion.serde.rs}   |   3 ++
 datafusion/proto/src/generated/pbjson.rs           |   3 ++
 datafusion/proto/src/generated/prost.rs            |   3 ++
 datafusion/proto/src/physical_plan/mod.rs          |   2 +
 12 files changed, 66 insertions(+), 57 deletions(-)

diff --git 
a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs 
b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
index a89ef3aaff..2d3470b15d 100644
--- a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
+++ b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
@@ -155,7 +155,7 @@ impl BoundedAggregateStream {
         let all_aggregate_expressions =
             aggregates::aggregate_expressions(&agg.aggr_expr, &agg.mode, 
start_idx)?;
         let filter_expressions = match agg.mode {
-            AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
+            AggregateMode::Partial | AggregateMode::Single | 
AggregateMode::Partitioned => agg_filter_expr,
             AggregateMode::Final | AggregateMode::FinalPartitioned => {
                 vec![None; agg.aggr_expr.len()]
             }
@@ -599,7 +599,7 @@ impl BoundedAggregateStream {
                         state_accessor
                             .point_to(0, 
group_state.aggregation_buffer.as_mut_slice());
                         match self.mode {
-                            AggregateMode::Partial | AggregateMode::Single => {
+                            AggregateMode::Partial | AggregateMode::Single | 
AggregateMode::Partitioned => {
                                 accumulator.update_batch(&values, &mut 
state_accessor)
                             }
                             AggregateMode::FinalPartitioned | 
AggregateMode::Final => {
@@ -622,7 +622,7 @@ impl BoundedAggregateStream {
                         )?;
                         let size_pre = accumulator.size();
                         let res = match self.mode {
-                            AggregateMode::Partial | AggregateMode::Single => {
+                            AggregateMode::Partial | AggregateMode::Single | 
AggregateMode::Partitioned => {
                                 accumulator.update_batch(&values)
                             }
                             AggregateMode::FinalPartitioned | 
AggregateMode::Final => {
@@ -973,7 +973,8 @@ impl BoundedAggregateStream {
             }
             AggregateMode::Final
             | AggregateMode::FinalPartitioned
-            | AggregateMode::Single => {
+            | AggregateMode::Single
+            | AggregateMode::Partitioned => {
                 let mut results = vec![];
                 for (idx, acc) in self.row_accumulators.iter().enumerate() {
                     let mut state_accessor = 
RowAccessor::new(&self.row_aggr_schema);
@@ -1016,7 +1017,8 @@ impl BoundedAggregateStream {
                     ),
                     AggregateMode::Final
                     | AggregateMode::FinalPartitioned
-                    | AggregateMode::Single => ScalarValue::iter_to_array(
+                    | AggregateMode::Single
+                    | AggregateMode::Partitioned => ScalarValue::iter_to_array(
                         group_state_chunk.iter().map(|group_state| {
                             group_state.group_state.accumulator_set[idx]
                                 .evaluate()
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs 
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 5edb047677..1f8fd9c67a 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -67,6 +67,8 @@ pub enum AggregateMode {
     Partial,
     /// Final aggregate that produces a single partition of output
     Final,
+    /// Aggregate that works on pre-partitioned data.
+    Partitioned,
     /// Final aggregate that works on pre-partitioned data.
     ///
     /// This requires the invariant that all rows with a particular
@@ -822,7 +824,7 @@ impl ExecutionPlan for AggregateExec {
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
         match &self.mode {
-            AggregateMode::Partial | AggregateMode::Single => {
+            AggregateMode::Partial | AggregateMode::Single | 
AggregateMode::Partitioned => {
                 // Partial and Single Aggregation will not change the output 
partitioning but need to respect the Alias
                 let input_partition = self.input.output_partitioning();
                 match input_partition {
@@ -875,7 +877,7 @@ impl ExecutionPlan for AggregateExec {
             AggregateMode::Partial | AggregateMode::Single => {
                 vec![Distribution::UnspecifiedDistribution]
             }
-            AggregateMode::FinalPartitioned => {
+            AggregateMode::FinalPartitioned | AggregateMode::Partitioned  => {
                 vec![Distribution::HashPartitioned(self.output_group_expr())]
             }
             AggregateMode::Final => vec![Distribution::SinglePartition],
@@ -935,8 +937,11 @@ impl ExecutionPlan for AggregateExec {
         // TODO stats: aggr expression:
         // - aggregations somtimes also preserve invariants such as min, max...
         match self.mode {
-            AggregateMode::Final | AggregateMode::FinalPartitioned
-                if self.group_by.expr.is_empty() =>
+            AggregateMode::Final 
+            | AggregateMode::FinalPartitioned 
+            | AggregateMode::Single
+            | AggregateMode::Partitioned
+            if self.group_by.expr.is_empty() =>
             {
                 Statistics {
                     num_rows: Some(1),
@@ -949,7 +954,7 @@ impl ExecutionPlan for AggregateExec {
                 num_rows: self.input.statistics().num_rows,
                 is_exact: false,
                 ..Default::default()
-            },
+            }
         }
     }
 }
@@ -982,7 +987,8 @@ fn create_schema(
         }
         AggregateMode::Final
         | AggregateMode::FinalPartitioned
-        | AggregateMode::Single => {
+        | AggregateMode::Single
+        | AggregateMode::Partitioned => {
             // in final mode, the field with the final result of the 
accumulator
             for expr in aggr_expr {
                 fields.push(expr.field()?)
@@ -1008,7 +1014,7 @@ fn aggregate_expressions(
     col_idx_base: usize,
 ) -> Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
     match mode {
-        AggregateMode::Partial | AggregateMode::Single => Ok(aggr_expr
+        AggregateMode::Partial | AggregateMode::Single | 
AggregateMode::Partitioned  => Ok(aggr_expr
             .iter()
             .map(|agg| {
                 let pre_cast_type = if let Some(Sum {
@@ -1141,7 +1147,8 @@ fn finalize_aggregation(
         }
         AggregateMode::Final
         | AggregateMode::FinalPartitioned
-        | AggregateMode::Single => {
+        | AggregateMode::Single
+        | AggregateMode::Partitioned => {
             // merge the state to the final value
             accumulators
                 .iter()
diff --git a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs 
b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
index 89d392f0b6..966f1dc199 100644
--- a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
+++ b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
@@ -79,7 +79,7 @@ impl AggregateStream {
 
         let aggregate_expressions = aggregate_expressions(&agg.aggr_expr, 
&agg.mode, 0)?;
         let filter_expressions = match agg.mode {
-            AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
+            AggregateMode::Partial | AggregateMode::Single | 
AggregateMode::Partitioned => agg_filter_expr,
             AggregateMode::Final | AggregateMode::FinalPartitioned => {
                 vec![None; agg.aggr_expr.len()]
             }
@@ -222,7 +222,7 @@ fn aggregate_batch(
             // 1.4
             let size_pre = accum.size();
             let res = match mode {
-                AggregateMode::Partial | AggregateMode::Single => {
+                AggregateMode::Partial | AggregateMode::Single  | 
AggregateMode::Partitioned=> {
                     accum.update_batch(values)
                 }
                 AggregateMode::Final | AggregateMode::FinalPartitioned => {
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs 
b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index e272b60b05..068a6dacf8 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -230,7 +230,7 @@ impl GroupedHashAggregateStream {
         )?;
 
         let filter_expressions = match agg.mode {
-            AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
+            AggregateMode::Partial | AggregateMode::Single | 
AggregateMode::Partitioned => agg_filter_expr,
             AggregateMode::Final | AggregateMode::FinalPartitioned => {
                 vec![None; agg.aggr_expr.len()]
             }
@@ -496,7 +496,7 @@ impl GroupedHashAggregateStream {
                 // Call the appropriate method on each aggregator with
                 // the entire input row and the relevant group indexes
                 match self.mode {
-                    AggregateMode::Partial | AggregateMode::Single => {
+                    AggregateMode::Partial | AggregateMode::Single | 
AggregateMode::Partitioned => {
                         acc.update_batch(
                             values,
                             group_indices,
@@ -543,7 +543,8 @@ impl GroupedHashAggregateStream {
                 AggregateMode::Partial => output.extend(acc.state()?),
                 AggregateMode::Final
                 | AggregateMode::FinalPartitioned
-                | AggregateMode::Single => output.push(acc.evaluate()?),
+                | AggregateMode::Single
+                | AggregateMode::Partitioned => output.push(acc.evaluate()?),
             }
         }
 
@@ -573,4 +574,4 @@ impl ScratchSpace {
             + self.current_group_indices.allocated_size()
             + self.hashes_buffer.allocated_size()
     }
-}
+}
\ No newline at end of file
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index de34353d59..f1b5698a05 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -718,8 +718,8 @@ impl DefaultPhysicalPlanner {
 
                     let (aggregates, filters, order_bys) : (Vec<_>, Vec<_>, 
Vec<_>) = multiunzip(agg_filter.into_iter());
 
-                    let initial_aggr = Arc::new(AggregateExec::try_new(
-                        AggregateMode::Partial,
+                    let final_grouping_set = Arc::new(AggregateExec::try_new(
+                        AggregateMode::Partitioned,
                         groups.clone(),
                         aggregates.clone(),
                         filters.clone(),
@@ -728,42 +728,26 @@ impl DefaultPhysicalPlanner {
                         physical_input_schema.clone(),
                     )?);
 
-                    // update group column indices based on partial aggregate 
plan evaluation
-                    let final_group: Vec<Arc<dyn PhysicalExpr>> = 
initial_aggr.output_group_expr();
-
-                    let can_repartition = !groups.is_empty()
-                        && session_state.config().target_partitions() > 1
-                        && session_state.config().repartition_aggregations();
-
-                    let (initial_aggr, next_partition_mode): (
-                        Arc<dyn ExecutionPlan>,
-                        AggregateMode,
-                    ) = if can_repartition {
-                        // construct a second aggregation with 
'AggregateMode::FinalPartitioned'
-                        (initial_aggr, AggregateMode::FinalPartitioned)
-                    } else {
-                        // construct a second aggregation, keeping the final 
column name equal to the
-                        // first aggregation and the expressions corresponding 
to the respective aggregate
-                        (initial_aggr, AggregateMode::Final)
-                    };
-
-                    let final_grouping_set = PhysicalGroupBy::new_single(
-                        final_group
-                            .iter()
-                            .enumerate()
-                            .map(|(i, expr)| (expr.clone(), 
groups.expr()[i].1.clone()))
-                            .collect()
-                    );
-
-                    Ok(Arc::new(AggregateExec::try_new(
-                        next_partition_mode,
-                        final_grouping_set,
-                        aggregates,
-                        filters,
-                        order_bys,
-                        initial_aggr,
-                        physical_input_schema.clone(),
-                    )?))
+                    // // update group column indices based on partial 
aggregate plan evaluation
+                    // let final_group: Vec<Arc<dyn PhysicalExpr>> = 
initial_aggr.output_group_expr();
+
+                    // let can_repartition = !groups.is_empty()
+                    //     && session_state.config().target_partitions() > 1
+                    //     && 
session_state.config().repartition_aggregations();
+
+                    // let (initial_aggr, next_partition_mode): (
+                    //     Arc<dyn ExecutionPlan>,
+                    //     AggregateMode,
+                    // ) = if can_repartition {
+                    //     // construct a second aggregation with 
'AggregateMode::FinalPartitioned'
+                    //     (initial_aggr, AggregateMode::FinalPartitioned)
+                    // } else {
+                    //     // construct a second aggregation, keeping the 
final column name equal to the
+                    //     // first aggregation and the expressions 
corresponding to the respective aggregate
+                    //     (initial_aggr, AggregateMode::Final)
+                    // };
+
+                    Ok(final_grouping_set)
                 }
                 LogicalPlan::Projection(Projection { input, expr, .. }) => {
                     let input_exec = self.create_initial_plan(input, 
session_state).await?;
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 89bca57cf3..df2758534b 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1321,6 +1321,7 @@ enum AggregateMode {
   FINAL = 1;
   FINAL_PARTITIONED = 2;
   SINGLE = 3;
+  PARTITIONED_MODE = 4;
 }
 
 message WindowAggExecNode {
diff --git a/datafusion/proto/proto/proto_descriptor.bin 
b/datafusion/proto/proto/proto_descriptor.bin
new file mode 100644
index 0000000000..69343b9312
Binary files /dev/null and b/datafusion/proto/proto/proto_descriptor.bin differ
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/datafusion.rs
similarity index 99%
copy from datafusion/proto/src/generated/prost.rs
copy to datafusion/proto/src/datafusion.rs
index 251760f090..1acbb238c5 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/datafusion.rs
@@ -2852,6 +2852,7 @@ pub enum AggregateMode {
     Final = 1,
     FinalPartitioned = 2,
     Single = 3,
+    PartitionedMode = 4,
 }
 impl AggregateMode {
     /// String value of the enum field names used in the ProtoBuf definition.
@@ -2864,6 +2865,7 @@ impl AggregateMode {
             AggregateMode::Final => "FINAL",
             AggregateMode::FinalPartitioned => "FINAL_PARTITIONED",
             AggregateMode::Single => "SINGLE",
+            AggregateMode::PartitionedMode => "PARTITIONED_MODE",
         }
     }
     /// Creates an enum from field names used in the ProtoBuf definition.
@@ -2873,6 +2875,7 @@ impl AggregateMode {
             "FINAL" => Some(Self::Final),
             "FINAL_PARTITIONED" => Some(Self::FinalPartitioned),
             "SINGLE" => Some(Self::Single),
+            "PARTITIONED_MODE" => Some(Self::PartitionedMode),
             _ => None,
         }
     }
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/datafusion.serde.rs
similarity index 99%
copy from datafusion/proto/src/generated/pbjson.rs
copy to datafusion/proto/src/datafusion.serde.rs
index 590b462ad8..b415aaf56b 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/datafusion.serde.rs
@@ -588,6 +588,7 @@ impl serde::Serialize for AggregateMode {
             Self::Final => "FINAL",
             Self::FinalPartitioned => "FINAL_PARTITIONED",
             Self::Single => "SINGLE",
+            Self::PartitionedMode => "PARTITIONED_MODE",
         };
         serializer.serialize_str(variant)
     }
@@ -603,6 +604,7 @@ impl<'de> serde::Deserialize<'de> for AggregateMode {
             "FINAL",
             "FINAL_PARTITIONED",
             "SINGLE",
+            "PARTITIONED_MODE",
         ];
 
         struct GeneratedVisitor;
@@ -649,6 +651,7 @@ impl<'de> serde::Deserialize<'de> for AggregateMode {
                     "FINAL" => Ok(AggregateMode::Final),
                     "FINAL_PARTITIONED" => Ok(AggregateMode::FinalPartitioned),
                     "SINGLE" => Ok(AggregateMode::Single),
+                    "PARTITIONED_MODE" => Ok(AggregateMode::PartitionedMode),
                     _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
                 }
             }
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 590b462ad8..b415aaf56b 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -588,6 +588,7 @@ impl serde::Serialize for AggregateMode {
             Self::Final => "FINAL",
             Self::FinalPartitioned => "FINAL_PARTITIONED",
             Self::Single => "SINGLE",
+            Self::PartitionedMode => "PARTITIONED_MODE",
         };
         serializer.serialize_str(variant)
     }
@@ -603,6 +604,7 @@ impl<'de> serde::Deserialize<'de> for AggregateMode {
             "FINAL",
             "FINAL_PARTITIONED",
             "SINGLE",
+            "PARTITIONED_MODE",
         ];
 
         struct GeneratedVisitor;
@@ -649,6 +651,7 @@ impl<'de> serde::Deserialize<'de> for AggregateMode {
                     "FINAL" => Ok(AggregateMode::Final),
                     "FINAL_PARTITIONED" => Ok(AggregateMode::FinalPartitioned),
                     "SINGLE" => Ok(AggregateMode::Single),
+                    "PARTITIONED_MODE" => Ok(AggregateMode::PartitionedMode),
                     _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
                 }
             }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 251760f090..1acbb238c5 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2852,6 +2852,7 @@ pub enum AggregateMode {
     Final = 1,
     FinalPartitioned = 2,
     Single = 3,
+    PartitionedMode = 4,
 }
 impl AggregateMode {
     /// String value of the enum field names used in the ProtoBuf definition.
@@ -2864,6 +2865,7 @@ impl AggregateMode {
             AggregateMode::Final => "FINAL",
             AggregateMode::FinalPartitioned => "FINAL_PARTITIONED",
             AggregateMode::Single => "SINGLE",
+            AggregateMode::PartitionedMode => "PARTITIONED_MODE",
         }
     }
     /// Creates an enum from field names used in the ProtoBuf definition.
@@ -2873,6 +2875,7 @@ impl AggregateMode {
             "FINAL" => Some(Self::Final),
             "FINAL_PARTITIONED" => Some(Self::FinalPartitioned),
             "SINGLE" => Some(Self::Single),
+            "PARTITIONED_MODE" => Some(Self::PartitionedMode),
             _ => None,
         }
     }
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index b5b4aeb2da..980617a1e2 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -358,6 +358,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                         AggregateMode::FinalPartitioned
                     }
                     protobuf::AggregateMode::Single => AggregateMode::Single,
+                    protobuf::AggregateMode::PartitionedMode => 
AggregateMode::Partitioned,
                 };
 
                 let num_expr = hash_agg.group_expr.len();
@@ -996,6 +997,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     protobuf::AggregateMode::FinalPartitioned
                 }
                 AggregateMode::Single => protobuf::AggregateMode::Single,
+                AggregateMode::Partitioned => 
protobuf::AggregateMode::PartitionedMode,
             };
             let input_schema = exec.input_schema();
             let input = protobuf::PhysicalPlanNode::try_from_physical_plan(

Reply via email to