This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9289c207c7 Fix required partitioning of Single aggregation mode (#6950)
9289c207c7 is described below
commit 9289c207c73acc46f083493e7ecd86d9add5b122
Author: Daniël Heres <[email protected]>
AuthorDate: Sat Jul 15 14:21:10 2023 +0200
Fix required partitioning of Single aggregation mode (#6950)
* Fix required partitioning of Single aggregation mode
* Add SinglePartition mode
* Merge
* Fmt
* Tests
* Remove
* Remove
* Fix
---------
Co-authored-by: Daniël Heres <[email protected]>
---
.../combine_partial_final_agg.rs | 8 +++++++-
.../aggregates/bounded_aggregate_stream.rs | 18 +++++++++++-----
.../core/src/physical_plan/aggregates/mod.rs | 24 ++++++++++++++++------
.../src/physical_plan/aggregates/no_grouping.rs | 10 +++++----
.../core/src/physical_plan/aggregates/row_hash.rs | 11 +++++++---
.../core/tests/sqllogictests/test_files/joins.slt | 4 ++--
.../sqllogictests/test_files/tpch/q13.slt.part | 2 +-
.../core/tests/sqllogictests/test_files/union.slt | 2 +-
.../core/tests/sqllogictests/test_files/window.slt | 2 +-
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/generated/pbjson.rs | 3 +++
datafusion/proto/src/generated/prost.rs | 3 +++
datafusion/proto/src/physical_plan/mod.rs | 6 ++++++
13 files changed, 70 insertions(+), 24 deletions(-)
diff --git
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index 05b0dc4f19..de47f3fbee 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -91,8 +91,14 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
),
)
{
+ let mode = if *final_mode ==
AggregateMode::Final
+ {
+ AggregateMode::Single
+ } else {
+ AggregateMode::SinglePartitioned
+ };
AggregateExec::try_new(
- AggregateMode::Single,
+ mode,
input_group_by.clone(),
input_aggr_expr.to_vec(),
input_filter_expr.to_vec(),
diff --git
a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
index a89ef3aaff..5982701e21 100644
--- a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
+++ b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
@@ -155,7 +155,9 @@ impl BoundedAggregateStream {
let all_aggregate_expressions =
aggregates::aggregate_expressions(&agg.aggr_expr, &agg.mode,
start_idx)?;
let filter_expressions = match agg.mode {
- AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
+ AggregateMode::Partial
+ | AggregateMode::Single
+ | AggregateMode::SinglePartitioned => agg_filter_expr,
AggregateMode::Final | AggregateMode::FinalPartitioned => {
vec![None; agg.aggr_expr.len()]
}
@@ -599,7 +601,9 @@ impl BoundedAggregateStream {
state_accessor
.point_to(0,
group_state.aggregation_buffer.as_mut_slice());
match self.mode {
- AggregateMode::Partial | AggregateMode::Single => {
+ AggregateMode::Partial
+ | AggregateMode::Single
+ | AggregateMode::SinglePartitioned => {
accumulator.update_batch(&values, &mut
state_accessor)
}
AggregateMode::FinalPartitioned |
AggregateMode::Final => {
@@ -622,7 +626,9 @@ impl BoundedAggregateStream {
)?;
let size_pre = accumulator.size();
let res = match self.mode {
- AggregateMode::Partial | AggregateMode::Single => {
+ AggregateMode::Partial
+ | AggregateMode::Single
+ | AggregateMode::SinglePartitioned => {
accumulator.update_batch(&values)
}
AggregateMode::FinalPartitioned |
AggregateMode::Final => {
@@ -973,7 +979,8 @@ impl BoundedAggregateStream {
}
AggregateMode::Final
| AggregateMode::FinalPartitioned
- | AggregateMode::Single => {
+ | AggregateMode::Single
+ | AggregateMode::SinglePartitioned => {
let mut results = vec![];
for (idx, acc) in self.row_accumulators.iter().enumerate() {
let mut state_accessor =
RowAccessor::new(&self.row_aggr_schema);
@@ -1016,7 +1023,8 @@ impl BoundedAggregateStream {
),
AggregateMode::Final
| AggregateMode::FinalPartitioned
- | AggregateMode::Single => ScalarValue::iter_to_array(
+ | AggregateMode::Single
+ | AggregateMode::SinglePartitioned =>
ScalarValue::iter_to_array(
group_state_chunk.iter().map(|group_state| {
group_state.group_state.accumulator_set[idx]
.evaluate()
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 5edb047677..b406320dda 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -77,7 +77,13 @@ pub enum AggregateMode {
/// Applies the entire logical aggregation operation in a single operator,
/// as opposed to Partial / Final modes which apply the logical
aggregation using
/// two operators.
+ /// This mode requires tha the input is a single partition (like Final)
Single,
+ /// Applies the entire logical aggregation operation in a single operator,
+ /// as opposed to Partial / Final modes which apply the logical
aggregation using
+ /// two operators.
+ /// This mode requires tha the input is partitioned by group key (like
FinalPartitioned)
+ SinglePartitioned,
}
/// Group By expression modes
@@ -872,13 +878,15 @@ impl ExecutionPlan for AggregateExec {
fn required_input_distribution(&self) -> Vec<Distribution> {
match &self.mode {
- AggregateMode::Partial | AggregateMode::Single => {
+ AggregateMode::Partial => {
vec![Distribution::UnspecifiedDistribution]
}
- AggregateMode::FinalPartitioned => {
+ AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned
=> {
vec![Distribution::HashPartitioned(self.output_group_expr())]
}
- AggregateMode::Final => vec![Distribution::SinglePartition],
+ AggregateMode::Final | AggregateMode::Single => {
+ vec![Distribution::SinglePartition]
+ }
}
}
@@ -982,7 +990,8 @@ fn create_schema(
}
AggregateMode::Final
| AggregateMode::FinalPartitioned
- | AggregateMode::Single => {
+ | AggregateMode::Single
+ | AggregateMode::SinglePartitioned => {
// in final mode, the field with the final result of the
accumulator
for expr in aggr_expr {
fields.push(expr.field()?)
@@ -1008,7 +1017,9 @@ fn aggregate_expressions(
col_idx_base: usize,
) -> Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
match mode {
- AggregateMode::Partial | AggregateMode::Single => Ok(aggr_expr
+ AggregateMode::Partial
+ | AggregateMode::Single
+ | AggregateMode::SinglePartitioned => Ok(aggr_expr
.iter()
.map(|agg| {
let pre_cast_type = if let Some(Sum {
@@ -1141,7 +1152,8 @@ fn finalize_aggregation(
}
AggregateMode::Final
| AggregateMode::FinalPartitioned
- | AggregateMode::Single => {
+ | AggregateMode::Single
+ | AggregateMode::SinglePartitioned => {
// merge the state to the final value
accumulators
.iter()
diff --git a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
index 89d392f0b6..610c24faab 100644
--- a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
+++ b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
@@ -79,7 +79,9 @@ impl AggregateStream {
let aggregate_expressions = aggregate_expressions(&agg.aggr_expr,
&agg.mode, 0)?;
let filter_expressions = match agg.mode {
- AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
+ AggregateMode::Partial
+ | AggregateMode::Single
+ | AggregateMode::SinglePartitioned => agg_filter_expr,
AggregateMode::Final | AggregateMode::FinalPartitioned => {
vec![None; agg.aggr_expr.len()]
}
@@ -222,9 +224,9 @@ fn aggregate_batch(
// 1.4
let size_pre = accum.size();
let res = match mode {
- AggregateMode::Partial | AggregateMode::Single => {
- accum.update_batch(values)
- }
+ AggregateMode::Partial
+ | AggregateMode::Single
+ | AggregateMode::SinglePartitioned =>
accum.update_batch(values),
AggregateMode::Final | AggregateMode::FinalPartitioned => {
accum.merge_batch(values)
}
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index e272b60b05..c57f436324 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -230,7 +230,9 @@ impl GroupedHashAggregateStream {
)?;
let filter_expressions = match agg.mode {
- AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
+ AggregateMode::Partial
+ | AggregateMode::Single
+ | AggregateMode::SinglePartitioned => agg_filter_expr,
AggregateMode::Final | AggregateMode::FinalPartitioned => {
vec![None; agg.aggr_expr.len()]
}
@@ -496,7 +498,9 @@ impl GroupedHashAggregateStream {
// Call the appropriate method on each aggregator with
// the entire input row and the relevant group indexes
match self.mode {
- AggregateMode::Partial | AggregateMode::Single => {
+ AggregateMode::Partial
+ | AggregateMode::Single
+ | AggregateMode::SinglePartitioned => {
acc.update_batch(
values,
group_indices,
@@ -543,7 +547,8 @@ impl GroupedHashAggregateStream {
AggregateMode::Partial => output.extend(acc.state()?),
AggregateMode::Final
| AggregateMode::FinalPartitioned
- | AggregateMode::Single => output.push(acc.evaluate()?),
+ | AggregateMode::Single
+ | AggregateMode::SinglePartitioned =>
output.push(acc.evaluate()?),
}
}
diff --git a/datafusion/core/tests/sqllogictests/test_files/joins.slt
b/datafusion/core/tests/sqllogictests/test_files/joins.slt
index 95f8999aee..c2517ead0f 100644
--- a/datafusion/core/tests/sqllogictests/test_files/joins.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/joins.slt
@@ -1309,7 +1309,7 @@ Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[]]
------TableScan: join_t1 projection=[t1_id]
------TableScan: join_t2 projection=[t2_id]
physical_plan
-AggregateExec: mode=Single, gby=[t1_id@0 as t1_id], aggr=[]
+AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id], aggr=[]
--ProjectionExec: expr=[t1_id@0 as t1_id]
----CoalesceBatchesExec: target_batch_size=4096
------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)]
@@ -1337,7 +1337,7 @@ Projection: COUNT(UInt8(1))
--------TableScan: join_t2 projection=[t2_id]
physical_plan
ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]
---AggregateExec: mode=Single, gby=[t1_id@0 as t1_id], aggr=[COUNT(UInt8(1))]
+--AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id],
aggr=[COUNT(UInt8(1))]
----ProjectionExec: expr=[t1_id@0 as t1_id]
------CoalesceBatchesExec: target_batch_size=4096
--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0,
t2_id@0)]
diff --git a/datafusion/core/tests/sqllogictests/test_files/tpch/q13.slt.part
b/datafusion/core/tests/sqllogictests/test_files/tpch/q13.slt.part
index bd358962b5..732ae27f01 100644
--- a/datafusion/core/tests/sqllogictests/test_files/tpch/q13.slt.part
+++ b/datafusion/core/tests/sqllogictests/test_files/tpch/q13.slt.part
@@ -65,7 +65,7 @@ GlobalLimitExec: skip=0, fetch=10
--------------AggregateExec: mode=Partial, gby=[c_count@0 as c_count],
aggr=[COUNT(UInt8(1))]
----------------ProjectionExec: expr=[COUNT(orders.o_orderkey)@0 as c_count]
------------------ProjectionExec: expr=[COUNT(orders.o_orderkey)@1 as
COUNT(orders.o_orderkey)]
---------------------AggregateExec: mode=Single, gby=[c_custkey@0 as
c_custkey], aggr=[COUNT(orders.o_orderkey)]
+--------------------AggregateExec: mode=SinglePartitioned, gby=[c_custkey@0 as
c_custkey], aggr=[COUNT(orders.o_orderkey)]
----------------------ProjectionExec: expr=[c_custkey@0 as c_custkey,
o_orderkey@1 as o_orderkey]
------------------------CoalesceBatchesExec: target_batch_size=8192
--------------------------HashJoinExec: mode=Partitioned, join_type=Left,
on=[(c_custkey@0, o_custkey@1)]
diff --git a/datafusion/core/tests/sqllogictests/test_files/union.slt
b/datafusion/core/tests/sqllogictests/test_files/union.slt
index 2b3022ddd1..1f0fa1d456 100644
--- a/datafusion/core/tests/sqllogictests/test_files/union.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/union.slt
@@ -347,7 +347,7 @@ Projection: COUNT(UInt8(1))
--------TableScan: t2 projection=[name]
physical_plan
ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]
---AggregateExec: mode=Single, gby=[name@0 as name], aggr=[COUNT(UInt8(1))]
+--AggregateExec: mode=SinglePartitioned, gby=[name@0 as name],
aggr=[COUNT(UInt8(1))]
----InterleaveExec
------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
--------CoalesceBatchesExec: target_batch_size=8192
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index c4d745f8f1..21f76eb9cb 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -357,7 +357,7 @@ Sort: d.b ASC NULLS LAST
physical_plan
SortPreservingMergeExec: [b@0 ASC NULLS LAST]
--ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as
MAX(d.seq)]
-----AggregateExec: mode=Single, gby=[b@2 as b], aggr=[MAX(d.a), MAX(d.seq)],
ordering_mode=FullyOrdered
+----AggregateExec: mode=SinglePartitioned, gby=[b@2 as b], aggr=[MAX(d.a),
MAX(d.seq)], ordering_mode=FullyOrdered
------ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0 as
a, b@1 as b]
--------BoundedWindowAggExec: wdw=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY
[s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:
Ok(Field { name: "ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64,
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound:
CurrentRow }], mode=[Sorted]
----------SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST]
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index b26d18947f..a1caa4c621 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1322,6 +1322,7 @@ enum AggregateMode {
FINAL = 1;
FINAL_PARTITIONED = 2;
SINGLE = 3;
+ SINGLE_PARTITIONED = 4;
}
message WindowAggExecNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 09e85f11a7..4155a052cf 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -588,6 +588,7 @@ impl serde::Serialize for AggregateMode {
Self::Final => "FINAL",
Self::FinalPartitioned => "FINAL_PARTITIONED",
Self::Single => "SINGLE",
+ Self::SinglePartitioned => "SINGLE_PARTITIONED",
};
serializer.serialize_str(variant)
}
@@ -603,6 +604,7 @@ impl<'de> serde::Deserialize<'de> for AggregateMode {
"FINAL",
"FINAL_PARTITIONED",
"SINGLE",
+ "SINGLE_PARTITIONED",
];
struct GeneratedVisitor;
@@ -649,6 +651,7 @@ impl<'de> serde::Deserialize<'de> for AggregateMode {
"FINAL" => Ok(AggregateMode::Final),
"FINAL_PARTITIONED" => Ok(AggregateMode::FinalPartitioned),
"SINGLE" => Ok(AggregateMode::Single),
+ "SINGLE_PARTITIONED" =>
Ok(AggregateMode::SinglePartitioned),
_ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 516aa325bc..af0703460a 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2855,6 +2855,7 @@ pub enum AggregateMode {
Final = 1,
FinalPartitioned = 2,
Single = 3,
+ SinglePartitioned = 4,
}
impl AggregateMode {
/// String value of the enum field names used in the ProtoBuf definition.
@@ -2867,6 +2868,7 @@ impl AggregateMode {
AggregateMode::Final => "FINAL",
AggregateMode::FinalPartitioned => "FINAL_PARTITIONED",
AggregateMode::Single => "SINGLE",
+ AggregateMode::SinglePartitioned => "SINGLE_PARTITIONED",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
@@ -2876,6 +2878,7 @@ impl AggregateMode {
"FINAL" => Some(Self::Final),
"FINAL_PARTITIONED" => Some(Self::FinalPartitioned),
"SINGLE" => Some(Self::Single),
+ "SINGLE_PARTITIONED" => Some(Self::SinglePartitioned),
_ => None,
}
}
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index b5b4aeb2da..9e9b391a6c 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -358,6 +358,9 @@ impl AsExecutionPlan for PhysicalPlanNode {
AggregateMode::FinalPartitioned
}
protobuf::AggregateMode::Single => AggregateMode::Single,
+ protobuf::AggregateMode::SinglePartitioned => {
+ AggregateMode::SinglePartitioned
+ }
};
let num_expr = hash_agg.group_expr.len();
@@ -996,6 +999,9 @@ impl AsExecutionPlan for PhysicalPlanNode {
protobuf::AggregateMode::FinalPartitioned
}
AggregateMode::Single => protobuf::AggregateMode::Single,
+ AggregateMode::SinglePartitioned => {
+ protobuf::AggregateMode::SinglePartitioned
+ }
};
let input_schema = exec.input_schema();
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(