alamb commented on code in PR #10396:
URL: https://github.com/apache/datafusion/pull/10396#discussion_r1592864219


##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -34,90 +33,63 @@ use datafusion_common::{
 use datafusion_expr::expr::Alias;
 use datafusion_expr::logical_plan::{Aggregate, LogicalPlan, Projection, 
Window};
 use datafusion_expr::{col, Expr, ExprSchemable};
+use indexmap::IndexMap;
 
-/// Set of expressions generated by the [`ExprIdentifierVisitor`]
-/// and consumed by the [`CommonSubexprRewriter`].
-#[derive(Default)]
-struct ExprSet {
-    /// A map from expression's identifier (stringified expr) to tuple 
including:
-    /// - the expression itself (cloned)
-    /// - counter
-    /// - DataType of this expression.
-    /// - symbol used as the identifier in the alias.
-    map: HashMap<Identifier, (Expr, usize, DataType, Identifier)>,
-}
-
-impl ExprSet {
-    fn expr_identifier(expr: &Expr) -> Identifier {
-        format!("{expr}")
-    }
-
-    fn get(&self, key: &Identifier) -> Option<&(Expr, usize, DataType, 
Identifier)> {
-        self.map.get(key)
-    }
-
-    fn entry(
-        &mut self,
-        key: Identifier,
-    ) -> Entry<'_, Identifier, (Expr, usize, DataType, Identifier)> {
-        self.map.entry(key)
-    }
-
-    fn populate_expr_set(
-        &mut self,
-        expr: &[Expr],
-        input_schema: DFSchemaRef,
-        expr_mask: ExprMask,
-    ) -> Result<()> {
-        expr.iter().try_for_each(|e| {
-            self.expr_to_identifier(e, Arc::clone(&input_schema), expr_mask)?;
-
-            Ok(())
-        })
-    }
-
-    /// Go through an expression tree and generate identifier for every node 
in this tree.
-    fn expr_to_identifier(
-        &mut self,
-        expr: &Expr,
-        input_schema: DFSchemaRef,
-        expr_mask: ExprMask,
-    ) -> Result<()> {
-        expr.visit(&mut ExprIdentifierVisitor {
-            expr_set: self,
-            input_schema,
-            visit_stack: vec![],
-            node_count: 0,
-            expr_mask,
-        })?;
-
-        Ok(())
-    }
-}
-
-impl From<Vec<(Identifier, (Expr, usize, DataType, Identifier))>> for ExprSet {
-    fn from(entries: Vec<(Identifier, (Expr, usize, DataType, Identifier))>) 
-> Self {
-        let mut expr_set = Self::default();
-        entries.into_iter().for_each(|(k, v)| {
-            expr_set.map.insert(k, v);
-        });
-        expr_set
-    }
-}
-
-/// Identifier for each subexpression.
+/// Identifier that represents a subexpression tree.
 ///
-/// Note that the current implementation uses the `Display` of an expression
-/// (a `String`) as `Identifier`.
+/// Note that the current implementation contains:
+/// - the `Display` of an expression (a `String`) and
+/// - the identifiers of the childrens of the expression

Review Comment:
   Do we need to form such complicated identifiers -- what if we simply use 
`#{}` like in https://github.com/apache/datafusion/pull/10333 as suggested by 
@MohamedAbdeen21  in 
https://github.com/apache/datafusion/pull/10396#issuecomment-2096811278?
   
   



##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -789,6 +861,74 @@ mod test {
         assert_eq!(expected, formatted_plan);
     }
 
