This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 40e44a64f Fix output_partitioning(), output_ordering(), 
equivalence_properties() in WindowAggExec, shift the Column indexes (#4455)
40e44a64f is described below

commit 40e44a64f51438ea8eae7112828ef116409a15f0
Author: mingmwang <[email protected]>
AuthorDate: Sat Dec 3 04:50:54 2022 +0800

    Fix output_partitioning(), output_ordering(), equivalence_properties() in 
WindowAggExec, shift the Column indexes (#4455)
    
    * Fix output_partitioning(), output_ordering(), equivalence_properties() in 
WindowAggExec, shift the Column indexes
    
    * resolve review comments
---
 .../src/physical_plan/windows/window_agg_exec.rs   | 94 ++++++++++++++++++++--
 datafusion/core/tests/sql/window.rs                | 37 +++++++++
 2 files changed, 124 insertions(+), 7 deletions(-)

diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs 
b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 2e1b6a70b..5b4dc79ea 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -24,7 +24,7 @@ use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
 };
 use crate::physical_plan::{
-    ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
+    Column, ColumnStatistics, DisplayFormatType, Distribution, 
EquivalenceProperties,
     ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
     SendableRecordBatchStream, Statistics, WindowExpr,
 };
@@ -35,6 +35,8 @@ use arrow::{
     error::{ArrowError, Result as ArrowResult},
     record_batch::RecordBatch,
 };
+use datafusion_physical_expr::rewrite::TreeNodeRewritable;
+use datafusion_physical_expr::EquivalentClass;
 use futures::stream::Stream;
 use futures::{ready, StreamExt};
 use log::debug;
@@ -58,6 +60,8 @@ pub struct WindowAggExec {
     pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
     /// Sort Keys
     pub sort_keys: Option<Vec<PhysicalSortExpr>>,
+    /// The output ordering
+    output_ordering: Option<Vec<PhysicalSortExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -73,6 +77,34 @@ impl WindowAggExec {
     ) -> Result<Self> {
         let schema = create_schema(&input_schema, &window_expr)?;
         let schema = Arc::new(schema);
+        let window_expr_len = window_expr.len();
+        // Although WindowAggExec does not change the output ordering from the 
input, but can not return the output ordering
+        // from the input directly, need to adjust the column index to align 
with the new schema.
+        let output_ordering = input
+            .output_ordering()
+            .map(|sort_exprs| {
+                let new_sort_exprs: Result<Vec<PhysicalSortExpr>> = sort_exprs
+                    .iter()
+                    .map(|e| {
+                        let new_expr = e.expr.clone().transform_down(&|e| {
+                            Ok(e.as_any().downcast_ref::<Column>().map(|col| {
+                                Arc::new(Column::new(
+                                    col.name(),
+                                    window_expr_len + col.index(),
+                                ))
+                                    as Arc<dyn PhysicalExpr>
+                            }))
+                        })?;
+                        Ok(PhysicalSortExpr {
+                            expr: new_expr,
+                            options: e.options,
+                        })
+                    })
+                    .collect();
+                new_sort_exprs
+            })
+            .map_or(Ok(None), |v| v.map(Some))?;
+
         Ok(Self {
             input,
             window_expr,
@@ -80,6 +112,7 @@ impl WindowAggExec {
             input_schema,
             partition_keys,
             sort_keys,
+            output_ordering,
             metrics: ExecutionPlanMetricsSet::new(),
         })
     }
@@ -116,14 +149,38 @@ impl ExecutionPlan for WindowAggExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        // because we can have repartitioning using the partition keys
-        // this would be either 1 or more than 1 depending on the presense of
-        // repartitioning
-        self.input.output_partitioning()
+        // Although WindowAggExec does not change the output partitioning from 
the input, but can not return the output partitioning
+        // from the input directly, need to adjust the column index to align 
with the new schema.
+        let window_expr_len = self.window_expr.len();
+        let input_partitioning = self.input.output_partitioning();
+        match input_partitioning {
+            Partitioning::RoundRobinBatch(size) => 
Partitioning::RoundRobinBatch(size),
+            Partitioning::UnknownPartitioning(size) => {
+                Partitioning::UnknownPartitioning(size)
+            }
+            Partitioning::Hash(exprs, size) => {
+                let new_exprs = exprs
+                    .into_iter()
+                    .map(|expr| {
+                        expr.transform_down(&|e| {
+                            Ok(e.as_any().downcast_ref::<Column>().map(|col| {
+                                Arc::new(Column::new(
+                                    col.name(),
+                                    window_expr_len + col.index(),
+                                ))
+                                    as Arc<dyn PhysicalExpr>
+                            }))
+                        })
+                        .unwrap()
+                    })
+                    .collect::<Vec<_>>();
+                Partitioning::Hash(new_exprs, size)
+            }
+        }
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.input.output_ordering()
+        self.output_ordering.as_deref()
     }
 
     fn maintains_input_order(&self) -> bool {
@@ -146,7 +203,30 @@ impl ExecutionPlan for WindowAggExec {
     }
 
     fn equivalence_properties(&self) -> EquivalenceProperties {
-        self.input.equivalence_properties()
+        // Although WindowAggExec does not change the equivalence properties 
from the input, but can not return the equivalence properties
+        // from the input directly, need to adjust the column index to align 
with the new schema.
+        let window_expr_len = self.window_expr.len();
+        let mut new_properties = EquivalenceProperties::new(self.schema());
+        let new_eq_classes = self
+            .input
+            .equivalence_properties()
+            .classes()
+            .iter()
+            .map(|prop| {
+                let new_head = Column::new(
+                    prop.head().name(),
+                    window_expr_len + prop.head().index(),
+                );
+                let new_others = prop
+                    .others()
+                    .iter()
+                    .map(|col| Column::new(col.name(), window_expr_len + 
col.index()))
+                    .collect::<Vec<_>>();
+                EquivalentClass::new(new_head, new_others)
+            })
+            .collect::<Vec<_>>();
+        new_properties.extend(new_eq_classes);
+        new_properties
     }
 
     fn with_new_children(
diff --git a/datafusion/core/tests/sql/window.rs 
b/datafusion/core/tests/sql/window.rs
index 95d0ed929..6d30d53f5 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -1607,3 +1607,40 @@ async fn test_window_frame_nth_value_aggregate() -> 
Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn test_window_agg_sort() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT
+      c9,
+      SUM(c9) OVER(ORDER BY c9) as sum1,
+      SUM(c9) OVER(ORDER BY c9, c8) as sum2
+      FROM aggregate_test_100";
+
+    let msg = format!("Creating logical plan for '{}'", sql);
+    let plan = ctx.create_logical_plan(sql).expect(&msg);
+    let state = ctx.state();
+    let logical_plan = state.optimize(&plan)?;
+    let physical_plan = state.create_physical_plan(&logical_plan).await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST]@0 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, 
aggregate_test_100.c8 ASC NULLS LAST]@1 as sum2]",
+            "  WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} })]",
+            "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} })]",
+            "      SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+        expected, actual
+    );
+    Ok(())
+}

Reply via email to