This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b42f3d62 Re-organize and rename aggregates physical plan (#2388)
3b42f3d62 is described below

commit 3b42f3d62027982fa88b70e696a6bce2d8866a31
Author: Yijie Shen <[email protected]>
AuthorDate: Mon May 2 08:41:14 2022 +0800

    Re-organize and rename aggregates physical plan (#2388)
    
    * first move: re-group aggregates functionalities in 
core/physical_p/aggregates
    
    * address review comments feedback
    
    * naming
---
 ballista/rust/core/proto/ballista.proto            |    4 +-
 ballista/rust/core/src/serde/physical_plan/mod.rs  |   34 +-
 ballista/rust/core/src/utils.rs                    |    6 +-
 ballista/rust/scheduler/src/planner.rs             |   24 +-
 datafusion/core/src/lib.rs                         |    2 +-
 .../src/physical_optimizer/aggregate_statistics.rs |   53 +-
 .../src/physical_optimizer/coalesce_batches.rs     |    4 +-
 .../core/src/physical_optimizer/repartition.rs     |   26 +-
 .../core/src/physical_plan/aggregates/hash.rs      |  477 +++++++
 .../core/src/physical_plan/aggregates/mod.rs       |  719 +++++++++++
 .../src/physical_plan/aggregates/no_grouping.rs    |  165 +++
 .../core/src/physical_plan/hash_aggregate.rs       | 1299 --------------------
 datafusion/core/src/physical_plan/mod.rs           |    1 -
 datafusion/core/src/physical_plan/planner.rs       |   10 +-
 datafusion/core/tests/sql/avro.rs                  |    4 +-
 datafusion/core/tests/sql/explain_analyze.rs       |   10 +-
 datafusion/core/tests/sql/json.rs                  |    4 +-
 17 files changed, 1451 insertions(+), 1391 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index e7b1a7508..d0357fb12 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -266,7 +266,7 @@ message PhysicalPlanNode {
     ProjectionExecNode projection = 4;
     GlobalLimitExecNode global_limit = 6;
     LocalLimitExecNode local_limit = 7;
-    HashAggregateExecNode hash_aggregate = 8;
+    AggregateExecNode aggregate = 8;
     HashJoinExecNode hash_join = 9;
     ShuffleReaderExecNode shuffle_reader = 10;
     SortExecNode sort = 11;
@@ -519,7 +519,7 @@ message WindowAggExecNode {
   datafusion.Schema input_schema = 4;
 }
 
-message HashAggregateExecNode {
+message AggregateExecNode {
   repeated PhysicalExprNode group_expr = 1;
   repeated PhysicalExprNode aggr_expr = 2;
   AggregateMode mode = 3;
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs 
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index ed268820f..f5b495b67 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -28,7 +28,8 @@ use datafusion::datasource::listing::PartitionedFile;
 use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::logical_plan::window_frames::WindowFrame;
 use datafusion::logical_plan::FunctionRegistry;
-use datafusion::physical_plan::aggregates::create_aggregate_expr;
+use datafusion::physical_plan::aggregates::AggregateExec;
+use datafusion::physical_plan::aggregates::{create_aggregate_expr, 
AggregateMode};
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::cross_join::CrossJoinExec;
@@ -39,7 +40,6 @@ use datafusion::physical_plan::file_format::{
     AvroExec, CsvExec, FileScanConfig, ParquetExec,
 };
 use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::hash_aggregate::{AggregateMode, 
HashAggregateExec};
 use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
 use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use datafusion::physical_plan::projection::ProjectionExec;
@@ -306,19 +306,21 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     Arc::new((&input_schema).try_into()?),
                 )?))
             }
-            PhysicalPlanType::HashAggregate(hash_agg) => {
+            PhysicalPlanType::Aggregate(hash_agg) => {
                 let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
                     hash_agg.input,
                     registry,
                     runtime,
                     extension_codec
                 )?;
-                let mode = 
protobuf::AggregateMode::from_i32(hash_agg.mode).ok_or_else(|| {
-                    proto_error(format!(
-                        "Received a HashAggregateNode message with unknown 
AggregateMode {}",
+                let mode = 
protobuf::AggregateMode::from_i32(hash_agg.mode).ok_or_else(
+                    || {
+                        proto_error(format!(
+                        "Received a AggregateNode message with unknown 
AggregateMode {}",
                         hash_agg.mode
                     ))
-                })?;
+                    },
+                )?;
                 let agg_mode: AggregateMode = match mode {
                     protobuf::AggregateMode::Partial => AggregateMode::Partial,
                     protobuf::AggregateMode::Final => AggregateMode::Final,
@@ -341,7 +343,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     .as_ref()
                     .ok_or_else(|| {
                         BallistaError::General(
-                            "input_schema in HashAggregateNode is 
missing.".to_owned(),
+                            "input_schema in AggregateNode is 
missing.".to_owned(),
                         )
                     })?
                     .clone();
@@ -384,14 +386,14 @@ impl AsExecutionPlan for PhysicalPlanNode {
                                 )?)
                             }
                             _ => Err(BallistaError::General(
-                                "Invalid aggregate  expression for 
HashAggregateExec"
+                                "Invalid aggregate expression for 
AggregateExec"
                                     .to_string(),
                             )),
                         }
                     })
                     .collect::<Result<Vec<_>, _>>()?;
 
-                Ok(Arc::new(HashAggregateExec::try_new(
+                Ok(Arc::new(AggregateExec::try_new(
                     agg_mode,
                     group,
                     physical_aggr_expr,
@@ -730,7 +732,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     },
                 ))),
             })
-        } else if let Some(exec) = plan.downcast_ref::<HashAggregateExec>() {
+        } else if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
             let groups = exec
                 .group_expr()
                 .iter()