+    #[test]
+    fn id_array_visitor() -> Result<()> {

Review Comment:
   This is really nice to see more of how this code works 👍 



##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -34,90 +33,63 @@ use datafusion_common::{
 use datafusion_expr::expr::Alias;
 use datafusion_expr::logical_plan::{Aggregate, LogicalPlan, Projection, 
Window};
 use datafusion_expr::{col, Expr, ExprSchemable};
+use indexmap::IndexMap;
 
-/// Set of expressions generated by the [`ExprIdentifierVisitor`]
-/// and consumed by the [`CommonSubexprRewriter`].
-#[derive(Default)]
-struct ExprSet {
-    /// A map from expression's identifier (stringified expr) to tuple 
including:
-    /// - the expression itself (cloned)
-    /// - counter
-    /// - DataType of this expression.
-    /// - symbol used as the identifier in the alias.
-    map: HashMap<Identifier, (Expr, usize, DataType, Identifier)>,
-}
-
-impl ExprSet {
-    fn expr_identifier(expr: &Expr) -> Identifier {
-        format!("{expr}")
-    }
-
-    fn get(&self, key: &Identifier) -> Option<&(Expr, usize, DataType, 
Identifier)> {
-        self.map.get(key)
-    }
-
-    fn entry(
-        &mut self,
-        key: Identifier,
-    ) -> Entry<'_, Identifier, (Expr, usize, DataType, Identifier)> {
-        self.map.entry(key)
-    }
-
-    fn populate_expr_set(
-        &mut self,
-        expr: &[Expr],
-        input_schema: DFSchemaRef,
-        expr_mask: ExprMask,
-    ) -> Result<()> {
-        expr.iter().try_for_each(|e| {
-            self.expr_to_identifier(e, Arc::clone(&input_schema), expr_mask)?;
-
-            Ok(())
-        })
-    }
-
-    /// Go through an expression tree and generate identifier for every node 
in this tree.
-    fn expr_to_identifier(
-        &mut self,
-        expr: &Expr,
-        input_schema: DFSchemaRef,
-        expr_mask: ExprMask,
-    ) -> Result<()> {
-        expr.visit(&mut ExprIdentifierVisitor {
-            expr_set: self,
-            input_schema,
-            visit_stack: vec![],
-            node_count: 0,
-            expr_mask,
-        })?;
-
-        Ok(())
-    }
-}
-
-impl From<Vec<(Identifier, (Expr, usize, DataType, Identifier))>> for ExprSet {
-    fn from(entries: Vec<(Identifier, (Expr, usize, DataType, Identifier))>) 
-> Self {
-        let mut expr_set = Self::default();
-        entries.into_iter().for_each(|(k, v)| {
-            expr_set.map.insert(k, v);
-        });
-        expr_set
-    }
-}
-
-/// Identifier for each subexpression.
+/// Identifier that represents a subexpression tree.
 ///
-/// Note that the current implementation uses the `Display` of an expression
-/// (a `String`) as `Identifier`.
+/// Note that the current implementation contains:
+/// - the `Display` of an expression (a `String`) and
+/// - the identifiers of the childrens of the expression
+/// concatenated.
 ///
 /// An identifier should (ideally) be able to "hash", "accumulate", "equal" 
and "have no
 /// collision (as low as possible)"
 ///
 /// Since an identifier is likely to be copied many times, it is better that 
an identifier
-/// is small or "copy". otherwise some kinds of reference count is needed. 
String description
-/// here is not such a good choose.
+/// is small or "copy". otherwise some kinds of reference count is needed. 
String
+/// description here is not such a good choose.
 type Identifier = String;
 
+/// A cache that contains the postorder index and the identifier of expression 
tree nodes
+/// by the preorder index of the nodes.
+///
+/// This cache is filled by `ExprIdentifierVisitor` during the first traversal 
and is used
+/// by `CommonSubexprRewriter` during the second traversal.
+///
+/// The purpose of this cache is to quickly find the identifier of a node 
during the
+/// second traversal.
+///
+/// Elements in this array are added during `f_down` so the indexes represent 
the preorder
+/// index of expression nodes and thus element 0 belongs to the root of the 
expression
+/// tree.
+/// The elements of the array are tuples that contain:
+/// - Postorder index that belongs to the preorder index. Assigned during 
`f_up`, start
+///   from 0.
+/// - Identifier of the expression. If empty (`""`), expr should not be 
considered for
+///   CSE.
+///
+/// # Example
+/// An expression like `(a + b)` would have the following `IdArray`:
+/// ```text
+/// [
+///   (2, "a + b"),
+///   (1, "a"),
+///   (0, "b")
+/// ]
+/// ```
+type IdArray = Vec<(usize, Identifier)>;
+
+/// A map that contains statistics of expressions by their identifiers.
+/// It contains:
+/// - The number of occurrences and
+/// - The DataType
+/// of an expression.
+type ExprStats = HashMap<Identifier, (usize, DataType)>;

