This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch aggregate_partition_mode
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/aggregate_partition_mode by
this push:
new 94ac9d19ba Add partition aggregation mode
94ac9d19ba is described below
commit 94ac9d19ba704d9aa1fdf4cb8731a96369ddeaa1
Author: Daniƫl Heres <[email protected]>
AuthorDate: Wed Jul 12 22:29:27 2023 +0200
Add partition aggregation mode
---
.../aggregates/bounded_aggregate_stream.rs | 12 +++--
.../core/src/physical_plan/aggregates/mod.rs | 23 +++++---
.../src/physical_plan/aggregates/no_grouping.rs | 4 +-
.../core/src/physical_plan/aggregates/row_hash.rs | 9 ++--
datafusion/core/src/physical_planner.rs | 60 ++++++++-------------
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/proto/proto_descriptor.bin | Bin 0 -> 88806 bytes
.../src/{generated/prost.rs => datafusion.rs} | 3 ++
.../{generated/pbjson.rs => datafusion.serde.rs} | 3 ++
datafusion/proto/src/generated/pbjson.rs | 3 ++
datafusion/proto/src/generated/prost.rs | 3 ++
datafusion/proto/src/physical_plan/mod.rs | 2 +
12 files changed, 66 insertions(+), 57 deletions(-)
diff --git
a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
index a89ef3aaff..2d3470b15d 100644
--- a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
+++ b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
@@ -155,7 +155,7 @@ impl BoundedAggregateStream {
let all_aggregate_expressions =
aggregates::aggregate_expressions(&agg.aggr_expr, &agg.mode,
start_idx)?;
let filter_expressions = match agg.mode {
- AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
+ AggregateMode::Partial | AggregateMode::Single |
AggregateMode::Partitioned => agg_filter_expr,
AggregateMode::Final | AggregateMode::FinalPartitioned => {
vec![None; agg.aggr_expr.len()]
}
@@ -599,7 +599,7 @@ impl BoundedAggregateStream {
state_accessor
.point_to(0,
group_state.aggregation_buffer.as_mut_slice());
match self.mode {
- AggregateMode::Partial | AggregateMode::Single => {
+ AggregateMode::Partial | AggregateMode::Single |
AggregateMode::Partitioned => {
accumulator.update_batch(&values, &mut
state_accessor)
}
AggregateMode::FinalPartitioned |
AggregateMode::Final => {
@@ -622,7 +622,7 @@ impl BoundedAggregateStream {
)?;
let size_pre = accumulator.size();
let res = match self.mode {
- AggregateMode::Partial | AggregateMode::Single => {
+ AggregateMode::Partial | AggregateMode::Single |
AggregateMode::Partitioned => {
accumulator.update_batch(&values)
}
AggregateMode::FinalPartitioned |
AggregateMode::Final => {
@@ -973,7 +973,8 @@ impl BoundedAggregateStream {
}
AggregateMode::Final
| AggregateMode::FinalPartitioned
- | AggregateMode::Single => {
+ | AggregateMode::Single
+ | AggregateMode::Partitioned => {
let mut results = vec![];
for (idx, acc) in self.row_accumulators.iter().enumerate() {
let mut state_accessor =
RowAccessor::new(&self.row_aggr_schema);
@@ -1016,7 +1017,8 @@ impl BoundedAggregateStream {
),
AggregateMode::Final
| AggregateMode::FinalPartitioned
- | AggregateMode::Single => ScalarValue::iter_to_array(
+ | AggregateMode::Single
+ | AggregateMode::Partitioned => ScalarValue::iter_to_array(
group_state_chunk.iter().map(|group_state| {
group_state.group_state.accumulator_set[idx]
.evaluate()
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 5edb047677..1f8fd9c67a 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -67,6 +67,8 @@ pub enum AggregateMode {
Partial,
/// Final aggregate that produces a single partition of output
Final,
+ /// Aggregate that works on pre-partitioned data.
+ Partitioned,
/// Final aggregate that works on pre-partitioned data.
///
/// This requires the invariant that all rows with a particular
@@ -822,7 +824,7 @@ impl ExecutionPlan for AggregateExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
match &self.mode {
- AggregateMode::Partial | AggregateMode::Single => {
+ AggregateMode::Partial | AggregateMode::Single |
AggregateMode::Partitioned => {
// Partial and Single Aggregation will not change the output
partitioning but need to respect the Alias
let input_partition = self.input.output_partitioning();
match input_partition {
@@ -875,7 +877,7 @@ impl ExecutionPlan for AggregateExec {
AggregateMode::Partial | AggregateMode::Single => {
vec![Distribution::UnspecifiedDistribution]
}
- AggregateMode::FinalPartitioned => {
+ AggregateMode::FinalPartitioned | AggregateMode::Partitioned => {
vec![Distribution::HashPartitioned(self.output_group_expr())]
}
AggregateMode::Final => vec![Distribution::SinglePartition],
@@ -935,8 +937,11 @@ impl ExecutionPlan for AggregateExec {
// TODO stats: aggr expression:
// - aggregations somtimes also preserve invariants such as min, max...
match self.mode {
- AggregateMode::Final | AggregateMode::FinalPartitioned
- if self.group_by.expr.is_empty() =>
+ AggregateMode::Final
+ | AggregateMode::FinalPartitioned
+ | AggregateMode::Single
+ | AggregateMode::Partitioned
+ if self.group_by.expr.is_empty() =>
{
Statistics {
num_rows: Some(1),
@@ -949,7 +954,7 @@ impl ExecutionPlan for AggregateExec {
num_rows: self.input.statistics().num_rows,
is_exact: false,
..Default::default()
- },
+ }
}
}
}
@@ -982,7 +987,8 @@ fn create_schema(
}
AggregateMode::Final
| AggregateMode::FinalPartitioned
- | AggregateMode::Single => {
+ | AggregateMode::Single
+ | AggregateMode::Partitioned => {
// in final mode, the field with the final result of the
accumulator
for expr in aggr_expr {
fields.push(expr.field()?)
@@ -1008,7 +1014,7 @@ fn aggregate_expressions(
col_idx_base: usize,
) -> Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
match mode {
- AggregateMode::Partial | AggregateMode::Single => Ok(aggr_expr
+ AggregateMode::Partial | AggregateMode::Single |
AggregateMode::Partitioned => Ok(aggr_expr
.iter()
.map(|agg| {
let pre_cast_type = if let Some(Sum {
@@ -1141,7 +1147,8 @@ fn finalize_aggregation(
}
AggregateMode::Final
| AggregateMode::FinalPartitioned
- | AggregateMode::Single => {
+ | AggregateMode::Single
+ | AggregateMode::Partitioned => {
// merge the state to the final value
accumulators
.iter()
diff --git a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
index 89d392f0b6..966f1dc199 100644
--- a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
+++ b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
@@ -79,7 +79,7 @@ impl AggregateStream {
let aggregate_expressions = aggregate_expressions(&agg.aggr_expr,
&agg.mode, 0)?;
let filter_expressions = match agg.mode {
- AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
+ AggregateMode::Partial | AggregateMode::Single |
AggregateMode::Partitioned => agg_filter_expr,
AggregateMode::Final | AggregateMode::FinalPartitioned => {
vec![None; agg.aggr_expr.len()]
}
@@ -222,7 +222,7 @@ fn aggregate_batch(
// 1.4
let size_pre = accum.size();
let res = match mode {
- AggregateMode::Partial | AggregateMode::Single => {
+ AggregateMode::Partial | AggregateMode::Single |
AggregateMode::Partitioned=> {
accum.update_batch(values)
}
AggregateMode::Final | AggregateMode::FinalPartitioned => {
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index e272b60b05..068a6dacf8 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -230,7 +230,7 @@ impl GroupedHashAggregateStream {
)?;
let filter_expressions = match agg.mode {
- AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
+ AggregateMode::Partial | AggregateMode::Single |
AggregateMode::Partitioned => agg_filter_expr,
AggregateMode::Final | AggregateMode::FinalPartitioned => {
vec![None; agg.aggr_expr.len()]
}
@@ -496,7 +496,7 @@ impl GroupedHashAggregateStream {
// Call the appropriate method on each aggregator with
// the entire input row and the relevant group indexes
match self.mode {
- AggregateMode::Partial | AggregateMode::Single => {
+ AggregateMode::Partial | AggregateMode::Single |
AggregateMode::Partitioned => {
acc.update_batch(
values,
group_indices,
@@ -543,7 +543,8 @@ impl GroupedHashAggregateStream {
AggregateMode::Partial => output.extend(acc.state()?),
AggregateMode::Final
| AggregateMode::FinalPartitioned
- | AggregateMode::Single => output.push(acc.evaluate()?),
+ | AggregateMode::Single
+ | AggregateMode::Partitioned => output.push(acc.evaluate()?),
}
}
@@ -573,4 +574,4 @@ impl ScratchSpace {
+ self.current_group_indices.allocated_size()
+ self.hashes_buffer.allocated_size()
}
-}
+}
\ No newline at end of file
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index de34353d59..f1b5698a05 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -718,8 +718,8 @@ impl DefaultPhysicalPlanner {
let (aggregates, filters, order_bys) : (Vec<_>, Vec<_>,
Vec<_>) = multiunzip(agg_filter.into_iter());
- let initial_aggr = Arc::new(AggregateExec::try_new(
- AggregateMode::Partial,
+ let final_grouping_set = Arc::new(AggregateExec::try_new(
+ AggregateMode::Partitioned,
groups.clone(),
aggregates.clone(),
filters.clone(),
@@ -728,42 +728,26 @@ impl DefaultPhysicalPlanner {
physical_input_schema.clone(),
)?);
- // update group column indices based on partial aggregate
plan evaluation
- let final_group: Vec<Arc<dyn PhysicalExpr>> =
initial_aggr.output_group_expr();
-
- let can_repartition = !groups.is_empty()
- && session_state.config().target_partitions() > 1
- && session_state.config().repartition_aggregations();
-
- let (initial_aggr, next_partition_mode): (
- Arc<dyn ExecutionPlan>,
- AggregateMode,
- ) = if can_repartition {
- // construct a second aggregation with
'AggregateMode::FinalPartitioned'
- (initial_aggr, AggregateMode::FinalPartitioned)
- } else {
- // construct a second aggregation, keeping the final
column name equal to the
- // first aggregation and the expressions corresponding
to the respective aggregate
- (initial_aggr, AggregateMode::Final)
- };
-
- let final_grouping_set = PhysicalGroupBy::new_single(
- final_group
- .iter()
- .enumerate()
- .map(|(i, expr)| (expr.clone(),
groups.expr()[i].1.clone()))
- .collect()
- );
-
- Ok(Arc::new(AggregateExec::try_new(
- next_partition_mode,
- final_grouping_set,
- aggregates,
- filters,
- order_bys,
- initial_aggr,
- physical_input_schema.clone(),
- )?))
+ // // update group column indices based on partial
aggregate plan evaluation
+ // let final_group: Vec<Arc<dyn PhysicalExpr>> =
initial_aggr.output_group_expr();
+
+ // let can_repartition = !groups.is_empty()
+ // && session_state.config().target_partitions() > 1
+ // &&
session_state.config().repartition_aggregations();
+
+ // let (initial_aggr, next_partition_mode): (
+ // Arc<dyn ExecutionPlan>,
+ // AggregateMode,
+ // ) = if can_repartition {
+ // // construct a second aggregation with
'AggregateMode::FinalPartitioned'
+ // (initial_aggr, AggregateMode::FinalPartitioned)
+ // } else {
+ // // construct a second aggregation, keeping the
final column name equal to the
+ // // first aggregation and the expressions
corresponding to the respective aggregate
+ // (initial_aggr, AggregateMode::Final)
+ // };
+
+ Ok(final_grouping_set)
}
LogicalPlan::Projection(Projection { input, expr, .. }) => {
let input_exec = self.create_initial_plan(input,
session_state).await?;
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 89bca57cf3..df2758534b 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1321,6 +1321,7 @@ enum AggregateMode {
FINAL = 1;
FINAL_PARTITIONED = 2;
SINGLE = 3;
+ PARTITIONED_MODE = 4;
}
message WindowAggExecNode {
diff --git a/datafusion/proto/proto/proto_descriptor.bin
b/datafusion/proto/proto/proto_descriptor.bin
new file mode 100644
index 0000000000..69343b9312
Binary files /dev/null and b/datafusion/proto/proto/proto_descriptor.bin differ
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/datafusion.rs
similarity index 99%
copy from datafusion/proto/src/generated/prost.rs
copy to datafusion/proto/src/datafusion.rs
index 251760f090..1acbb238c5 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/datafusion.rs
@@ -2852,6 +2852,7 @@ pub enum AggregateMode {
Final = 1,
FinalPartitioned = 2,
Single = 3,
+ PartitionedMode = 4,
}
impl AggregateMode {
/// String value of the enum field names used in the ProtoBuf definition.
@@ -2864,6 +2865,7 @@ impl AggregateMode {
AggregateMode::Final => "FINAL",
AggregateMode::FinalPartitioned => "FINAL_PARTITIONED",
AggregateMode::Single => "SINGLE",
+ AggregateMode::PartitionedMode => "PARTITIONED_MODE",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
@@ -2873,6 +2875,7 @@ impl AggregateMode {
"FINAL" => Some(Self::Final),
"FINAL_PARTITIONED" => Some(Self::FinalPartitioned),
"SINGLE" => Some(Self::Single),
+ "PARTITIONED_MODE" => Some(Self::PartitionedMode),
_ => None,
}
}
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/datafusion.serde.rs
similarity index 99%
copy from datafusion/proto/src/generated/pbjson.rs
copy to datafusion/proto/src/datafusion.serde.rs
index 590b462ad8..b415aaf56b 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/datafusion.serde.rs
@@ -588,6 +588,7 @@ impl serde::Serialize for AggregateMode {
Self::Final => "FINAL",
Self::FinalPartitioned => "FINAL_PARTITIONED",
Self::Single => "SINGLE",
+ Self::PartitionedMode => "PARTITIONED_MODE",
};
serializer.serialize_str(variant)
}
@@ -603,6 +604,7 @@ impl<'de> serde::Deserialize<'de> for AggregateMode {
"FINAL",
"FINAL_PARTITIONED",
"SINGLE",
+ "PARTITIONED_MODE",
];
struct GeneratedVisitor;
@@ -649,6 +651,7 @@ impl<'de> serde::Deserialize<'de> for AggregateMode {
"FINAL" => Ok(AggregateMode::Final),
"FINAL_PARTITIONED" => Ok(AggregateMode::FinalPartitioned),
"SINGLE" => Ok(AggregateMode::Single),
+ "PARTITIONED_MODE" => Ok(AggregateMode::PartitionedMode),
_ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
}
}
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 590b462ad8..b415aaf56b 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -588,6 +588,7 @@ impl serde::Serialize for AggregateMode {
Self::Final => "FINAL",
Self::FinalPartitioned => "FINAL_PARTITIONED",
Self::Single => "SINGLE",
+ Self::PartitionedMode => "PARTITIONED_MODE",
};
serializer.serialize_str(variant)
}
@@ -603,6 +604,7 @@ impl<'de> serde::Deserialize<'de> for AggregateMode {
"FINAL",
"FINAL_PARTITIONED",
"SINGLE",
+ "PARTITIONED_MODE",
];
struct GeneratedVisitor;
@@ -649,6 +651,7 @@ impl<'de> serde::Deserialize<'de> for AggregateMode {
"FINAL" => Ok(AggregateMode::Final),
"FINAL_PARTITIONED" => Ok(AggregateMode::FinalPartitioned),
"SINGLE" => Ok(AggregateMode::Single),
+ "PARTITIONED_MODE" => Ok(AggregateMode::PartitionedMode),
_ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 251760f090..1acbb238c5 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2852,6 +2852,7 @@ pub enum AggregateMode {
Final = 1,
FinalPartitioned = 2,
Single = 3,
+ PartitionedMode = 4,
}
impl AggregateMode {
/// String value of the enum field names used in the ProtoBuf definition.
@@ -2864,6 +2865,7 @@ impl AggregateMode {
AggregateMode::Final => "FINAL",
AggregateMode::FinalPartitioned => "FINAL_PARTITIONED",
AggregateMode::Single => "SINGLE",
+ AggregateMode::PartitionedMode => "PARTITIONED_MODE",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
@@ -2873,6 +2875,7 @@ impl AggregateMode {
"FINAL" => Some(Self::Final),
"FINAL_PARTITIONED" => Some(Self::FinalPartitioned),
"SINGLE" => Some(Self::Single),
+ "PARTITIONED_MODE" => Some(Self::PartitionedMode),
_ => None,
}
}
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index b5b4aeb2da..980617a1e2 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -358,6 +358,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
AggregateMode::FinalPartitioned
}
protobuf::AggregateMode::Single => AggregateMode::Single,
+ protobuf::AggregateMode::PartitionedMode =>
AggregateMode::Partitioned,
};
let num_expr = hash_agg.group_expr.len();
@@ -996,6 +997,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
protobuf::AggregateMode::FinalPartitioned
}
AggregateMode::Single => protobuf::AggregateMode::Single,
+ AggregateMode::Partitioned =>
protobuf::AggregateMode::PartitionedMode,
};
let input_schema = exec.input_schema();
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(