@@ -768,8 +770,8 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 extension_codec,
             )?;
             Ok(protobuf::PhysicalPlanNode {
-                physical_plan_type: 
Some(PhysicalPlanType::HashAggregate(Box::new(
-                    protobuf::HashAggregateExecNode {
+                physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
+                    protobuf::AggregateExecNode {
                         group_expr: groups,
                         group_expr_name: group_names,
                         aggr_expr: agg,
@@ -1080,12 +1082,12 @@ mod roundtrip_tests {
         datasource::listing::PartitionedFile,
         logical_plan::{JoinType, Operator},
         physical_plan::{
+            aggregates::{AggregateExec, AggregateMode},
             empty::EmptyExec,
             expressions::{binary, col, lit, InListExpr, NotExpr},
             expressions::{Avg, Column, PhysicalSortExpr},
             file_format::{FileScanConfig, ParquetExec},
             filter::FilterExec,
-            hash_aggregate::{AggregateMode, HashAggregateExec},
             hash_join::{HashJoinExec, PartitionMode},
             limit::{GlobalLimitExec, LocalLimitExec},
             sorts::sort::SortExec,
@@ -1212,7 +1214,7 @@ mod roundtrip_tests {
     }
 
     #[test]
-    fn rountrip_hash_aggregate() -> Result<()> {
+    fn rountrip_aggregate() -> Result<()> {
         let field_a = Field::new("a", DataType::Int64, false);
         let field_b = Field::new("b", DataType::Int64, false);
         let schema = Arc::new(Schema::new(vec![field_a, field_b]));
@@ -1226,7 +1228,7 @@ mod roundtrip_tests {
             DataType::Float64,
         ))];
 
-        roundtrip_test(Arc::new(HashAggregateExec::try_new(
+        roundtrip_test(Arc::new(AggregateExec::try_new(
             AggregateMode::Final,
             groups.clone(),
             aggregates.clone(),
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 6670ab5ce..85a557e43 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -48,9 +48,9 @@ use datafusion::physical_plan::common::batch_byte_size;
 use datafusion::physical_plan::empty::EmptyExec;
 
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion::physical_plan::aggregates::AggregateExec;
 use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
 use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
 use datafusion::physical_plan::hash_join::HashJoinExec;
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
@@ -151,8 +151,8 @@ fn build_exec_plan_diagram(
     id: &mut AtomicUsize,
     draw_entity: bool,
 ) -> Result<usize> {
-    let operator_str = if 
plan.as_any().downcast_ref::<HashAggregateExec>().is_some() {
-        "HashAggregateExec"
+    let operator_str = if 
plan.as_any().downcast_ref::<AggregateExec>().is_some() {
+        "AggregateExec"
     } else if plan.as_any().downcast_ref::<SortExec>().is_some() {
         "SortExec"
     } else if plan.as_any().downcast_ref::<ProjectionExec>().is_some() {
diff --git a/ballista/rust/scheduler/src/planner.rs 
b/ballista/rust/scheduler/src/planner.rs
index 07168adb6..af1ce1c61 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -276,8 +276,8 @@ mod test {
     use ballista_core::error::BallistaError;
     use ballista_core::execution_plans::UnresolvedShuffleExec;
     use ballista_core::serde::{protobuf, AsExecutionPlan, BallistaCodec};
+    use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
     use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
-    use datafusion::physical_plan::hash_aggregate::{AggregateMode, 
HashAggregateExec};
     use datafusion::physical_plan::hash_join::HashJoinExec;
     use datafusion::physical_plan::sorts::sort::SortExec;
     use datafusion::physical_plan::{
@@ -298,7 +298,7 @@ mod test {
     }
 
     #[tokio::test]
-    async fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> {
+    async fn distributed_aggregate_plan() -> Result<(), BallistaError> {
         let ctx = datafusion_test_context("testdata").await?;
 
         // simplified form of TPC-H query 1
@@ -327,12 +327,12 @@ mod test {
         /* Expected result:
 
         ShuffleWriterExec: Some(Hash([Column { name: "l_returnflag", index: 0 
}], 2))
-          HashAggregateExec: mode=Partial, gby=[l_returnflag@1 as 
l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
+          AggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], 
aggr=[SUM(l_extendedprice Multiply Int64(1))]
             CsvExec: source=Path(testdata/lineitem: 
[testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), 
has_header=false
 
         ShuffleWriterExec: None
           ProjectionExec: expr=[l_returnflag@0 as l_returnflag, 
SUM(lineitem.l_extendedprice Multiply Int64(1))@1 as sum_disc_price]
-            HashAggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as 
l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
+            AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as 
l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
               CoalesceBatchesExec: target_batch_size=4096
                 UnresolvedShuffleExec
 
@@ -346,14 +346,14 @@ mod test {
 
         // verify stage 0
         let stage0 = stages[0].children()[0].clone();
-        let partial_hash = downcast_exec!(stage0, HashAggregateExec);
+        let partial_hash = downcast_exec!(stage0, AggregateExec);
         assert!(*partial_hash.mode() == AggregateMode::Partial);
 
         // verify stage 1
         let stage1 = stages[1].children()[0].clone();
         let projection = downcast_exec!(stage1, ProjectionExec);
         let final_hash = projection.children()[0].clone();
-        let final_hash = downcast_exec!(final_hash, HashAggregateExec);
+        let final_hash = downcast_exec!(final_hash, AggregateExec);
         assert!(*final_hash.mode() == AggregateMode::FinalPartitioned);
         let coalesce = final_hash.children()[0].clone();
         let coalesce = downcast_exec!(coalesce, CoalesceBatchesExec);
@@ -449,7 +449,7 @@ order by
           CsvExec: source=Path(testdata/orders: [testdata/orders/orders.tbl]), 
has_header=false
 
         ShuffleWriterExec: Some(Hash([Column { name: "l_shipmode", index: 0 
}], 2))
-          HashAggregateExec: mode=Partial, gby=[l_shipmode@4 as l_shipmode], 
aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or 
#orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), 
SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And 
#orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
+          AggregateExec: mode=Partial, gby=[l_shipmode@4 as l_shipmode], 
aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or 
#orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), 
SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And 
#orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
             CoalesceBatchesExec: target_batch_size=4096
               HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { 
name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })]
                 CoalesceBatchesExec: target_batch_size=4096
@@ -459,7 +459,7 @@ order by
 
         ShuffleWriterExec: None
           ProjectionExec: expr=[l_shipmode@0 as l_shipmode, SUM(CASE WHEN 
#orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq 
Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, SUM(CASE 
WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority 
NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count]
-            HashAggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as 
l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or 
#orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), 
SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And 
#orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
+            AggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as 
l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or 
#orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), 
SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And 
#orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
               CoalesceBatchesExec: target_batch_size=4096
                 UnresolvedShuffleExec
 
@@ -514,7 +514,7 @@ order by
                 .partition_count()
         );
 
-        let hash_agg = downcast_exec!(input, HashAggregateExec);
+        let hash_agg = downcast_exec!(input, AggregateExec);
 
         let coalesce_batches = hash_agg.children()[0].clone();
         let coalesce_batches = downcast_exec!(coalesce_batches, 
CoalesceBatchesExec);
@@ -560,7 +560,7 @@ order by
     }
 
     #[tokio::test]
-    async fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> {
+    async fn roundtrip_serde_aggregate() -> Result<(), BallistaError> {
         let ctx = datafusion_test_context("testdata").await?;
 
         // simplified form of TPC-H query 1
@@ -586,8 +586,8 @@ order by
         let partial_hash = stages[0].children()[0].clone();
         let partial_hash_serde = roundtrip_operator(partial_hash.clone())?;
 
-        let partial_hash = downcast_exec!(partial_hash, HashAggregateExec);
-        let partial_hash_serde = downcast_exec!(partial_hash_serde, 
HashAggregateExec);
+        let partial_hash = downcast_exec!(partial_hash, AggregateExec);
+        let partial_hash_serde = downcast_exec!(partial_hash_serde, 
AggregateExec);
 
         assert_eq!(
             format!("{:?}", partial_hash),
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index fd08d3a0a..055a17f4e 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -158,7 +158,7 @@
 //!
 //! * Projection: [`ProjectionExec`](physical_plan::projection::ProjectionExec)
 //! * Filter: [`FilterExec`](physical_plan::filter::FilterExec)
-//! * Hash and Grouped aggregations: 
[`HashAggregateExec`](physical_plan::hash_aggregate::HashAggregateExec)
+//! * Grouped and non-grouped aggregations: 
[`AggregateExec`](physical_plan::aggregates::AggregateExec)
 //! * Sort: [`SortExec`](physical_plan::sorts::sort::SortExec)
 //! * Coalesce partitions: 
[`CoalescePartitionsExec`](physical_plan::coalesce_partitions::CoalescePartitionsExec)
 //! * Limit: [`LocalLimitExec`](physical_plan::limit::LocalLimitExec) and 
[`GlobalLimitExec`](physical_plan::limit::GlobalLimitExec)
diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs 
b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index 9af053f93..9c548ab9b 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -21,8 +21,8 @@ use std::sync::Arc;
 use arrow::datatypes::Schema;
 
 use crate::execution::context::SessionConfig;
+use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
 use crate::physical_plan::empty::EmptyExec;
-use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
 use crate::physical_plan::projection::ProjectionExec;
 use crate::physical_plan::{
     expressions, AggregateExpr, ColumnStatistics, ExecutionPlan, Statistics,
@@ -53,8 +53,8 @@ impl PhysicalOptimizerRule for AggregateStatistics {
         if let Some(partial_agg_exec) = take_optimizable(&*plan) {
             let partial_agg_exec = partial_agg_exec
                 .as_any()
-                .downcast_ref::<HashAggregateExec>()
-                .expect("take_optimizable() ensures that this is a 
HashAggregateExec");
+                .downcast_ref::<AggregateExec>()
+                .expect("take_optimizable() ensures that this is a 
AggregateExec");
             let stats = partial_agg_exec.input().statistics();
             let mut projections = vec![];
             for expr in partial_agg_exec.aggr_expr() {
@@ -96,22 +96,22 @@ impl PhysicalOptimizerRule for AggregateStatistics {
     }
 }
 
-/// assert if the node passed as argument is a final `HashAggregateExec` node 
that can be optimized:
-/// - its child (with posssible intermediate layers) is a partial 
`HashAggregateExec` node
+/// assert if the node passed as argument is a final `AggregateExec` node that 
can be optimized:
+/// - its child (with posssible intermediate layers) is a partial 
`AggregateExec` node
 /// - they both have no grouping expression
 /// - the statistics are exact
-/// If this is the case, return a ref to the partial `HashAggregateExec`, else 
`None`.
-/// We would have prefered to return a casted ref to HashAggregateExec but the 
recursion requires
+/// If this is the case, return a ref to the partial `AggregateExec`, else 
`None`.
+/// We would have prefered to return a casted ref to AggregateExec but the 
recursion requires
 /// the `ExecutionPlan.children()` method that returns an owned reference.
 fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn 
ExecutionPlan>> {
-    if let Some(final_agg_exec) = 
node.as_any().downcast_ref::<HashAggregateExec>() {
+    if let Some(final_agg_exec) = 
node.as_any().downcast_ref::<AggregateExec>() {
         if final_agg_exec.mode() == &AggregateMode::Final
             && final_agg_exec.group_expr().is_empty()
         {
             let mut child = Arc::clone(final_agg_exec.input());
             loop {
                 if let Some(partial_agg_exec) =
-                    child.as_any().downcast_ref::<HashAggregateExec>()
+                    child.as_any().downcast_ref::<AggregateExec>()
                 {
                     if partial_agg_exec.mode() == &AggregateMode::Partial
                         && partial_agg_exec.group_expr().is_empty()
@@ -260,11 +260,11 @@ mod tests {
 
     use crate::error::Result;
     use crate::logical_plan::Operator;
+    use crate::physical_plan::aggregates::AggregateExec;
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::common;
     use crate::physical_plan::expressions::Count;
     use crate::physical_plan::filter::FilterExec;
-    use crate::physical_plan::hash_aggregate::HashAggregateExec;
     use crate::physical_plan::memory::MemoryExec;
     use crate::prelude::SessionContext;
 
@@ -291,10 +291,7 @@ mod tests {
     }
 
     /// Checks that the count optimization was applied and we still get the 
right result
-    async fn assert_count_optim_success(
-        plan: HashAggregateExec,
-        nulls: bool,
-    ) -> Result<()> {
+    async fn assert_count_optim_success(plan: AggregateExec, nulls: bool) -> 
Result<()> {
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let conf = session_ctx.copied_config();
@@ -336,7 +333,7 @@ mod tests {
         let source = mock_data()?;
         let schema = source.schema();
 
-        let partial_agg = HashAggregateExec::try_new(
+        let partial_agg = AggregateExec::try_new(
             AggregateMode::Partial,
             vec![],
             vec![count_expr(None, None)],
@@ -344,7 +341,7 @@ mod tests {
             Arc::clone(&schema),
         )?;
 
-        let final_agg = HashAggregateExec::try_new(
+        let final_agg = AggregateExec::try_new(
             AggregateMode::Final,
             vec![],
             vec![count_expr(None, None)],
@@ -363,7 +360,7 @@ mod tests {
         let source = mock_data()?;
         let schema = source.schema();
 
-        let partial_agg = HashAggregateExec::try_new(
+        let partial_agg = AggregateExec::try_new(
             AggregateMode::Partial,
             vec![],
             vec![count_expr(Some(&schema), Some("a"))],
@@ -371,7 +368,7 @@ mod tests {
             Arc::clone(&schema),
         )?;
 
-        let final_agg = HashAggregateExec::try_new(
+        let final_agg = AggregateExec::try_new(
             AggregateMode::Final,
             vec![],
             vec![count_expr(Some(&schema), Some("a"))],
@@ -389,7 +386,7 @@ mod tests {
         let source = mock_data()?;
         let schema = source.schema();
 
-        let partial_agg = HashAggregateExec::try_new(
+        let partial_agg = AggregateExec::try_new(
             AggregateMode::Partial,
             vec![],
             vec![count_expr(None, None)],
@@ -400,7 +397,7 @@ mod tests {
         // We introduce an intermediate optimization step between the partial 
and final aggregtator
         let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg));
 
-        let final_agg = HashAggregateExec::try_new(
+        let final_agg = AggregateExec::try_new(
             AggregateMode::Final,
             vec![],
             vec![count_expr(None, None)],
@@ -418,7 +415,7 @@ mod tests {
         let source = mock_data()?;
         let schema = source.schema();
 
-        let partial_agg = HashAggregateExec::try_new(
+        let partial_agg = AggregateExec::try_new(
             AggregateMode::Partial,
             vec![],
             vec![count_expr(Some(&schema), Some("a"))],
@@ -429,7 +426,7 @@ mod tests {
         // We introduce an intermediate optimization step between the partial 
and final aggregtator
         let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg));
 
-        let final_agg = HashAggregateExec::try_new(
+        let final_agg = AggregateExec::try_new(
             AggregateMode::Final,
             vec![],
             vec![count_expr(Some(&schema), Some("a"))],
@@ -458,7 +455,7 @@ mod tests {
             source,
         )?);
 
-        let partial_agg = HashAggregateExec::try_new(
+        let partial_agg = AggregateExec::try_new(
             AggregateMode::Partial,
             vec![],
             vec![count_expr(None, None)],
@@ -466,7 +463,7 @@ mod tests {
             Arc::clone(&schema),
         )?;
 
-        let final_agg = HashAggregateExec::try_new(
+        let final_agg = AggregateExec::try_new(
             AggregateMode::Final,
             vec![],
             vec![count_expr(None, None)],
@@ -479,7 +476,7 @@ mod tests {
             AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
 
         // check that the original ExecutionPlan was not replaced
-        assert!(optimized.as_any().is::<HashAggregateExec>());
+        assert!(optimized.as_any().is::<AggregateExec>());
 
         Ok(())
     }
@@ -500,7 +497,7 @@ mod tests {
             source,
         )?);
 
-        let partial_agg = HashAggregateExec::try_new(
+        let partial_agg = AggregateExec::try_new(
             AggregateMode::Partial,
             vec![],
             vec![count_expr(Some(&schema), Some("a"))],
@@ -508,7 +505,7 @@ mod tests {
             Arc::clone(&schema),
         )?;
 
-        let final_agg = HashAggregateExec::try_new(
+        let final_agg = AggregateExec::try_new(
             AggregateMode::Final,
             vec![],
             vec![count_expr(Some(&schema), Some("a"))],
@@ -521,7 +518,7 @@ mod tests {
             AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
 
         // check that the original ExecutionPlan was not replaced
-        assert!(optimized.as_any().is::<HashAggregateExec>());
+        assert!(optimized.as_any().is::<AggregateExec>());
 
         Ok(())
     }
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs 
b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index 50f8abe7f..2b8582126 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -62,9 +62,9 @@ impl PhysicalOptimizerRule for CoalesceBatches {
             || plan_any.downcast_ref::<HashJoinExec>().is_some()
             || plan_any.downcast_ref::<RepartitionExec>().is_some();
 
-        // TODO we should also do this for HashAggregateExec but we need to 
update tests
+        // TODO we should also do this for AggregateExec but we need to update 
tests
         // as part of this work - see 
https://issues.apache.org/jira/browse/ARROW-11068
-        // || plan_any.downcast_ref::<HashAggregateExec>().is_some();
+        // || plan_any.downcast_ref::<AggregateExec>().is_some();
 
         if plan.children().is_empty() {
             // leaf node, children cannot be replaced
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs 
b/datafusion/core/src/physical_optimizer/repartition.rs
index 2506348fe..3b2c4515e 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -241,10 +241,10 @@ mod tests {
 
     use super::*;
     use crate::datasource::listing::PartitionedFile;
+    use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
     use crate::physical_plan::expressions::{col, PhysicalSortExpr};
     use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
     use crate::physical_plan::filter::FilterExec;
-    use crate::physical_plan::hash_aggregate::{AggregateMode, 
HashAggregateExec};
     use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
     use crate::physical_plan::projection::ProjectionExec;
     use crate::physical_plan::sorts::sort::SortExec;
@@ -300,15 +300,15 @@ mod tests {
         Arc::new(ProjectionExec::try_new(exprs, input).unwrap())
     }
 
-    fn hash_aggregate(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> 
{
+    fn aggregate(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
         let schema = schema();
         Arc::new(
-            HashAggregateExec::try_new(
+            AggregateExec::try_new(
                 AggregateMode::Final,
                 vec![],
                 vec![],
                 Arc::new(
-                    HashAggregateExec::try_new(
+                    AggregateExec::try_new(
                         AggregateMode::Partial,
                         vec![],
                         vec![],
@@ -361,11 +361,11 @@ mod tests {
 
     #[test]
     fn added_repartition_to_single_partition() -> Result<()> {
-        let plan = hash_aggregate(parquet_exec());
+        let plan = aggregate(parquet_exec());
 
         let expected = [
-            "HashAggregateExec: mode=Final, gby=[], aggr=[]",
-            "HashAggregateExec: mode=Partial, gby=[], aggr=[]",
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "AggregateExec: mode=Partial, gby=[], aggr=[]",
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
             "ParquetExec: limit=None, partitions=[x], projection=[c1]",
         ];
@@ -376,11 +376,11 @@ mod tests {
 
     #[test]
     fn repartition_deepest_node() -> Result<()> {
-        let plan = hash_aggregate(filter_exec(parquet_exec()));
+        let plan = aggregate(filter_exec(parquet_exec()));
 
         let expected = &[
-            "HashAggregateExec: mode=Final, gby=[], aggr=[]",
-            "HashAggregateExec: mode=Partial, gby=[], aggr=[]",
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "AggregateExec: mode=Partial, gby=[], aggr=[]",
             "FilterExec: c1@0",
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
             "ParquetExec: limit=None, partitions=[x], projection=[c1]",
@@ -443,11 +443,11 @@ mod tests {
 
     #[test]
     fn repartition_ignores_limit() -> Result<()> {
-        let plan = 
hash_aggregate(limit_exec(filter_exec(limit_exec(parquet_exec()))));
+        let plan = 
aggregate(limit_exec(filter_exec(limit_exec(parquet_exec()))));
 
         let expected = &[
-            "HashAggregateExec: mode=Final, gby=[], aggr=[]",
-            "HashAggregateExec: mode=Partial, gby=[], aggr=[]",
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "AggregateExec: mode=Partial, gby=[], aggr=[]",
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
             "GlobalLimitExec: limit=100",
             "LocalLimitExec: limit=100",
diff --git a/datafusion/core/src/physical_plan/aggregates/hash.rs 
b/datafusion/core/src/physical_plan/aggregates/hash.rs
new file mode 100644
index 000000000..d5a325367
--- /dev/null
+++ b/datafusion/core/src/physical_plan/aggregates/hash.rs
@@ -0,0 +1,477 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the execution plan for the hash aggregate operation
+
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::vec;
+
+use ahash::RandomState;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+
+use crate::error::Result;
+use crate::physical_plan::aggregates::{AccumulatorItem, AggregateMode};
+use crate::physical_plan::hash_utils::create_hashes;
+use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
+use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+use crate::scalar::ScalarValue;
+
+use arrow::{array::ArrayRef, compute, compute::cast};
+use arrow::{
+    array::{Array, UInt32Builder},
+    error::{ArrowError, Result as ArrowResult},
+};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    record_batch::RecordBatch,
+};
+use hashbrown::raw::RawTable;
+
+/*
+The architecture is the following:
+
+1. An accumulator has state that is updated on each batch.
+2. At the end of the aggregation (e.g. end of batches in a partition), the 
accumulator converts its state to a RecordBatch of a single row
+3. The RecordBatches of all accumulators are merged (`concatenate` in 
`rust/arrow`) together to a single RecordBatch.
+4. The state's RecordBatch is `merge`d to a new state
+5. The state is mapped to the final value
+
+Why:
+
+* Accumulators' state can be statically typed, but it is more efficient to 
transmit data from the accumulators via `Array`
+* The `merge` operation must have access to the state of the aggregators 
because it uses it to correctly merge
+* It uses Arrow's native dynamically typed object, `Array`.
+* Arrow shines in batch operations and both `merge` and `concatenate` of 
uniform types are very performant.
+
+Example: average
+
+* the state is `n: u32` and `sum: f64`
+* For every batch, we update them accordingly.
+* At the end of the accumulation (of a partition), we convert `n` and `sum` to 
a RecordBatch of 1 row and two columns: `[n, sum]`
+* The RecordBatch is (sent back / transmitted over network)
+* Once all N record batches arrive, `merge` is performed, which builds a 
RecordBatch with N rows and 2 columns.
+* Finally, `get_value` returns an array with one entry computed from the state
+*/
+pub(crate) struct GroupedHashAggregateStream {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    mode: AggregateMode,
+    accumulators: Accumulators,
+    aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+
+    aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+    group_expr: Vec<Arc<dyn PhysicalExpr>>,
+
+    baseline_metrics: BaselineMetrics,
+    random_state: RandomState,
+    finished: bool,
+}
+
+impl GroupedHashAggregateStream {
+    /// Create a new GroupedHashAggregateStream
+    pub fn new(
+        mode: AggregateMode,
+        schema: SchemaRef,
+        group_expr: Vec<Arc<dyn PhysicalExpr>>,
+        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        input: SendableRecordBatchStream,
+        baseline_metrics: BaselineMetrics,
+    ) -> Result<Self> {
+        let timer = baseline_metrics.elapsed_compute().timer();
+
+        // The expressions to evaluate the batch, one vec of expressions per 
aggregation.
+        // Assume create_schema() always put group columns in front of aggr 
columns, we set
+        // col_idx_base to group expression count.
+        let aggregate_expressions =
+            aggregates::aggregate_expressions(&aggr_expr, &mode, 
group_expr.len())?;
+
+        timer.done();
+
+        Ok(Self {
+            schema,
+            mode,
+            input,
+            aggr_expr,
+            group_expr,
+            baseline_metrics,
+            aggregate_expressions,
+            accumulators: Default::default(),
+            random_state: Default::default(),
+            finished: false,
+        })
+    }
+}
+
+impl Stream for GroupedHashAggregateStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let this = &mut *self;
+        if this.finished {
+            return Poll::Ready(None);
+        }
+
+        let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+        loop {
+            let result = match ready!(this.input.poll_next_unpin(cx)) {
+                Some(Ok(batch)) => {
+                    let timer = elapsed_compute.timer();
+                    let result = group_aggregate_batch(
+                        &this.mode,
+                        &this.random_state,
+                        &this.group_expr,
+                        &this.aggr_expr,
+                        batch,
+                        &mut this.accumulators,
+                        &this.aggregate_expressions,
+                    );
+
+                    timer.done();
+
+                    match result {
+                        Ok(_) => continue,
+                        Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
+                    }
+                }
+                Some(Err(e)) => Err(e),
+                None => {
+                    this.finished = true;
+                    let timer = 
this.baseline_metrics.elapsed_compute().timer();
+                    let result = create_batch_from_map(
+                        &this.mode,
+                        &this.accumulators,
+                        this.group_expr.len(),
+                        &this.schema,
+                    )
+                    .record_output(&this.baseline_metrics);
+
+                    timer.done();
+                    result
+                }
+            };
+
+            this.finished = true;
+            return Poll::Ready(Some(result));
+        }
+    }
+}
+
+impl RecordBatchStream for GroupedHashAggregateStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+/// TODO: Make this a member function of [`GroupedHashAggregateStream`]
+fn group_aggregate_batch(
+    mode: &AggregateMode,
+    random_state: &RandomState,
+    group_expr: &[Arc<dyn PhysicalExpr>],
+    aggr_expr: &[Arc<dyn AggregateExpr>],
+    batch: RecordBatch,
+    accumulators: &mut Accumulators,
+    aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>],
+) -> Result<()> {
+    // evaluate the grouping expressions
+    let group_values = evaluate(group_expr, &batch)?;
+
+    // evaluate the aggregation expressions.
+    // We could evaluate them after the `take`, but since we need to evaluate 
all
+    // of them anyways, it is more performant to do it while they are together.
+    let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?;
+
+    // 1.1 construct the key from the group values
+    // 1.2 construct the mapping key if it does not exist
+    // 1.3 add the row' index to `indices`
+
+    // track which entries in `accumulators` have rows in this batch to 
aggregate
+    let mut groups_with_rows = vec![];
+
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_hashes(&group_values, random_state, &mut batch_hashes)?;
+
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let Accumulators { map, group_states } = accumulators;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is
+            // actually the same key value as the group in
+            // existing_idx  (aka group_values @ row)
+            let group_state = &group_states[*group_idx];
+            group_values
+                .iter()
+                .zip(group_state.group_by_values.iter())
+                .all(|(array, scalar)| scalar.eq_array(array, row))
+        });
+
+        match entry {
+            // Existing entry for this group value
+            Some((_hash, group_idx)) => {
+                let group_state = &mut group_states[*group_idx];
+                // 1.3
+                if group_state.indices.is_empty() {
+                    groups_with_rows.push(*group_idx);
+                };
+                group_state.indices.push(row as u32); // remember this row
+            }
+            //  1.2 Need to create new entry
+            None => {
+                let accumulator_set = 
aggregates::create_accumulators(aggr_expr)?;
+
+                // Copy group values out of arrays into `ScalarValue`s
+                let group_by_values = group_values
+                    .iter()
+                    .map(|col| ScalarValue::try_from_array(col, row))
+                    .collect::<Result<Vec<_>>>()?;
+
+                // Add new entry to group_states and save newly created index
+                let group_state = GroupState {
+                    group_by_values: group_by_values.into_boxed_slice(),
+                    accumulator_set,
+                    indices: vec![row as u32], // 1.3
+                };
+                let group_idx = group_states.len();
+                group_states.push(group_state);
+                groups_with_rows.push(group_idx);
+
+                // for hasher function, use precomputed hash value
+                map.insert(hash, (hash, group_idx), |(hash, _group_idx)| 
*hash);
+            }
+        };
+    }
+
+    // Collect all indices + offsets based on keys in this vec
+    let mut batch_indices: UInt32Builder = UInt32Builder::new(0);
+    let mut offsets = vec![0];
+    let mut offset_so_far = 0;
+    for group_idx in groups_with_rows.iter() {
+        let indices = &accumulators.group_states[*group_idx].indices;
+        batch_indices.append_slice(indices)?;
+        offset_so_far += indices.len();
+        offsets.push(offset_so_far);
+    }
+    let batch_indices = batch_indices.finish();
+
+    // `Take` all values based on indices into Arrays
+    let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values
+        .iter()
+        .map(|array| {
+            array
+                .iter()
+                .map(|array| {
+                    compute::take(
+                        array.as_ref(),
+                        &batch_indices,
+                        None, // None: no index check
+                    )
+                    .unwrap()
+                })
+                .collect()
+            // 2.3
+        })
+        .collect();
+
+    // 2.1 for each key in this batch
+    // 2.2 for each aggregation
+    // 2.3 `slice` from each of its arrays the keys' values
+    // 2.4 update / merge the accumulator with the values
+    // 2.5 clear indices
+    groups_with_rows
+        .iter()
+        .zip(offsets.windows(2))
+        .try_for_each(|(group_idx, offsets)| {
+            let group_state = &mut accumulators.group_states[*group_idx];
+            // 2.2
+            group_state
+                .accumulator_set
+                .iter_mut()
+                .zip(values.iter())
+                .map(|(accumulator, aggr_array)| {
+                    (
+                        accumulator,
+                        aggr_array
+                            .iter()
+                            .map(|array| {
+                                // 2.3
+                                array.slice(offsets[0], offsets[1] - 
offsets[0])
+                            })
+                            .collect::<Vec<ArrayRef>>(),
+                    )
+                })
+                .try_for_each(|(accumulator, values)| match mode {
+                    AggregateMode::Partial => 
accumulator.update_batch(&values),
+                    AggregateMode::FinalPartitioned | AggregateMode::Final => {
+                        // note: the aggregation here is over states, not 
values, thus the merge
+                        accumulator.merge_batch(&values)
+                    }
+                })
+                // 2.5
+                .and({
+                    group_state.indices.clear();
+                    Ok(())
+                })
+        })?;
+
+    Ok(())
+}
+
+/// The state that is built for each output group.
+#[derive(Debug)]
+struct GroupState {
+    /// The actual group by values, one for each group column
+    group_by_values: Box<[ScalarValue]>,
+
+    // Accumulator state, one for each aggregate
+    accumulator_set: Vec<AccumulatorItem>,
+
+    /// scratch space used to collect indices for input rows in a
+    /// bach that have values to aggregate. Reset on each batch
+    indices: Vec<u32>,
+}
+
+/// The state of all the groups
+#[derive(Default)]
+struct Accumulators {
+    /// Logically maps group values to an index in `group_states`
+    ///
+    /// Uses the raw API of hashbrown to avoid actually storing the
+    /// keys in the table
+    ///
+    /// keys: u64 hashes of the GroupValue
+    /// values: (hash, index into `group_states`)
+    map: RawTable<(u64, usize)>,
+
+    /// State for each group
+    group_states: Vec<GroupState>,
+}
+
+impl std::fmt::Debug for Accumulators {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        // hashes are not store inline, so could only get values
+        let map_string = "RawTable";
+        f.debug_struct("Accumulators")
+            .field("map", &map_string)
+            .field("group_states", &self.group_states)
+            .finish()
+    }
+}
+
+/// Evaluates expressions against a record batch.
+fn evaluate(
+    expr: &[Arc<dyn PhysicalExpr>],
+    batch: &RecordBatch,
+) -> Result<Vec<ArrayRef>> {
+    expr.iter()
+        .map(|expr| expr.evaluate(batch))
+        .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+        .collect::<Result<Vec<_>>>()
+}
+
+/// Evaluates expressions against a record batch.
+fn evaluate_many(
+    expr: &[Vec<Arc<dyn PhysicalExpr>>],
+    batch: &RecordBatch,
+) -> Result<Vec<Vec<ArrayRef>>> {
+    expr.iter()
+        .map(|expr| evaluate(expr, batch))
+        .collect::<Result<Vec<_>>>()
+}
+
+/// Create a RecordBatch with all group keys and accumulator' states or values.
+fn create_batch_from_map(
+    mode: &AggregateMode,
+    accumulators: &Accumulators,
+    num_group_expr: usize,
+    output_schema: &Schema,
+) -> ArrowResult<RecordBatch> {
+    if accumulators.group_states.is_empty() {
+        return Ok(RecordBatch::new_empty(Arc::new(output_schema.to_owned())));
+    }
+    let accs = &accumulators.group_states[0].accumulator_set;
+    let mut acc_data_types: Vec<usize> = vec![];
+
+    // Calculate number/shape of state arrays
+    match mode {
+        AggregateMode::Partial => {
+            for acc in accs.iter() {
+                let state = acc.state()?;
+                acc_data_types.push(state.len());
+            }
+        }
+        AggregateMode::Final | AggregateMode::FinalPartitioned => {
+            acc_data_types = vec![1; accs.len()];
+        }
+    }
+
+    let mut columns = (0..num_group_expr)
+        .map(|i| {
+            ScalarValue::iter_to_array(
+                accumulators
+                    .group_states
+                    .iter()
+                    .map(|group_state| group_state.group_by_values[i].clone()),
+            )
+        })
+        .collect::<Result<Vec<_>>>()?;
+
+    // add state / evaluated arrays
+    for (x, &state_len) in acc_data_types.iter().enumerate() {
+        for y in 0..state_len {
+            match mode {
+                AggregateMode::Partial => {
+                    let res = ScalarValue::iter_to_array(
+                        accumulators.group_states.iter().map(|group_state| {
+                            let x = 
group_state.accumulator_set[x].state().unwrap();
+                            x[y].clone()
+                        }),
+                    )?;
+
+                    columns.push(res);
+                }
+                AggregateMode::Final | AggregateMode::FinalPartitioned => {
+                    let res = ScalarValue::iter_to_array(
+                        accumulators.group_states.iter().map(|group_state| {
+                            group_state.accumulator_set[x].evaluate().unwrap()
+                        }),
+                    )?;
+                    columns.push(res);
+                }
+            }
+        }
+    }
+
+    // cast output if needed (e.g. for types like Dictionary where
+    // the intermediate GroupByScalar type was not the same as the
+    // output
+    let columns = columns
+        .iter()
+        .zip(output_schema.fields().iter())
+        .map(|(col, desired_field)| cast(col, desired_field.data_type()))
+        .collect::<ArrowResult<Vec<_>>>()?;
+
+    RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns)
+}
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs 
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index c0208b239..5e1da793c 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -17,5 +17,724 @@
 
 //! Aggregates functionalities
 
+use crate::execution::context::TaskContext;
+use crate::physical_plan::aggregates::hash::GroupedHashAggregateStream;
+use crate::physical_plan::aggregates::no_grouping::AggregateStream;
+use crate::physical_plan::metrics::{
+    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
+};
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    SendableRecordBatchStream, Statistics,
+};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{Field, Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion_common::Result;
+use datafusion_expr::Accumulator;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::{
+    expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr,
+};
+use std::any::Any;
+use std::sync::Arc;
+
+mod hash;
+mod no_grouping;
+
 pub use datafusion_expr::AggregateFunction;
 pub use datafusion_physical_expr::expressions::create_aggregate_expr;
+
+/// Hash aggregate modes
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub enum AggregateMode {
+    /// Partial aggregate that can be applied in parallel across input 
partitions
+    Partial,
+    /// Final aggregate that produces a single partition of output
+    Final,
+    /// Final aggregate that works on pre-partitioned data.
+    ///
+    /// This requires the invariant that all rows with a particular
+    /// grouping key are in the same partitions, such as is the case
+    /// with Hash repartitioning on the group keys. If a group key is
+    /// duplicated, duplicate groups would be produced
+    FinalPartitioned,
+}
+
+/// Hash aggregate execution plan
+#[derive(Debug)]
+pub struct AggregateExec {
+    /// Aggregation mode (full, partial)
+    mode: AggregateMode,
+    /// Grouping expressions
+    group_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
+    /// Aggregate expressions
+    aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+    /// Input plan, could be a partial aggregate or the input to the aggregate
+    input: Arc<dyn ExecutionPlan>,
+    /// Schema after the aggregate is applied
+    schema: SchemaRef,
+    /// Input schema before any aggregation is applied. For partial aggregate 
this will be the
+    /// same as input.schema() but for the final aggregate it will be the same 
as the input
+    /// to the partial aggregate
+    input_schema: SchemaRef,
+    /// Execution Metrics
+    metrics: ExecutionPlanMetricsSet,
+}
+
+impl AggregateExec {
+    /// Create a new hash aggregate execution plan
+    pub fn try_new(
+        mode: AggregateMode,
+        group_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
+        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        input: Arc<dyn ExecutionPlan>,
+        input_schema: SchemaRef,
+    ) -> Result<Self> {
+        let schema = create_schema(&input.schema(), &group_expr, &aggr_expr, 
mode)?;
+
+        let schema = Arc::new(schema);
+
+        Ok(AggregateExec {
+            mode,
+            group_expr,
+            aggr_expr,
+            input,
+            schema,
+            input_schema,
+            metrics: ExecutionPlanMetricsSet::new(),
+        })
+    }
+
+    /// Aggregation mode (full, partial)
+    pub fn mode(&self) -> &AggregateMode {
+        &self.mode
+    }
+
+    /// Grouping expressions
+    pub fn group_expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] {
+        &self.group_expr
+    }
+
+    /// Grouping expressions as they occur in the output schema
+    pub fn output_group_expr(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        // Update column indices. Since the group by columns come first in the 
output schema, their
+        // indices are simply 0..self.group_expr(len).
+        self.group_expr
+            .iter()
+            .enumerate()
+            .map(|(index, (_col, name))| {
+                Arc::new(expressions::Column::new(name, index)) as Arc<dyn 
PhysicalExpr>
+            })
+            .collect()
+    }
+
+    /// Aggregate expressions
+    pub fn aggr_expr(&self) -> &[Arc<dyn AggregateExpr>] {
+        &self.aggr_expr
+    }
+
+    /// Input plan
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Get the input schema before any aggregates are applied
+    pub fn input_schema(&self) -> SchemaRef {
+        self.input_schema.clone()
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for AggregateExec {
+    /// Return a reference to Any that can be used for down-casting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        self.input.output_partitioning()
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        match &self.mode {
+            AggregateMode::Partial => Distribution::UnspecifiedDistribution,
+            AggregateMode::FinalPartitioned => Distribution::HashPartitioned(
+                self.group_expr.iter().map(|x| x.0.clone()).collect(),
+            ),
+            AggregateMode::Final => Distribution::SinglePartition,
+        }
+    }
+
+    fn relies_on_input_order(&self) -> bool {
+        false
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(AggregateExec::try_new(
+            self.mode,
+            self.group_expr.clone(),
+            self.aggr_expr.clone(),
+            children[0].clone(),
+            self.input_schema.clone(),
+        )?))
+    }
+
+    async fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let input = self.input.execute(partition, context).await?;
+        let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect();
+
+        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+
+        if self.group_expr.is_empty() {
+            Ok(Box::pin(AggregateStream::new(
+                self.mode,
+                self.schema.clone(),
+                self.aggr_expr.clone(),
+                input,
+                baseline_metrics,
+            )?))
+        } else {
+            Ok(Box::pin(GroupedHashAggregateStream::new(
+                self.mode,
+                self.schema.clone(),
+                group_expr,
+                self.aggr_expr.clone(),
+                input,
+                baseline_metrics,
+            )?))
+        }
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "AggregateExec: mode={:?}", self.mode)?;
+                let g: Vec<String> = self
+                    .group_expr
+                    .iter()
+                    .map(|(e, alias)| {
+                        let e = e.to_string();
+                        if &e != alias {
+                            format!("{} as {}", e, alias)
+                        } else {
+                            e
+                        }
+                    })
+                    .collect();
+                write!(f, ", gby=[{}]", g.join(", "))?;
+
+                let a: Vec<String> = self
+                    .aggr_expr
+                    .iter()
+                    .map(|agg| agg.name().to_string())
+                    .collect();
+                write!(f, ", aggr=[{}]", a.join(", "))?;
+            }
+        }
+        Ok(())
+    }
+
+    fn statistics(&self) -> Statistics {
+        // TODO stats: group expressions:
+        // - once expressions will be able to compute their own stats, use it 
here
+        // - case where we group by on a column for which with have the 
`distinct` stat
+        // TODO stats: aggr expression:
+        // - aggregations somtimes also preserve invariants such as min, max...
+        match self.mode {
+            AggregateMode::Final | AggregateMode::FinalPartitioned
+                if self.group_expr.is_empty() =>
+            {
+                Statistics {
+                    num_rows: Some(1),
+                    is_exact: true,
+                    ..Default::default()
+                }
+            }
+            _ => Statistics::default(),
+        }
+    }
+}
+
+fn create_schema(
+    input_schema: &Schema,
+    group_expr: &[(Arc<dyn PhysicalExpr>, String)],
+    aggr_expr: &[Arc<dyn AggregateExpr>],
+    mode: AggregateMode,
+) -> datafusion_common::Result<Schema> {
+    let mut fields = Vec::with_capacity(group_expr.len() + aggr_expr.len());
+    for (expr, name) in group_expr {
+        fields.push(Field::new(
+            name,
+            expr.data_type(input_schema)?,
+            expr.nullable(input_schema)?,
+        ))
+    }
+
+    match mode {
+        AggregateMode::Partial => {
+            // in partial mode, the fields of the accumulator's state
+            for expr in aggr_expr {
+                fields.extend(expr.state_fields()?.iter().cloned())
+            }
+        }
+        AggregateMode::Final | AggregateMode::FinalPartitioned => {
+            // in final mode, the field with the final result of the 
accumulator
+            for expr in aggr_expr {
+                fields.push(expr.field()?)
+            }
+        }
+    }
+
+    Ok(Schema::new(fields))
+}
+
+/// returns physical expressions to evaluate against a batch
+/// The expressions are different depending on `mode`:
+/// * Partial: AggregateExpr::expressions
+/// * Final: columns of `AggregateExpr::state_fields()`
+fn aggregate_expressions(
+    aggr_expr: &[Arc<dyn AggregateExpr>],
+    mode: &AggregateMode,
+    col_idx_base: usize,
+) -> datafusion_common::Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
+    match mode {
+        AggregateMode::Partial => {
+            Ok(aggr_expr.iter().map(|agg| agg.expressions()).collect())
+        }
+        // in this mode, we build the merge expressions of the aggregation
+        AggregateMode::Final | AggregateMode::FinalPartitioned => {
+            let mut col_idx_base = col_idx_base;
+            Ok(aggr_expr
+                .iter()
+                .map(|agg| {
+                    let exprs = merge_expressions(col_idx_base, agg)?;
+                    col_idx_base += exprs.len();
+                    Ok(exprs)
+                })
+                .collect::<datafusion_common::Result<Vec<_>>>()?)
+        }
+    }
+}
+
+/// uses `state_fields` to build a vec of physical column expressions required 
to merge the
+/// AggregateExpr' accumulator's state.
+///
+/// `index_base` is the starting physical column index for the next expanded 
state field.
+fn merge_expressions(
+    index_base: usize,
+    expr: &Arc<dyn AggregateExpr>,
+) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+    Ok(expr
+        .state_fields()?
+        .iter()
+        .enumerate()
+        .map(|(idx, f)| {
+            Arc::new(Column::new(f.name(), index_base + idx)) as Arc<dyn 
PhysicalExpr>
+        })
+        .collect::<Vec<_>>())
+}
+
+pub(crate) type AccumulatorItem = Box<dyn Accumulator>;
+
+fn create_accumulators(
+    aggr_expr: &[Arc<dyn AggregateExpr>],
+) -> datafusion_common::Result<Vec<AccumulatorItem>> {
+    aggr_expr
+        .iter()
+        .map(|expr| expr.create_accumulator())
+        .collect::<datafusion_common::Result<Vec<_>>>()
+}
+
+/// returns a vector of ArrayRefs, where each entry corresponds to either the
+/// final value (mode = Final) or states (mode = Partial)
+fn finalize_aggregation(
+    accumulators: &[AccumulatorItem],
+    mode: &AggregateMode,
+) -> datafusion_common::Result<Vec<ArrayRef>> {
+    match mode {
+        AggregateMode::Partial => {
+            // build the vector of states
+            let a = accumulators
+                .iter()
+                .map(|accumulator| accumulator.state())
+                .map(|value| {
+                    value.map(|e| {
+                        e.iter().map(|v| 
v.to_array()).collect::<Vec<ArrayRef>>()
+                    })
+                })
+                .collect::<datafusion_common::Result<Vec<_>>>()?;
+            Ok(a.iter().flatten().cloned().collect::<Vec<_>>())
+        }
+        AggregateMode::Final | AggregateMode::FinalPartitioned => {
+            // merge the state to the final value
+            accumulators
+                .iter()
+                .map(|accumulator| accumulator.evaluate().map(|v| 
v.to_array()))
+                .collect::<datafusion_common::Result<Vec<ArrayRef>>>()
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::execution::context::TaskContext;
+    use crate::from_slice::FromSlice;
+    use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
+    use crate::physical_plan::expressions::{col, Avg};
+    use crate::test::assert_is_pending;
+    use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
+    use crate::{assert_batches_sorted_eq, physical_plan::common};
+    use arrow::array::{Float64Array, UInt32Array};
+    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+    use arrow::error::Result as ArrowResult;
+    use arrow::record_batch::RecordBatch;
+    use async_trait::async_trait;
+    use datafusion_common::{DataFusionError, Result};
+    use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, 
PhysicalSortExpr};
+    use futures::{FutureExt, Stream};
+    use std::any::Any;
+    use std::sync::Arc;
+    use std::task::{Context, Poll};
+
+    use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+    use crate::physical_plan::{
+        ExecutionPlan, Partitioning, RecordBatchStream, 
SendableRecordBatchStream,
+        Statistics,
+    };
+    use crate::prelude::SessionContext;
+
+    /// some mock data to aggregates
+    fn some_data() -> (Arc<Schema>, Vec<RecordBatch>) {
+        // define a schema.
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::UInt32, false),
+            Field::new("b", DataType::Float64, false),
+        ]));
+
+        // define data.
+        (
+            schema.clone(),
+            vec![
+                RecordBatch::try_new(
+                    schema.clone(),
+                    vec![
+                        Arc::new(UInt32Array::from_slice(&[2, 3, 4, 4])),
+                        Arc::new(Float64Array::from_slice(&[1.0, 2.0, 3.0, 
4.0])),
+                    ],
+                )
+                .unwrap(),
+                RecordBatch::try_new(
+                    schema,
+                    vec![
+                        Arc::new(UInt32Array::from_slice(&[2, 3, 3, 4])),
+                        Arc::new(Float64Array::from_slice(&[1.0, 2.0, 3.0, 
4.0])),
+                    ],
+                )
+                .unwrap(),
+            ],
+        )
+    }
+
+    /// build the aggregates on the data from some_data() and check the results
+    async fn check_aggregates(input: Arc<dyn ExecutionPlan>) -> Result<()> {
+        let input_schema = input.schema();
+
+        let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
+            vec![(col("a", &input_schema)?, "a".to_string())];
+
+        let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
+            col("b", &input_schema)?,
+            "AVG(b)".to_string(),
+            DataType::Float64,
+        ))];
+
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+
+        let partial_aggregate = Arc::new(AggregateExec::try_new(
+            AggregateMode::Partial,
+            groups.clone(),
+            aggregates.clone(),
+            input,
+            input_schema.clone(),
+        )?);
+
+        let result =
+            common::collect(partial_aggregate.execute(0, 
task_ctx.clone()).await?)
+                .await?;
+
+        let expected = vec![
+            "+---+---------------+-------------+",
+            "| a | AVG(b)[count] | AVG(b)[sum] |",
+            "+---+---------------+-------------+",
+            "| 2 | 2             | 2           |",
+            "| 3 | 3             | 7           |",
+            "| 4 | 3             | 11          |",
+            "+---+---------------+-------------+",
+        ];
+        assert_batches_sorted_eq!(expected, &result);
+
+        let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate));
+
+        let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
+            .map(|i| col(&groups[i].1, &input_schema))
+            .collect::<Result<_>>()?;
+
+        let merged_aggregate = Arc::new(AggregateExec::try_new(
+            AggregateMode::Final,
+            final_group
+                .iter()
+                .enumerate()
+                .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
+                .collect(),
+            aggregates,
+            merge,
+            input_schema,
+        )?);
+
+        let result =
+            common::collect(merged_aggregate.execute(0, 
task_ctx.clone()).await?).await?;
+        assert_eq!(result.len(), 1);
+
+        let batch = &result[0];
+        assert_eq!(batch.num_columns(), 2);
+        assert_eq!(batch.num_rows(), 3);
+
+        let expected = vec![
+            "+---+--------------------+",
+            "| a | AVG(b)             |",
+            "+---+--------------------+",
+            "| 2 | 1                  |",
+            "| 3 | 2.3333333333333335 |", // 3, (2 + 3 + 2) / 3
+            "| 4 | 3.6666666666666665 |", // 4, (3 + 4 + 4) / 3
+            "+---+--------------------+",
+        ];
+
+        assert_batches_sorted_eq!(&expected, &result);
+
+        let metrics = merged_aggregate.metrics().unwrap();
+        let output_rows = metrics.output_rows().unwrap();
+        assert_eq!(3, output_rows);
+
+        Ok(())
+    }
+
+    /// Define a test source that can yield back to runtime before returning 
its first item ///
+
+    #[derive(Debug)]
+    struct TestYieldingExec {
+        /// True if this exec should yield back to runtime the first time it 
is polled
+        pub yield_first: bool,
+    }
+
+    #[async_trait]
+    impl ExecutionPlan for TestYieldingExec {
+        fn as_any(&self) -> &dyn Any {
+            self
+        }
+        fn schema(&self) -> SchemaRef {
+            some_data().0
+        }
+
+        fn output_partitioning(&self) -> Partitioning {
+            Partitioning::UnknownPartitioning(1)
+        }
+
+        fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+            None
+        }
+
+        fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+            vec![]
+        }
+
+        fn with_new_children(
+            self: Arc<Self>,
+            _: Vec<Arc<dyn ExecutionPlan>>,
+        ) -> Result<Arc<dyn ExecutionPlan>> {
+            Err(DataFusionError::Internal(format!(
+                "Children cannot be replaced in {:?}",
+                self
+            )))
+        }
+
+        async fn execute(
+            &self,
+            _partition: usize,
+            _context: Arc<TaskContext>,
+        ) -> Result<SendableRecordBatchStream> {
+            let stream = if self.yield_first {
+                TestYieldingStream::New
+            } else {
+                TestYieldingStream::Yielded
+            };
+
+            Ok(Box::pin(stream))
+        }
+
+        fn statistics(&self) -> Statistics {
+            let (_, batches) = some_data();
+            common::compute_record_batch_statistics(&[batches], 
&self.schema(), None)
+        }
+    }
+
+    /// A stream using the demo data. If inited as new, it will first yield to 
runtime before returning records
+    enum TestYieldingStream {
+        New,
+        Yielded,
+        ReturnedBatch1,
+        ReturnedBatch2,
+    }
+
+    impl Stream for TestYieldingStream {
+        type Item = ArrowResult<RecordBatch>;
+
+        fn poll_next(
+            mut self: std::pin::Pin<&mut Self>,
+            cx: &mut Context<'_>,
+        ) -> Poll<Option<Self::Item>> {
+            match &*self {
+                TestYieldingStream::New => {
+                    *(self.as_mut()) = TestYieldingStream::Yielded;
+                    cx.waker().wake_by_ref();
+                    Poll::Pending
+                }
+                TestYieldingStream::Yielded => {
+                    *(self.as_mut()) = TestYieldingStream::ReturnedBatch1;
+                    Poll::Ready(Some(Ok(some_data().1[0].clone())))
+                }
+                TestYieldingStream::ReturnedBatch1 => {
+                    *(self.as_mut()) = TestYieldingStream::ReturnedBatch2;
+                    Poll::Ready(Some(Ok(some_data().1[1].clone())))
+                }
+                TestYieldingStream::ReturnedBatch2 => Poll::Ready(None),
+            }
+        }
+    }
+
+    impl RecordBatchStream for TestYieldingStream {
+        fn schema(&self) -> SchemaRef {
+            some_data().0
+        }
+    }
+
+    //// Tests ////
+
+    #[tokio::test]
+    async fn aggregate_source_not_yielding() -> Result<()> {
+        let input: Arc<dyn ExecutionPlan> =
+            Arc::new(TestYieldingExec { yield_first: false });
+
+        check_aggregates(input).await
+    }
+
+    #[tokio::test]
+    async fn aggregate_source_with_yielding() -> Result<()> {
+        let input: Arc<dyn ExecutionPlan> =
+            Arc::new(TestYieldingExec { yield_first: true });
+
+        check_aggregates(input).await
+    }
+
+    #[tokio::test]
+    async fn test_drop_cancel_without_groups() -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+        let schema =
+            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, 
true)]));
+
+        let groups = vec![];
+
+        let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
+            col("a", &schema)?,
+            "AVG(a)".to_string(),
+            DataType::Float64,
+        ))];
+
+        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 
1));
+        let refs = blocking_exec.refs();
+        let aggregate_exec = Arc::new(AggregateExec::try_new(
+            AggregateMode::Partial,
+            groups.clone(),
+            aggregates.clone(),
+            blocking_exec,
+            schema,
+        )?);
+
+        let fut = crate::physical_plan::collect(aggregate_exec, task_ctx);
+        let mut fut = fut.boxed();
+
+        assert_is_pending(&mut fut);
+        drop(fut);
+        assert_strong_count_converges_to_zero(refs).await;
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_drop_cancel_with_groups() -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Float32, true),
+            Field::new("b", DataType::Float32, true),
+        ]));
+
+        let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
+            vec![(col("a", &schema)?, "a".to_string())];
+
+        let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
+            col("b", &schema)?,
+            "AVG(b)".to_string(),
+            DataType::Float64,
+        ))];
+
+        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 
1));
+        let refs = blocking_exec.refs();
+        let aggregate_exec = Arc::new(AggregateExec::try_new(
+            AggregateMode::Partial,
+            groups.clone(),
+            aggregates.clone(),
+            blocking_exec,
+            schema,
+        )?);
+
+        let fut = crate::physical_plan::collect(aggregate_exec, task_ctx);
+        let mut fut = fut.boxed();
+
+        assert_is_pending(&mut fut);
+        drop(fut);
+        assert_strong_count_converges_to_zero(refs).await;
+
+        Ok(())
+    }
+}
diff --git a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs 
b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
new file mode 100644
index 000000000..f687c982c
--- /dev/null
+++ b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Aggregate without grouping columns
+
+use crate::physical_plan::aggregates::{
+    aggregate_expressions, create_accumulators, finalize_aggregation, 
AccumulatorItem,
+    AggregateMode,
+};
+use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+use arrow::datatypes::SchemaRef;
+use arrow::error::{ArrowError, Result as ArrowResult};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+
+/// stream struct for aggregation without grouping columns
+pub(crate) struct AggregateStream {
+    schema: SchemaRef,
+    mode: AggregateMode,
+    input: SendableRecordBatchStream,
+    baseline_metrics: BaselineMetrics,
+    aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+    accumulators: Vec<AccumulatorItem>,
+    finished: bool,
+}
+
+impl AggregateStream {
+    /// Create a new AggregateStream
+    pub fn new(
+        mode: AggregateMode,
+        schema: SchemaRef,
+        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        input: SendableRecordBatchStream,
+        baseline_metrics: BaselineMetrics,
+    ) -> datafusion_common::Result<Self> {
+        let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode, 
0)?;
+        let accumulators = create_accumulators(&aggr_expr)?;
+
+        Ok(Self {
+            schema,
+            mode,
+            input,
+            baseline_metrics,
+            aggregate_expressions,
+            accumulators,
+            finished: false,
+        })
+    }
+}
+
+impl Stream for AggregateStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let this = &mut *self;
+        if this.finished {
+            return Poll::Ready(None);
+        }
+
+        let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+        loop {
+            let result = match ready!(this.input.poll_next_unpin(cx)) {
+                Some(Ok(batch)) => {
+                    let timer = elapsed_compute.timer();
+                    let result = aggregate_batch(
+                        &this.mode,
+                        &batch,
+                        &mut this.accumulators,
+                        &this.aggregate_expressions,
+                    );
+
+                    timer.done();
+
+                    match result {
+                        Ok(_) => continue,
+                        Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
+                    }
+                }
+                Some(Err(e)) => Err(e),
+                None => {
+                    this.finished = true;
+                    let timer = 
this.baseline_metrics.elapsed_compute().timer();
+                    let result = finalize_aggregation(&this.accumulators, 
&this.mode)
+                        .map_err(|e| ArrowError::ExternalError(Box::new(e)))
+                        .and_then(|columns| {
+                            RecordBatch::try_new(this.schema.clone(), columns)
+                        })
+                        .record_output(&this.baseline_metrics);
+
+                    timer.done();
+                    result
+                }
+            };
+
+            this.finished = true;
+            return Poll::Ready(Some(result));
+        }
+    }
+}
+
+impl RecordBatchStream for AggregateStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+/// TODO: Make this a member function
+fn aggregate_batch(
+    mode: &AggregateMode,
+    batch: &RecordBatch,
+    accumulators: &mut [AccumulatorItem],
+    expressions: &[Vec<Arc<dyn PhysicalExpr>>],
+) -> Result<()> {
+    // 1.1 iterate accumulators and respective expressions together
+    // 1.2 evaluate expressions
+    // 1.3 update / merge accumulators with the expressions' values
+
+    // 1.1
+    accumulators
+        .iter_mut()
+        .zip(expressions)
+        .try_for_each(|(accum, expr)| {
+            // 1.2
+            let values = &expr
+                .iter()
+                .map(|e| e.evaluate(batch))
+                .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+                .collect::<Result<Vec<_>>>()?;
+
+            // 1.3
+            match mode {
+                AggregateMode::Partial => accum.update_batch(values),
+                AggregateMode::Final | AggregateMode::FinalPartitioned => {
+                    accum.merge_batch(values)
+                }
+            }
+        })
+}
diff --git a/datafusion/core/src/physical_plan/hash_aggregate.rs 
b/datafusion/core/src/physical_plan/hash_aggregate.rs
deleted file mode 100644
index 643174557..000000000
--- a/datafusion/core/src/physical_plan/hash_aggregate.rs
+++ /dev/null
@@ -1,1299 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Defines the execution plan for the hash aggregate operation
-
-use std::any::Any;
-use std::sync::Arc;
-use std::task::{Context, Poll};
-use std::vec;
-
-use ahash::RandomState;
-use futures::{
-    ready,
-    stream::{Stream, StreamExt},
-};
-
-use crate::error::Result;
-use crate::physical_plan::hash_utils::create_hashes;
-use crate::physical_plan::{
-    Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan,
-    Partitioning, PhysicalExpr,
-};
-use crate::scalar::ScalarValue;
-
-use arrow::{array::ArrayRef, compute, compute::cast};
-use arrow::{
-    array::{Array, UInt32Builder},
-    error::{ArrowError, Result as ArrowResult},
-};
-use arrow::{
-    datatypes::{Field, Schema, SchemaRef},
-    record_batch::RecordBatch,
-};
-use hashbrown::raw::RawTable;
-
-use crate::execution::context::TaskContext;
-use async_trait::async_trait;
-
-use super::expressions::PhysicalSortExpr;
-use super::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
-};
-use super::Statistics;
-use super::{expressions::Column, RecordBatchStream, SendableRecordBatchStream};
-
-/// Hash aggregate modes
-#[derive(Debug, Copy, Clone, PartialEq, Eq)]
-pub enum AggregateMode {
-    /// Partial aggregate that can be applied in parallel across input 
partitions
-    Partial,
-    /// Final aggregate that produces a single partition of output
-    Final,
-    /// Final aggregate that works on pre-partitioned data.
-    ///
-    /// This requires the invariant that all rows with a particular
-    /// grouping key are in the same partitions, such as is the case
-    /// with Hash repartitioning on the group keys. If a group key is
-    /// duplicated, duplicate groups would be produced
-    FinalPartitioned,
-}
-
-/// Hash aggregate execution plan
-#[derive(Debug)]
-pub struct HashAggregateExec {
-    /// Aggregation mode (full, partial)
-    mode: AggregateMode,
-    /// Grouping expressions
-    group_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
-    /// Aggregate expressions
-    aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    /// Input plan, could be a partial aggregate or the input to the aggregate
-    input: Arc<dyn ExecutionPlan>,
-    /// Schema after the aggregate is applied
-    schema: SchemaRef,
-    /// Input schema before any aggregation is applied. For partial aggregate 
this will be the
-    /// same as input.schema() but for the final aggregate it will be the same 
as the input
-    /// to the partial aggregate
-    input_schema: SchemaRef,
-    /// Execution Metrics
-    metrics: ExecutionPlanMetricsSet,
-}
-
-fn create_schema(
-    input_schema: &Schema,
-    group_expr: &[(Arc<dyn PhysicalExpr>, String)],
-    aggr_expr: &[Arc<dyn AggregateExpr>],
-    mode: AggregateMode,
-) -> Result<Schema> {
-    let mut fields = Vec::with_capacity(group_expr.len() + aggr_expr.len());
-    for (expr, name) in group_expr {
-        fields.push(Field::new(
-            name,
-            expr.data_type(input_schema)?,
-            expr.nullable(input_schema)?,
-        ))
-    }
-
-    match mode {
-        AggregateMode::Partial => {
-            // in partial mode, the fields of the accumulator's state
-            for expr in aggr_expr {
-                fields.extend(expr.state_fields()?.iter().cloned())
-            }
-        }
-        AggregateMode::Final | AggregateMode::FinalPartitioned => {
-            // in final mode, the field with the final result of the 
accumulator
-            for expr in aggr_expr {
-                fields.push(expr.field()?)
-            }
-        }
-    }
-
-    Ok(Schema::new(fields))
-}
-
-impl HashAggregateExec {
-    /// Create a new hash aggregate execution plan
-    pub fn try_new(
-        mode: AggregateMode,
-        group_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
-        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-        input: Arc<dyn ExecutionPlan>,
-        input_schema: SchemaRef,
-    ) -> Result<Self> {
-        let schema = create_schema(&input.schema(), &group_expr, &aggr_expr, 
mode)?;
-
-        let schema = Arc::new(schema);
-
-        Ok(HashAggregateExec {
-            mode,
-            group_expr,
-            aggr_expr,
-            input,
-            schema,
-            input_schema,
-            metrics: ExecutionPlanMetricsSet::new(),
-        })
-    }
-
-    /// Aggregation mode (full, partial)
-    pub fn mode(&self) -> &AggregateMode {
-        &self.mode
-    }
-
-    /// Grouping expressions
-    pub fn group_expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] {
-        &self.group_expr
-    }
-
-    /// Grouping expressions as they occur in the output schema
-    pub fn output_group_expr(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        // Update column indices. Since the group by columns come first in the 
output schema, their
-        // indices are simply 0..self.group_expr(len).
-        self.group_expr
-            .iter()
-            .enumerate()
-            .map(|(index, (_col, name))| {
-                Arc::new(Column::new(name, index)) as Arc<dyn PhysicalExpr>
-            })
-            .collect()
-    }
-
-    /// Aggregate expressions
-    pub fn aggr_expr(&self) -> &[Arc<dyn AggregateExpr>] {
-        &self.aggr_expr
-    }
-
-    /// Input plan
-    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
-        &self.input
-    }
-
-    /// Get the input schema before any aggregates are applied
-    pub fn input_schema(&self) -> SchemaRef {
-        self.input_schema.clone()
-    }
-}
-
-#[async_trait]
-impl ExecutionPlan for HashAggregateExec {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
-    }
-
-    fn required_child_distribution(&self) -> Distribution {
-        match &self.mode {
-            AggregateMode::Partial => Distribution::UnspecifiedDistribution,
-            AggregateMode::FinalPartitioned => Distribution::HashPartitioned(
-                self.group_expr.iter().map(|x| x.0.clone()).collect(),
-            ),
-            AggregateMode::Final => Distribution::SinglePartition,
-        }
-    }
-
-    /// Get the output partitioning of this plan
-    fn output_partitioning(&self) -> Partitioning {
-        self.input.output_partitioning()
-    }
-
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
-    }
-
-    fn relies_on_input_order(&self) -> bool {
-        false
-    }
-
-    async fn execute(
-        &self,
-        partition: usize,
-        context: Arc<TaskContext>,
-    ) -> Result<SendableRecordBatchStream> {
-        let input = self.input.execute(partition, context).await?;
-        let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect();
-
-        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
-
-        if self.group_expr.is_empty() {
-            Ok(Box::pin(HashAggregateStream::new(
-                self.mode,
-                self.schema.clone(),
-                self.aggr_expr.clone(),
-                input,
-                baseline_metrics,
-            )?))
-        } else {
-            Ok(Box::pin(GroupedHashAggregateStream::new(
-                self.mode,
-                self.schema.clone(),
-                group_expr,
-                self.aggr_expr.clone(),
-                input,
-                baseline_metrics,
-            )?))
-        }
-    }
-
-    fn with_new_children(
-        self: Arc<Self>,
-        children: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(HashAggregateExec::try_new(
-            self.mode,
-            self.group_expr.clone(),
-            self.aggr_expr.clone(),
-            children[0].clone(),
-            self.input_schema.clone(),
-        )?))
-    }
-
-    fn metrics(&self) -> Option<MetricsSet> {
-        Some(self.metrics.clone_inner())
-    }
-
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default => {
-                write!(f, "HashAggregateExec: mode={:?}", self.mode)?;
-                let g: Vec<String> = self
-                    .group_expr
-                    .iter()
-                    .map(|(e, alias)| {
-                        let e = e.to_string();
-                        if &e != alias {
-                            format!("{} as {}", e, alias)
-                        } else {
-                            e
-                        }
-                    })
-                    .collect();
-                write!(f, ", gby=[{}]", g.join(", "))?;
-
-                let a: Vec<String> = self
-                    .aggr_expr
-                    .iter()
-                    .map(|agg| agg.name().to_string())
-                    .collect();
-                write!(f, ", aggr=[{}]", a.join(", "))?;
-            }
-        }
-        Ok(())
-    }
-
-    fn statistics(&self) -> Statistics {
-        // TODO stats: group expressions:
-        // - once expressions will be able to compute their own stats, use it 
here
-        // - case where we group by on a column for which with have the 
`distinct` stat
-        // TODO stats: aggr expression:
-        // - aggregations somtimes also preserve invariants such as min, max...
-        match self.mode {
-            AggregateMode::Final | AggregateMode::FinalPartitioned
-                if self.group_expr.is_empty() =>
-            {
-                Statistics {
-                    num_rows: Some(1),
-                    is_exact: true,
-                    ..Default::default()
-                }
-            }
-            _ => Statistics::default(),
-        }
-    }
-}
-
-/*
-The architecture is the following:
-
-1. An accumulator has state that is updated on each batch.
-2. At the end of the aggregation (e.g. end of batches in a partition), the 
accumulator converts its state to a RecordBatch of a single row
-3. The RecordBatches of all accumulators are merged (`concatenate` in 
`rust/arrow`) together to a single RecordBatch.
-4. The state's RecordBatch is `merge`d to a new state
-5. The state is mapped to the final value
-
-Why:
-
-* Accumulators' state can be statically typed, but it is more efficient to 
transmit data from the accumulators via `Array`
-* The `merge` operation must have access to the state of the aggregators 
because it uses it to correctly merge
-* It uses Arrow's native dynamically typed object, `Array`.
-* Arrow shines in batch operations and both `merge` and `concatenate` of 
uniform types are very performant.
-
-Example: average
-
-* the state is `n: u32` and `sum: f64`
-* For every batch, we update them accordingly.
-* At the end of the accumulation (of a partition), we convert `n` and `sum` to 
a RecordBatch of 1 row and two columns: `[n, sum]`
-* The RecordBatch is (sent back / transmitted over network)
-* Once all N record batches arrive, `merge` is performed, which builds a 
RecordBatch with N rows and 2 columns.
-* Finally, `get_value` returns an array with one entry computed from the state
-*/
-struct GroupedHashAggregateStream {
-    schema: SchemaRef,
-    input: SendableRecordBatchStream,
-    mode: AggregateMode,
-    accumulators: Accumulators,
-    aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
-
-    aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    group_expr: Vec<Arc<dyn PhysicalExpr>>,
-
-    baseline_metrics: BaselineMetrics,
-    random_state: RandomState,
-    finished: bool,
-}
-
-impl GroupedHashAggregateStream {
-    /// Create a new HashAggregateStream
-    pub fn new(
-        mode: AggregateMode,
-        schema: SchemaRef,
-        group_expr: Vec<Arc<dyn PhysicalExpr>>,
-        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-        input: SendableRecordBatchStream,
-        baseline_metrics: BaselineMetrics,
-    ) -> Result<Self> {
-        let timer = baseline_metrics.elapsed_compute().timer();
-
-        // The expressions to evaluate the batch, one vec of expressions per 
aggregation.
-        // Assume create_schema() always put group columns in front of aggr 
columns, we set
-        // col_idx_base to group expression count.
-        let aggregate_expressions =
-            aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;
-
-        timer.done();
-
-        Ok(Self {
-            schema,
-            mode,
-            input,
-            aggr_expr,
-            group_expr,
-            baseline_metrics,
-            aggregate_expressions,
-            accumulators: Default::default(),
-            random_state: Default::default(),
-            finished: false,
-        })
-    }
-}
-
-impl Stream for GroupedHashAggregateStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        let this = &mut *self;
-        if this.finished {
-            return Poll::Ready(None);
-        }
-
-        let elapsed_compute = this.baseline_metrics.elapsed_compute();
-
-        loop {
-            let result = match ready!(this.input.poll_next_unpin(cx)) {
-                Some(Ok(batch)) => {
-                    let timer = elapsed_compute.timer();
-                    let result = group_aggregate_batch(
-                        &this.mode,
-                        &this.random_state,
-                        &this.group_expr,
-                        &this.aggr_expr,
-                        batch,
-                        &mut this.accumulators,
-                        &this.aggregate_expressions,
-                    );
-
-                    timer.done();
-
-                    match result {
-                        Ok(_) => continue,
-                        Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
-                    }
-                }
-                Some(Err(e)) => Err(e),
-                None => {
-                    this.finished = true;
-                    let timer = 
this.baseline_metrics.elapsed_compute().timer();
-                    let result = create_batch_from_map(
-                        &this.mode,
-                        &this.accumulators,
-                        this.group_expr.len(),
-                        &this.schema,
-                    )
-                    .record_output(&this.baseline_metrics);
-
-                    timer.done();
-                    result
-                }
-            };
-
-            this.finished = true;
-            return Poll::Ready(Some(result));
-        }
-    }
-}
-
-impl RecordBatchStream for GroupedHashAggregateStream {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-}
-
-/// TODO: Make this a member function of [`GroupedHashAggregateStream`]
-fn group_aggregate_batch(
-    mode: &AggregateMode,
-    random_state: &RandomState,
-    group_expr: &[Arc<dyn PhysicalExpr>],
-    aggr_expr: &[Arc<dyn AggregateExpr>],
-    batch: RecordBatch,
-    accumulators: &mut Accumulators,
-    aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>],
-) -> Result<()> {
-    // evaluate the grouping expressions
-    let group_values = evaluate(group_expr, &batch)?;
-
-    // evaluate the aggregation expressions.
-    // We could evaluate them after the `take`, but since we need to evaluate 
all
-    // of them anyways, it is more performant to do it while they are together.
-    let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?;
-
-    // 1.1 construct the key from the group values
-    // 1.2 construct the mapping key if it does not exist
-    // 1.3 add the row' index to `indices`
-
-    // track which entries in `accumulators` have rows in this batch to 
aggregate
-    let mut groups_with_rows = vec![];
-
-    // 1.1 Calculate the group keys for the group values
-    let mut batch_hashes = vec![0; batch.num_rows()];
-    create_hashes(&group_values, random_state, &mut batch_hashes)?;
-
-    for (row, hash) in batch_hashes.into_iter().enumerate() {
-        let Accumulators { map, group_states } = accumulators;
-
-        let entry = map.get_mut(hash, |(_hash, group_idx)| {
-            // verify that a group that we are inserting with hash is
-            // actually the same key value as the group in
-            // existing_idx  (aka group_values @ row)
-            let group_state = &group_states[*group_idx];
-            group_values
-                .iter()
-                .zip(group_state.group_by_values.iter())
-                .all(|(array, scalar)| scalar.eq_array(array, row))
-        });
-
-        match entry {
-            // Existing entry for this group value
-            Some((_hash, group_idx)) => {
-                let group_state = &mut group_states[*group_idx];
-                // 1.3
-                if group_state.indices.is_empty() {
-                    groups_with_rows.push(*group_idx);
-                };
-                group_state.indices.push(row as u32); // remember this row
-            }
-            //  1.2 Need to create new entry
-            None => {
-                let accumulator_set = create_accumulators(aggr_expr)?;
-
-                // Copy group values out of arrays into `ScalarValue`s
-                let group_by_values = group_values
-                    .iter()
-                    .map(|col| ScalarValue::try_from_array(col, row))
-                    .collect::<Result<Vec<_>>>()?;
-
-                // Add new entry to group_states and save newly created index
-                let group_state = GroupState {
-                    group_by_values: group_by_values.into_boxed_slice(),
-                    accumulator_set,
-                    indices: vec![row as u32], // 1.3
-                };
-                let group_idx = group_states.len();
-                group_states.push(group_state);
-                groups_with_rows.push(group_idx);
-
-                // for hasher function, use precomputed hash value
-                map.insert(hash, (hash, group_idx), |(hash, _group_idx)| 
*hash);
-            }
-        };
-    }
-
-    // Collect all indices + offsets based on keys in this vec
-    let mut batch_indices: UInt32Builder = UInt32Builder::new(0);
-    let mut offsets = vec![0];
-    let mut offset_so_far = 0;
-    for group_idx in groups_with_rows.iter() {
-        let indices = &accumulators.group_states[*group_idx].indices;
-        batch_indices.append_slice(indices)?;
-        offset_so_far += indices.len();
-        offsets.push(offset_so_far);
-    }
-    let batch_indices = batch_indices.finish();
-
-    // `Take` all values based on indices into Arrays
-    let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values
-        .iter()
-        .map(|array| {
-            array
-                .iter()
-                .map(|array| {
-                    compute::take(
-                        array.as_ref(),
-                        &batch_indices,
-                        None, // None: no index check
-                    )
-                    .unwrap()
-                })
-                .collect()
-            // 2.3
-        })
-        .collect();
-
-    // 2.1 for each key in this batch
-    // 2.2 for each aggregation
-    // 2.3 `slice` from each of its arrays the keys' values
-    // 2.4 update / merge the accumulator with the values
-    // 2.5 clear indices
-    groups_with_rows
-        .iter()
-        .zip(offsets.windows(2))
-        .try_for_each(|(group_idx, offsets)| {
-            let group_state = &mut accumulators.group_states[*group_idx];
-            // 2.2
-            group_state
-                .accumulator_set
-                .iter_mut()
-                .zip(values.iter())
-                .map(|(accumulator, aggr_array)| {
-                    (
-                        accumulator,
-                        aggr_array
-                            .iter()
-                            .map(|array| {
-                                // 2.3
-                                array.slice(offsets[0], offsets[1] - 
offsets[0])
-                            })
-                            .collect::<Vec<ArrayRef>>(),
-                    )
-                })
-                .try_for_each(|(accumulator, values)| match mode {
-                    AggregateMode::Partial => 
accumulator.update_batch(&values),
-                    AggregateMode::FinalPartitioned | AggregateMode::Final => {
-                        // note: the aggregation here is over states, not 
values, thus the merge
-                        accumulator.merge_batch(&values)
-                    }
-                })
-                // 2.5
-                .and({
-                    group_state.indices.clear();
-                    Ok(())
-                })
-        })?;
-
-    Ok(())
-}
-
-type AccumulatorItem = Box<dyn Accumulator>;
-
-/// The state that is built for each output group.
-#[derive(Debug)]
-struct GroupState {
-    /// The actual group by values, one for each group column
-    group_by_values: Box<[ScalarValue]>,
-
-    // Accumulator state, one for each aggregate
-    accumulator_set: Vec<AccumulatorItem>,
-
-    /// scratch space used to collect indices for input rows in a
-    /// bach that have values to aggregate. Reset on each batch
-    indices: Vec<u32>,
-}
-
-/// The state of all the groups
-#[derive(Default)]
-struct Accumulators {
-    /// Logically maps group values to an index in `group_states`
-    ///
-    /// Uses the raw API of hashbrown to avoid actually storing the
-    /// keys in the table
-    ///
-    /// keys: u64 hashes of the GroupValue
-    /// values: (hash, index into `group_states`)
-    map: RawTable<(u64, usize)>,
-
-    /// State for each group
-    group_states: Vec<GroupState>,
-}
-
-impl std::fmt::Debug for Accumulators {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        // hashes are not store inline, so could only get values
-        let map_string = "RawTable";
-        f.debug_struct("Accumulators")
-            .field("map", &map_string)
-            .field("group_states", &self.group_states)
-            .finish()
-    }
-}
-
-/// Evaluates expressions against a record batch.
-fn evaluate(
-    expr: &[Arc<dyn PhysicalExpr>],
-    batch: &RecordBatch,
-) -> Result<Vec<ArrayRef>> {
-    expr.iter()
-        .map(|expr| expr.evaluate(batch))
-        .map(|r| r.map(|v| v.into_array(batch.num_rows())))
-        .collect::<Result<Vec<_>>>()
-}
-
-/// Evaluates expressions against a record batch.
-fn evaluate_many(
-    expr: &[Vec<Arc<dyn PhysicalExpr>>],
-    batch: &RecordBatch,
-) -> Result<Vec<Vec<ArrayRef>>> {
-    expr.iter()
-        .map(|expr| evaluate(expr, batch))
-        .collect::<Result<Vec<_>>>()
-}
-
-/// uses `state_fields` to build a vec of physical column expressions required 
to merge the
-/// AggregateExpr' accumulator's state.
-///
-/// `index_base` is the starting physical column index for the next expanded 
state field.
-fn merge_expressions(
-    index_base: usize,
-    expr: &Arc<dyn AggregateExpr>,
-) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
-    Ok(expr
-        .state_fields()?
-        .iter()
-        .enumerate()
-        .map(|(idx, f)| {
-            Arc::new(Column::new(f.name(), index_base + idx)) as Arc<dyn 
PhysicalExpr>
-        })
-        .collect::<Vec<_>>())
-}
-
-/// returns physical expressions to evaluate against a batch
-/// The expressions are different depending on `mode`:
-/// * Partial: AggregateExpr::expressions
-/// * Final: columns of `AggregateExpr::state_fields()`
-fn aggregate_expressions(
-    aggr_expr: &[Arc<dyn AggregateExpr>],
-    mode: &AggregateMode,
-    col_idx_base: usize,
-) -> Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
-    match mode {
-        AggregateMode::Partial => {
-            Ok(aggr_expr.iter().map(|agg| agg.expressions()).collect())
-        }
-        // in this mode, we build the merge expressions of the aggregation
-        AggregateMode::Final | AggregateMode::FinalPartitioned => {
-            let mut col_idx_base = col_idx_base;
-            Ok(aggr_expr
-                .iter()
-                .map(|agg| {
-                    let exprs = merge_expressions(col_idx_base, agg)?;
-                    col_idx_base += exprs.len();
-                    Ok(exprs)
-                })
-                .collect::<Result<Vec<_>>>()?)
-        }
-    }
-}
-
-/// stream struct for hash aggregation
-pub struct HashAggregateStream {
-    schema: SchemaRef,
-    mode: AggregateMode,
-    input: SendableRecordBatchStream,
-    baseline_metrics: BaselineMetrics,
-    aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
-    accumulators: Vec<AccumulatorItem>,
-    finished: bool,
-}
-
-impl HashAggregateStream {
-    /// Create a new HashAggregateStream
-    pub fn new(
-        mode: AggregateMode,
-        schema: SchemaRef,
-        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-        input: SendableRecordBatchStream,
-        baseline_metrics: BaselineMetrics,
-    ) -> Result<Self> {
-        let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode, 
0)?;
-        let accumulators = create_accumulators(&aggr_expr)?;
-
-        Ok(Self {
-            schema,
-            mode,
-            input,
-            baseline_metrics,
-            aggregate_expressions,
-            accumulators,
-            finished: false,
-        })
-    }
-}
-
-/// TODO: Make this a member function
-fn aggregate_batch(
-    mode: &AggregateMode,
-    batch: &RecordBatch,
-    accumulators: &mut [AccumulatorItem],
-    expressions: &[Vec<Arc<dyn PhysicalExpr>>],
-) -> Result<()> {
-    // 1.1 iterate accumulators and respective expressions together
-    // 1.2 evaluate expressions
-    // 1.3 update / merge accumulators with the expressions' values
-
-    // 1.1
-    accumulators
-        .iter_mut()
-        .zip(expressions)
-        .try_for_each(|(accum, expr)| {
-            // 1.2
-            let values = &expr
-                .iter()
-                .map(|e| e.evaluate(batch))
-                .map(|r| r.map(|v| v.into_array(batch.num_rows())))
-                .collect::<Result<Vec<_>>>()?;
-
-            // 1.3
-            match mode {
-                AggregateMode::Partial => accum.update_batch(values),
-                AggregateMode::Final | AggregateMode::FinalPartitioned => {
-                    accum.merge_batch(values)
-                }
-            }
-        })
-}
-
-impl Stream for HashAggregateStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        let this = &mut *self;
-        if this.finished {
-            return Poll::Ready(None);
-        }
-
-        let elapsed_compute = this.baseline_metrics.elapsed_compute();
-
-        loop {
-            let result = match ready!(this.input.poll_next_unpin(cx)) {
-                Some(Ok(batch)) => {
-                    let timer = elapsed_compute.timer();
-                    let result = aggregate_batch(
-                        &this.mode,
-                        &batch,
-                        &mut this.accumulators,
-                        &this.aggregate_expressions,
-                    );
-
-                    timer.done();
-
-                    match result {
-                        Ok(_) => continue,
-                        Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
-                    }
-                }
-                Some(Err(e)) => Err(e),
-                None => {
-                    this.finished = true;
-                    let timer = 
this.baseline_metrics.elapsed_compute().timer();
-                    let result = finalize_aggregation(&this.accumulators, 
&this.mode)
-                        .map_err(|e| ArrowError::ExternalError(Box::new(e)))
-                        .and_then(|columns| {
-                            RecordBatch::try_new(this.schema.clone(), columns)
-                        })
-                        .record_output(&this.baseline_metrics);
-
-                    timer.done();
-                    result
-                }
-            };
-
-            this.finished = true;
-            return Poll::Ready(Some(result));
-        }
-    }
-}
-
-impl RecordBatchStream for HashAggregateStream {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-}
-
-/// Create a RecordBatch with all group keys and accumulator' states or values.
-fn create_batch_from_map(
-    mode: &AggregateMode,
-    accumulators: &Accumulators,
-    num_group_expr: usize,
-    output_schema: &Schema,
-) -> ArrowResult<RecordBatch> {
-    if accumulators.group_states.is_empty() {
-        return Ok(RecordBatch::new_empty(Arc::new(output_schema.to_owned())));
-    }
-    let accs = &accumulators.group_states[0].accumulator_set;
-    let mut acc_data_types: Vec<usize> = vec![];
-
-    // Calculate number/shape of state arrays
-    match mode {
-        AggregateMode::Partial => {
-            for acc in accs.iter() {
-                let state = acc.state()?;
-                acc_data_types.push(state.len());
-            }
-        }
-        AggregateMode::Final | AggregateMode::FinalPartitioned => {
-            acc_data_types = vec![1; accs.len()];
-        }
-    }
-
-    let mut columns = (0..num_group_expr)
-        .map(|i| {
-            ScalarValue::iter_to_array(
-                accumulators
-                    .group_states
-                    .iter()
-                    .map(|group_state| group_state.group_by_values[i].clone()),
-            )
-        })
-        .collect::<Result<Vec<_>>>()?;
-
-    // add state / evaluated arrays
-    for (x, &state_len) in acc_data_types.iter().enumerate() {
-        for y in 0..state_len {
-            match mode {
-                AggregateMode::Partial => {
-                    let res = ScalarValue::iter_to_array(
-                        accumulators.group_states.iter().map(|group_state| {
-                            let x = 
group_state.accumulator_set[x].state().unwrap();
-                            x[y].clone()
-                        }),
-                    )?;
-
-                    columns.push(res);
-                }
-                AggregateMode::Final | AggregateMode::FinalPartitioned => {
-                    let res = ScalarValue::iter_to_array(
-                        accumulators.group_states.iter().map(|group_state| {
-                            group_state.accumulator_set[x].evaluate().unwrap()
-                        }),
-                    )?;
-                    columns.push(res);
-                }
-            }
-        }
-    }
-
-    // cast output if needed (e.g. for types like Dictionary where
-    // the intermediate GroupByScalar type was not the same as the
-    // output
-    let columns = columns
-        .iter()
-        .zip(output_schema.fields().iter())
-        .map(|(col, desired_field)| cast(col, desired_field.data_type()))
-        .collect::<ArrowResult<Vec<_>>>()?;
-
-    RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns)
-}
-
-fn create_accumulators(
-    aggr_expr: &[Arc<dyn AggregateExpr>],
-) -> Result<Vec<AccumulatorItem>> {
-    aggr_expr
-        .iter()
-        .map(|expr| expr.create_accumulator())
-        .collect::<Result<Vec<_>>>()
-}
-
-/// returns a vector of ArrayRefs, where each entry corresponds to either the
-/// final value (mode = Final) or states (mode = Partial)
-fn finalize_aggregation(
-    accumulators: &[AccumulatorItem],
-    mode: &AggregateMode,
-) -> Result<Vec<ArrayRef>> {
-    match mode {
-        AggregateMode::Partial => {
-            // build the vector of states
-            let a = accumulators
-                .iter()
-                .map(|accumulator| accumulator.state())
-                .map(|value| {
-                    value.map(|e| {
-                        e.iter().map(|v| 
v.to_array()).collect::<Vec<ArrayRef>>()
-                    })
-                })
-                .collect::<Result<Vec<_>>>()?;
-            Ok(a.iter().flatten().cloned().collect::<Vec<_>>())
-        }
-        AggregateMode::Final | AggregateMode::FinalPartitioned => {
-            // merge the state to the final value
-            accumulators
-                .iter()
-                .map(|accumulator| accumulator.evaluate().map(|v| 
v.to_array()))
-                .collect::<Result<Vec<ArrayRef>>>()
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-
-    use super::*;
-    use crate::from_slice::FromSlice;
-    use crate::physical_plan::expressions::{col, Avg};
-    use crate::test::assert_is_pending;
-    use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
-    use crate::{assert_batches_sorted_eq, physical_plan::common};
-    use arrow::array::{Float64Array, UInt32Array};
-    use arrow::datatypes::DataType;
-    use datafusion_common::DataFusionError;
-    use futures::FutureExt;
-
-    use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-    use crate::prelude::SessionContext;
-
-    /// some mock data to aggregates
-    fn some_data() -> (Arc<Schema>, Vec<RecordBatch>) {
-        // define a schema.
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("a", DataType::UInt32, false),
-            Field::new("b", DataType::Float64, false),
-        ]));
-
-        // define data.
-        (
-            schema.clone(),
-            vec![
-                RecordBatch::try_new(
-                    schema.clone(),
-                    vec![
-                        Arc::new(UInt32Array::from_slice(&[2, 3, 4, 4])),
-                        Arc::new(Float64Array::from_slice(&[1.0, 2.0, 3.0, 
4.0])),
-                    ],
-                )
-                .unwrap(),
-                RecordBatch::try_new(
-                    schema,
-                    vec![
-                        Arc::new(UInt32Array::from_slice(&[2, 3, 3, 4])),
-                        Arc::new(Float64Array::from_slice(&[1.0, 2.0, 3.0, 
4.0])),
-                    ],
-                )
-                .unwrap(),
-            ],
-        )
-    }
-
-    /// build the aggregates on the data from some_data() and check the results
-    async fn check_aggregates(input: Arc<dyn ExecutionPlan>) -> Result<()> {
-        let input_schema = input.schema();
-
-        let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
-            vec![(col("a", &input_schema)?, "a".to_string())];
-
-        let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
-            col("b", &input_schema)?,
-            "AVG(b)".to_string(),
-            DataType::Float64,
-        ))];
-
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
-
-        let partial_aggregate = Arc::new(HashAggregateExec::try_new(
-            AggregateMode::Partial,
-            groups.clone(),
-            aggregates.clone(),
-            input,
-            input_schema.clone(),
-        )?);
-
-        let result =
-            common::collect(partial_aggregate.execute(0, 
task_ctx.clone()).await?)
-                .await?;
-
-        let expected = vec![
-            "+---+---------------+-------------+",
-            "| a | AVG(b)[count] | AVG(b)[sum] |",
-            "+---+---------------+-------------+",
-            "| 2 | 2             | 2           |",
-            "| 3 | 3             | 7           |",
-            "| 4 | 3             | 11          |",
-            "+---+---------------+-------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &result);
-
-        let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate));
-
-        let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
-            .map(|i| col(&groups[i].1, &input_schema))
-            .collect::<Result<_>>()?;
-
-        let merged_aggregate = Arc::new(HashAggregateExec::try_new(
-            AggregateMode::Final,
-            final_group
-                .iter()
-                .enumerate()
-                .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
-                .collect(),
-            aggregates,
-            merge,
-            input_schema,
-        )?);
-
-        let result =
-            common::collect(merged_aggregate.execute(0, 
task_ctx.clone()).await?).await?;
-        assert_eq!(result.len(), 1);
-
-        let batch = &result[0];
-        assert_eq!(batch.num_columns(), 2);
-        assert_eq!(batch.num_rows(), 3);
-
-        let expected = vec![
-            "+---+--------------------+",
-            "| a | AVG(b)             |",
-            "+---+--------------------+",
-            "| 2 | 1                  |",
-            "| 3 | 2.3333333333333335 |", // 3, (2 + 3 + 2) / 3
-            "| 4 | 3.6666666666666665 |", // 4, (3 + 4 + 4) / 3
-            "+---+--------------------+",
-        ];
-
-        assert_batches_sorted_eq!(&expected, &result);
-
-        let metrics = merged_aggregate.metrics().unwrap();
-        let output_rows = metrics.output_rows().unwrap();
-        assert_eq!(3, output_rows);
-
-        Ok(())
-    }
-
-    /// Define a test source that can yield back to runtime before returning 
its first item ///
-
-    #[derive(Debug)]
-    struct TestYieldingExec {
-        /// True if this exec should yield back to runtime the first time it 
is polled
-        pub yield_first: bool,
-    }
-
-    #[async_trait]
-    impl ExecutionPlan for TestYieldingExec {
-        fn as_any(&self) -> &dyn Any {
-            self
-        }
-        fn schema(&self) -> SchemaRef {
-            some_data().0
-        }
-
-        fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-            vec![]
-        }
-
-        fn output_partitioning(&self) -> Partitioning {
-            Partitioning::UnknownPartitioning(1)
-        }
-
-        fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-            None
-        }
-
-        fn with_new_children(
-            self: Arc<Self>,
-            _: Vec<Arc<dyn ExecutionPlan>>,
-        ) -> Result<Arc<dyn ExecutionPlan>> {
-            Err(DataFusionError::Internal(format!(
-                "Children cannot be replaced in {:?}",
-                self
-            )))
-        }
-
-        async fn execute(
-            &self,
-            _partition: usize,
-            _context: Arc<TaskContext>,
-        ) -> Result<SendableRecordBatchStream> {
-            let stream = if self.yield_first {
-                TestYieldingStream::New
-            } else {
-                TestYieldingStream::Yielded
-            };
-
-            Ok(Box::pin(stream))
-        }
-
-        fn statistics(&self) -> Statistics {
-            let (_, batches) = some_data();
-            common::compute_record_batch_statistics(&[batches], 
&self.schema(), None)
-        }
-    }
-
-    /// A stream using the demo data. If inited as new, it will first yield to 
runtime before returning records
-    enum TestYieldingStream {
-        New,
-        Yielded,
-        ReturnedBatch1,
-        ReturnedBatch2,
-    }
-
-    impl Stream for TestYieldingStream {
-        type Item = ArrowResult<RecordBatch>;
-
-        fn poll_next(
-            mut self: std::pin::Pin<&mut Self>,
-            cx: &mut Context<'_>,
-        ) -> Poll<Option<Self::Item>> {
-            match &*self {
-                TestYieldingStream::New => {
-                    *(self.as_mut()) = TestYieldingStream::Yielded;
-                    cx.waker().wake_by_ref();
-                    Poll::Pending
-                }
-                TestYieldingStream::Yielded => {
-                    *(self.as_mut()) = TestYieldingStream::ReturnedBatch1;
-                    Poll::Ready(Some(Ok(some_data().1[0].clone())))
-                }
-                TestYieldingStream::ReturnedBatch1 => {
-                    *(self.as_mut()) = TestYieldingStream::ReturnedBatch2;
-                    Poll::Ready(Some(Ok(some_data().1[1].clone())))
-                }
-                TestYieldingStream::ReturnedBatch2 => Poll::Ready(None),
-            }
-        }
-    }
-
-    impl RecordBatchStream for TestYieldingStream {
-        fn schema(&self) -> SchemaRef {
-            some_data().0
-        }
-    }
-
-    //// Tests ////
-
-    #[tokio::test]
-    async fn aggregate_source_not_yielding() -> Result<()> {
-        let input: Arc<dyn ExecutionPlan> =
-            Arc::new(TestYieldingExec { yield_first: false });
-
-        check_aggregates(input).await
-    }
-
-    #[tokio::test]
-    async fn aggregate_source_with_yielding() -> Result<()> {
-        let input: Arc<dyn ExecutionPlan> =
-            Arc::new(TestYieldingExec { yield_first: true });
-
-        check_aggregates(input).await
-    }
-
-    #[tokio::test]
-    async fn test_drop_cancel_without_groups() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
-        let schema =
-            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, 
true)]));
-
-        let groups = vec![];
-
-        let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
-            col("a", &schema)?,
-            "AVG(a)".to_string(),
-            DataType::Float64,
-        ))];
-
-        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 
1));
-        let refs = blocking_exec.refs();
-        let hash_aggregate_exec = Arc::new(HashAggregateExec::try_new(
-            AggregateMode::Partial,
-            groups.clone(),
-            aggregates.clone(),
-            blocking_exec,
-            schema,
-        )?);
-
-        let fut = crate::physical_plan::collect(hash_aggregate_exec, task_ctx);
-        let mut fut = fut.boxed();
-
-        assert_is_pending(&mut fut);
-        drop(fut);
-        assert_strong_count_converges_to_zero(refs).await;
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_drop_cancel_with_groups() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("a", DataType::Float32, true),
-            Field::new("b", DataType::Float32, true),
-        ]));
-
-        let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
-            vec![(col("a", &schema)?, "a".to_string())];
-
-        let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
-            col("b", &schema)?,
-            "AVG(b)".to_string(),
-            DataType::Float64,
-        ))];
-
-        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 
1));
-        let refs = blocking_exec.refs();
-        let hash_aggregate_exec = Arc::new(HashAggregateExec::try_new(
-            AggregateMode::Partial,
-            groups.clone(),
-            aggregates.clone(),
-            blocking_exec,
-            schema,
-        )?);
-
-        let fut = crate::physical_plan::collect(hash_aggregate_exec, task_ctx);
-        let mut fut = fut.boxed();
-
-        assert_is_pending(&mut fut);
-        drop(fut);
-        assert_strong_count_converges_to_zero(refs).await;
-
-        Ok(())
-    }
-}
diff --git a/datafusion/core/src/physical_plan/mod.rs 
b/datafusion/core/src/physical_plan/mod.rs
index b7b25a636..dc963c7e1 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -555,7 +555,6 @@ pub use datafusion_physical_expr::expressions;
 pub mod file_format;
 pub mod filter;
 pub mod functions;