Review Comment:
   Nit the code might be easier to follow if this a proper struct and move the 
manipulation functions (like `to_arrays` for example) into methods
   
   Maybe as a follow on PR 



##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -658,55 +686,89 @@ impl TreeNodeVisitor for ExprIdentifierVisitor<'_> {
     type Node = Expr;
 
     fn f_down(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
-        // related to https://github.com/apache/datafusion/issues/8814
+        // related to https://github.com/apache/arrow-datafusion/issues/8814
         // If the expr contain volatile expression or is a short-circuit 
expression, skip it.
+        // TODO: propagate is_volatile state bottom-up + consider non-volatile 
sub-expressions for CSE
+        // TODO: consider surely executed children of "short circuited"s for 
CSE
         if expr.short_circuits() || expr.is_volatile()? {
-            self.visit_stack
-                .push(VisitRecord::JumpMark(self.node_count));
-            return Ok(TreeNodeRecursion::Jump); // go to f_up
+            self.visit_stack.push(VisitRecord::JumpMark);
+
+            return Ok(TreeNodeRecursion::Jump);
         }
 
+        self.id_array.push((0, "".to_string()));
         self.visit_stack
-            .push(VisitRecord::EnterMark(self.node_count));
-        self.node_count += 1;
+            .push(VisitRecord::EnterMark(self.down_index));
+        self.down_index += 1;
 
         Ok(TreeNodeRecursion::Continue)
     }
 
     fn f_up(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
-        let (_idx, sub_expr_identifier) = self.pop_enter_mark();
-
-        // skip exprs should not be recognize.
-        if self.expr_mask.ignores(expr) {
-            let curr_expr_identifier = ExprSet::expr_identifier(expr);
-            self.visit_stack
-                .push(VisitRecord::ExprItem(curr_expr_identifier));
+        let Some((down_index, sub_expr_id)) = self.pop_enter_mark() else {
             return Ok(TreeNodeRecursion::Continue);
-        }
-        let curr_expr_identifier = ExprSet::expr_identifier(expr);
-        let alias_symbol = 
format!("{curr_expr_identifier}{sub_expr_identifier}");
+        };
 
-        self.visit_stack
-            .push(VisitRecord::ExprItem(alias_symbol.clone()));
+        let expr_id = expr_identifier(expr, sub_expr_id);
 
-        let data_type = expr.get_type(&self.input_schema)?;
+        self.id_array[down_index].0 = self.up_index;
+        if !self.expr_mask.ignores(expr) {
+            self.id_array[down_index].1.clone_from(&expr_id);
+
+            // TODO: can we capture the data type in the second traversal only 
for
+            //  replaced expressions?
+            let data_type = expr.get_type(&self.input_schema)?;
+            let (count, _) = self
+                .expr_stats
+                .entry(expr_id.clone())
+                .or_insert((0, data_type));
+            *count += 1;
+        }
+        self.visit_stack.push(VisitRecord::ExprItem(expr_id));
+        self.up_index += 1;
 
-        self.expr_set
-            .entry(curr_expr_identifier)
-            .or_insert_with(|| (expr.clone(), 0, data_type, alias_symbol))
-            .1 += 1;
         Ok(TreeNodeRecursion::Continue)
     }
 }
 
-/// Rewrite expression by common sub-expression with a corresponding temporary
-/// column name that will compute the subexpression.
-///
-/// `affected_id` is updated with any sub expressions that were replaced
+fn expr_identifier(expr: &Expr, sub_expr_identifier: Identifier) -> Identifier 
{
+    format!("{{{expr}{sub_expr_identifier}}}")

Review Comment:
   see comments about identifiers elsewhere



##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -720,43 +782,53 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
             return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump));
         }
 
