alamb commented on code in PR #10396: URL: https://github.com/apache/datafusion/pull/10396#discussion_r1592864219
########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -34,90 +33,63 @@ use datafusion_common::{ use datafusion_expr::expr::Alias; use datafusion_expr::logical_plan::{Aggregate, LogicalPlan, Projection, Window}; use datafusion_expr::{col, Expr, ExprSchemable}; +use indexmap::IndexMap; -/// Set of expressions generated by the [`ExprIdentifierVisitor`] -/// and consumed by the [`CommonSubexprRewriter`]. -#[derive(Default)] -struct ExprSet { - /// A map from expression's identifier (stringified expr) to tuple including: - /// - the expression itself (cloned) - /// - counter - /// - DataType of this expression. - /// - symbol used as the identifier in the alias. - map: HashMap<Identifier, (Expr, usize, DataType, Identifier)>, -} - -impl ExprSet { - fn expr_identifier(expr: &Expr) -> Identifier { - format!("{expr}") - } - - fn get(&self, key: &Identifier) -> Option<&(Expr, usize, DataType, Identifier)> { - self.map.get(key) - } - - fn entry( - &mut self, - key: Identifier, - ) -> Entry<'_, Identifier, (Expr, usize, DataType, Identifier)> { - self.map.entry(key) - } - - fn populate_expr_set( - &mut self, - expr: &[Expr], - input_schema: DFSchemaRef, - expr_mask: ExprMask, - ) -> Result<()> { - expr.iter().try_for_each(|e| { - self.expr_to_identifier(e, Arc::clone(&input_schema), expr_mask)?; - - Ok(()) - }) - } - - /// Go through an expression tree and generate identifier for every node in this tree. - fn expr_to_identifier( - &mut self, - expr: &Expr, - input_schema: DFSchemaRef, - expr_mask: ExprMask, - ) -> Result<()> { - expr.visit(&mut ExprIdentifierVisitor { - expr_set: self, - input_schema, - visit_stack: vec![], - node_count: 0, - expr_mask, - })?; - - Ok(()) - } -} - -impl From<Vec<(Identifier, (Expr, usize, DataType, Identifier))>> for ExprSet { - fn from(entries: Vec<(Identifier, (Expr, usize, DataType, Identifier))>) -> Self { - let mut expr_set = Self::default(); - entries.into_iter().for_each(|(k, v)| { - expr_set.map.insert(k, v); - }); - expr_set - } -} - -/// Identifier for each subexpression. +/// Identifier that represents a subexpression tree. /// -/// Note that the current implementation uses the `Display` of an expression -/// (a `String`) as `Identifier`. +/// Note that the current implementation contains: +/// - the `Display` of an expression (a `String`) and +/// - the identifiers of the childrens of the expression Review Comment: Do we need to form such complicated identifiers -- what if we simply use `#{}` like in https://github.com/apache/datafusion/pull/10333 as suggested by @MohamedAbdeen21 in https://github.com/apache/datafusion/pull/10396#issuecomment-2096811278? ########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -789,6 +861,74 @@ mod test { assert_eq!(expected, formatted_plan); } + #[test] + fn id_array_visitor() -> Result<()> { Review Comment: This is really nice to see more of how this code works 👍 ########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -34,90 +33,63 @@ use datafusion_common::{ use datafusion_expr::expr::Alias; use datafusion_expr::logical_plan::{Aggregate, LogicalPlan, Projection, Window}; use datafusion_expr::{col, Expr, ExprSchemable}; +use indexmap::IndexMap; -/// Set of expressions generated by the [`ExprIdentifierVisitor`] -/// and consumed by the [`CommonSubexprRewriter`]. -#[derive(Default)] -struct ExprSet { - /// A map from expression's identifier (stringified expr) to tuple including: - /// - the expression itself (cloned) - /// - counter - /// - DataType of this expression. - /// - symbol used as the identifier in the alias. - map: HashMap<Identifier, (Expr, usize, DataType, Identifier)>, -} - -impl ExprSet { - fn expr_identifier(expr: &Expr) -> Identifier { - format!("{expr}") - } - - fn get(&self, key: &Identifier) -> Option<&(Expr, usize, DataType, Identifier)> { - self.map.get(key) - } - - fn entry( - &mut self, - key: Identifier, - ) -> Entry<'_, Identifier, (Expr, usize, DataType, Identifier)> { - self.map.entry(key) - } - - fn populate_expr_set( - &mut self, - expr: &[Expr], - input_schema: DFSchemaRef, - expr_mask: ExprMask, - ) -> Result<()> { - expr.iter().try_for_each(|e| { - self.expr_to_identifier(e, Arc::clone(&input_schema), expr_mask)?; - - Ok(()) - }) - } - - /// Go through an expression tree and generate identifier for every node in this tree. - fn expr_to_identifier( - &mut self, - expr: &Expr, - input_schema: DFSchemaRef, - expr_mask: ExprMask, - ) -> Result<()> { - expr.visit(&mut ExprIdentifierVisitor { - expr_set: self, - input_schema, - visit_stack: vec![], - node_count: 0, - expr_mask, - })?; - - Ok(()) - } -} - -impl From<Vec<(Identifier, (Expr, usize, DataType, Identifier))>> for ExprSet { - fn from(entries: Vec<(Identifier, (Expr, usize, DataType, Identifier))>) -> Self { - let mut expr_set = Self::default(); - entries.into_iter().for_each(|(k, v)| { - expr_set.map.insert(k, v); - }); - expr_set - } -} - -/// Identifier for each subexpression. +/// Identifier that represents a subexpression tree. /// -/// Note that the current implementation uses the `Display` of an expression -/// (a `String`) as `Identifier`. +/// Note that the current implementation contains: +/// - the `Display` of an expression (a `String`) and +/// - the identifiers of the childrens of the expression +/// concatenated. /// /// An identifier should (ideally) be able to "hash", "accumulate", "equal" and "have no /// collision (as low as possible)" /// /// Since an identifier is likely to be copied many times, it is better that an identifier -/// is small or "copy". otherwise some kinds of reference count is needed. String description -/// here is not such a good choose. +/// is small or "copy". otherwise some kinds of reference count is needed. String +/// description here is not such a good choose. type Identifier = String; +/// A cache that contains the postorder index and the identifier of expression tree nodes +/// by the preorder index of the nodes. +/// +/// This cache is filled by `ExprIdentifierVisitor` during the first traversal and is used +/// by `CommonSubexprRewriter` during the second traversal. +/// +/// The purpose of this cache is to quickly find the identifier of a node during the +/// second traversal. +/// +/// Elements in this array are added during `f_down` so the indexes represent the preorder +/// index of expression nodes and thus element 0 belongs to the root of the expression +/// tree. +/// The elements of the array are tuples that contain: +/// - Postorder index that belongs to the preorder index. Assigned during `f_up`, start +/// from 0. +/// - Identifier of the expression. If empty (`""`), expr should not be considered for +/// CSE. +/// +/// # Example +/// An expression like `(a + b)` would have the following `IdArray`: +/// ```text +/// [ +/// (2, "a + b"), +/// (1, "a"), +/// (0, "b") +/// ] +/// ``` +type IdArray = Vec<(usize, Identifier)>; + +/// A map that contains statistics of expressions by their identifiers. +/// It contains: +/// - The number of occurrences and +/// - The DataType +/// of an expression. +type ExprStats = HashMap<Identifier, (usize, DataType)>; Review Comment: Nit the code might be easier to follow if this a proper struct and move the manipulation functions (like `to_arrays` for example) into methods Maybe as a follow on PR ########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -658,55 +686,89 @@ impl TreeNodeVisitor for ExprIdentifierVisitor<'_> { type Node = Expr; fn f_down(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> { - // related to https://github.com/apache/datafusion/issues/8814 + // related to https://github.com/apache/arrow-datafusion/issues/8814 // If the expr contain volatile expression or is a short-circuit expression, skip it. + // TODO: propagate is_volatile state bottom-up + consider non-volatile sub-expressions for CSE + // TODO: consider surely executed children of "short circuited"s for CSE if expr.short_circuits() || expr.is_volatile()? { - self.visit_stack - .push(VisitRecord::JumpMark(self.node_count)); - return Ok(TreeNodeRecursion::Jump); // go to f_up + self.visit_stack.push(VisitRecord::JumpMark); + + return Ok(TreeNodeRecursion::Jump); } + self.id_array.push((0, "".to_string())); self.visit_stack - .push(VisitRecord::EnterMark(self.node_count)); - self.node_count += 1; + .push(VisitRecord::EnterMark(self.down_index)); + self.down_index += 1; Ok(TreeNodeRecursion::Continue) } fn f_up(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> { - let (_idx, sub_expr_identifier) = self.pop_enter_mark(); - - // skip exprs should not be recognize. - if self.expr_mask.ignores(expr) { - let curr_expr_identifier = ExprSet::expr_identifier(expr); - self.visit_stack - .push(VisitRecord::ExprItem(curr_expr_identifier)); + let Some((down_index, sub_expr_id)) = self.pop_enter_mark() else { return Ok(TreeNodeRecursion::Continue); - } - let curr_expr_identifier = ExprSet::expr_identifier(expr); - let alias_symbol = format!("{curr_expr_identifier}{sub_expr_identifier}"); + }; - self.visit_stack - .push(VisitRecord::ExprItem(alias_symbol.clone())); + let expr_id = expr_identifier(expr, sub_expr_id); - let data_type = expr.get_type(&self.input_schema)?; + self.id_array[down_index].0 = self.up_index; + if !self.expr_mask.ignores(expr) { + self.id_array[down_index].1.clone_from(&expr_id); + + // TODO: can we capture the data type in the second traversal only for + // replaced expressions? + let data_type = expr.get_type(&self.input_schema)?; + let (count, _) = self + .expr_stats + .entry(expr_id.clone()) + .or_insert((0, data_type)); + *count += 1; + } + self.visit_stack.push(VisitRecord::ExprItem(expr_id)); + self.up_index += 1; - self.expr_set - .entry(curr_expr_identifier) - .or_insert_with(|| (expr.clone(), 0, data_type, alias_symbol)) - .1 += 1; Ok(TreeNodeRecursion::Continue) } } -/// Rewrite expression by common sub-expression with a corresponding temporary -/// column name that will compute the subexpression. -/// -/// `affected_id` is updated with any sub expressions that were replaced +fn expr_identifier(expr: &Expr, sub_expr_identifier: Identifier) -> Identifier { + format!("{{{expr}{sub_expr_identifier}}}") Review Comment: see comments about identifiers elsewhere ########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -720,43 +782,53 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> { return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)); } - let curr_id = &ExprSet::expr_identifier(&expr); - - // lookup previously visited expression - match self.expr_set.get(curr_id) { - Some((_, counter, _, symbol)) => { - // if has a commonly used (a.k.a. 1+ use) expr - if *counter > 1 { - self.affected_id.insert(curr_id.clone()); - - let expr_name = expr.display_name()?; - // Alias this `Column` expr to it original "expr name", - // `projection_push_down` optimizer use "expr name" to eliminate useless - // projections. - Ok(Transformed::new( - col(symbol).alias(expr_name), - true, - TreeNodeRecursion::Jump, - )) - } else { - Ok(Transformed::no(expr)) - } + let (up_index, expr_id) = &self.id_array[self.down_index]; + self.down_index += 1; + + // skip `Expr`s without identifier (empty identifier). + if expr_id.is_empty() { + return Ok(Transformed::no(expr)); + } + + let (counter, _) = self.expr_stats.get(expr_id).unwrap(); + if *counter > 1 { + // step index to skip all sub-node (which has smaller series number). + while self.down_index < self.id_array.len() + && self.id_array[self.down_index].0 < *up_index + { + self.down_index += 1; } - None => Ok(Transformed::no(expr)), + + let expr_name = expr.display_name()?; + self.common_exprs.insert(expr_id.clone(), expr); + // Alias this `Column` expr to it original "expr name", + // `projection_push_down` optimizer use "expr name" to eliminate useless + // projections. + // TODO: do we really need to alias here? Review Comment: FWIW I removed the alias and several tests failed: ``` $ cargo test --test sqllogictests Compiling datafusion-optimizer v37.1.0 (/Users/andrewlamb/Software/datafusion2/datafusion/optimizer) warning: unused variable: `expr_name` --> datafusion/optimizer/src/common_subexpr_eliminate.rs:802:17 | 802 | let expr_name = expr.display_name()?; | ^^^^^^^^^ help: if this is intentional, prefix it with an underscore: `_expr_name` | = note: `#[warn(unused_variables)]` on by default Compiling datafusion v37.1.0 (/Users/andrewlamb/Software/datafusion2/datafusion/core) warning: `datafusion-optimizer` (lib) generated 1 warning ... Running "map.slt" External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed caused by Schema error: No field named "log(unsigned_integers.b)". Valid fields are a, "log({CAST(unsigned_integers.b AS Float32)|{unsigned_integers.b}})", "log(Int64(10),unsigned_integers.b)". [SQL] select log(a, 64) a, log(b), log(10, b) from unsigned_integers; at test_files/scalar.slt:592 External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed caused by Schema error: No field named "aggregate_test_100.c2 % Int64(2) = Int64(0)". Valid fields are "{CAST(aggregate_test_100.c2 AS Int64) % Int64(2) = Int64(0)|{Int64(0)}|{CAST(aggregate_test_100.c2 AS Int64) % Int64(2)|{Int64(2)}|{CAST(aggregate_test_100.c2 AS Int64)|{aggregate_test_100.c2}}}}", "FIRST_VALUE(aggregate_test_100.c2) ORDER BY [CAST(aggregate_test_100.c2 AS Int64) % Int64(2) = Int64(0) ASC NULLS LAST, aggregate_test_100.c3 DESC NULLS FIRST]", "FIRST_VALUE(aggregate_test_100.c3 - Int64(100)) ORDER BY [CAST(aggregate_test_100.c2 AS Int64) % Int64(2) = Int64(0) ASC NULLS LAST, aggregate_test_100.c3 DESC NULLS FIRST]". [SQL] SELECT DISTINCT ON (c2 % 2 = 0) c2, c3 - 100 FROM aggregate_test_100 ORDER BY c2 % 2 = 0, c3 DESC; at test_files/distinct_on.slt:116 External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed caused by Schema error: No field named "acos(round(Float64(1) / doubles.f64))". Valid fields are doubles.f64, i64_1, "acos({round(Float64(1) / doubles.f64)|{Float64(1) / doubles.f64|{doubles.f64}|{Float64(1)}}})". [SQL] select f64, round(1.0 / f64) as i64_1, acos(round(1.0 / f64)) from doubles; at test_files/expr.slt:2272 External error: query result mismatch: [SQL] explain select a/2, a/2 + 1 from t [Diff] (-expected|+actual) - logical_plan - 01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1) - 02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}} - 03)----TableScan: t projection=[a] at test_files/subquery.slt:1071 External error: query result mismatch: [SQL] EXPLAIN SELECT x/2, x/2+1 FROM t; [Diff] (-expected|+actual) - logical_plan - 01)Projection: {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2) + Int64(1) - 02)--Projection: t.x / Int64(2) AS {t.x / Int64(2)|{Int64(2)}|{t.x}} - 03)----TableScan: t projection=[x] - physical_plan - 01)ProjectionExec: expr=[{t.x / Int64(2)|{Int64(2)}|{t.x}}@0 as t.x / Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}}@0 + 1 as t.x / Int64(2) + Int64(1)] - 02)--ProjectionExec: expr=[x@0 / 2 as {t.x / Int64(2)|{Int64(2)}|{t.x}}] - 03)----MemoryExec: partitions=1, partition_sizes=[1] at test_files/select.slt:1425 External error: query result mismatch: [SQL] EXPLAIN SELECT c3, SUM(c9) OVER(ORDER BY c3+c4 DESC, c9 DESC, c2 ASC) as sum1, SUM(c9) OVER(ORDER BY c3+c4 ASC, c9 ASC ) as sum2 FROM aggregate_test_100 LIMIT 5 [Diff] (-expected|+actual) logical_plan 01)Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2 02)--Limit: skip=0, fetch=5 - 03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] + 03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] 04)------Projection: {aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - 05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] + 05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] 06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS {aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, aggregate_test_100.c2, aggregate_test_100.c3, aggregate_test_100.c9 07)------------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9] physical_plan 01)ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2] 02)--GlobalLimitExec: skip=0, fetch=5 03)----WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int16(NULL)), is_causal: false }] 04)------ProjectionExec: expr=[{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 as {aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c3@2 as c3, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 05)--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted] 06)----------SortPreservingMergeExec: [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 DESC,c9@3 DESC,c2@1 ASC NULLS LAST] 07)------------SortExec: expr=[{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 DESC,c9@3 DESC,c2@1 ASC NULLS LAST], preserve_partitioning=[true] 08)--------------ProjectionExec: expr=[c3@1 + c4@2 as {aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c2@0 as c2, c3@1 as c3, c9@3 as c9] 09)----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 10)------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c4, c9], has_header=true at test_files/window.slt:1680 External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed caused by Schema error: No field named "hits.ClientIP - Int64(1)". Valid fields are hits."ClientIP", "{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(1)", "{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(2)", "{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(3)", "COUNT(*)". [SQL] SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10; at test_files/clickbench.slt:241 External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed caused by Schema error: No field named "value_dict.x_dict % Int64(2)". Valid fields are "{CAST(value_dict.x_dict AS Int64)|{value_dict.x_dict}} % Int64(2)", "SUM(value_dict.x_dict)". [SQL] select sum(x_dict) from value_dict group by x_dict % 2 order by sum(x_dict); at test_files/aggregate.slt:2696 External error: query result mismatch: [SQL] EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT CAST(x AS DOUBLE)) FROM t1 GROUP BY y; [Diff] (-expected|+actual) logical_plan - 01)Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT t1.x) - 02)--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]] - 03)----Aggregate: groupBy=[[t1.y, {CAST(t1.x AS Float64)|{t1.x}} AS t1.x AS alias1]], aggr=[[]] - 04)------Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS Float64)|{t1.x}}, t1.y - 05)--------TableScan: t1 projection=[x, y] + 01)Projection: SUM(DISTINCT t1.x), MAX(DISTINCT t1.x) + 02)--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(DISTINCT {CAST(t1.x AS Float64)|{t1.x}}) AS SUM(DISTINCT t1.x), MAX(DISTINCT {CAST(t1.x AS Float64)|{t1.x}}) AS MAX(DISTINCT t1.x)]] + 03)----Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS Float64)|{t1.x}}, t1.y + 04)------TableScan: t1 projection=[x, y] physical_plan - 01)ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as MAX(DISTINCT t1.x)] - 02)--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)] + 01)ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)@2 as MAX(DISTINCT t1.x)] + 02)--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)] 03)----CoalesceBatchesExec: target_batch_size=2 04)------RepartitionExec: partitioning=Hash([y@0], 8), input_partitions=8 - 05)--------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)] - 06)----------AggregateExec: mode=FinalPartitioned, gby=[y@0 as y, alias1@1 as alias1], aggr=[] - 07)------------CoalesceBatchesExec: target_batch_size=2 - 08)--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8), input_partitions=8 - 09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 - 10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, {CAST(t1.x AS Float64)|{t1.x}}@0 as alias1], aggr=[] - 11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as {CAST(t1.x AS Float64)|{t1.x}}, y@1 as y] - 12)----------------------MemoryExec: partitions=1, partition_sizes=[1] + 05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + 06)----------AggregateExec: mode=Partial, gby=[y@1 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)] + 07)------------ProjectionExec: expr=[CAST(x@0 AS Float64) as {CAST(t1.x AS Float64)|{t1.x}}, y@1 as y] + 08)--------------MemoryExec: partitions=1, partition_sizes=[1] at test_files/group_by.slt:4184 Error: Execution("9 failures") error: test failed, to rerun pass `-p datafusion-sqllogictest --test sqllogictests` Caused by: process didn't exit successfully: `/Users/andrewlamb/Software/datafusion2/target/debug/deps/sqllogictests-ce3a36cfeab74789` (exit status: 1) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org