This is an automated email from the ASF dual-hosted git repository.
yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 3b42f3d62 Re-organize and rename aggregates physical plan (#2388)
3b42f3d62 is described below
commit 3b42f3d62027982fa88b70e696a6bce2d8866a31
Author: Yijie Shen <[email protected]>
AuthorDate: Mon May 2 08:41:14 2022 +0800
Re-organize and rename aggregates physical plan (#2388)
* first move: re-group aggregates functionalities in
core/physical_p/aggregates
* address review comments feedback
* naming
---
ballista/rust/core/proto/ballista.proto | 4 +-
ballista/rust/core/src/serde/physical_plan/mod.rs | 34 +-
ballista/rust/core/src/utils.rs | 6 +-
ballista/rust/scheduler/src/planner.rs | 24 +-
datafusion/core/src/lib.rs | 2 +-
.../src/physical_optimizer/aggregate_statistics.rs | 53 +-
.../src/physical_optimizer/coalesce_batches.rs | 4 +-
.../core/src/physical_optimizer/repartition.rs | 26 +-
.../core/src/physical_plan/aggregates/hash.rs | 477 +++++++
.../core/src/physical_plan/aggregates/mod.rs | 719 +++++++++++
.../src/physical_plan/aggregates/no_grouping.rs | 165 +++
.../core/src/physical_plan/hash_aggregate.rs | 1299 --------------------
datafusion/core/src/physical_plan/mod.rs | 1 -
datafusion/core/src/physical_plan/planner.rs | 10 +-
datafusion/core/tests/sql/avro.rs | 4 +-
datafusion/core/tests/sql/explain_analyze.rs | 10 +-
datafusion/core/tests/sql/json.rs | 4 +-
17 files changed, 1451 insertions(+), 1391 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index e7b1a7508..d0357fb12 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -266,7 +266,7 @@ message PhysicalPlanNode {
ProjectionExecNode projection = 4;
GlobalLimitExecNode global_limit = 6;
LocalLimitExecNode local_limit = 7;
- HashAggregateExecNode hash_aggregate = 8;
+ AggregateExecNode aggregate = 8;
HashJoinExecNode hash_join = 9;
ShuffleReaderExecNode shuffle_reader = 10;
SortExecNode sort = 11;
@@ -519,7 +519,7 @@ message WindowAggExecNode {
datafusion.Schema input_schema = 4;
}
-message HashAggregateExecNode {
+message AggregateExecNode {
repeated PhysicalExprNode group_expr = 1;
repeated PhysicalExprNode aggr_expr = 2;
AggregateMode mode = 3;
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index ed268820f..f5b495b67 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -28,7 +28,8 @@ use datafusion::datasource::listing::PartitionedFile;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::window_frames::WindowFrame;
use datafusion::logical_plan::FunctionRegistry;
-use datafusion::physical_plan::aggregates::create_aggregate_expr;
+use datafusion::physical_plan::aggregates::AggregateExec;
+use datafusion::physical_plan::aggregates::{create_aggregate_expr,
AggregateMode};
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::cross_join::CrossJoinExec;
@@ -39,7 +40,6 @@ use datafusion::physical_plan::file_format::{
AvroExec, CsvExec, FileScanConfig, ParquetExec,
};
use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::hash_aggregate::{AggregateMode,
HashAggregateExec};
use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion::physical_plan::projection::ProjectionExec;
@@ -306,19 +306,21 @@ impl AsExecutionPlan for PhysicalPlanNode {
Arc::new((&input_schema).try_into()?),
)?))
}
- PhysicalPlanType::HashAggregate(hash_agg) => {
+ PhysicalPlanType::Aggregate(hash_agg) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
hash_agg.input,
registry,
runtime,
extension_codec
)?;
- let mode =
protobuf::AggregateMode::from_i32(hash_agg.mode).ok_or_else(|| {
- proto_error(format!(
- "Received a HashAggregateNode message with unknown
AggregateMode {}",
+ let mode =
protobuf::AggregateMode::from_i32(hash_agg.mode).ok_or_else(
+ || {
+ proto_error(format!(
+ "Received a AggregateNode message with unknown
AggregateMode {}",
hash_agg.mode
))
- })?;
+ },
+ )?;
let agg_mode: AggregateMode = match mode {
protobuf::AggregateMode::Partial => AggregateMode::Partial,
protobuf::AggregateMode::Final => AggregateMode::Final,
@@ -341,7 +343,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
.as_ref()
.ok_or_else(|| {
BallistaError::General(
- "input_schema in HashAggregateNode is
missing.".to_owned(),
+ "input_schema in AggregateNode is
missing.".to_owned(),
)
})?
.clone();
@@ -384,14 +386,14 @@ impl AsExecutionPlan for PhysicalPlanNode {
)?)
}
_ => Err(BallistaError::General(
- "Invalid aggregate expression for
HashAggregateExec"
+ "Invalid aggregate expression for
AggregateExec"
.to_string(),
)),
}
})
.collect::<Result<Vec<_>, _>>()?;
- Ok(Arc::new(HashAggregateExec::try_new(
+ Ok(Arc::new(AggregateExec::try_new(
agg_mode,
group,
physical_aggr_expr,
@@ -730,7 +732,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
},
))),
})
- } else if let Some(exec) = plan.downcast_ref::<HashAggregateExec>() {
+ } else if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
let groups = exec
.group_expr()
.iter()
@@ -768,8 +770,8 @@ impl AsExecutionPlan for PhysicalPlanNode {
extension_codec,
)?;
Ok(protobuf::PhysicalPlanNode {
- physical_plan_type:
Some(PhysicalPlanType::HashAggregate(Box::new(
- protobuf::HashAggregateExecNode {
+ physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
+ protobuf::AggregateExecNode {
group_expr: groups,
group_expr_name: group_names,
aggr_expr: agg,
@@ -1080,12 +1082,12 @@ mod roundtrip_tests {
datasource::listing::PartitionedFile,
logical_plan::{JoinType, Operator},
physical_plan::{
+ aggregates::{AggregateExec, AggregateMode},
empty::EmptyExec,
expressions::{binary, col, lit, InListExpr, NotExpr},
expressions::{Avg, Column, PhysicalSortExpr},
file_format::{FileScanConfig, ParquetExec},
filter::FilterExec,
- hash_aggregate::{AggregateMode, HashAggregateExec},
hash_join::{HashJoinExec, PartitionMode},
limit::{GlobalLimitExec, LocalLimitExec},
sorts::sort::SortExec,
@@ -1212,7 +1214,7 @@ mod roundtrip_tests {
}
#[test]
- fn rountrip_hash_aggregate() -> Result<()> {
+ fn rountrip_aggregate() -> Result<()> {
let field_a = Field::new("a", DataType::Int64, false);
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
@@ -1226,7 +1228,7 @@ mod roundtrip_tests {
DataType::Float64,
))];
- roundtrip_test(Arc::new(HashAggregateExec::try_new(
+ roundtrip_test(Arc::new(AggregateExec::try_new(
AggregateMode::Final,
groups.clone(),
aggregates.clone(),
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 6670ab5ce..85a557e43 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -48,9 +48,9 @@ use datafusion::physical_plan::common::batch_byte_size;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
@@ -151,8 +151,8 @@ fn build_exec_plan_diagram(
id: &mut AtomicUsize,
draw_entity: bool,
) -> Result<usize> {
- let operator_str = if
plan.as_any().downcast_ref::<HashAggregateExec>().is_some() {
- "HashAggregateExec"
+ let operator_str = if
plan.as_any().downcast_ref::<AggregateExec>().is_some() {
+ "AggregateExec"
} else if plan.as_any().downcast_ref::<SortExec>().is_some() {
"SortExec"
} else if plan.as_any().downcast_ref::<ProjectionExec>().is_some() {
diff --git a/ballista/rust/scheduler/src/planner.rs
b/ballista/rust/scheduler/src/planner.rs
index 07168adb6..af1ce1c61 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -276,8 +276,8 @@ mod test {
use ballista_core::error::BallistaError;
use ballista_core::execution_plans::UnresolvedShuffleExec;
use ballista_core::serde::{protobuf, AsExecutionPlan, BallistaCodec};
+ use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
- use datafusion::physical_plan::hash_aggregate::{AggregateMode,
HashAggregateExec};
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{
@@ -298,7 +298,7 @@ mod test {
}
#[tokio::test]
- async fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> {
+ async fn distributed_aggregate_plan() -> Result<(), BallistaError> {
let ctx = datafusion_test_context("testdata").await?;
// simplified form of TPC-H query 1
@@ -327,12 +327,12 @@ mod test {
/* Expected result:
ShuffleWriterExec: Some(Hash([Column { name: "l_returnflag", index: 0
}], 2))
- HashAggregateExec: mode=Partial, gby=[l_returnflag@1 as
l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
+ AggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag],
aggr=[SUM(l_extendedprice Multiply Int64(1))]
CsvExec: source=Path(testdata/lineitem:
[testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]),
has_header=false
ShuffleWriterExec: None
ProjectionExec: expr=[l_returnflag@0 as l_returnflag,
SUM(lineitem.l_extendedprice Multiply Int64(1))@1 as sum_disc_price]
- HashAggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as
l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
+ AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as
l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
CoalesceBatchesExec: target_batch_size=4096
UnresolvedShuffleExec
@@ -346,14 +346,14 @@ mod test {
// verify stage 0
let stage0 = stages[0].children()[0].clone();
- let partial_hash = downcast_exec!(stage0, HashAggregateExec);
+ let partial_hash = downcast_exec!(stage0, AggregateExec);
assert!(*partial_hash.mode() == AggregateMode::Partial);
// verify stage 1
let stage1 = stages[1].children()[0].clone();
let projection = downcast_exec!(stage1, ProjectionExec);
let final_hash = projection.children()[0].clone();
- let final_hash = downcast_exec!(final_hash, HashAggregateExec);
+ let final_hash = downcast_exec!(final_hash, AggregateExec);
assert!(*final_hash.mode() == AggregateMode::FinalPartitioned);
let coalesce = final_hash.children()[0].clone();
let coalesce = downcast_exec!(coalesce, CoalesceBatchesExec);
@@ -449,7 +449,7 @@ order by
CsvExec: source=Path(testdata/orders: [testdata/orders/orders.tbl]),
has_header=false
ShuffleWriterExec: Some(Hash([Column { name: "l_shipmode", index: 0
}], 2))
- HashAggregateExec: mode=Partial, gby=[l_shipmode@4 as l_shipmode],
aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or
#orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END),
SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And
#orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
+ AggregateExec: mode=Partial, gby=[l_shipmode@4 as l_shipmode],
aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or
#orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END),
SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And
#orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
CoalesceBatchesExec: target_batch_size=4096
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column {
name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })]
CoalesceBatchesExec: target_batch_size=4096
@@ -459,7 +459,7 @@ order by
ShuffleWriterExec: None
ProjectionExec: expr=[l_shipmode@0 as l_shipmode, SUM(CASE WHEN
#orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq
Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, SUM(CASE
WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority
NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count]
- HashAggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as
l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or
#orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END),
SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And
#orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
+ AggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as
l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or
#orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END),
SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And
#orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
CoalesceBatchesExec: target_batch_size=4096
UnresolvedShuffleExec
@@ -514,7 +514,7 @@ order by
.partition_count()
);
- let hash_agg = downcast_exec!(input, HashAggregateExec);
+ let hash_agg = downcast_exec!(input, AggregateExec);
let coalesce_batches = hash_agg.children()[0].clone();
let coalesce_batches = downcast_exec!(coalesce_batches,
CoalesceBatchesExec);
@@ -560,7 +560,7 @@ order by
}
#[tokio::test]
- async fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> {
+ async fn roundtrip_serde_aggregate() -> Result<(), BallistaError> {
let ctx = datafusion_test_context("testdata").await?;
// simplified form of TPC-H query 1
@@ -586,8 +586,8 @@ order by
let partial_hash = stages[0].children()[0].clone();
let partial_hash_serde = roundtrip_operator(partial_hash.clone())?;
- let partial_hash = downcast_exec!(partial_hash, HashAggregateExec);
- let partial_hash_serde = downcast_exec!(partial_hash_serde,
HashAggregateExec);
+ let partial_hash = downcast_exec!(partial_hash, AggregateExec);
+ let partial_hash_serde = downcast_exec!(partial_hash_serde,
AggregateExec);
assert_eq!(
format!("{:?}", partial_hash),
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index fd08d3a0a..055a17f4e 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -158,7 +158,7 @@
//!
//! * Projection: [`ProjectionExec`](physical_plan::projection::ProjectionExec)
//! * Filter: [`FilterExec`](physical_plan::filter::FilterExec)
-//! * Hash and Grouped aggregations:
[`HashAggregateExec`](physical_plan::hash_aggregate::HashAggregateExec)
+//! * Grouped and non-grouped aggregations:
[`AggregateExec`](physical_plan::aggregates::AggregateExec)
//! * Sort: [`SortExec`](physical_plan::sorts::sort::SortExec)
//! * Coalesce partitions:
[`CoalescePartitionsExec`](physical_plan::coalesce_partitions::CoalescePartitionsExec)
//! * Limit: [`LocalLimitExec`](physical_plan::limit::LocalLimitExec) and
[`GlobalLimitExec`](physical_plan::limit::GlobalLimitExec)
diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index 9af053f93..9c548ab9b 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -21,8 +21,8 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use crate::execution::context::SessionConfig;
+use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
use crate::physical_plan::empty::EmptyExec;
-use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{
expressions, AggregateExpr, ColumnStatistics, ExecutionPlan, Statistics,
@@ -53,8 +53,8 @@ impl PhysicalOptimizerRule for AggregateStatistics {
if let Some(partial_agg_exec) = take_optimizable(&*plan) {
let partial_agg_exec = partial_agg_exec
.as_any()
- .downcast_ref::<HashAggregateExec>()
- .expect("take_optimizable() ensures that this is a
HashAggregateExec");
+ .downcast_ref::<AggregateExec>()
+ .expect("take_optimizable() ensures that this is a
AggregateExec");
let stats = partial_agg_exec.input().statistics();
let mut projections = vec![];
for expr in partial_agg_exec.aggr_expr() {
@@ -96,22 +96,22 @@ impl PhysicalOptimizerRule for AggregateStatistics {
}
}
-/// assert if the node passed as argument is a final `HashAggregateExec` node
that can be optimized:
-/// - its child (with posssible intermediate layers) is a partial
`HashAggregateExec` node
+/// assert if the node passed as argument is a final `AggregateExec` node that
can be optimized:
+/// - its child (with posssible intermediate layers) is a partial
`AggregateExec` node
/// - they both have no grouping expression
/// - the statistics are exact
-/// If this is the case, return a ref to the partial `HashAggregateExec`, else
`None`.
-/// We would have prefered to return a casted ref to HashAggregateExec but the
recursion requires
+/// If this is the case, return a ref to the partial `AggregateExec`, else
`None`.
+/// We would have prefered to return a casted ref to AggregateExec but the
recursion requires
/// the `ExecutionPlan.children()` method that returns an owned reference.
fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn
ExecutionPlan>> {
- if let Some(final_agg_exec) =
node.as_any().downcast_ref::<HashAggregateExec>() {
+ if let Some(final_agg_exec) =
node.as_any().downcast_ref::<AggregateExec>() {
if final_agg_exec.mode() == &AggregateMode::Final
&& final_agg_exec.group_expr().is_empty()
{
let mut child = Arc::clone(final_agg_exec.input());
loop {
if let Some(partial_agg_exec) =
- child.as_any().downcast_ref::<HashAggregateExec>()
+ child.as_any().downcast_ref::<AggregateExec>()
{
if partial_agg_exec.mode() == &AggregateMode::Partial
&& partial_agg_exec.group_expr().is_empty()
@@ -260,11 +260,11 @@ mod tests {
use crate::error::Result;
use crate::logical_plan::Operator;
+ use crate::physical_plan::aggregates::AggregateExec;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::common;
use crate::physical_plan::expressions::Count;
use crate::physical_plan::filter::FilterExec;
- use crate::physical_plan::hash_aggregate::HashAggregateExec;
use crate::physical_plan::memory::MemoryExec;
use crate::prelude::SessionContext;
@@ -291,10 +291,7 @@ mod tests {
}
/// Checks that the count optimization was applied and we still get the
right result
- async fn assert_count_optim_success(
- plan: HashAggregateExec,
- nulls: bool,
- ) -> Result<()> {
+ async fn assert_count_optim_success(plan: AggregateExec, nulls: bool) ->
Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let conf = session_ctx.copied_config();
@@ -336,7 +333,7 @@ mod tests {
let source = mock_data()?;
let schema = source.schema();
- let partial_agg = HashAggregateExec::try_new(
+ let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
vec![],
vec![count_expr(None, None)],
@@ -344,7 +341,7 @@ mod tests {
Arc::clone(&schema),
)?;
- let final_agg = HashAggregateExec::try_new(
+ let final_agg = AggregateExec::try_new(
AggregateMode::Final,
vec![],
vec![count_expr(None, None)],
@@ -363,7 +360,7 @@ mod tests {
let source = mock_data()?;
let schema = source.schema();
- let partial_agg = HashAggregateExec::try_new(
+ let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
vec![],
vec![count_expr(Some(&schema), Some("a"))],
@@ -371,7 +368,7 @@ mod tests {
Arc::clone(&schema),
)?;
- let final_agg = HashAggregateExec::try_new(
+ let final_agg = AggregateExec::try_new(
AggregateMode::Final,
vec![],
vec![count_expr(Some(&schema), Some("a"))],
@@ -389,7 +386,7 @@ mod tests {
let source = mock_data()?;
let schema = source.schema();
- let partial_agg = HashAggregateExec::try_new(
+ let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
vec![],
vec![count_expr(None, None)],
@@ -400,7 +397,7 @@ mod tests {
// We introduce an intermediate optimization step between the partial
and final aggregtator
let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg));
- let final_agg = HashAggregateExec::try_new(
+ let final_agg = AggregateExec::try_new(
AggregateMode::Final,
vec![],
vec![count_expr(None, None)],
@@ -418,7 +415,7 @@ mod tests {
let source = mock_data()?;
let schema = source.schema();
- let partial_agg = HashAggregateExec::try_new(
+ let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
vec![],
vec![count_expr(Some(&schema), Some("a"))],
@@ -429,7 +426,7 @@ mod tests {
// We introduce an intermediate optimization step between the partial
and final aggregtator
let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg));
- let final_agg = HashAggregateExec::try_new(
+ let final_agg = AggregateExec::try_new(
AggregateMode::Final,
vec![],
vec![count_expr(Some(&schema), Some("a"))],
@@ -458,7 +455,7 @@ mod tests {
source,
)?);
- let partial_agg = HashAggregateExec::try_new(
+ let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
vec![],
vec![count_expr(None, None)],
@@ -466,7 +463,7 @@ mod tests {
Arc::clone(&schema),
)?;
- let final_agg = HashAggregateExec::try_new(
+ let final_agg = AggregateExec::try_new(
AggregateMode::Final,
vec![],
vec![count_expr(None, None)],
@@ -479,7 +476,7 @@ mod tests {
AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
// check that the original ExecutionPlan was not replaced
- assert!(optimized.as_any().is::<HashAggregateExec>());
+ assert!(optimized.as_any().is::<AggregateExec>());
Ok(())
}
@@ -500,7 +497,7 @@ mod tests {
source,
)?);
- let partial_agg = HashAggregateExec::try_new(
+ let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
vec![],
vec![count_expr(Some(&schema), Some("a"))],
@@ -508,7 +505,7 @@ mod tests {
Arc::clone(&schema),
)?;
- let final_agg = HashAggregateExec::try_new(
+ let final_agg = AggregateExec::try_new(
AggregateMode::Final,
vec![],
vec![count_expr(Some(&schema), Some("a"))],
@@ -521,7 +518,7 @@ mod tests {
AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
// check that the original ExecutionPlan was not replaced
- assert!(optimized.as_any().is::<HashAggregateExec>());
+ assert!(optimized.as_any().is::<AggregateExec>());
Ok(())
}
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index 50f8abe7f..2b8582126 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -62,9 +62,9 @@ impl PhysicalOptimizerRule for CoalesceBatches {
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<RepartitionExec>().is_some();
- // TODO we should also do this for HashAggregateExec but we need to
update tests
+ // TODO we should also do this for AggregateExec but we need to update
tests
// as part of this work - see
https://issues.apache.org/jira/browse/ARROW-11068
- // || plan_any.downcast_ref::<HashAggregateExec>().is_some();
+ // || plan_any.downcast_ref::<AggregateExec>().is_some();
if plan.children().is_empty() {
// leaf node, children cannot be replaced
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs
b/datafusion/core/src/physical_optimizer/repartition.rs
index 2506348fe..3b2c4515e 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -241,10 +241,10 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
+ use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
use crate::physical_plan::expressions::{col, PhysicalSortExpr};
use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::filter::FilterExec;
- use crate::physical_plan::hash_aggregate::{AggregateMode,
HashAggregateExec};
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::sorts::sort::SortExec;
@@ -300,15 +300,15 @@ mod tests {
Arc::new(ProjectionExec::try_new(exprs, input).unwrap())
}
- fn hash_aggregate(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan>
{
+ fn aggregate(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let schema = schema();
Arc::new(
- HashAggregateExec::try_new(
+ AggregateExec::try_new(
AggregateMode::Final,
vec![],
vec![],
Arc::new(
- HashAggregateExec::try_new(
+ AggregateExec::try_new(
AggregateMode::Partial,
vec![],
vec![],
@@ -361,11 +361,11 @@ mod tests {
#[test]
fn added_repartition_to_single_partition() -> Result<()> {
- let plan = hash_aggregate(parquet_exec());
+ let plan = aggregate(parquet_exec());
let expected = [
- "HashAggregateExec: mode=Final, gby=[], aggr=[]",
- "HashAggregateExec: mode=Partial, gby=[], aggr=[]",
+ "AggregateExec: mode=Final, gby=[], aggr=[]",
+ "AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"ParquetExec: limit=None, partitions=[x], projection=[c1]",
];
@@ -376,11 +376,11 @@ mod tests {
#[test]
fn repartition_deepest_node() -> Result<()> {
- let plan = hash_aggregate(filter_exec(parquet_exec()));
+ let plan = aggregate(filter_exec(parquet_exec()));
let expected = &[
- "HashAggregateExec: mode=Final, gby=[], aggr=[]",
- "HashAggregateExec: mode=Partial, gby=[], aggr=[]",
+ "AggregateExec: mode=Final, gby=[], aggr=[]",
+ "AggregateExec: mode=Partial, gby=[], aggr=[]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"ParquetExec: limit=None, partitions=[x], projection=[c1]",
@@ -443,11 +443,11 @@ mod tests {
#[test]
fn repartition_ignores_limit() -> Result<()> {
- let plan =
hash_aggregate(limit_exec(filter_exec(limit_exec(parquet_exec()))));
+ let plan =
aggregate(limit_exec(filter_exec(limit_exec(parquet_exec()))));
let expected = &[
- "HashAggregateExec: mode=Final, gby=[], aggr=[]",
- "HashAggregateExec: mode=Partial, gby=[], aggr=[]",
+ "AggregateExec: mode=Final, gby=[], aggr=[]",
+ "AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"GlobalLimitExec: limit=100",
"LocalLimitExec: limit=100",
diff --git a/datafusion/core/src/physical_plan/aggregates/hash.rs
b/datafusion/core/src/physical_plan/aggregates/hash.rs
new file mode 100644
index 000000000..d5a325367
--- /dev/null
+++ b/datafusion/core/src/physical_plan/aggregates/hash.rs
@@ -0,0 +1,477 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the execution plan for the hash aggregate operation
+
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::vec;
+
+use ahash::RandomState;
+use futures::{
+ ready,
+ stream::{Stream, StreamExt},
+};
+
+use crate::error::Result;
+use crate::physical_plan::aggregates::{AccumulatorItem, AggregateMode};
+use crate::physical_plan::hash_utils::create_hashes;
+use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
+use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+use crate::scalar::ScalarValue;
+
+use arrow::{array::ArrayRef, compute, compute::cast};
+use arrow::{
+ array::{Array, UInt32Builder},
+ error::{ArrowError, Result as ArrowResult},
+};
+use arrow::{
+ datatypes::{Schema, SchemaRef},
+ record_batch::RecordBatch,
+};
+use hashbrown::raw::RawTable;
+
+/*
+The architecture is the following:
+
+1. An accumulator has state that is updated on each batch.
+2. At the end of the aggregation (e.g. end of batches in a partition), the
accumulator converts its state to a RecordBatch of a single row
+3. The RecordBatches of all accumulators are merged (`concatenate` in
`rust/arrow`) together to a single RecordBatch.
+4. The state's RecordBatch is `merge`d to a new state
+5. The state is mapped to the final value
+
+Why:
+
+* Accumulators' state can be statically typed, but it is more efficient to
transmit data from the accumulators via `Array`
+* The `merge` operation must have access to the state of the aggregators
because it uses it to correctly merge
+* It uses Arrow's native dynamically typed object, `Array`.
+* Arrow shines in batch operations and both `merge` and `concatenate` of
uniform types are very performant.
+
+Example: average
+
+* the state is `n: u32` and `sum: f64`
+* For every batch, we update them accordingly.
+* At the end of the accumulation (of a partition), we convert `n` and `sum` to
a RecordBatch of 1 row and two columns: `[n, sum]`
+* The RecordBatch is (sent back / transmitted over network)
+* Once all N record batches arrive, `merge` is performed, which builds a
RecordBatch with N rows and 2 columns.
+* Finally, `get_value` returns an array with one entry computed from the state
+*/
+pub(crate) struct GroupedHashAggregateStream {
+ schema: SchemaRef,
+ input: SendableRecordBatchStream,
+ mode: AggregateMode,
+ accumulators: Accumulators,
+ aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+
+ aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+ group_expr: Vec<Arc<dyn PhysicalExpr>>,
+
+ baseline_metrics: BaselineMetrics,
+ random_state: RandomState,
+ finished: bool,
+}
+
+impl GroupedHashAggregateStream {
+ /// Create a new GroupedHashAggregateStream
+ pub fn new(
+ mode: AggregateMode,
+ schema: SchemaRef,
+ group_expr: Vec<Arc<dyn PhysicalExpr>>,
+ aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+ input: SendableRecordBatchStream,
+ baseline_metrics: BaselineMetrics,
+ ) -> Result<Self> {
+ let timer = baseline_metrics.elapsed_compute().timer();
+
+ // The expressions to evaluate the batch, one vec of expressions per
aggregation.
+ // Assume create_schema() always put group columns in front of aggr
columns, we set
+ // col_idx_base to group expression count.
+ let aggregate_expressions =
+ aggregates::aggregate_expressions(&aggr_expr, &mode,
group_expr.len())?;
+
+ timer.done();
+
+ Ok(Self {
+ schema,
+ mode,
+ input,
+ aggr_expr,
+ group_expr,
+ baseline_metrics,
+ aggregate_expressions,
+ accumulators: Default::default(),
+ random_state: Default::default(),
+ finished: false,
+ })
+ }
+}
+
+impl Stream for GroupedHashAggregateStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let this = &mut *self;
+ if this.finished {
+ return Poll::Ready(None);
+ }
+
+ let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+ loop {
+ let result = match ready!(this.input.poll_next_unpin(cx)) {
+ Some(Ok(batch)) => {
+ let timer = elapsed_compute.timer();
+ let result = group_aggregate_batch(
+ &this.mode,
+ &this.random_state,
+ &this.group_expr,
+ &this.aggr_expr,
+ batch,
+ &mut this.accumulators,
+ &this.aggregate_expressions,
+ );
+
+ timer.done();
+
+ match result {
+ Ok(_) => continue,
+ Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
+ }
+ }
+ Some(Err(e)) => Err(e),
+ None => {
+ this.finished = true;
+ let timer =
this.baseline_metrics.elapsed_compute().timer();
+ let result = create_batch_from_map(
+ &this.mode,
+ &this.accumulators,
+ this.group_expr.len(),
+ &this.schema,
+ )
+ .record_output(&this.baseline_metrics);
+
+ timer.done();
+ result
+ }
+ };
+
+ this.finished = true;
+ return Poll::Ready(Some(result));
+ }
+ }
+}
+
+impl RecordBatchStream for GroupedHashAggregateStream {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}
+
+/// TODO: Make this a member function of [`GroupedHashAggregateStream`]
+fn group_aggregate_batch(
+ mode: &AggregateMode,
+ random_state: &RandomState,
+ group_expr: &[Arc<dyn PhysicalExpr>],
+ aggr_expr: &[Arc<dyn AggregateExpr>],
+ batch: RecordBatch,
+ accumulators: &mut Accumulators,
+ aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>],
+) -> Result<()> {
+ // evaluate the grouping expressions
+ let group_values = evaluate(group_expr, &batch)?;
+
+ // evaluate the aggregation expressions.
+ // We could evaluate them after the `take`, but since we need to evaluate
all
+ // of them anyways, it is more performant to do it while they are together.
+ let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?;
+
+ // 1.1 construct the key from the group values
+ // 1.2 construct the mapping key if it does not exist
+ // 1.3 add the row' index to `indices`
+
+ // track which entries in `accumulators` have rows in this batch to
aggregate
+ let mut groups_with_rows = vec![];
+
+ // 1.1 Calculate the group keys for the group values
+ let mut batch_hashes = vec![0; batch.num_rows()];
+ create_hashes(&group_values, random_state, &mut batch_hashes)?;
+
+ for (row, hash) in batch_hashes.into_iter().enumerate() {
+ let Accumulators { map, group_states } = accumulators;
+
+ let entry = map.get_mut(hash, |(_hash, group_idx)| {
+ // verify that a group that we are inserting with hash is
+ // actually the same key value as the group in
+ // existing_idx (aka group_values @ row)
+ let group_state = &group_states[*group_idx];
+ group_values
+ .iter()
+ .zip(group_state.group_by_values.iter())
+ .all(|(array, scalar)| scalar.eq_array(array, row))
+ });
+
+ match entry {
+ // Existing entry for this group value
+ Some((_hash, group_idx)) => {
+ let group_state = &mut group_states[*group_idx];
+ // 1.3
+ if group_state.indices.is_empty() {
+ groups_with_rows.push(*group_idx);
+ };
+ group_state.indices.push(row as u32); // remember this row
+ }
+ // 1.2 Need to create new entry
+ None => {
+ let accumulator_set =
aggregates::create_accumulators(aggr_expr)?;
+
+ // Copy group values out of arrays into `ScalarValue`s
+ let group_by_values = group_values
+ .iter()
+ .map(|col| ScalarValue::try_from_array(col, row))
+ .collect::<Result<Vec<_>>>()?;
+
+ // Add new entry to group_states and save newly created index
+ let group_state = GroupState {
+ group_by_values: group_by_values.into_boxed_slice(),
+ accumulator_set,
+ indices: vec![row as u32], // 1.3
+ };
+ let group_idx = group_states.len();
+ group_states.push(group_state);
+ groups_with_rows.push(group_idx);
+
+ // for hasher function, use precomputed hash value
+ map.insert(hash, (hash, group_idx), |(hash, _group_idx)|
*hash);
+ }
+ };
+ }
+
+ // Collect all indices + offsets based on keys in this vec
+ let mut batch_indices: UInt32Builder = UInt32Builder::new(0);
+ let mut offsets = vec![0];
+ let mut offset_so_far = 0;
+ for group_idx in groups_with_rows.iter() {
+ let indices = &accumulators.group_states[*group_idx].indices;
+ batch_indices.append_slice(indices)?;
+ offset_so_far += indices.len();
+ offsets.push(offset_so_far);
+ }
+ let batch_indices = batch_indices.finish();
+
+ // `Take` all values based on indices into Arrays
+ let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values
+ .iter()
+ .map(|array| {
+ array
+ .iter()
+ .map(|array| {
+ compute::take(
+ array.as_ref(),
+ &batch_indices,
+ None, // None: no index check
+ )
+ .unwrap()
+ })
+ .collect()
+ // 2.3
+ })
+ .collect();
+
+ // 2.1 for each key in this batch
+ // 2.2 for each aggregation
+ // 2.3 `slice` from each of its arrays the keys' values
+ // 2.4 update / merge the accumulator with the values
+ // 2.5 clear indices
+ groups_with_rows
+ .iter()
+ .zip(offsets.windows(2))
+ .try_for_each(|(group_idx, offsets)| {
+ let group_state = &mut accumulators.group_states[*group_idx];
+ // 2.2
+ group_state
+ .accumulator_set
+ .iter_mut()
+ .zip(values.iter())
+ .map(|(accumulator, aggr_array)| {
+ (
+ accumulator,
+ aggr_array
+ .iter()
+ .map(|array| {
+ // 2.3
+ array.slice(offsets[0], offsets[1] -
offsets[0])
+ })
+ .collect::<Vec<ArrayRef>>(),
+ )
+ })
+ .try_for_each(|(accumulator, values)| match mode {
+ AggregateMode::Partial =>
accumulator.update_batch(&values),
+ AggregateMode::FinalPartitioned | AggregateMode::Final => {
+ // note: the aggregation here is over states, not
values, thus the merge
+ accumulator.merge_batch(&values)
+ }
+ })
+ // 2.5
+ .and({
+ group_state.indices.clear();
+ Ok(())
+ })
+ })?;
+
+ Ok(())
+}
+
+/// The state that is built for each output group.
+#[derive(Debug)]
+struct GroupState {
+ /// The actual group by values, one for each group column
+ group_by_values: Box<[ScalarValue]>,
+
+ // Accumulator state, one for each aggregate
+ accumulator_set: Vec<AccumulatorItem>,
+
+ /// scratch space used to collect indices for input rows in a
+ /// bach that have values to aggregate. Reset on each batch
+ indices: Vec<u32>,
+}
+
+/// The state of all the groups
+#[derive(Default)]
+struct Accumulators {
+ /// Logically maps group values to an index in `group_states`
+ ///
+ /// Uses the raw API of hashbrown to avoid actually storing the
+ /// keys in the table
+ ///
+ /// keys: u64 hashes of the GroupValue
+ /// values: (hash, index into `group_states`)
+ map: RawTable<(u64, usize)>,
+
+ /// State for each group
+ group_states: Vec<GroupState>,
+}
+
+impl std::fmt::Debug for Accumulators {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ // hashes are not store inline, so could only get values
+ let map_string = "RawTable";
+ f.debug_struct("Accumulators")
+ .field("map", &map_string)
+ .field("group_states", &self.group_states)
+ .finish()
+ }
+}
+
+/// Evaluates expressions against a record batch.
+fn evaluate(
+ expr: &[Arc<dyn PhysicalExpr>],
+ batch: &RecordBatch,
+) -> Result<Vec<ArrayRef>> {
+ expr.iter()
+ .map(|expr| expr.evaluate(batch))
+ .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+ .collect::<Result<Vec<_>>>()
+}
+
+/// Evaluates expressions against a record batch.
+fn evaluate_many(
+ expr: &[Vec<Arc<dyn PhysicalExpr>>],
+ batch: &RecordBatch,
+) -> Result<Vec<Vec<ArrayRef>>> {
+ expr.iter()
+ .map(|expr| evaluate(expr, batch))
+ .collect::<Result<Vec<_>>>()
+}
+
+/// Create a RecordBatch with all group keys and accumulator' states or values.
+fn create_batch_from_map(
+ mode: &AggregateMode,
+ accumulators: &Accumulators,
+ num_group_expr: usize,
+ output_schema: &Schema,
+) -> ArrowResult<RecordBatch> {
+ if accumulators.group_states.is_empty() {
+ return Ok(RecordBatch::new_empty(Arc::new(output_schema.to_owned())));
+ }
+ let accs = &accumulators.group_states[0].accumulator_set;
+ let mut acc_data_types: Vec<usize> = vec![];
+
+ // Calculate number/shape of state arrays
+ match mode {
+ AggregateMode::Partial => {
+ for acc in accs.iter() {
+ let state = acc.state()?;
+ acc_data_types.push(state.len());
+ }
+ }
+ AggregateMode::Final | AggregateMode::FinalPartitioned => {
+ acc_data_types = vec![1; accs.len()];
+ }
+ }
+
+ let mut columns = (0..num_group_expr)
+ .map(|i| {
+ ScalarValue::iter_to_array(
+ accumulators
+ .group_states
+ .iter()
+ .map(|group_state| group_state.group_by_values[i].clone()),
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // add state / evaluated arrays
+ for (x, &state_len) in acc_data_types.iter().enumerate() {
+ for y in 0..state_len {
+ match mode {
+ AggregateMode::Partial => {
+ let res = ScalarValue::iter_to_array(
+ accumulators.group_states.iter().map(|group_state| {
+ let x =
group_state.accumulator_set[x].state().unwrap();
+ x[y].clone()
+ }),
+ )?;
+
+ columns.push(res);
+ }
+ AggregateMode::Final | AggregateMode::FinalPartitioned => {
+ let res = ScalarValue::iter_to_array(
+ accumulators.group_states.iter().map(|group_state| {
+ group_state.accumulator_set[x].evaluate().unwrap()
+ }),
+ )?;
+ columns.push(res);
+ }
+ }
+ }
+ }
+
+ // cast output if needed (e.g. for types like Dictionary where
+ // the intermediate GroupByScalar type was not the same as the
+ // output
+ let columns = columns
+ .iter()
+ .zip(output_schema.fields().iter())
+ .map(|(col, desired_field)| cast(col, desired_field.data_type()))
+ .collect::<ArrowResult<Vec<_>>>()?;
+
+ RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns)
+}
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index c0208b239..5e1da793c 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -17,5 +17,724 @@
//! Aggregates functionalities
+use crate::execution::context::TaskContext;
+use crate::physical_plan::aggregates::hash::GroupedHashAggregateStream;
+use crate::physical_plan::aggregates::no_grouping::AggregateStream;
+use crate::physical_plan::metrics::{
+ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
+};
+use crate::physical_plan::{
+ DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ SendableRecordBatchStream, Statistics,
+};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{Field, Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion_common::Result;
+use datafusion_expr::Accumulator;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::{
+ expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr,
+};
+use std::any::Any;
+use std::sync::Arc;
+
+mod hash;
+mod no_grouping;
+
pub use datafusion_expr::AggregateFunction;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
+
+/// Hash aggregate modes
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub enum AggregateMode {
+ /// Partial aggregate that can be applied in parallel across input
partitions
+ Partial,
+ /// Final aggregate that produces a single partition of output
+ Final,
+ /// Final aggregate that works on pre-partitioned data.
+ ///
+ /// This requires the invariant that all rows with a particular
+ /// grouping key are in the same partitions, such as is the case
+ /// with Hash repartitioning on the group keys. If a group key is
+ /// duplicated, duplicate groups would be produced
+ FinalPartitioned,
+}
+
+/// Hash aggregate execution plan
+#[derive(Debug)]
+pub struct AggregateExec {
+ /// Aggregation mode (full, partial)
+ mode: AggregateMode,
+ /// Grouping expressions
+ group_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
+ /// Aggregate expressions
+ aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+ /// Input plan, could be a partial aggregate or the input to the aggregate
+ input: Arc<dyn ExecutionPlan>,
+ /// Schema after the aggregate is applied
+ schema: SchemaRef,
+ /// Input schema before any aggregation is applied. For partial aggregate
this will be the
+ /// same as input.schema() but for the final aggregate it will be the same
as the input
+ /// to the partial aggregate
+ input_schema: SchemaRef,
+ /// Execution Metrics
+ metrics: ExecutionPlanMetricsSet,
+}
+
+impl AggregateExec {
+ /// Create a new hash aggregate execution plan
+ pub fn try_new(
+ mode: AggregateMode,
+ group_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
+ aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+ input: Arc<dyn ExecutionPlan>,
+ input_schema: SchemaRef,
+ ) -> Result<Self> {
+ let schema = create_schema(&input.schema(), &group_expr, &aggr_expr,
mode)?;
+
+ let schema = Arc::new(schema);
+
+ Ok(AggregateExec {
+ mode,
+ group_expr,
+ aggr_expr,
+ input,
+ schema,
+ input_schema,
+ metrics: ExecutionPlanMetricsSet::new(),
+ })
+ }
+
+ /// Aggregation mode (full, partial)
+ pub fn mode(&self) -> &AggregateMode {
+ &self.mode
+ }
+
+ /// Grouping expressions
+ pub fn group_expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] {
+ &self.group_expr
+ }
+
+ /// Grouping expressions as they occur in the output schema
+ pub fn output_group_expr(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ // Update column indices. Since the group by columns come first in the
output schema, their
+ // indices are simply 0..self.group_expr(len).
+ self.group_expr
+ .iter()
+ .enumerate()
+ .map(|(index, (_col, name))| {
+ Arc::new(expressions::Column::new(name, index)) as Arc<dyn
PhysicalExpr>
+ })
+ .collect()
+ }
+
+ /// Aggregate expressions
+ pub fn aggr_expr(&self) -> &[Arc<dyn AggregateExpr>] {
+ &self.aggr_expr
+ }
+
+ /// Input plan
+ pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+ &self.input
+ }
+
+ /// Get the input schema before any aggregates are applied
+ pub fn input_schema(&self) -> SchemaRef {
+ self.input_schema.clone()
+ }
+}
+
+#[async_trait]
+impl ExecutionPlan for AggregateExec {
+ /// Return a reference to Any that can be used for down-casting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ self.input.output_partitioning()
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
+ fn required_child_distribution(&self) -> Distribution {
+ match &self.mode {
+ AggregateMode::Partial => Distribution::UnspecifiedDistribution,
+ AggregateMode::FinalPartitioned => Distribution::HashPartitioned(
+ self.group_expr.iter().map(|x| x.0.clone()).collect(),
+ ),
+ AggregateMode::Final => Distribution::SinglePartition,
+ }
+ }
+
+ fn relies_on_input_order(&self) -> bool {
+ false
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![self.input.clone()]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(AggregateExec::try_new(
+ self.mode,
+ self.group_expr.clone(),
+ self.aggr_expr.clone(),
+ children[0].clone(),
+ self.input_schema.clone(),
+ )?))
+ }
+
+ async fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ let input = self.input.execute(partition, context).await?;
+ let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect();
+
+ let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+
+ if self.group_expr.is_empty() {
+ Ok(Box::pin(AggregateStream::new(
+ self.mode,
+ self.schema.clone(),
+ self.aggr_expr.clone(),
+ input,
+ baseline_metrics,
+ )?))
+ } else {
+ Ok(Box::pin(GroupedHashAggregateStream::new(
+ self.mode,
+ self.schema.clone(),
+ group_expr,
+ self.aggr_expr.clone(),
+ input,
+ baseline_metrics,
+ )?))
+ }
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "AggregateExec: mode={:?}", self.mode)?;
+ let g: Vec<String> = self
+ .group_expr
+ .iter()
+ .map(|(e, alias)| {
+ let e = e.to_string();
+ if &e != alias {
+ format!("{} as {}", e, alias)
+ } else {
+ e
+ }
+ })
+ .collect();
+ write!(f, ", gby=[{}]", g.join(", "))?;
+
+ let a: Vec<String> = self
+ .aggr_expr
+ .iter()
+ .map(|agg| agg.name().to_string())
+ .collect();
+ write!(f, ", aggr=[{}]", a.join(", "))?;
+ }
+ }
+ Ok(())
+ }
+
+ fn statistics(&self) -> Statistics {
+ // TODO stats: group expressions:
+ // - once expressions will be able to compute their own stats, use it
here
+ // - case where we group by on a column for which with have the
`distinct` stat
+ // TODO stats: aggr expression:
+ // - aggregations somtimes also preserve invariants such as min, max...
+ match self.mode {
+ AggregateMode::Final | AggregateMode::FinalPartitioned
+ if self.group_expr.is_empty() =>
+ {
+ Statistics {
+ num_rows: Some(1),
+ is_exact: true,
+ ..Default::default()
+ }
+ }
+ _ => Statistics::default(),
+ }
+ }
+}
+
+fn create_schema(
+ input_schema: &Schema,
+ group_expr: &[(Arc<dyn PhysicalExpr>, String)],
+ aggr_expr: &[Arc<dyn AggregateExpr>],
+ mode: AggregateMode,
+) -> datafusion_common::Result<Schema> {
+ let mut fields = Vec::with_capacity(group_expr.len() + aggr_expr.len());
+ for (expr, name) in group_expr {
+ fields.push(Field::new(
+ name,
+ expr.data_type(input_schema)?,
+ expr.nullable(input_schema)?,
+ ))
+ }
+
+ match mode {
+ AggregateMode::Partial => {
+ // in partial mode, the fields of the accumulator's state
+ for expr in aggr_expr {
+ fields.extend(expr.state_fields()?.iter().cloned())
+ }
+ }
+ AggregateMode::Final | AggregateMode::FinalPartitioned => {
+ // in final mode, the field with the final result of the
accumulator
+ for expr in aggr_expr {
+ fields.push(expr.field()?)
+ }
+ }
+ }
+
+ Ok(Schema::new(fields))
+}
+
+/// returns physical expressions to evaluate against a batch
+/// The expressions are different depending on `mode`:
+/// * Partial: AggregateExpr::expressions
+/// * Final: columns of `AggregateExpr::state_fields()`
+fn aggregate_expressions(
+ aggr_expr: &[Arc<dyn AggregateExpr>],
+ mode: &AggregateMode,
+ col_idx_base: usize,
+) -> datafusion_common::Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
+ match mode {
+ AggregateMode::Partial => {
+ Ok(aggr_expr.iter().map(|agg| agg.expressions()).collect())
+ }
+ // in this mode, we build the merge expressions of the aggregation
+ AggregateMode::Final | AggregateMode::FinalPartitioned => {
+ let mut col_idx_base = col_idx_base;
+ Ok(aggr_expr
+ .iter()
+ .map(|agg| {
+ let exprs = merge_expressions(col_idx_base, agg)?;
+ col_idx_base += exprs.len();
+ Ok(exprs)
+ })
+ .collect::<datafusion_common::Result<Vec<_>>>()?)
+ }
+ }
+}
+
+/// uses `state_fields` to build a vec of physical column expressions required
to merge the
+/// AggregateExpr' accumulator's state.
+///
+/// `index_base` is the starting physical column index for the next expanded
state field.
+fn merge_expressions(
+ index_base: usize,
+ expr: &Arc<dyn AggregateExpr>,
+) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+ Ok(expr
+ .state_fields()?
+ .iter()
+ .enumerate()
+ .map(|(idx, f)| {
+ Arc::new(Column::new(f.name(), index_base + idx)) as Arc<dyn
PhysicalExpr>
+ })
+ .collect::<Vec<_>>())
+}
+
+pub(crate) type AccumulatorItem = Box<dyn Accumulator>;
+
+fn create_accumulators(
+ aggr_expr: &[Arc<dyn AggregateExpr>],
+) -> datafusion_common::Result<Vec<AccumulatorItem>> {
+ aggr_expr
+ .iter()
+ .map(|expr| expr.create_accumulator())
+ .collect::<datafusion_common::Result<Vec<_>>>()
+}
+
+/// returns a vector of ArrayRefs, where each entry corresponds to either the
+/// final value (mode = Final) or states (mode = Partial)
+fn finalize_aggregation(
+ accumulators: &[AccumulatorItem],
+ mode: &AggregateMode,
+) -> datafusion_common::Result<Vec<ArrayRef>> {
+ match mode {
+ AggregateMode::Partial => {
+ // build the vector of states
+ let a = accumulators
+ .iter()
+ .map(|accumulator| accumulator.state())
+ .map(|value| {
+ value.map(|e| {
+ e.iter().map(|v|
v.to_array()).collect::<Vec<ArrayRef>>()
+ })
+ })
+ .collect::<datafusion_common::Result<Vec<_>>>()?;
+ Ok(a.iter().flatten().cloned().collect::<Vec<_>>())
+ }
+ AggregateMode::Final | AggregateMode::FinalPartitioned => {
+ // merge the state to the final value
+ accumulators
+ .iter()
+ .map(|accumulator| accumulator.evaluate().map(|v|
v.to_array()))
+ .collect::<datafusion_common::Result<Vec<ArrayRef>>>()
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::execution::context::TaskContext;
+ use crate::from_slice::FromSlice;
+ use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
+ use crate::physical_plan::expressions::{col, Avg};
+ use crate::test::assert_is_pending;
+ use crate::test::exec::{assert_strong_count_converges_to_zero,
BlockingExec};
+ use crate::{assert_batches_sorted_eq, physical_plan::common};
+ use arrow::array::{Float64Array, UInt32Array};
+ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+ use arrow::error::Result as ArrowResult;
+ use arrow::record_batch::RecordBatch;
+ use async_trait::async_trait;
+ use datafusion_common::{DataFusionError, Result};
+ use datafusion_physical_expr::{AggregateExpr, PhysicalExpr,
PhysicalSortExpr};
+ use futures::{FutureExt, Stream};
+ use std::any::Any;
+ use std::sync::Arc;
+ use std::task::{Context, Poll};
+
+ use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+ use crate::physical_plan::{
+ ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
+ Statistics,
+ };
+ use crate::prelude::SessionContext;
+
+ /// some mock data to aggregates
+ fn some_data() -> (Arc<Schema>, Vec<RecordBatch>) {
+ // define a schema.
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::UInt32, false),
+ Field::new("b", DataType::Float64, false),
+ ]));
+
+ // define data.
+ (
+ schema.clone(),
+ vec![
+ RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(UInt32Array::from_slice(&[2, 3, 4, 4])),
+ Arc::new(Float64Array::from_slice(&[1.0, 2.0, 3.0,
4.0])),
+ ],
+ )
+ .unwrap(),
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(UInt32Array::from_slice(&[2, 3, 3, 4])),
+ Arc::new(Float64Array::from_slice(&[1.0, 2.0, 3.0,
4.0])),
+ ],
+ )
+ .unwrap(),
+ ],
+ )
+ }
+
+ /// build the aggregates on the data from some_data() and check the results
+ async fn check_aggregates(input: Arc<dyn ExecutionPlan>) -> Result<()> {
+ let input_schema = input.schema();
+
+ let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
+ vec![(col("a", &input_schema)?, "a".to_string())];
+
+ let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
+ col("b", &input_schema)?,
+ "AVG(b)".to_string(),
+ DataType::Float64,
+ ))];
+
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+
+ let partial_aggregate = Arc::new(AggregateExec::try_new(
+ AggregateMode::Partial,
+ groups.clone(),
+ aggregates.clone(),
+ input,
+ input_schema.clone(),
+ )?);
+
+ let result =
+ common::collect(partial_aggregate.execute(0,
task_ctx.clone()).await?)
+ .await?;
+
+ let expected = vec![
+ "+---+---------------+-------------+",
+ "| a | AVG(b)[count] | AVG(b)[sum] |",
+ "+---+---------------+-------------+",
+ "| 2 | 2 | 2 |",
+ "| 3 | 3 | 7 |",
+ "| 4 | 3 | 11 |",
+ "+---+---------------+-------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &result);
+
+ let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate));
+
+ let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
+ .map(|i| col(&groups[i].1, &input_schema))
+ .collect::<Result<_>>()?;
+
+ let merged_aggregate = Arc::new(AggregateExec::try_new(
+ AggregateMode::Final,
+ final_group
+ .iter()
+ .enumerate()
+ .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
+ .collect(),
+ aggregates,
+ merge,
+ input_schema,
+ )?);
+
+ let result =
+ common::collect(merged_aggregate.execute(0,
task_ctx.clone()).await?).await?;
+ assert_eq!(result.len(), 1);
+
+ let batch = &result[0];
+ assert_eq!(batch.num_columns(), 2);
+ assert_eq!(batch.num_rows(), 3);
+
+ let expected = vec![
+ "+---+--------------------+",
+ "| a | AVG(b) |",
+ "+---+--------------------+",
+ "| 2 | 1 |",
+ "| 3 | 2.3333333333333335 |", // 3, (2 + 3 + 2) / 3
+ "| 4 | 3.6666666666666665 |", // 4, (3 + 4 + 4) / 3
+ "+---+--------------------+",
+ ];
+
+ assert_batches_sorted_eq!(&expected, &result);
+
+ let metrics = merged_aggregate.metrics().unwrap();
+ let output_rows = metrics.output_rows().unwrap();
+ assert_eq!(3, output_rows);
+
+ Ok(())
+ }
+
+ /// Define a test source that can yield back to runtime before returning
its first item ///
+
+ #[derive(Debug)]
+ struct TestYieldingExec {
+ /// True if this exec should yield back to runtime the first time it
is polled
+ pub yield_first: bool,
+ }
+
+ #[async_trait]
+ impl ExecutionPlan for TestYieldingExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+ fn schema(&self) -> SchemaRef {
+ some_data().0
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(1)
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Err(DataFusionError::Internal(format!(
+ "Children cannot be replaced in {:?}",
+ self
+ )))
+ }
+
+ async fn execute(
+ &self,
+ _partition: usize,
+ _context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ let stream = if self.yield_first {
+ TestYieldingStream::New
+ } else {
+ TestYieldingStream::Yielded
+ };
+
+ Ok(Box::pin(stream))
+ }
+
+ fn statistics(&self) -> Statistics {
+ let (_, batches) = some_data();
+ common::compute_record_batch_statistics(&[batches],
&self.schema(), None)
+ }
+ }
+
+ /// A stream using the demo data. If inited as new, it will first yield to
runtime before returning records
+ enum TestYieldingStream {
+ New,
+ Yielded,
+ ReturnedBatch1,
+ ReturnedBatch2,
+ }
+
+ impl Stream for TestYieldingStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ match &*self {
+ TestYieldingStream::New => {
+ *(self.as_mut()) = TestYieldingStream::Yielded;
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ TestYieldingStream::Yielded => {
+ *(self.as_mut()) = TestYieldingStream::ReturnedBatch1;
+ Poll::Ready(Some(Ok(some_data().1[0].clone())))
+ }
+ TestYieldingStream::ReturnedBatch1 => {
+ *(self.as_mut()) = TestYieldingStream::ReturnedBatch2;
+ Poll::Ready(Some(Ok(some_data().1[1].clone())))
+ }
+ TestYieldingStream::ReturnedBatch2 => Poll::Ready(None),
+ }
+ }
+ }
+
+ impl RecordBatchStream for TestYieldingStream {
+ fn schema(&self) -> SchemaRef {
+ some_data().0
+ }
+ }
+
+ //// Tests ////
+
+ #[tokio::test]
+ async fn aggregate_source_not_yielding() -> Result<()> {
+ let input: Arc<dyn ExecutionPlan> =
+ Arc::new(TestYieldingExec { yield_first: false });
+
+ check_aggregates(input).await
+ }
+
+ #[tokio::test]
+ async fn aggregate_source_with_yielding() -> Result<()> {
+ let input: Arc<dyn ExecutionPlan> =
+ Arc::new(TestYieldingExec { yield_first: true });
+
+ check_aggregates(input).await
+ }
+
+ #[tokio::test]
+ async fn test_drop_cancel_without_groups() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let schema =
+ Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
true)]));
+
+ let groups = vec![];
+
+ let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
+ col("a", &schema)?,
+ "AVG(a)".to_string(),
+ DataType::Float64,
+ ))];
+
+ let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema),
1));
+ let refs = blocking_exec.refs();
+ let aggregate_exec = Arc::new(AggregateExec::try_new(
+ AggregateMode::Partial,
+ groups.clone(),
+ aggregates.clone(),
+ blocking_exec,
+ schema,
+ )?);
+
+ let fut = crate::physical_plan::collect(aggregate_exec, task_ctx);
+ let mut fut = fut.boxed();
+
+ assert_is_pending(&mut fut);
+ drop(fut);
+ assert_strong_count_converges_to_zero(refs).await;
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_drop_cancel_with_groups() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Float32, true),
+ Field::new("b", DataType::Float32, true),
+ ]));
+
+ let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
+ vec![(col("a", &schema)?, "a".to_string())];
+
+ let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
+ col("b", &schema)?,
+ "AVG(b)".to_string(),
+ DataType::Float64,
+ ))];
+
+ let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema),
1));
+ let refs = blocking_exec.refs();
+ let aggregate_exec = Arc::new(AggregateExec::try_new(
+ AggregateMode::Partial,
+ groups.clone(),
+ aggregates.clone(),
+ blocking_exec,
+ schema,
+ )?);
+
+ let fut = crate::physical_plan::collect(aggregate_exec, task_ctx);
+ let mut fut = fut.boxed();
+
+ assert_is_pending(&mut fut);
+ drop(fut);
+ assert_strong_count_converges_to_zero(refs).await;
+
+ Ok(())
+ }
+}
diff --git a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
new file mode 100644
index 000000000..f687c982c
--- /dev/null
+++ b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Aggregate without grouping columns
+
+use crate::physical_plan::aggregates::{
+ aggregate_expressions, create_accumulators, finalize_aggregation,
AccumulatorItem,
+ AggregateMode,
+};
+use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+use arrow::datatypes::SchemaRef;
+use arrow::error::{ArrowError, Result as ArrowResult};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use futures::{
+ ready,
+ stream::{Stream, StreamExt},
+};
+
+/// stream struct for aggregation without grouping columns
+pub(crate) struct AggregateStream {
+ schema: SchemaRef,
+ mode: AggregateMode,
+ input: SendableRecordBatchStream,
+ baseline_metrics: BaselineMetrics,
+ aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+ accumulators: Vec<AccumulatorItem>,
+ finished: bool,
+}
+
+impl AggregateStream {
+ /// Create a new AggregateStream
+ pub fn new(
+ mode: AggregateMode,
+ schema: SchemaRef,
+ aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+ input: SendableRecordBatchStream,
+ baseline_metrics: BaselineMetrics,
+ ) -> datafusion_common::Result<Self> {
+ let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode,
0)?;
+ let accumulators = create_accumulators(&aggr_expr)?;
+
+ Ok(Self {
+ schema,
+ mode,
+ input,
+ baseline_metrics,
+ aggregate_expressions,
+ accumulators,
+ finished: false,
+ })
+ }
+}
+
+impl Stream for AggregateStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let this = &mut *self;
+ if this.finished {
+ return Poll::Ready(None);
+ }
+
+ let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+ loop {
+ let result = match ready!(this.input.poll_next_unpin(cx)) {
+ Some(Ok(batch)) => {
+ let timer = elapsed_compute.timer();
+ let result = aggregate_batch(
+ &this.mode,
+ &batch,
+ &mut this.accumulators,
+ &this.aggregate_expressions,
+ );
+
+ timer.done();
+
+ match result {
+ Ok(_) => continue,
+ Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
+ }
+ }
+ Some(Err(e)) => Err(e),
+ None => {
+ this.finished = true;
+ let timer =
this.baseline_metrics.elapsed_compute().timer();
+ let result = finalize_aggregation(&this.accumulators,
&this.mode)
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))
+ .and_then(|columns| {
+ RecordBatch::try_new(this.schema.clone(), columns)
+ })
+ .record_output(&this.baseline_metrics);
+
+ timer.done();
+ result
+ }
+ };
+
+ this.finished = true;
+ return Poll::Ready(Some(result));
+ }
+ }
+}
+
+impl RecordBatchStream for AggregateStream {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}
+
+/// TODO: Make this a member function
+fn aggregate_batch(
+ mode: &AggregateMode,
+ batch: &RecordBatch,
+ accumulators: &mut [AccumulatorItem],
+ expressions: &[Vec<Arc<dyn PhysicalExpr>>],
+) -> Result<()> {
+ // 1.1 iterate accumulators and respective expressions together
+ // 1.2 evaluate expressions
+ // 1.3 update / merge accumulators with the expressions' values
+
+ // 1.1
+ accumulators
+ .iter_mut()
+ .zip(expressions)
+ .try_for_each(|(accum, expr)| {
+ // 1.2
+ let values = &expr
+ .iter()
+ .map(|e| e.evaluate(batch))
+ .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+ .collect::<Result<Vec<_>>>()?;
+
+ // 1.3
+ match mode {
+ AggregateMode::Partial => accum.update_batch(values),
+ AggregateMode::Final | AggregateMode::FinalPartitioned => {
+ accum.merge_batch(values)
+ }
+ }
+ })
+}
diff --git a/datafusion/core/src/physical_plan/hash_aggregate.rs
b/datafusion/core/src/physical_plan/hash_aggregate.rs
deleted file mode 100644
index 643174557..000000000
--- a/datafusion/core/src/physical_plan/hash_aggregate.rs
+++ /dev/null
@@ -1,1299 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Defines the execution plan for the hash aggregate operation
-
-use std::any::Any;
-use std::sync::Arc;
-use std::task::{Context, Poll};
-use std::vec;
-
-use ahash::RandomState;
-use futures::{
- ready,
- stream::{Stream, StreamExt},
-};
-
-use crate::error::Result;
-use crate::physical_plan::hash_utils::create_hashes;
-use crate::physical_plan::{
- Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan,
- Partitioning, PhysicalExpr,
-};
-use crate::scalar::ScalarValue;
-
-use arrow::{array::ArrayRef, compute, compute::cast};
-use arrow::{
- array::{Array, UInt32Builder},
- error::{ArrowError, Result as ArrowResult},
-};
-use arrow::{
- datatypes::{Field, Schema, SchemaRef},
- record_batch::RecordBatch,
-};
-use hashbrown::raw::RawTable;
-
-use crate::execution::context::TaskContext;
-use async_trait::async_trait;
-
-use super::expressions::PhysicalSortExpr;
-use super::metrics::{
- BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
-};
-use super::Statistics;
-use super::{expressions::Column, RecordBatchStream, SendableRecordBatchStream};
-
-/// Hash aggregate modes
-#[derive(Debug, Copy, Clone, PartialEq, Eq)]
-pub enum AggregateMode {
- /// Partial aggregate that can be applied in parallel across input
partitions
- Partial,
- /// Final aggregate that produces a single partition of output
- Final,
- /// Final aggregate that works on pre-partitioned data.
- ///
- /// This requires the invariant that all rows with a particular
- /// grouping key are in the same partitions, such as is the case
- /// with Hash repartitioning on the group keys. If a group key is
- /// duplicated, duplicate groups would be produced
- FinalPartitioned,
-}
-
-/// Hash aggregate execution plan
-#[derive(Debug)]
-pub struct HashAggregateExec {
- /// Aggregation mode (full, partial)
- mode: AggregateMode,
- /// Grouping expressions
- group_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
- /// Aggregate expressions
- aggr_expr: Vec<Arc<dyn AggregateExpr>>,
- /// Input plan, could be a partial aggregate or the input to the aggregate
- input: Arc<dyn ExecutionPlan>,
- /// Schema after the aggregate is applied
- schema: SchemaRef,
- /// Input schema before any aggregation is applied. For partial aggregate
this will be the
- /// same as input.schema() but for the final aggregate it will be the same
as the input
- /// to the partial aggregate
- input_schema: SchemaRef,
- /// Execution Metrics
- metrics: ExecutionPlanMetricsSet,
-}
-
-fn create_schema(
- input_schema: &Schema,
- group_expr: &[(Arc<dyn PhysicalExpr>, String)],
- aggr_expr: &[Arc<dyn AggregateExpr>],
- mode: AggregateMode,
-) -> Result<Schema> {
- let mut fields = Vec::with_capacity(group_expr.len() + aggr_expr.len());
- for (expr, name) in group_expr {
- fields.push(Field::new(
- name,
- expr.data_type(input_schema)?,
- expr.nullable(input_schema)?,
- ))
- }
-
- match mode {
- AggregateMode::Partial => {
- // in partial mode, the fields of the accumulator's state
- for expr in aggr_expr {
- fields.extend(expr.state_fields()?.iter().cloned())
- }
- }
- AggregateMode::Final | AggregateMode::FinalPartitioned => {
- // in final mode, the field with the final result of the
accumulator
- for expr in aggr_expr {
- fields.push(expr.field()?)
- }
- }
- }
-
- Ok(Schema::new(fields))
-}
-
-impl HashAggregateExec {
- /// Create a new hash aggregate execution plan
- pub fn try_new(
- mode: AggregateMode,
- group_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
- aggr_expr: Vec<Arc<dyn AggregateExpr>>,
- input: Arc<dyn ExecutionPlan>,
- input_schema: SchemaRef,
- ) -> Result<Self> {
- let schema = create_schema(&input.schema(), &group_expr, &aggr_expr,
mode)?;
-
- let schema = Arc::new(schema);
-
- Ok(HashAggregateExec {
- mode,
- group_expr,
- aggr_expr,
- input,
- schema,
- input_schema,
- metrics: ExecutionPlanMetricsSet::new(),
- })
- }
-
- /// Aggregation mode (full, partial)
- pub fn mode(&self) -> &AggregateMode {
- &self.mode
- }
-
- /// Grouping expressions
- pub fn group_expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] {
- &self.group_expr
- }
-
- /// Grouping expressions as they occur in the output schema
- pub fn output_group_expr(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- // Update column indices. Since the group by columns come first in the
output schema, their
- // indices are simply 0..self.group_expr(len).
- self.group_expr
- .iter()
- .enumerate()
- .map(|(index, (_col, name))| {
- Arc::new(Column::new(name, index)) as Arc<dyn PhysicalExpr>
- })
- .collect()
- }
-
- /// Aggregate expressions
- pub fn aggr_expr(&self) -> &[Arc<dyn AggregateExpr>] {
- &self.aggr_expr
- }
-
- /// Input plan
- pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
- &self.input
- }
-
- /// Get the input schema before any aggregates are applied
- pub fn input_schema(&self) -> SchemaRef {
- self.input_schema.clone()
- }
-}
-
-#[async_trait]
-impl ExecutionPlan for HashAggregateExec {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
- }
-
- fn required_child_distribution(&self) -> Distribution {
- match &self.mode {
- AggregateMode::Partial => Distribution::UnspecifiedDistribution,
- AggregateMode::FinalPartitioned => Distribution::HashPartitioned(
- self.group_expr.iter().map(|x| x.0.clone()).collect(),
- ),
- AggregateMode::Final => Distribution::SinglePartition,
- }
- }
-
- /// Get the output partitioning of this plan
- fn output_partitioning(&self) -> Partitioning {
- self.input.output_partitioning()
- }
-
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn relies_on_input_order(&self) -> bool {
- false
- }
-
- async fn execute(
- &self,
- partition: usize,
- context: Arc<TaskContext>,
- ) -> Result<SendableRecordBatchStream> {
- let input = self.input.execute(partition, context).await?;
- let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect();
-
- let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
-
- if self.group_expr.is_empty() {
- Ok(Box::pin(HashAggregateStream::new(
- self.mode,
- self.schema.clone(),
- self.aggr_expr.clone(),
- input,
- baseline_metrics,
- )?))
- } else {
- Ok(Box::pin(GroupedHashAggregateStream::new(
- self.mode,
- self.schema.clone(),
- group_expr,
- self.aggr_expr.clone(),
- input,
- baseline_metrics,
- )?))
- }
- }
-
- fn with_new_children(
- self: Arc<Self>,
- children: Vec<Arc<dyn ExecutionPlan>>,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(Arc::new(HashAggregateExec::try_new(
- self.mode,
- self.group_expr.clone(),
- self.aggr_expr.clone(),
- children[0].clone(),
- self.input_schema.clone(),
- )?))
- }
-
- fn metrics(&self) -> Option<MetricsSet> {
- Some(self.metrics.clone_inner())
- }
-
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default => {
- write!(f, "HashAggregateExec: mode={:?}", self.mode)?;
- let g: Vec<String> = self
- .group_expr
- .iter()
- .map(|(e, alias)| {
- let e = e.to_string();
- if &e != alias {
- format!("{} as {}", e, alias)
- } else {
- e
- }
- })
- .collect();
- write!(f, ", gby=[{}]", g.join(", "))?;
-
- let a: Vec<String> = self
- .aggr_expr
- .iter()
- .map(|agg| agg.name().to_string())
- .collect();
- write!(f, ", aggr=[{}]", a.join(", "))?;
- }
- }
- Ok(())
- }
-
- fn statistics(&self) -> Statistics {
- // TODO stats: group expressions:
- // - once expressions will be able to compute their own stats, use it
here
- // - case where we group by on a column for which with have the
`distinct` stat
- // TODO stats: aggr expression:
- // - aggregations somtimes also preserve invariants such as min, max...
- match self.mode {
- AggregateMode::Final | AggregateMode::FinalPartitioned
- if self.group_expr.is_empty() =>
- {
- Statistics {
- num_rows: Some(1),
- is_exact: true,
- ..Default::default()
- }
- }
- _ => Statistics::default(),
- }
- }
-}
-
-/*
-The architecture is the following:
-
-1. An accumulator has state that is updated on each batch.
-2. At the end of the aggregation (e.g. end of batches in a partition), the
accumulator converts its state to a RecordBatch of a single row
-3. The RecordBatches of all accumulators are merged (`concatenate` in
`rust/arrow`) together to a single RecordBatch.
-4. The state's RecordBatch is `merge`d to a new state
-5. The state is mapped to the final value
-
-Why:
-
-* Accumulators' state can be statically typed, but it is more efficient to
transmit data from the accumulators via `Array`
-* The `merge` operation must have access to the state of the aggregators
because it uses it to correctly merge
-* It uses Arrow's native dynamically typed object, `Array`.
-* Arrow shines in batch operations and both `merge` and `concatenate` of
uniform types are very performant.
-
-Example: average
-
-* the state is `n: u32` and `sum: f64`
-* For every batch, we update them accordingly.
-* At the end of the accumulation (of a partition), we convert `n` and `sum` to
a RecordBatch of 1 row and two columns: `[n, sum]`
-* The RecordBatch is (sent back / transmitted over network)
-* Once all N record batches arrive, `merge` is performed, which builds a
RecordBatch with N rows and 2 columns.
-* Finally, `get_value` returns an array with one entry computed from the state
-*/
-struct GroupedHashAggregateStream {
- schema: SchemaRef,
- input: SendableRecordBatchStream,
- mode: AggregateMode,
- accumulators: Accumulators,
- aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
-
- aggr_expr: Vec<Arc<dyn AggregateExpr>>,
- group_expr: Vec<Arc<dyn PhysicalExpr>>,
-
- baseline_metrics: BaselineMetrics,
- random_state: RandomState,
- finished: bool,
-}
-
-impl GroupedHashAggregateStream {
- /// Create a new HashAggregateStream
- pub fn new(
- mode: AggregateMode,
- schema: SchemaRef,
- group_expr: Vec<Arc<dyn PhysicalExpr>>,
- aggr_expr: Vec<Arc<dyn AggregateExpr>>,
- input: SendableRecordBatchStream,
- baseline_metrics: BaselineMetrics,
- ) -> Result<Self> {
- let timer = baseline_metrics.elapsed_compute().timer();
-
- // The expressions to evaluate the batch, one vec of expressions per
aggregation.
- // Assume create_schema() always put group columns in front of aggr
columns, we set
- // col_idx_base to group expression count.
- let aggregate_expressions =
- aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;
-
- timer.done();
-
- Ok(Self {
- schema,
- mode,
- input,
- aggr_expr,
- group_expr,
- baseline_metrics,
- aggregate_expressions,
- accumulators: Default::default(),
- random_state: Default::default(),
- finished: false,
- })
- }
-}
-
-impl Stream for GroupedHashAggregateStream {
- type Item = ArrowResult<RecordBatch>;
-
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- let this = &mut *self;
- if this.finished {
- return Poll::Ready(None);
- }
-
- let elapsed_compute = this.baseline_metrics.elapsed_compute();
-
- loop {
- let result = match ready!(this.input.poll_next_unpin(cx)) {
- Some(Ok(batch)) => {
- let timer = elapsed_compute.timer();
- let result = group_aggregate_batch(
- &this.mode,
- &this.random_state,
- &this.group_expr,
- &this.aggr_expr,
- batch,
- &mut this.accumulators,
- &this.aggregate_expressions,
- );
-
- timer.done();
-
- match result {
- Ok(_) => continue,
- Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
- }
- }
- Some(Err(e)) => Err(e),
- None => {
- this.finished = true;
- let timer =
this.baseline_metrics.elapsed_compute().timer();
- let result = create_batch_from_map(
- &this.mode,
- &this.accumulators,
- this.group_expr.len(),
- &this.schema,
- )
- .record_output(&this.baseline_metrics);
-
- timer.done();
- result
- }
- };
-
- this.finished = true;
- return Poll::Ready(Some(result));
- }
- }
-}
-
-impl RecordBatchStream for GroupedHashAggregateStream {
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-}
-
-/// TODO: Make this a member function of [`GroupedHashAggregateStream`]
-fn group_aggregate_batch(
- mode: &AggregateMode,
- random_state: &RandomState,
- group_expr: &[Arc<dyn PhysicalExpr>],
- aggr_expr: &[Arc<dyn AggregateExpr>],
- batch: RecordBatch,
- accumulators: &mut Accumulators,
- aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>],
-) -> Result<()> {
- // evaluate the grouping expressions
- let group_values = evaluate(group_expr, &batch)?;
-
- // evaluate the aggregation expressions.
- // We could evaluate them after the `take`, but since we need to evaluate
all
- // of them anyways, it is more performant to do it while they are together.
- let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?;
-
- // 1.1 construct the key from the group values
- // 1.2 construct the mapping key if it does not exist
- // 1.3 add the row' index to `indices`
-
- // track which entries in `accumulators` have rows in this batch to
aggregate
- let mut groups_with_rows = vec![];
-
- // 1.1 Calculate the group keys for the group values
- let mut batch_hashes = vec![0; batch.num_rows()];
- create_hashes(&group_values, random_state, &mut batch_hashes)?;
-
- for (row, hash) in batch_hashes.into_iter().enumerate() {
- let Accumulators { map, group_states } = accumulators;
-
- let entry = map.get_mut(hash, |(_hash, group_idx)| {
- // verify that a group that we are inserting with hash is
- // actually the same key value as the group in
- // existing_idx (aka group_values @ row)
- let group_state = &group_states[*group_idx];
- group_values
- .iter()
- .zip(group_state.group_by_values.iter())
- .all(|(array, scalar)| scalar.eq_array(array, row))
- });
-
- match entry {
- // Existing entry for this group value
- Some((_hash, group_idx)) => {
- let group_state = &mut group_states[*group_idx];
- // 1.3
- if group_state.indices.is_empty() {
- groups_with_rows.push(*group_idx);
- };
- group_state.indices.push(row as u32); // remember this row
- }
- // 1.2 Need to create new entry
- None => {
- let accumulator_set = create_accumulators(aggr_expr)?;
-
- // Copy group values out of arrays into `ScalarValue`s
- let group_by_values = group_values
- .iter()
- .map(|col| ScalarValue::try_from_array(col, row))
- .collect::<Result<Vec<_>>>()?;
-
- // Add new entry to group_states and save newly created index
- let group_state = GroupState {
- group_by_values: group_by_values.into_boxed_slice(),
- accumulator_set,
- indices: vec![row as u32], // 1.3
- };
- let group_idx = group_states.len();
- group_states.push(group_state);
- groups_with_rows.push(group_idx);
-
- // for hasher function, use precomputed hash value
- map.insert(hash, (hash, group_idx), |(hash, _group_idx)|
*hash);
- }
- };
- }
-
- // Collect all indices + offsets based on keys in this vec
- let mut batch_indices: UInt32Builder = UInt32Builder::new(0);
- let mut offsets = vec![0];
- let mut offset_so_far = 0;
- for group_idx in groups_with_rows.iter() {
- let indices = &accumulators.group_states[*group_idx].indices;
- batch_indices.append_slice(indices)?;
- offset_so_far += indices.len();
- offsets.push(offset_so_far);
- }
- let batch_indices = batch_indices.finish();
-
- // `Take` all values based on indices into Arrays
- let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values
- .iter()
- .map(|array| {
- array
- .iter()
- .map(|array| {
- compute::take(
- array.as_ref(),
- &batch_indices,
- None, // None: no index check
- )
- .unwrap()
- })
- .collect()
- // 2.3
- })
- .collect();
-
- // 2.1 for each key in this batch
- // 2.2 for each aggregation
- // 2.3 `slice` from each of its arrays the keys' values
- // 2.4 update / merge the accumulator with the values
- // 2.5 clear indices
- groups_with_rows
- .iter()
- .zip(offsets.windows(2))
- .try_for_each(|(group_idx, offsets)| {
- let group_state = &mut accumulators.group_states[*group_idx];
- // 2.2
- group_state
- .accumulator_set
- .iter_mut()
- .zip(values.iter())
- .map(|(accumulator, aggr_array)| {
- (
- accumulator,
- aggr_array
- .iter()
- .map(|array| {
- // 2.3
- array.slice(offsets[0], offsets[1] -
offsets[0])
- })
- .collect::<Vec<ArrayRef>>(),
- )
- })
- .try_for_each(|(accumulator, values)| match mode {
- AggregateMode::Partial =>
accumulator.update_batch(&values),
- AggregateMode::FinalPartitioned | AggregateMode::Final => {
- // note: the aggregation here is over states, not
values, thus the merge
- accumulator.merge_batch(&values)
- }
- })
- // 2.5
- .and({
- group_state.indices.clear();
- Ok(())
- })
- })?;
-
- Ok(())
-}
-
-type AccumulatorItem = Box<dyn Accumulator>;
-
-/// The state that is built for each output group.
-#[derive(Debug)]
-struct GroupState {
- /// The actual group by values, one for each group column
- group_by_values: Box<[ScalarValue]>,
-
- // Accumulator state, one for each aggregate
- accumulator_set: Vec<AccumulatorItem>,
-
- /// scratch space used to collect indices for input rows in a
- /// bach that have values to aggregate. Reset on each batch
- indices: Vec<u32>,
-}
-
-/// The state of all the groups
-#[derive(Default)]
-struct Accumulators {
- /// Logically maps group values to an index in `group_states`
- ///
- /// Uses the raw API of hashbrown to avoid actually storing the
- /// keys in the table
- ///
- /// keys: u64 hashes of the GroupValue
- /// values: (hash, index into `group_states`)
- map: RawTable<(u64, usize)>,
-
- /// State for each group
- group_states: Vec<GroupState>,
-}
-
-impl std::fmt::Debug for Accumulators {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- // hashes are not store inline, so could only get values
- let map_string = "RawTable";
- f.debug_struct("Accumulators")
- .field("map", &map_string)
- .field("group_states", &self.group_states)
- .finish()
- }
-}
-
-/// Evaluates expressions against a record batch.
-fn evaluate(
- expr: &[Arc<dyn PhysicalExpr>],
- batch: &RecordBatch,
-) -> Result<Vec<ArrayRef>> {
- expr.iter()
- .map(|expr| expr.evaluate(batch))
- .map(|r| r.map(|v| v.into_array(batch.num_rows())))
- .collect::<Result<Vec<_>>>()
-}
-
-/// Evaluates expressions against a record batch.
-fn evaluate_many(
- expr: &[Vec<Arc<dyn PhysicalExpr>>],
- batch: &RecordBatch,
-) -> Result<Vec<Vec<ArrayRef>>> {
- expr.iter()
- .map(|expr| evaluate(expr, batch))
- .collect::<Result<Vec<_>>>()
-}
-
-/// uses `state_fields` to build a vec of physical column expressions required
to merge the
-/// AggregateExpr' accumulator's state.
-///
-/// `index_base` is the starting physical column index for the next expanded
state field.
-fn merge_expressions(
- index_base: usize,
- expr: &Arc<dyn AggregateExpr>,
-) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
- Ok(expr
- .state_fields()?
- .iter()
- .enumerate()
- .map(|(idx, f)| {
- Arc::new(Column::new(f.name(), index_base + idx)) as Arc<dyn
PhysicalExpr>
- })
- .collect::<Vec<_>>())
-}
-
-/// returns physical expressions to evaluate against a batch
-/// The expressions are different depending on `mode`:
-/// * Partial: AggregateExpr::expressions
-/// * Final: columns of `AggregateExpr::state_fields()`
-fn aggregate_expressions(
- aggr_expr: &[Arc<dyn AggregateExpr>],
- mode: &AggregateMode,
- col_idx_base: usize,
-) -> Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
- match mode {
- AggregateMode::Partial => {
- Ok(aggr_expr.iter().map(|agg| agg.expressions()).collect())
- }
- // in this mode, we build the merge expressions of the aggregation
- AggregateMode::Final | AggregateMode::FinalPartitioned => {
- let mut col_idx_base = col_idx_base;
- Ok(aggr_expr
- .iter()
- .map(|agg| {
- let exprs = merge_expressions(col_idx_base, agg)?;
- col_idx_base += exprs.len();
- Ok(exprs)
- })
- .collect::<Result<Vec<_>>>()?)
- }
- }
-}
-
-/// stream struct for hash aggregation
-pub struct HashAggregateStream {
- schema: SchemaRef,
- mode: AggregateMode,
- input: SendableRecordBatchStream,
- baseline_metrics: BaselineMetrics,
- aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
- accumulators: Vec<AccumulatorItem>,
- finished: bool,
-}
-
-impl HashAggregateStream {
- /// Create a new HashAggregateStream
- pub fn new(
- mode: AggregateMode,
- schema: SchemaRef,
- aggr_expr: Vec<Arc<dyn AggregateExpr>>,
- input: SendableRecordBatchStream,
- baseline_metrics: BaselineMetrics,
- ) -> Result<Self> {
- let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode,
0)?;
- let accumulators = create_accumulators(&aggr_expr)?;
-
- Ok(Self {
- schema,
- mode,
- input,
- baseline_metrics,
- aggregate_expressions,
- accumulators,
- finished: false,
- })
- }
-}
-
-/// TODO: Make this a member function
-fn aggregate_batch(
- mode: &AggregateMode,
- batch: &RecordBatch,
- accumulators: &mut [AccumulatorItem],
- expressions: &[Vec<Arc<dyn PhysicalExpr>>],
-) -> Result<()> {
- // 1.1 iterate accumulators and respective expressions together
- // 1.2 evaluate expressions
- // 1.3 update / merge accumulators with the expressions' values
-
- // 1.1
- accumulators
- .iter_mut()
- .zip(expressions)
- .try_for_each(|(accum, expr)| {
- // 1.2
- let values = &expr
- .iter()
- .map(|e| e.evaluate(batch))
- .map(|r| r.map(|v| v.into_array(batch.num_rows())))
- .collect::<Result<Vec<_>>>()?;
-
- // 1.3
- match mode {
- AggregateMode::Partial => accum.update_batch(values),
- AggregateMode::Final | AggregateMode::FinalPartitioned => {
- accum.merge_batch(values)
- }
- }
- })
-}
-
-impl Stream for HashAggregateStream {
- type Item = ArrowResult<RecordBatch>;
-
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- let this = &mut *self;
- if this.finished {
- return Poll::Ready(None);
- }
-
- let elapsed_compute = this.baseline_metrics.elapsed_compute();
-
- loop {
- let result = match ready!(this.input.poll_next_unpin(cx)) {
- Some(Ok(batch)) => {
- let timer = elapsed_compute.timer();
- let result = aggregate_batch(
- &this.mode,
- &batch,
- &mut this.accumulators,
- &this.aggregate_expressions,
- );
-
- timer.done();
-
- match result {
- Ok(_) => continue,
- Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
- }
- }
- Some(Err(e)) => Err(e),
- None => {
- this.finished = true;
- let timer =
this.baseline_metrics.elapsed_compute().timer();
- let result = finalize_aggregation(&this.accumulators,
&this.mode)
- .map_err(|e| ArrowError::ExternalError(Box::new(e)))
- .and_then(|columns| {
- RecordBatch::try_new(this.schema.clone(), columns)
- })
- .record_output(&this.baseline_metrics);
-
- timer.done();
- result
- }
- };
-
- this.finished = true;
- return Poll::Ready(Some(result));
- }
- }
-}
-
-impl RecordBatchStream for HashAggregateStream {
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-}
-
-/// Create a RecordBatch with all group keys and accumulator' states or values.
-fn create_batch_from_map(
- mode: &AggregateMode,
- accumulators: &Accumulators,
- num_group_expr: usize,
- output_schema: &Schema,
-) -> ArrowResult<RecordBatch> {
- if accumulators.group_states.is_empty() {
- return Ok(RecordBatch::new_empty(Arc::new(output_schema.to_owned())));
- }
- let accs = &accumulators.group_states[0].accumulator_set;
- let mut acc_data_types: Vec<usize> = vec![];
-
- // Calculate number/shape of state arrays
- match mode {
- AggregateMode::Partial => {
- for acc in accs.iter() {
- let state = acc.state()?;
- acc_data_types.push(state.len());
- }
- }
- AggregateMode::Final | AggregateMode::FinalPartitioned => {
- acc_data_types = vec![1; accs.len()];
- }
- }
-
- let mut columns = (0..num_group_expr)
- .map(|i| {
- ScalarValue::iter_to_array(
- accumulators
- .group_states
- .iter()
- .map(|group_state| group_state.group_by_values[i].clone()),
- )
- })
- .collect::<Result<Vec<_>>>()?;
-
- // add state / evaluated arrays
- for (x, &state_len) in acc_data_types.iter().enumerate() {
- for y in 0..state_len {
- match mode {
- AggregateMode::Partial => {
- let res = ScalarValue::iter_to_array(
- accumulators.group_states.iter().map(|group_state| {
- let x =
group_state.accumulator_set[x].state().unwrap();
- x[y].clone()
- }),
- )?;
-
- columns.push(res);
- }
- AggregateMode::Final | AggregateMode::FinalPartitioned => {
- let res = ScalarValue::iter_to_array(
- accumulators.group_states.iter().map(|group_state| {
- group_state.accumulator_set[x].evaluate().unwrap()
- }),
- )?;
- columns.push(res);
- }
- }
- }
- }
-
- // cast output if needed (e.g. for types like Dictionary where
- // the intermediate GroupByScalar type was not the same as the
- // output
- let columns = columns
- .iter()
- .zip(output_schema.fields().iter())
- .map(|(col, desired_field)| cast(col, desired_field.data_type()))
- .collect::<ArrowResult<Vec<_>>>()?;
-
- RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns)
-}
-
-fn create_accumulators(
- aggr_expr: &[Arc<dyn AggregateExpr>],
-) -> Result<Vec<AccumulatorItem>> {
- aggr_expr
- .iter()
- .map(|expr| expr.create_accumulator())
- .collect::<Result<Vec<_>>>()
-}
-
-/// returns a vector of ArrayRefs, where each entry corresponds to either the
-/// final value (mode = Final) or states (mode = Partial)
-fn finalize_aggregation(
- accumulators: &[AccumulatorItem],
- mode: &AggregateMode,
-) -> Result<Vec<ArrayRef>> {
- match mode {
- AggregateMode::Partial => {
- // build the vector of states
- let a = accumulators
- .iter()
- .map(|accumulator| accumulator.state())
- .map(|value| {
- value.map(|e| {
- e.iter().map(|v|
v.to_array()).collect::<Vec<ArrayRef>>()
- })
- })
- .collect::<Result<Vec<_>>>()?;
- Ok(a.iter().flatten().cloned().collect::<Vec<_>>())
- }
- AggregateMode::Final | AggregateMode::FinalPartitioned => {
- // merge the state to the final value
- accumulators
- .iter()
- .map(|accumulator| accumulator.evaluate().map(|v|
v.to_array()))
- .collect::<Result<Vec<ArrayRef>>>()
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
-
- use super::*;
- use crate::from_slice::FromSlice;
- use crate::physical_plan::expressions::{col, Avg};
- use crate::test::assert_is_pending;
- use crate::test::exec::{assert_strong_count_converges_to_zero,
BlockingExec};
- use crate::{assert_batches_sorted_eq, physical_plan::common};
- use arrow::array::{Float64Array, UInt32Array};
- use arrow::datatypes::DataType;
- use datafusion_common::DataFusionError;
- use futures::FutureExt;
-
- use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
- use crate::prelude::SessionContext;
-
- /// some mock data to aggregates
- fn some_data() -> (Arc<Schema>, Vec<RecordBatch>) {
- // define a schema.
- let schema = Arc::new(Schema::new(vec![
- Field::new("a", DataType::UInt32, false),
- Field::new("b", DataType::Float64, false),
- ]));
-
- // define data.
- (
- schema.clone(),
- vec![
- RecordBatch::try_new(
- schema.clone(),
- vec![
- Arc::new(UInt32Array::from_slice(&[2, 3, 4, 4])),
- Arc::new(Float64Array::from_slice(&[1.0, 2.0, 3.0,
4.0])),
- ],
- )
- .unwrap(),
- RecordBatch::try_new(
- schema,
- vec![
- Arc::new(UInt32Array::from_slice(&[2, 3, 3, 4])),
- Arc::new(Float64Array::from_slice(&[1.0, 2.0, 3.0,
4.0])),
- ],
- )
- .unwrap(),
- ],
- )
- }
-
- /// build the aggregates on the data from some_data() and check the results
- async fn check_aggregates(input: Arc<dyn ExecutionPlan>) -> Result<()> {
- let input_schema = input.schema();
-
- let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
- vec![(col("a", &input_schema)?, "a".to_string())];
-
- let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
- col("b", &input_schema)?,
- "AVG(b)".to_string(),
- DataType::Float64,
- ))];
-
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
-
- let partial_aggregate = Arc::new(HashAggregateExec::try_new(
- AggregateMode::Partial,
- groups.clone(),
- aggregates.clone(),
- input,
- input_schema.clone(),
- )?);
-
- let result =
- common::collect(partial_aggregate.execute(0,
task_ctx.clone()).await?)
- .await?;
-
- let expected = vec![
- "+---+---------------+-------------+",
- "| a | AVG(b)[count] | AVG(b)[sum] |",
- "+---+---------------+-------------+",
- "| 2 | 2 | 2 |",
- "| 3 | 3 | 7 |",
- "| 4 | 3 | 11 |",
- "+---+---------------+-------------+",
- ];
- assert_batches_sorted_eq!(expected, &result);
-
- let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate));
-
- let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
- .map(|i| col(&groups[i].1, &input_schema))
- .collect::<Result<_>>()?;
-
- let merged_aggregate = Arc::new(HashAggregateExec::try_new(
- AggregateMode::Final,
- final_group
- .iter()
- .enumerate()
- .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
- .collect(),
- aggregates,
- merge,
- input_schema,
- )?);
-
- let result =
- common::collect(merged_aggregate.execute(0,
task_ctx.clone()).await?).await?;
- assert_eq!(result.len(), 1);
-
- let batch = &result[0];
- assert_eq!(batch.num_columns(), 2);
- assert_eq!(batch.num_rows(), 3);
-
- let expected = vec![
- "+---+--------------------+",
- "| a | AVG(b) |",
- "+---+--------------------+",
- "| 2 | 1 |",
- "| 3 | 2.3333333333333335 |", // 3, (2 + 3 + 2) / 3
- "| 4 | 3.6666666666666665 |", // 4, (3 + 4 + 4) / 3
- "+---+--------------------+",
- ];
-
- assert_batches_sorted_eq!(&expected, &result);
-
- let metrics = merged_aggregate.metrics().unwrap();
- let output_rows = metrics.output_rows().unwrap();
- assert_eq!(3, output_rows);
-
- Ok(())
- }
-
- /// Define a test source that can yield back to runtime before returning
its first item ///
-
- #[derive(Debug)]
- struct TestYieldingExec {
- /// True if this exec should yield back to runtime the first time it
is polled
- pub yield_first: bool,
- }
-
- #[async_trait]
- impl ExecutionPlan for TestYieldingExec {
- fn as_any(&self) -> &dyn Any {
- self
- }
- fn schema(&self) -> SchemaRef {
- some_data().0
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![]
- }
-
- fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(1)
- }
-
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
- }
-
- fn with_new_children(
- self: Arc<Self>,
- _: Vec<Arc<dyn ExecutionPlan>>,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- Err(DataFusionError::Internal(format!(
- "Children cannot be replaced in {:?}",
- self
- )))
- }
-
- async fn execute(
- &self,
- _partition: usize,
- _context: Arc<TaskContext>,
- ) -> Result<SendableRecordBatchStream> {
- let stream = if self.yield_first {
- TestYieldingStream::New
- } else {
- TestYieldingStream::Yielded
- };
-
- Ok(Box::pin(stream))
- }
-
- fn statistics(&self) -> Statistics {
- let (_, batches) = some_data();
- common::compute_record_batch_statistics(&[batches],
&self.schema(), None)
- }
- }
-
- /// A stream using the demo data. If inited as new, it will first yield to
runtime before returning records
- enum TestYieldingStream {
- New,
- Yielded,
- ReturnedBatch1,
- ReturnedBatch2,
- }
-
- impl Stream for TestYieldingStream {
- type Item = ArrowResult<RecordBatch>;
-
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- match &*self {
- TestYieldingStream::New => {
- *(self.as_mut()) = TestYieldingStream::Yielded;
- cx.waker().wake_by_ref();
- Poll::Pending
- }
- TestYieldingStream::Yielded => {
- *(self.as_mut()) = TestYieldingStream::ReturnedBatch1;
- Poll::Ready(Some(Ok(some_data().1[0].clone())))
- }
- TestYieldingStream::ReturnedBatch1 => {
- *(self.as_mut()) = TestYieldingStream::ReturnedBatch2;
- Poll::Ready(Some(Ok(some_data().1[1].clone())))
- }
- TestYieldingStream::ReturnedBatch2 => Poll::Ready(None),
- }
- }
- }
-
- impl RecordBatchStream for TestYieldingStream {
- fn schema(&self) -> SchemaRef {
- some_data().0
- }
- }
-
- //// Tests ////
-
- #[tokio::test]
- async fn aggregate_source_not_yielding() -> Result<()> {
- let input: Arc<dyn ExecutionPlan> =
- Arc::new(TestYieldingExec { yield_first: false });
-
- check_aggregates(input).await
- }
-
- #[tokio::test]
- async fn aggregate_source_with_yielding() -> Result<()> {
- let input: Arc<dyn ExecutionPlan> =
- Arc::new(TestYieldingExec { yield_first: true });
-
- check_aggregates(input).await
- }
-
- #[tokio::test]
- async fn test_drop_cancel_without_groups() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
- let schema =
- Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
true)]));
-
- let groups = vec![];
-
- let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
- col("a", &schema)?,
- "AVG(a)".to_string(),
- DataType::Float64,
- ))];
-
- let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema),
1));
- let refs = blocking_exec.refs();
- let hash_aggregate_exec = Arc::new(HashAggregateExec::try_new(
- AggregateMode::Partial,
- groups.clone(),
- aggregates.clone(),
- blocking_exec,
- schema,
- )?);
-
- let fut = crate::physical_plan::collect(hash_aggregate_exec, task_ctx);
- let mut fut = fut.boxed();
-
- assert_is_pending(&mut fut);
- drop(fut);
- assert_strong_count_converges_to_zero(refs).await;
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_drop_cancel_with_groups() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
- let schema = Arc::new(Schema::new(vec![
- Field::new("a", DataType::Float32, true),
- Field::new("b", DataType::Float32, true),
- ]));
-
- let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
- vec![(col("a", &schema)?, "a".to_string())];
-
- let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
- col("b", &schema)?,
- "AVG(b)".to_string(),
- DataType::Float64,
- ))];
-
- let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema),
1));
- let refs = blocking_exec.refs();
- let hash_aggregate_exec = Arc::new(HashAggregateExec::try_new(
- AggregateMode::Partial,
- groups.clone(),
- aggregates.clone(),
- blocking_exec,
- schema,
- )?);
-
- let fut = crate::physical_plan::collect(hash_aggregate_exec, task_ctx);
- let mut fut = fut.boxed();
-
- assert_is_pending(&mut fut);
- drop(fut);
- assert_strong_count_converges_to_zero(refs).await;
-
- Ok(())
- }
-}
diff --git a/datafusion/core/src/physical_plan/mod.rs
b/datafusion/core/src/physical_plan/mod.rs
index b7b25a636..dc963c7e1 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -555,7 +555,6 @@ pub use datafusion_physical_expr::expressions;
pub mod file_format;
pub mod filter;
pub mod functions;
-pub mod hash_aggregate;
pub mod hash_join;
pub mod hash_utils;
pub mod join_utils;
diff --git a/datafusion/core/src/physical_plan/planner.rs
b/datafusion/core/src/physical_plan/planner.rs
index c02e1732a..e3b9bc434 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -34,6 +34,7 @@ use crate::logical_plan::{
};
use crate::logical_plan::{Limit, Values};
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
use crate::physical_plan::cross_join::CrossJoinExec;
use crate::physical_plan::explain::ExplainExec;
use crate::physical_plan::expressions;
@@ -41,7 +42,6 @@ use crate::physical_plan::expressions::{
CaseExpr, Column, GetIndexedFieldExpr, Literal, PhysicalSortExpr,
};
use crate::physical_plan::filter::FilterExec;
-use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use crate::physical_plan::hash_join::HashJoinExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::projection::ProjectionExec;
@@ -531,7 +531,7 @@ impl DefaultPhysicalPlanner {
})
.collect::<Result<Vec<_>>>()?;
- let initial_aggr = Arc::new(HashAggregateExec::try_new(
+ let initial_aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates.clone(),
@@ -573,7 +573,7 @@ impl DefaultPhysicalPlanner {
(initial_aggr, AggregateMode::Final)
};
- Ok(Arc::new(HashAggregateExec::try_new(
+ Ok(Arc::new(AggregateExec::try_new(
next_partition_mode,
final_group
.iter()
@@ -1866,7 +1866,7 @@ mod tests {
let execution_plan = plan(&logical_plan).await?;
let final_hash_agg = execution_plan
.as_any()
- .downcast_ref::<HashAggregateExec>()
+ .downcast_ref::<AggregateExec>()
.expect("hash aggregate");
assert_eq!(
"SUM(aggregate_test_100.c2)",
@@ -1900,7 +1900,7 @@ mod tests {
let formatted = format!("{:?}", execution_plan);
// Make sure the plan contains a FinalPartitioned, which means it will
not use the Final
- // mode in HashAggregate (which is slower)
+ // mode in Aggregate (which is slower)
assert!(formatted.contains("FinalPartitioned"));
Ok(())
diff --git a/datafusion/core/tests/sql/avro.rs
b/datafusion/core/tests/sql/avro.rs
index b5ea5477e..f5c25dbad 100644
--- a/datafusion/core/tests/sql/avro.rs
+++ b/datafusion/core/tests/sql/avro.rs
@@ -150,9 +150,9 @@ async fn avro_explain() {
vec![
"physical_plan",
"ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\
- \n HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
+ \n AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
\n CoalescePartitionsExec\
- \n HashAggregateExec: mode=Partial, gby=[],
aggr=[COUNT(UInt8(1))]\
+ \n AggregateExec: mode=Partial, gby=[],
aggr=[COUNT(UInt8(1))]\
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
\n AvroExec:
files=[ARROW_TEST_DATA/avro/alltypes_plain.avro], limit=None\
\n",
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index a124311aa..f72e0f8f8 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -51,12 +51,12 @@ async fn explain_analyze_baseline_metrics() {
assert_metrics!(
&formatted,
- "HashAggregateExec: mode=Partial, gby=[]",
+ "AggregateExec: mode=Partial, gby=[]",
"metrics=[output_rows=3, elapsed_compute="
);
assert_metrics!(
&formatted,
- "HashAggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
+ "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
"metrics=[output_rows=5, elapsed_compute="
);
assert_metrics!(
@@ -110,7 +110,7 @@ async fn explain_analyze_baseline_metrics() {
use datafusion::physical_plan::sorts;
plan.as_any().downcast_ref::<sorts::sort::SortExec>().is_some()
- ||
plan.as_any().downcast_ref::<physical_plan::hash_aggregate::HashAggregateExec>().is_some()
+ ||
plan.as_any().downcast_ref::<physical_plan::aggregates::AggregateExec>().is_some()
// CoalescePartitionsExec doesn't do any work so is not included
||
plan.as_any().downcast_ref::<physical_plan::filter::FilterExec>().is_some()
||
plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
@@ -660,10 +660,10 @@ async fn test_physical_plan_display_indent() {
" SortExec: [the_min@2 DESC]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1
as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
- " HashAggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
+ " AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 3)",
- " HashAggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
+ " AggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c12@1 < CAST(10 AS Float64)",
" RepartitionExec: partitioning=RoundRobinBatch(3)",
diff --git a/datafusion/core/tests/sql/json.rs
b/datafusion/core/tests/sql/json.rs
index e2209b876..79deaae79 100644
--- a/datafusion/core/tests/sql/json.rs
+++ b/datafusion/core/tests/sql/json.rs
@@ -92,9 +92,9 @@ async fn json_explain() {
vec![
"physical_plan",
"ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\
- \n HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
+ \n AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
\n CoalescePartitionsExec\
- \n HashAggregateExec: mode=Partial, gby=[],
aggr=[COUNT(UInt8(1))]\
+ \n AggregateExec: mode=Partial, gby=[],
aggr=[COUNT(UInt8(1))]\
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
\n JsonExec: limit=None, files=[tests/jsons/2.json]\n",
],