-pub mod hash_aggregate;
 pub mod hash_join;
 pub mod hash_utils;
 pub mod join_utils;
diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index c02e1732a..e3b9bc434 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -34,6 +34,7 @@ use crate::logical_plan::{
 };
 use crate::logical_plan::{Limit, Values};
 use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
 use crate::physical_plan::cross_join::CrossJoinExec;
 use crate::physical_plan::explain::ExplainExec;
 use crate::physical_plan::expressions;
@@ -41,7 +42,6 @@ use crate::physical_plan::expressions::{
     CaseExpr, Column, GetIndexedFieldExpr, Literal, PhysicalSortExpr,
 };
 use crate::physical_plan::filter::FilterExec;
-use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
 use crate::physical_plan::hash_join::HashJoinExec;
 use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use crate::physical_plan::projection::ProjectionExec;
@@ -531,7 +531,7 @@ impl DefaultPhysicalPlanner {
                         })
                         .collect::<Result<Vec<_>>>()?;
 
-                    let initial_aggr = Arc::new(HashAggregateExec::try_new(
+                    let initial_aggr = Arc::new(AggregateExec::try_new(
                         AggregateMode::Partial,
                         groups.clone(),
                         aggregates.clone(),
@@ -573,7 +573,7 @@ impl DefaultPhysicalPlanner {
                         (initial_aggr, AggregateMode::Final)
                     };
 
-                    Ok(Arc::new(HashAggregateExec::try_new(
+                    Ok(Arc::new(AggregateExec::try_new(
                         next_partition_mode,
                         final_group
                             .iter()
@@ -1866,7 +1866,7 @@ mod tests {
         let execution_plan = plan(&logical_plan).await?;
         let final_hash_agg = execution_plan
             .as_any()
-            .downcast_ref::<HashAggregateExec>()
+            .downcast_ref::<AggregateExec>()
             .expect("hash aggregate");
         assert_eq!(
             "SUM(aggregate_test_100.c2)",
@@ -1900,7 +1900,7 @@ mod tests {
         let formatted = format!("{:?}", execution_plan);
 
         // Make sure the plan contains a FinalPartitioned, which means it will 
not use the Final
-        // mode in HashAggregate (which is slower)
+        // mode in Aggregate (which is slower)
         assert!(formatted.contains("FinalPartitioned"));
 
         Ok(())
diff --git a/datafusion/core/tests/sql/avro.rs 
b/datafusion/core/tests/sql/avro.rs
index b5ea5477e..f5c25dbad 100644
--- a/datafusion/core/tests/sql/avro.rs
+++ b/datafusion/core/tests/sql/avro.rs
@@ -150,9 +150,9 @@ async fn avro_explain() {
         vec![
             "physical_plan",
             "ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\
-            \n  HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
+            \n  AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
             \n    CoalescePartitionsExec\
-            \n      HashAggregateExec: mode=Partial, gby=[], 
aggr=[COUNT(UInt8(1))]\
+            \n      AggregateExec: mode=Partial, gby=[], 
aggr=[COUNT(UInt8(1))]\
             \n        RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
             \n          AvroExec: 
files=[ARROW_TEST_DATA/avro/alltypes_plain.avro], limit=None\
             \n",
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index a124311aa..f72e0f8f8 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -51,12 +51,12 @@ async fn explain_analyze_baseline_metrics() {
 
     assert_metrics!(
         &formatted,
-        "HashAggregateExec: mode=Partial, gby=[]",
+        "AggregateExec: mode=Partial, gby=[]",
         "metrics=[output_rows=3, elapsed_compute="
     );
     assert_metrics!(
         &formatted,
-        "HashAggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
+        "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
         "metrics=[output_rows=5, elapsed_compute="
     );
     assert_metrics!(
@@ -110,7 +110,7 @@ async fn explain_analyze_baseline_metrics() {
         use datafusion::physical_plan::sorts;
 
         plan.as_any().downcast_ref::<sorts::sort::SortExec>().is_some()
-            || 
plan.as_any().downcast_ref::<physical_plan::hash_aggregate::HashAggregateExec>().is_some()
+            || 
plan.as_any().downcast_ref::<physical_plan::aggregates::AggregateExec>().is_some()
             // CoalescePartitionsExec doesn't do any work so is not included
             || 
plan.as_any().downcast_ref::<physical_plan::filter::FilterExec>().is_some()
             || 
plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
@@ -660,10 +660,10 @@ async fn test_physical_plan_display_indent() {
         "  SortExec: [the_min@2 DESC]",
         "    CoalescePartitionsExec",
         "      ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 
as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
-        "        HashAggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
+        "        AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
         "          CoalesceBatchesExec: target_batch_size=4096",
         "            RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 3)",
-        "              HashAggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
+        "              AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
         "                CoalesceBatchesExec: target_batch_size=4096",
         "                  FilterExec: c12@1 < CAST(10 AS Float64)",
         "                    RepartitionExec: partitioning=RoundRobinBatch(3)",
diff --git a/datafusion/core/tests/sql/json.rs 
b/datafusion/core/tests/sql/json.rs
index e2209b876..79deaae79 100644
--- a/datafusion/core/tests/sql/json.rs
+++ b/datafusion/core/tests/sql/json.rs
@@ -92,9 +92,9 @@ async fn json_explain() {
         vec![
             "physical_plan",
             "ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\
-            \n  HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
+            \n  AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
             \n    CoalescePartitionsExec\
-            \n      HashAggregateExec: mode=Partial, gby=[], 
aggr=[COUNT(UInt8(1))]\
+            \n      AggregateExec: mode=Partial, gby=[], 
aggr=[COUNT(UInt8(1))]\
             \n        RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
             \n          JsonExec: limit=None, files=[tests/jsons/2.json]\n",
         ],

Reply via email to