-        let curr_id = &ExprSet::expr_identifier(&expr);
-
-        // lookup previously visited expression
-        match self.expr_set.get(curr_id) {
-            Some((_, counter, _, symbol)) => {
-                // if has a commonly used (a.k.a. 1+ use) expr
-                if *counter > 1 {
-                    self.affected_id.insert(curr_id.clone());
-
-                    let expr_name = expr.display_name()?;
-                    // Alias this `Column` expr to it original "expr name",
-                    // `projection_push_down` optimizer use "expr name" to 
eliminate useless
-                    // projections.
-                    Ok(Transformed::new(
-                        col(symbol).alias(expr_name),
-                        true,
-                        TreeNodeRecursion::Jump,
-                    ))
-                } else {
-                    Ok(Transformed::no(expr))
-                }
+        let (up_index, expr_id) = &self.id_array[self.down_index];
+        self.down_index += 1;
+
+        // skip `Expr`s without identifier (empty identifier).
+        if expr_id.is_empty() {
+            return Ok(Transformed::no(expr));
+        }
+
+        let (counter, _) = self.expr_stats.get(expr_id).unwrap();
+        if *counter > 1 {
+            // step index to skip all sub-node (which has smaller series 
number).
+            while self.down_index < self.id_array.len()
+                && self.id_array[self.down_index].0 < *up_index
+            {
+                self.down_index += 1;
             }
-            None => Ok(Transformed::no(expr)),
+
+            let expr_name = expr.display_name()?;
+            self.common_exprs.insert(expr_id.clone(), expr);
+            // Alias this `Column` expr to it original "expr name",
+            // `projection_push_down` optimizer use "expr name" to eliminate 
useless
+            // projections.
+            // TODO: do we really need to alias here?

Review Comment:
   FWIW I removed the alias and several tests failed:
   
   ```
   $ cargo test --test sqllogictests
      Compiling datafusion-optimizer v37.1.0 
(/Users/andrewlamb/Software/datafusion2/datafusion/optimizer)
   warning: unused variable: `expr_name`
      --> datafusion/optimizer/src/common_subexpr_eliminate.rs:802:17
       |
   802 |             let expr_name = expr.display_name()?;
       |                 ^^^^^^^^^ help: if this is intentional, prefix it with 
an underscore: `_expr_name`
       |
       = note: `#[warn(unused_variables)]` on by default
   
      Compiling datafusion v37.1.0 
(/Users/andrewlamb/Software/datafusion2/datafusion/core)
   warning: `datafusion-optimizer` (lib) generated 1 warning
   ...
   Running "map.slt"
   External error: query failed: DataFusion error: Optimizer rule 
'common_sub_expression_eliminate' failed
   caused by
   Schema error: No field named "log(unsigned_integers.b)". Valid fields are a, 
"log({CAST(unsigned_integers.b AS Float32)|{unsigned_integers.b}})", 
"log(Int64(10),unsigned_integers.b)".
   [SQL] select log(a, 64) a, log(b), log(10, b) from unsigned_integers;
   at test_files/scalar.slt:592
   
   External error: query failed: DataFusion error: Optimizer rule 
'common_sub_expression_eliminate' failed
   caused by
   Schema error: No field named "aggregate_test_100.c2 % Int64(2) = Int64(0)". 
Valid fields are "{CAST(aggregate_test_100.c2 AS Int64) % Int64(2) = 
Int64(0)|{Int64(0)}|{CAST(aggregate_test_100.c2 AS Int64) % 
Int64(2)|{Int64(2)}|{CAST(aggregate_test_100.c2 AS 
Int64)|{aggregate_test_100.c2}}}}", "FIRST_VALUE(aggregate_test_100.c2) ORDER 
BY [CAST(aggregate_test_100.c2 AS Int64) % Int64(2) = Int64(0) ASC NULLS LAST, 
aggregate_test_100.c3 DESC NULLS FIRST]", "FIRST_VALUE(aggregate_test_100.c3 - 
Int64(100)) ORDER BY [CAST(aggregate_test_100.c2 AS Int64) % Int64(2) = 
Int64(0) ASC NULLS LAST, aggregate_test_100.c3 DESC NULLS FIRST]".
   [SQL] SELECT DISTINCT ON (c2 % 2 = 0) c2, c3 - 100 FROM aggregate_test_100 
ORDER BY c2 % 2 = 0, c3 DESC;
   at test_files/distinct_on.slt:116
   
   External error: query failed: DataFusion error: Optimizer rule 
'common_sub_expression_eliminate' failed
   caused by
   Schema error: No field named "acos(round(Float64(1) / doubles.f64))". Valid 
fields are doubles.f64, i64_1, "acos({round(Float64(1) / 
doubles.f64)|{Float64(1) / doubles.f64|{doubles.f64}|{Float64(1)}}})".
   [SQL] select f64, round(1.0 / f64) as i64_1, acos(round(1.0 / f64)) from 
