This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ab09536280 fix: split "union" and "interleave" (#6045)
ab09536280 is described below

commit ab09536280a00444c1d2882b4e1ef579cab992dd
Author: Marco Neumann <[email protected]>
AuthorDate: Fri Apr 21 13:15:34 2023 +0200

    fix: split "union" and "interleave" (#6045)
    
    * fix: split "union" and "interleave"
    
    Silently interleaving partitions is confusing and causes bugs.
    
    Fixes #5970.
    
    * refactor: add interleave during dist enforcement
    
    This avoids a top-level repartition in the following query:
    
    ```sql
    SELECT c1, c9 FROM aggregate_test_100
    UNION ALL
    SELECT c1, c3 FROM aggregate_test_100
    ORDER BY c9 DESC LIMIT 5
    ```
    
    * fix: docs
---
 .../src/physical_optimizer/dist_enforcement.rs     |  68 +++++
 datafusion/core/src/physical_plan/union.rs         | 314 +++++++++++++++------
 datafusion/core/tests/sql/mod.rs                   |   6 +-
 datafusion/core/tests/sql/order.rs                 |  77 +++++
 datafusion/core/tests/sql/union.rs                 |  40 +++
 5 files changed, 424 insertions(+), 81 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs 
b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index affe432830..f74a7f2e93 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -29,6 +29,7 @@ use crate::physical_plan::joins::{
 use crate::physical_plan::projection::ProjectionExec;
 use crate::physical_plan::repartition::RepartitionExec;
 use crate::physical_plan::sorts::sort::SortOptions;
+use crate::physical_plan::union::{can_interleave, InterleaveExec, UnionExec};
 use crate::physical_plan::windows::WindowAggExec;
 use crate::physical_plan::Partitioning;
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, 
ExecutionPlan};
@@ -825,6 +826,35 @@ fn ensure_distribution(
         return Ok(Transformed::No(plan));
     }
 
+    // special case for UnionExec: We want to "bubble up" hash-partitioned 
data. So instead of:
+    //
+    // Agg:
+    //   Repartition (hash):
+    //     Union:
+    //       - Agg:
+    //           Repartition (hash):
+    //             Data
+    //       - Agg:
+    //           Repartition (hash):
+    //             Data
+    //
+    // We can use:
+    //
+    // Agg:
+    //   Interleave:
+    //     - Agg:
+    //         Repartition (hash):
+    //           Data
+    //     - Agg:
+    //         Repartition (hash):
+    //           Data
+    if let Some(union_exec) = plan.as_any().downcast_ref::<UnionExec>() {
+        if can_interleave(union_exec.inputs()) {
+            let plan = InterleaveExec::try_new(union_exec.inputs().clone())?;
+            return Ok(Transformed::Yes(Arc::new(plan)));
+        }
+    }
+
     let required_input_distributions = plan.required_input_distribution();
     let children: Vec<Arc<dyn ExecutionPlan>> = plan.children();
     assert_eq!(children.len(), required_input_distributions.len());
@@ -2134,4 +2164,42 @@ mod tests {
         assert_optimized!(expected, exec);
         Ok(())
     }
+
+    #[test]
+    fn union_to_interleave() -> Result<()> {
+        // group by (a as a1)
+        let left = aggregate_exec_with_alias(
+            parquet_exec(),
+            vec![("a".to_string(), "a1".to_string())],
+        );
+        // group by (a as a2)
+        let right = aggregate_exec_with_alias(
+            parquet_exec(),
+            vec![("a".to_string(), "a1".to_string())],
+        );
+
+        //  Union
+        let plan = Arc::new(UnionExec::new(vec![left, right]));
+
+        // final agg
+        let plan =
+            aggregate_exec_with_alias(plan, vec![("a1".to_string(), 
"a2".to_string())]);
+
+        // Only two RepartitionExecs added, no final RepartionExec required
+        let expected = &[
+            "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
+            "AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
+            "InterleaveExec",
+            "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
+            "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 
0 }], 10), input_partitions=1",
+            "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
+            "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
+            "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 
0 }], 10), input_partitions=1",
+            "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
+        ];
+        assert_optimized!(expected, plan);
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/physical_plan/union.rs 
b/datafusion/core/src/physical_plan/union.rs
index 3d4522272a..0448813240 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -94,8 +94,6 @@ pub struct UnionExec {
     metrics: ExecutionPlanMetricsSet,
     /// Schema of Union
     schema: SchemaRef,
-    /// Partition aware Union
-    partition_aware: bool,
 }
 
 impl UnionExec {
@@ -133,45 +131,12 @@ impl UnionExec {
 
     /// Create a new UnionExec
     pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self {
-        let fields: Vec<Field> = (0..inputs[0].schema().fields().len())
-            .map(|i| {
-                inputs
-                    .iter()
-                    .filter_map(|input| {
-                        if input.schema().fields().len() > i {
-                            Some(input.schema().field(i).clone())
-                        } else {
-                            None
-                        }
-                    })
-                    .find_or_first(|f| f.is_nullable())
-                    .unwrap()
-            })
-            .collect();
-
-        let schema = Arc::new(Schema::new_with_metadata(
-            fields,
-            inputs[0].schema().metadata().clone(),
-        ));
-
-        // If all the input partitions have the same Hash partition spec with 
the first_input_partition
-        // The UnionExec is partition aware.
-        //
-        // It might be too strict here in the case that the input partition 
specs are compatible but not exactly the same.
-        // For example one input partition has the partition spec 
Hash('a','b','c') and
-        // other has the partition spec Hash('a'), It is safe to derive the 
out partition with the spec Hash('a','b','c').
-        let first_input_partition = inputs[0].output_partitioning();
-        let partition_aware = matches!(first_input_partition, 
Partitioning::Hash(_, _))
-            && inputs
-                .iter()
-                .map(|plan| plan.output_partitioning())
-                .all(|partition| partition == first_input_partition);
+        let schema = union_schema(&inputs);
 
         UnionExec {
             inputs,
             metrics: ExecutionPlanMetricsSet::new(),
             schema,
-            partition_aware,
         }
     }
 
@@ -204,28 +169,20 @@ impl ExecutionPlan for UnionExec {
 
     /// Output of the union is the combination of all output partitions of the 
inputs
     fn output_partitioning(&self) -> Partitioning {
-        if self.partition_aware {
-            self.inputs[0].output_partitioning()
-        } else {
-            // Output the combination of all output partitions of the inputs 
if the Union is not partition aware
-            let num_partitions = self
-                .inputs
-                .iter()
-                .map(|plan| plan.output_partitioning().partition_count())
-                .sum();
+        // Output the combination of all output partitions of the inputs if 
the Union is not partition aware
+        let num_partitions = self
+            .inputs
+            .iter()
+            .map(|plan| plan.output_partitioning().partition_count())
+            .sum();
 
-            Partitioning::UnknownPartitioning(num_partitions)
-        }
+        Partitioning::UnknownPartitioning(num_partitions)
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        // If the Union is partition aware, there is no output ordering.
-        // Otherwise, the output ordering is the "meet" of its input orderings.
+        // The output ordering is the "meet" of its input orderings.
         // The meet is the finest ordering that satisfied by all the input
         // orderings, see https://en.wikipedia.org/wiki/Join_and_meet.
-        if self.partition_aware {
-            return None;
-        }
         get_meet_of_orderings(&self.inputs)
     }
 
@@ -273,34 +230,15 @@ impl ExecutionPlan for UnionExec {
         let elapsed_compute = baseline_metrics.elapsed_compute().clone();
         let _timer = elapsed_compute.timer(); // record on drop
 
-        if self.partition_aware {
-            let mut input_stream_vec = vec![];
-            for input in self.inputs.iter() {
-                if partition < input.output_partitioning().partition_count() {
-                    input_stream_vec.push(input.execute(partition, 
context.clone())?);
-                } else {
-                    // Do not find a partition to execute
-                    break;
-                }
-            }
-            if input_stream_vec.len() == self.inputs.len() {
-                let stream = Box::pin(CombinedRecordBatchStream::new(
-                    self.schema(),
-                    input_stream_vec,
-                ));
+        // find partition to execute
+        for input in self.inputs.iter() {
+            // Calculate whether partition belongs to the current partition
+            if partition < input.output_partitioning().partition_count() {
+                let stream = input.execute(partition, context)?;
+                debug!("Found a Union partition to execute");
                 return Ok(Box::pin(ObservedStream::new(stream, 
baseline_metrics)));
-            }
-        } else {
-            // find partition to execute
-            for input in self.inputs.iter() {
-                // Calculate whether partition belongs to the current partition
-                if partition < input.output_partitioning().partition_count() {
-                    let stream = input.execute(partition, context)?;
-                    debug!("Found a Union partition to execute");
-                    return Ok(Box::pin(ObservedStream::new(stream, 
baseline_metrics)));
-                } else {
-                    partition -= input.output_partitioning().partition_count();
-                }
+            } else {
+                partition -= input.output_partitioning().partition_count();
             }
         }
 
@@ -340,8 +278,224 @@ impl ExecutionPlan for UnionExec {
     }
 }
 
+/// Combines multiple input streams by interleaving them.
+///
+/// This only works if all inputs have the same hash-partitioning.
+///
+/// # Data Flow
+/// ```text
+/// +---------+
+/// |         |---+
+/// | Input 1 |   |
+/// |         |-------------+
+/// +---------+   |         |     
+///               |         |         +---------+
+///               +------------------>|         |
+///                 +---------------->| Combine |-->
+///                 | +-------------->|         |
+///                 | |     |         +---------+
+/// +---------+     | |     |       
+/// |         |-----+ |     |
+/// | Input 2 |       |     |
+/// |         |---------------+
+/// +---------+       |     | |    
+///                   |     | |       +---------+
+///                   |     +-------->|         |
+///                   |       +------>| Combine |-->
+///                   |         +---->|         |
+///                   |         |     +---------+
+/// +---------+       |         |     
+/// |         |-------+         |
+/// | Input 3 |                 |
+/// |         |-----------------+
+/// +---------+
+/// ```
+#[derive(Debug)]
+pub struct InterleaveExec {
+    /// Input execution plan
+    inputs: Vec<Arc<dyn ExecutionPlan>>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+    /// Schema of Interleave
+    schema: SchemaRef,
+}
+
+impl InterleaveExec {
+    /// Create a new InterleaveExec
+    pub fn try_new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Result<Self> {
+        let schema = union_schema(&inputs);
+
+        if !can_interleave(&inputs) {
+            return Err(DataFusionError::Internal(String::from(
+                "Not all InterleaveExec children have a consistent hash 
partitioning",
+            )));
+        }
+
+        Ok(InterleaveExec {
+            inputs,
+            metrics: ExecutionPlanMetricsSet::new(),
+            schema,
+        })
+    }
+
+    /// Get inputs of the execution plan
+    pub fn inputs(&self) -> &Vec<Arc<dyn ExecutionPlan>> {
+        &self.inputs
+    }
+}
+
+impl ExecutionPlan for InterleaveExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children.iter().any(|x| *x))
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        self.inputs.clone()
+    }
+
+    /// All inputs must have the same partitioning. The output partioning of 
InterleaveExec is the same as the inputs
+    /// (NOT combined). E.g. if there are 10 inputs where each is 
`Hash(3)`-partitioned, InterleaveExec is also
+    /// `Hash(3)`-partitioned.
+    fn output_partitioning(&self) -> Partitioning {
+        self.inputs[0].output_partitioning()
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        vec![false; self.inputs().len()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(InterleaveExec::try_new(children)?))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        debug!("Start InterleaveExec::execute for partition {} of context 
session_id {} and task_id {:?}", partition, context.session_id(), 
context.task_id());
+        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+        // record the tiny amount of work done in this function so
+        // elapsed_compute is reported as non zero
+        let elapsed_compute = baseline_metrics.elapsed_compute().clone();
+        let _timer = elapsed_compute.timer(); // record on drop
+
+        let mut input_stream_vec = vec![];
+        for input in self.inputs.iter() {
+            if partition < input.output_partitioning().partition_count() {
+                input_stream_vec.push(input.execute(partition, 
context.clone())?);
+            } else {
+                // Do not find a partition to execute
+                break;
+            }
+        }
+        if input_stream_vec.len() == self.inputs.len() {
+            let stream = Box::pin(CombinedRecordBatchStream::new(
+                self.schema(),
+                input_stream_vec,
+            ));
+            return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
+        }
+
+        warn!("Error in InterleaveExec: Partition {} not found", partition);
+
+        Err(crate::error::DataFusionError::Execution(format!(
+            "Partition {partition} not found in InterleaveExec"
+        )))
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "InterleaveExec")
+            }
+        }
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Statistics {
+        self.inputs
+            .iter()
+            .map(|ep| ep.statistics())
+            .reduce(stats_union)
+            .unwrap_or_default()
+    }
+
+    fn benefits_from_input_partitioning(&self) -> bool {
+        false
+    }
+}
+
+/// If all the input partitions have the same Hash partition spec with the 
first_input_partition
+/// The InterleaveExec is partition aware.
+///
+/// It might be too strict here in the case that the input partition specs are 
compatible but not exactly the same.
+/// For example one input partition has the partition spec Hash('a','b','c') 
and
+/// other has the partition spec Hash('a'), It is safe to derive the out 
partition with the spec Hash('a','b','c').
+pub fn can_interleave(inputs: &[Arc<dyn ExecutionPlan>]) -> bool {
+    if inputs.is_empty() {
+        return false;
+    }
+
+    let first_input_partition = inputs[0].output_partitioning();
+    matches!(first_input_partition, Partitioning::Hash(_, _))
+        && inputs
+            .iter()
+            .map(|plan| plan.output_partitioning())
+            .all(|partition| partition == first_input_partition)
+}
+
+fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
+    let fields: Vec<Field> = (0..inputs[0].schema().fields().len())
+        .map(|i| {
+            inputs
+                .iter()
+                .filter_map(|input| {
+                    if input.schema().fields().len() > i {
+                        Some(input.schema().field(i).clone())
+                    } else {
+                        None
+                    }
+                })
+                .find_or_first(|f| f.is_nullable())
+                .unwrap()
+        })
+        .collect();
+
+    Arc::new(Schema::new_with_metadata(
+        fields,
+        inputs[0].schema().metadata().clone(),
+    ))
+}
+
 /// CombinedRecordBatchStream can be used to combine a Vec of 