doubles;
   at test_files/expr.slt:2272
   
   External error: query result mismatch:
   [SQL] explain select a/2, a/2 + 1 from t
   [Diff] (-expected|+actual)
   -   logical_plan
   -   01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a 
/ Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
   -   02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
   -   03)----TableScan: t projection=[a]
   at test_files/subquery.slt:1071
   
   External error: query result mismatch:
   [SQL] EXPLAIN SELECT x/2, x/2+1 FROM t;
   [Diff] (-expected|+actual)
   -   logical_plan
   -   01)Projection: {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2), {t.x 
/ Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2) + Int64(1)
   -   02)--Projection: t.x / Int64(2) AS {t.x / Int64(2)|{Int64(2)}|{t.x}}
   -   03)----TableScan: t projection=[x]
   -   physical_plan
   -   01)ProjectionExec: expr=[{t.x / Int64(2)|{Int64(2)}|{t.x}}@0 as t.x / 
Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}}@0 + 1 as t.x / Int64(2) + Int64(1)]
   -   02)--ProjectionExec: expr=[x@0 / 2 as {t.x / Int64(2)|{Int64(2)}|{t.x}}]
   -   03)----MemoryExec: partitions=1, partition_sizes=[1]
   at test_files/select.slt:1425
   
   External error: query result mismatch:
   [SQL] EXPLAIN SELECT c3,
       SUM(c9) OVER(ORDER BY c3+c4 DESC, c9 DESC, c2 ASC) as sum1,
       SUM(c9) OVER(ORDER BY c3+c4 ASC, c9 ASC ) as sum2
       FROM aggregate_test_100
       LIMIT 5
   [Diff] (-expected|+actual)
       logical_plan
       01)Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
       02)--Limit: skip=0, fetch=5
   -   03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS 
aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
   +   03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} ASC 
NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]
       04)------Projection: {aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, 
aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
   -   05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS 
aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
   +   05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} DESC 
NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]
       06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 
AS {aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, 
aggregate_test_100.c2, aggregate_test_100.c3, aggregate_test_100.c9
       07)------------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
       physical_plan
       01)ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
       02)--GlobalLimitExec: skip=0, fetch=5
       03)----WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
CurrentRow, end_bound: Following(Int16(NULL)), is_causal: false }]
       04)------ProjectionExec: expr=[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 as 
{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c3@2 as 
c3, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@4 as SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]
       05)--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int16(NULL)), end_bound: CurrentRow, is_causal: false }], 
mode=[Sorted]
       06)----------SortPreservingMergeExec: [{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 
DESC,c9@3 DESC,c2@1 ASC NULLS LAST]
       07)------------SortExec: expr=[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 
DESC,c9@3 DESC,c2@1 ASC NULLS LAST], preserve_partitioning=[true]
       08)--------------ProjectionExec: expr=[c3@1 + c4@2 as 
{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c2@0 as 
c2, c3@1 as c3, c9@3 as c9]
       09)----------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
       10)------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3, c4, c9], has_header=true
   at test_files/window.slt:1680
   
   External error: query failed: DataFusion error: Optimizer rule 
'common_sub_expression_eliminate' failed
   caused by
   Schema error: No field named "hits.ClientIP - Int64(1)". Valid fields are 
hits."ClientIP", "{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(1)", 
"{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(2)", 
"{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(3)", "COUNT(*)".
   [SQL] SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, 
COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, 
"ClientIP" - 3 ORDER BY c DESC LIMIT 10;
   at test_files/clickbench.slt:241
   
   External error: query failed: DataFusion error: Optimizer rule 
'common_sub_expression_eliminate' failed
   caused by
   Schema error: No field named "value_dict.x_dict % Int64(2)". Valid fields 
are "{CAST(value_dict.x_dict AS Int64)|{value_dict.x_dict}} % Int64(2)", 
"SUM(value_dict.x_dict)".
   [SQL] select sum(x_dict) from value_dict group by x_dict % 2 order by 
sum(x_dict);
   at test_files/aggregate.slt:2696
   
   External error: query result mismatch:
   [SQL] EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT CAST(x AS 
DOUBLE)) FROM t1 GROUP BY y;
   [Diff] (-expected|+actual)
       logical_plan
   -   01)Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS 
MAX(DISTINCT t1.x)
   -   02)--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]]
   -   03)----Aggregate: groupBy=[[t1.y, {CAST(t1.x AS Float64)|{t1.x}} AS t1.x 
AS alias1]], aggr=[[]]
   -   04)------Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS 
Float64)|{t1.x}}, t1.y
   -   05)--------TableScan: t1 projection=[x, y]
   +   01)Projection: SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)
   +   02)--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(DISTINCT {CAST(t1.x AS 
Float64)|{t1.x}}) AS SUM(DISTINCT t1.x), MAX(DISTINCT {CAST(t1.x AS 
Float64)|{t1.x}}) AS MAX(DISTINCT t1.x)]]
   +   03)----Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS 
Float64)|{t1.x}}, t1.y
   +   04)------TableScan: t1 projection=[x, y]
       physical_plan
   -   01)ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), 
MAX(alias1)@2 as MAX(DISTINCT t1.x)]
   -   02)--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], 
aggr=[SUM(alias1), MAX(alias1)]
   +   01)ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x), 
MAX(DISTINCT t1.x)@2 as MAX(DISTINCT t1.x)]
   +   02)--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], 
aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)]
       03)----CoalesceBatchesExec: target_batch_size=2
       04)------RepartitionExec: partitioning=Hash([y@0], 8), input_partitions=8
   -   05)--------AggregateExec: mode=Partial, gby=[y@0 as y], 
aggr=[SUM(alias1), MAX(alias1)]
   -   06)----------AggregateExec: mode=FinalPartitioned, gby=[y@0 as y, 
alias1@1 as alias1], aggr=[]
   -   07)------------CoalesceBatchesExec: target_batch_size=2
   -   08)--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8), 
input_partitions=8
   -   09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
   -   10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, 
{CAST(t1.x AS Float64)|{t1.x}}@0 as alias1], aggr=[]
   -   11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as 
{CAST(t1.x AS Float64)|{t1.x}}, y@1 as y]
   -   12)----------------------MemoryExec: partitions=1, partition_sizes=[1]
   +   05)--------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
   +   06)----------AggregateExec: mode=Partial, gby=[y@1 as y], 
aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)]
   +   07)------------ProjectionExec: expr=[CAST(x@0 AS Float64) as {CAST(t1.x 
AS Float64)|{t1.x}}, y@1 as y]
   +   08)--------------MemoryExec: partitions=1, partition_sizes=[1]
   at test_files/group_by.slt:4184
   
   Error: Execution("9 failures")
   error: test failed, to rerun pass `-p datafusion-sqllogictest --test 
sqllogictests`
   
   Caused by:
     process didn't exit successfully: 
`/Users/andrewlamb/Software/datafusion2/target/debug/deps/sqllogictests-ce3a36cfeab74789`
 (exit status: 1)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to