SendableRecordBatchStreams into one
-pub struct CombinedRecordBatchStream {
+struct CombinedRecordBatchStream {
     /// Schema wrapped by Arc
     schema: SchemaRef,
     /// Stream entries
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 192e2f65f8..8c7f50c9a6 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -751,7 +751,11 @@ fn create_sort_merge_join_datatype_context() -> 
Result<SessionContext> {
 }
 
 fn create_union_context() -> Result<SessionContext> {
-    let ctx = SessionContext::new();
+    let ctx = SessionContext::with_config(
+        SessionConfig::new()
+            .with_target_partitions(4)
+            .with_batch_size(4096),
+    );
     let t1_schema = Arc::new(Schema::new(vec![
         Field::new("id", DataType::Int32, true),
         Field::new("name", DataType::UInt8, true),
diff --git a/datafusion/core/tests/sql/order.rs 
b/datafusion/core/tests/sql/order.rs
index da7663cb8b..5691a7cfd6 100644
--- a/datafusion/core/tests/sql/order.rs
+++ b/datafusion/core/tests/sql/order.rs
@@ -175,3 +175,80 @@ async fn sort_with_duplicate_sort_exprs() -> Result<()> {
 
     Ok(())
 }
+
+/// Minimal test case for 
https://github.com/apache/arrow-datafusion/issues/5970
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_issue5970_mini() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_target_partitions(2)
+        .with_repartition_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    let sql = "
+WITH
+    m0(t) AS (
+        VALUES (0), (1), (2)),
+    m1(t) AS (
+        VALUES (0), (1)),
+    u AS (
+        SELECT 0 as m, t FROM m0 GROUP BY 1, 2), 
+    v AS (
+        SELECT 1 as m, t FROM m1 GROUP BY 1, 2)
+SELECT * FROM u
+UNION ALL
+SELECT * FROM v
+ORDER BY 1, 2;
+    ";
+
+    // check phys. plan
+    let dataframe = ctx.sql(sql).await.unwrap();
+    let plan = dataframe.into_optimized_plan().unwrap();
+    let plan = ctx.state().create_physical_plan(&plan).await.unwrap();
+    let expected = vec![
+        "SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
+        "  SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
+        "    InterleaveExec",
+        "      ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]",
+        "        AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as 
Int64(0), t@1 as t], aggr=[]",
+        "          CoalesceBatchesExec: target_batch_size=8192",
+        "            RepartitionExec: partitioning=Hash([Column { name: 
\"Int64(0)\", index: 0 }, Column { name: \"t\", index: 1 }], 2), 
input_partitions=2",
+        "              AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as 
t], aggr=[]",
+        "                RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
+        "                  ProjectionExec: expr=[column1@0 as t]",
+        "                    ValuesExec",
+        "      ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
+        "        AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as 
Int64(1), t@1 as t], aggr=[]",
+        "          CoalesceBatchesExec: target_batch_size=8192",
+        "            RepartitionExec: partitioning=Hash([Column { name: 
\"Int64(1)\", index: 0 }, Column { name: \"t\", index: 1 }], 2), 
input_partitions=2",
+        "              AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as 
t], aggr=[]",
+        "                RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
+        "                  ProjectionExec: expr=[column1@0 as t]",
+        "                    ValuesExec",
+    ];
+    let formatted = displayable(plan.as_ref()).indent().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    // sometimes it "just works"
+    for i in 0..10 {
+        println!("run: {i}");
+        let actual = execute_to_batches(&ctx, sql).await;
+
+        // in https://github.com/apache/arrow-datafusion/issues/5970 the order 
of the output was sometimes not right
+        let expected = vec![
+            "+---+---+",
+            "| m | t |",
+            "+---+---+",
+            "| 0 | 0 |",
+            "| 0 | 1 |",
+            "| 0 | 2 |",
+            "| 1 | 0 |",
+            "| 1 | 1 |",
+            "+---+---+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+    Ok(())
+}
diff --git a/datafusion/core/tests/sql/union.rs 
b/datafusion/core/tests/sql/union.rs
index d56a0a39ff..cca9c23e67 100644
--- a/datafusion/core/tests/sql/union.rs
+++ b/datafusion/core/tests/sql/union.rs
@@ -140,3 +140,43 @@ async fn test_union_upcast_types() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn union_with_hash_aggregate() -> Result<()> {
+    let ctx = create_union_context()?;
+    let sql = "select count(*) from (
+        select distinct name from t1
+        union all
+        select distinct name from t2
+        ) group by name";
+
+    let dataframe = ctx.sql(sql).await.unwrap();
+    let plan = dataframe.into_optimized_plan().unwrap();
+    let plan = ctx.state().create_physical_plan(&plan).await.unwrap();
+    let formatted = displayable(plan.as_ref()).indent().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+
+    let expected = vec![
+        "ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]",
+        "  AggregateExec: mode=Single, gby=[name@0 as name], 
aggr=[COUNT(UInt8(1))]",
+        "    InterleaveExec",
+        "      AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], 
aggr=[]",
+        "        CoalesceBatchesExec: target_batch_size=4096",
+        "          RepartitionExec: partitioning=Hash([Column { name: 
\"name\", index: 0 }], 4), input_partitions=4",
+        "            RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1",
+        "              AggregateExec: mode=Partial, gby=[name@0 as name], 
aggr=[]",
+        "                MemoryExec: partitions=1, partition_sizes=[1]",
+        "      AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], 
aggr=[]",
+        "        CoalesceBatchesExec: target_batch_size=4096",
+        "          RepartitionExec: partitioning=Hash([Column { name: 
\"name\", index: 0 }], 4), input_partitions=4",
+        "            RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1",
+        "              AggregateExec: mode=Partial, gby=[name@0 as name], 
aggr=[]",
+        "                MemoryExec: partitions=1, partition_sizes=[1]",
+    ];
+
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+    Ok(())
+}

Reply via email to