alamb commented on code in PR #10396:
URL: https://github.com/apache/datafusion/pull/10396#discussion_r1592864219
##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -34,90 +33,63 @@ use datafusion_common::{
use datafusion_expr::expr::Alias;
use datafusion_expr::logical_plan::{Aggregate, LogicalPlan, Projection,
Window};
use datafusion_expr::{col, Expr, ExprSchemable};
+use indexmap::IndexMap;
-/// Set of expressions generated by the [`ExprIdentifierVisitor`]
-/// and consumed by the [`CommonSubexprRewriter`].
-#[derive(Default)]
-struct ExprSet {
- /// A map from expression's identifier (stringified expr) to tuple
including:
- /// - the expression itself (cloned)
- /// - counter
- /// - DataType of this expression.
- /// - symbol used as the identifier in the alias.
- map: HashMap<Identifier, (Expr, usize, DataType, Identifier)>,
-}
-
-impl ExprSet {
- fn expr_identifier(expr: &Expr) -> Identifier {
- format!("{expr}")
- }
-
- fn get(&self, key: &Identifier) -> Option<&(Expr, usize, DataType,
Identifier)> {
- self.map.get(key)
- }
-
- fn entry(
- &mut self,
- key: Identifier,
- ) -> Entry<'_, Identifier, (Expr, usize, DataType, Identifier)> {
- self.map.entry(key)
- }
-
- fn populate_expr_set(
- &mut self,
- expr: &[Expr],
- input_schema: DFSchemaRef,
- expr_mask: ExprMask,
- ) -> Result<()> {
- expr.iter().try_for_each(|e| {
- self.expr_to_identifier(e, Arc::clone(&input_schema), expr_mask)?;
-
- Ok(())
- })
- }
-
- /// Go through an expression tree and generate identifier for every node
in this tree.
- fn expr_to_identifier(
- &mut self,
- expr: &Expr,
- input_schema: DFSchemaRef,
- expr_mask: ExprMask,
- ) -> Result<()> {
- expr.visit(&mut ExprIdentifierVisitor {
- expr_set: self,
- input_schema,
- visit_stack: vec![],
- node_count: 0,
- expr_mask,
- })?;
-
- Ok(())
- }
-}
-
-impl From<Vec<(Identifier, (Expr, usize, DataType, Identifier))>> for ExprSet {
- fn from(entries: Vec<(Identifier, (Expr, usize, DataType, Identifier))>)
-> Self {
- let mut expr_set = Self::default();
- entries.into_iter().for_each(|(k, v)| {
- expr_set.map.insert(k, v);
- });
- expr_set
- }
-}
-
-/// Identifier for each subexpression.
+/// Identifier that represents a subexpression tree.
///
-/// Note that the current implementation uses the `Display` of an expression
-/// (a `String`) as `Identifier`.
+/// Note that the current implementation contains:
+/// - the `Display` of an expression (a `String`) and
+/// - the identifiers of the childrens of the expression
Review Comment:
Do we need to form such complicated identifiers -- what if we simply use
`#{}` like in https://github.com/apache/datafusion/pull/10333 as suggested by
@MohamedAbdeen21 in
https://github.com/apache/datafusion/pull/10396#issuecomment-2096811278?
##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -789,6 +861,74 @@ mod test {
assert_eq!(expected, formatted_plan);
}
+ #[test]
+ fn id_array_visitor() -> Result<()> {
Review Comment:
This is really nice to see more of how this code works 👍
##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -34,90 +33,63 @@ use datafusion_common::{
use datafusion_expr::expr::Alias;
use datafusion_expr::logical_plan::{Aggregate, LogicalPlan, Projection,
Window};
use datafusion_expr::{col, Expr, ExprSchemable};
+use indexmap::IndexMap;
-/// Set of expressions generated by the [`ExprIdentifierVisitor`]
-/// and consumed by the [`CommonSubexprRewriter`].
-#[derive(Default)]
-struct ExprSet {
- /// A map from expression's identifier (stringified expr) to tuple
including:
- /// - the expression itself (cloned)
- /// - counter
- /// - DataType of this expression.
- /// - symbol used as the identifier in the alias.
- map: HashMap<Identifier, (Expr, usize, DataType, Identifier)>,
-}
-
-impl ExprSet {
- fn expr_identifier(expr: &Expr) -> Identifier {
- format!("{expr}")
- }
-
- fn get(&self, key: &Identifier) -> Option<&(Expr, usize, DataType,
Identifier)> {
- self.map.get(key)
- }
-
- fn entry(
- &mut self,
- key: Identifier,
- ) -> Entry<'_, Identifier, (Expr, usize, DataType, Identifier)> {
- self.map.entry(key)
- }
-
- fn populate_expr_set(
- &mut self,
- expr: &[Expr],
- input_schema: DFSchemaRef,
- expr_mask: ExprMask,
- ) -> Result<()> {
- expr.iter().try_for_each(|e| {
- self.expr_to_identifier(e, Arc::clone(&input_schema), expr_mask)?;
-
- Ok(())
- })
- }
-
- /// Go through an expression tree and generate identifier for every node
in this tree.
- fn expr_to_identifier(
- &mut self,
- expr: &Expr,
- input_schema: DFSchemaRef,
- expr_mask: ExprMask,
- ) -> Result<()> {
- expr.visit(&mut ExprIdentifierVisitor {
- expr_set: self,
- input_schema,
- visit_stack: vec![],
- node_count: 0,
- expr_mask,
- })?;
-
- Ok(())
- }
-}
-
-impl From<Vec<(Identifier, (Expr, usize, DataType, Identifier))>> for ExprSet {
- fn from(entries: Vec<(Identifier, (Expr, usize, DataType, Identifier))>)
-> Self {
- let mut expr_set = Self::default();
- entries.into_iter().for_each(|(k, v)| {
- expr_set.map.insert(k, v);
- });
- expr_set
- }
-}
-
-/// Identifier for each subexpression.
+/// Identifier that represents a subexpression tree.
///
-/// Note that the current implementation uses the `Display` of an expression
-/// (a `String`) as `Identifier`.
+/// Note that the current implementation contains:
+/// - the `Display` of an expression (a `String`) and
+/// - the identifiers of the childrens of the expression
+/// concatenated.
///
/// An identifier should (ideally) be able to "hash", "accumulate", "equal"
and "have no
/// collision (as low as possible)"
///
/// Since an identifier is likely to be copied many times, it is better that
an identifier
-/// is small or "copy". otherwise some kinds of reference count is needed.
String description
-/// here is not such a good choose.
+/// is small or "copy". otherwise some kinds of reference count is needed.
String
+/// description here is not such a good choose.
type Identifier = String;
+/// A cache that contains the postorder index and the identifier of expression
tree nodes
+/// by the preorder index of the nodes.
+///
+/// This cache is filled by `ExprIdentifierVisitor` during the first traversal
and is used
+/// by `CommonSubexprRewriter` during the second traversal.
+///
+/// The purpose of this cache is to quickly find the identifier of a node
during the
+/// second traversal.
+///
+/// Elements in this array are added during `f_down` so the indexes represent
the preorder
+/// index of expression nodes and thus element 0 belongs to the root of the
expression
+/// tree.
+/// The elements of the array are tuples that contain:
+/// - Postorder index that belongs to the preorder index. Assigned during
`f_up`, start
+/// from 0.
+/// - Identifier of the expression. If empty (`""`), expr should not be
considered for
+/// CSE.
+///
+/// # Example
+/// An expression like `(a + b)` would have the following `IdArray`:
+/// ```text
+/// [
+/// (2, "a + b"),
+/// (1, "a"),
+/// (0, "b")
+/// ]
+/// ```
+type IdArray = Vec<(usize, Identifier)>;
+
+/// A map that contains statistics of expressions by their identifiers.
+/// It contains:
+/// - The number of occurrences and
+/// - The DataType
+/// of an expression.
+type ExprStats = HashMap<Identifier, (usize, DataType)>;
Review Comment:
Nit the code might be easier to follow if this a proper struct and move the
manipulation functions (like `to_arrays` for example) into methods
Maybe as a follow on PR
##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -658,55 +686,89 @@ impl TreeNodeVisitor for ExprIdentifierVisitor<'_> {
type Node = Expr;
fn f_down(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
- // related to https://github.com/apache/datafusion/issues/8814
+ // related to https://github.com/apache/arrow-datafusion/issues/8814
// If the expr contain volatile expression or is a short-circuit
expression, skip it.
+ // TODO: propagate is_volatile state bottom-up + consider non-volatile
sub-expressions for CSE
+ // TODO: consider surely executed children of "short circuited"s for
CSE
if expr.short_circuits() || expr.is_volatile()? {
- self.visit_stack
- .push(VisitRecord::JumpMark(self.node_count));
- return Ok(TreeNodeRecursion::Jump); // go to f_up
+ self.visit_stack.push(VisitRecord::JumpMark);
+
+ return Ok(TreeNodeRecursion::Jump);
}
+ self.id_array.push((0, "".to_string()));
self.visit_stack
- .push(VisitRecord::EnterMark(self.node_count));
- self.node_count += 1;
+ .push(VisitRecord::EnterMark(self.down_index));
+ self.down_index += 1;
Ok(TreeNodeRecursion::Continue)
}
fn f_up(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
- let (_idx, sub_expr_identifier) = self.pop_enter_mark();
-
- // skip exprs should not be recognize.
- if self.expr_mask.ignores(expr) {
- let curr_expr_identifier = ExprSet::expr_identifier(expr);
- self.visit_stack
- .push(VisitRecord::ExprItem(curr_expr_identifier));
+ let Some((down_index, sub_expr_id)) = self.pop_enter_mark() else {
return Ok(TreeNodeRecursion::Continue);
- }
- let curr_expr_identifier = ExprSet::expr_identifier(expr);
- let alias_symbol =
format!("{curr_expr_identifier}{sub_expr_identifier}");
+ };
- self.visit_stack
- .push(VisitRecord::ExprItem(alias_symbol.clone()));
+ let expr_id = expr_identifier(expr, sub_expr_id);
- let data_type = expr.get_type(&self.input_schema)?;
+ self.id_array[down_index].0 = self.up_index;
+ if !self.expr_mask.ignores(expr) {
+ self.id_array[down_index].1.clone_from(&expr_id);
+
+ // TODO: can we capture the data type in the second traversal only
for
+ // replaced expressions?
+ let data_type = expr.get_type(&self.input_schema)?;
+ let (count, _) = self
+ .expr_stats
+ .entry(expr_id.clone())
+ .or_insert((0, data_type));
+ *count += 1;
+ }
+ self.visit_stack.push(VisitRecord::ExprItem(expr_id));
+ self.up_index += 1;
- self.expr_set
- .entry(curr_expr_identifier)
- .or_insert_with(|| (expr.clone(), 0, data_type, alias_symbol))
- .1 += 1;
Ok(TreeNodeRecursion::Continue)
}
}
-/// Rewrite expression by common sub-expression with a corresponding temporary
-/// column name that will compute the subexpression.
-///
-/// `affected_id` is updated with any sub expressions that were replaced
+fn expr_identifier(expr: &Expr, sub_expr_identifier: Identifier) -> Identifier
{
+ format!("{{{expr}{sub_expr_identifier}}}")
Review Comment:
see comments about identifiers elsewhere
##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -720,43 +782,53 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump));
}
- let curr_id = &ExprSet::expr_identifier(&expr);
-
- // lookup previously visited expression
- match self.expr_set.get(curr_id) {
- Some((_, counter, _, symbol)) => {
- // if has a commonly used (a.k.a. 1+ use) expr
- if *counter > 1 {
- self.affected_id.insert(curr_id.clone());
-
- let expr_name = expr.display_name()?;
- // Alias this `Column` expr to it original "expr name",
- // `projection_push_down` optimizer use "expr name" to
eliminate useless
- // projections.
- Ok(Transformed::new(
- col(symbol).alias(expr_name),
- true,
- TreeNodeRecursion::Jump,
- ))
- } else {
- Ok(Transformed::no(expr))
- }
+ let (up_index, expr_id) = &self.id_array[self.down_index];
+ self.down_index += 1;
+
+ // skip `Expr`s without identifier (empty identifier).
+ if expr_id.is_empty() {
+ return Ok(Transformed::no(expr));
+ }
+
+ let (counter, _) = self.expr_stats.get(expr_id).unwrap();
+ if *counter > 1 {
+ // step index to skip all sub-node (which has smaller series
number).
+ while self.down_index < self.id_array.len()
+ && self.id_array[self.down_index].0 < *up_index
+ {
+ self.down_index += 1;
}
- None => Ok(Transformed::no(expr)),
+
+ let expr_name = expr.display_name()?;
+ self.common_exprs.insert(expr_id.clone(), expr);
+ // Alias this `Column` expr to it original "expr name",
+ // `projection_push_down` optimizer use "expr name" to eliminate
useless
+ // projections.
+ // TODO: do we really need to alias here?
Review Comment:
FWIW I removed the alias and several tests failed:
```
$ cargo test --test sqllogictests
Compiling datafusion-optimizer v37.1.0
(/Users/andrewlamb/Software/datafusion2/datafusion/optimizer)
warning: unused variable: `expr_name`
--> datafusion/optimizer/src/common_subexpr_eliminate.rs:802:17
|
802 | let expr_name = expr.display_name()?;
| ^^^^^^^^^ help: if this is intentional, prefix it with
an underscore: `_expr_name`
|
= note: `#[warn(unused_variables)]` on by default
Compiling datafusion v37.1.0
(/Users/andrewlamb/Software/datafusion2/datafusion/core)
warning: `datafusion-optimizer` (lib) generated 1 warning
...
Running "map.slt"
External error: query failed: DataFusion error: Optimizer rule
'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "log(unsigned_integers.b)". Valid fields are a,
"log({CAST(unsigned_integers.b AS Float32)|{unsigned_integers.b}})",
"log(Int64(10),unsigned_integers.b)".
[SQL] select log(a, 64) a, log(b), log(10, b) from unsigned_integers;
at test_files/scalar.slt:592
External error: query failed: DataFusion error: Optimizer rule
'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "aggregate_test_100.c2 % Int64(2) = Int64(0)".
Valid fields are "{CAST(aggregate_test_100.c2 AS Int64) % Int64(2) =
Int64(0)|{Int64(0)}|{CAST(aggregate_test_100.c2 AS Int64) %
Int64(2)|{Int64(2)}|{CAST(aggregate_test_100.c2 AS
Int64)|{aggregate_test_100.c2}}}}", "FIRST_VALUE(aggregate_test_100.c2) ORDER
BY [CAST(aggregate_test_100.c2 AS Int64) % Int64(2) = Int64(0) ASC NULLS LAST,
aggregate_test_100.c3 DESC NULLS FIRST]", "FIRST_VALUE(aggregate_test_100.c3 -
Int64(100)) ORDER BY [CAST(aggregate_test_100.c2 AS Int64) % Int64(2) =
Int64(0) ASC NULLS LAST, aggregate_test_100.c3 DESC NULLS FIRST]".
[SQL] SELECT DISTINCT ON (c2 % 2 = 0) c2, c3 - 100 FROM aggregate_test_100
ORDER BY c2 % 2 = 0, c3 DESC;
at test_files/distinct_on.slt:116
External error: query failed: DataFusion error: Optimizer rule
'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "acos(round(Float64(1) / doubles.f64))". Valid
fields are doubles.f64, i64_1, "acos({round(Float64(1) /
doubles.f64)|{Float64(1) / doubles.f64|{doubles.f64}|{Float64(1)}}})".
[SQL] select f64, round(1.0 / f64) as i64_1, acos(round(1.0 / f64)) from
doubles;
at test_files/expr.slt:2272
External error: query result mismatch:
[SQL] explain select a/2, a/2 + 1 from t
[Diff] (-expected|+actual)
- logical_plan
- 01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a
/ Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
- 02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
- 03)----TableScan: t projection=[a]
at test_files/subquery.slt:1071
External error: query result mismatch:
[SQL] EXPLAIN SELECT x/2, x/2+1 FROM t;
[Diff] (-expected|+actual)
- logical_plan
- 01)Projection: {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2), {t.x
/ Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2) + Int64(1)
- 02)--Projection: t.x / Int64(2) AS {t.x / Int64(2)|{Int64(2)}|{t.x}}
- 03)----TableScan: t projection=[x]
- physical_plan
- 01)ProjectionExec: expr=[{t.x / Int64(2)|{Int64(2)}|{t.x}}@0 as t.x /
Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}}@0 + 1 as t.x / Int64(2) + Int64(1)]
- 02)--ProjectionExec: expr=[x@0 / 2 as {t.x / Int64(2)|{Int64(2)}|{t.x}}]
- 03)----MemoryExec: partitions=1, partition_sizes=[1]
at test_files/select.slt:1425
External error: query result mismatch:
[SQL] EXPLAIN SELECT c3,
SUM(c9) OVER(ORDER BY c3+c4 DESC, c9 DESC, c2 ASC) as sum1,
SUM(c9) OVER(ORDER BY c3+c4 ASC, c9 ASC ) as sum2
FROM aggregate_test_100
LIMIT 5
[Diff] (-expected|+actual)
logical_plan
01)Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
02)--Limit: skip=0, fetch=5
- 03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS
aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+ 03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} ASC
NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]
04)------Projection: {aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}},
aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
- 05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS
aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+ 05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} DESC
NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]
06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4
AS {aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}},
aggregate_test_100.c2, aggregate_test_100.c3, aggregate_test_100.c9
07)------------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
physical_plan
01)ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
02)--GlobalLimitExec: skip=0, fetch=5
03)----WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
CurrentRow, end_bound: Following(Int16(NULL)), is_causal: false }]
04)------ProjectionExec: expr=[{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 as
{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c3@2 as
c3, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@4 as SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]
05)--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(Int16(NULL)), end_bound: CurrentRow, is_causal: false }],
mode=[Sorted]
06)----------SortPreservingMergeExec: [{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0
DESC,c9@3 DESC,c2@1 ASC NULLS LAST]
07)------------SortExec: expr=[{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0
DESC,c9@3 DESC,c2@1 ASC NULLS LAST], preserve_partitioning=[true]
08)--------------ProjectionExec: expr=[c3@1 + c4@2 as
{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c2@0 as
c2, c3@1 as c3, c9@3 as c9]
09)----------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
10)------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2,
c3, c4, c9], has_header=true
at test_files/window.slt:1680
External error: query failed: DataFusion error: Optimizer rule
'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "hits.ClientIP - Int64(1)". Valid fields are
hits."ClientIP", "{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(1)",
"{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(2)",
"{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(3)", "COUNT(*)".
[SQL] SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3,
COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2,
"ClientIP" - 3 ORDER BY c DESC LIMIT 10;
at test_files/clickbench.slt:241
External error: query failed: DataFusion error: Optimizer rule
'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "value_dict.x_dict % Int64(2)". Valid fields
are "{CAST(value_dict.x_dict AS Int64)|{value_dict.x_dict}} % Int64(2)",
"SUM(value_dict.x_dict)".
[SQL] select sum(x_dict) from value_dict group by x_dict % 2 order by
sum(x_dict);
at test_files/aggregate.slt:2696
External error: query result mismatch:
[SQL] EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT CAST(x AS
DOUBLE)) FROM t1 GROUP BY y;
[Diff] (-expected|+actual)
logical_plan
- 01)Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS
MAX(DISTINCT t1.x)
- 02)--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]]
- 03)----Aggregate: groupBy=[[t1.y, {CAST(t1.x AS Float64)|{t1.x}} AS t1.x
AS alias1]], aggr=[[]]
- 04)------Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS
Float64)|{t1.x}}, t1.y
- 05)--------TableScan: t1 projection=[x, y]
+ 01)Projection: SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)
+ 02)--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(DISTINCT {CAST(t1.x AS
Float64)|{t1.x}}) AS SUM(DISTINCT t1.x), MAX(DISTINCT {CAST(t1.x AS
Float64)|{t1.x}}) AS MAX(DISTINCT t1.x)]]
+ 03)----Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS
Float64)|{t1.x}}, t1.y
+ 04)------TableScan: t1 projection=[x, y]
physical_plan
- 01)ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x),
MAX(alias1)@2 as MAX(DISTINCT t1.x)]
- 02)--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y],
aggr=[SUM(alias1), MAX(alias1)]
+ 01)ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x),
MAX(DISTINCT t1.x)@2 as MAX(DISTINCT t1.x)]
+ 02)--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y],
aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)]
03)----CoalesceBatchesExec: target_batch_size=2
04)------RepartitionExec: partitioning=Hash([y@0], 8), input_partitions=8
- 05)--------AggregateExec: mode=Partial, gby=[y@0 as y],
aggr=[SUM(alias1), MAX(alias1)]
- 06)----------AggregateExec: mode=FinalPartitioned, gby=[y@0 as y,
alias1@1 as alias1], aggr=[]
- 07)------------CoalesceBatchesExec: target_batch_size=2
- 08)--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8),
input_partitions=8
- 09)----------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
- 10)------------------AggregateExec: mode=Partial, gby=[y@1 as y,
{CAST(t1.x AS Float64)|{t1.x}}@0 as alias1], aggr=[]
- 11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as
{CAST(t1.x AS Float64)|{t1.x}}, y@1 as y]
- 12)----------------------MemoryExec: partitions=1, partition_sizes=[1]
+ 05)--------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
+ 06)----------AggregateExec: mode=Partial, gby=[y@1 as y],
aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)]
+ 07)------------ProjectionExec: expr=[CAST(x@0 AS Float64) as {CAST(t1.x
AS Float64)|{t1.x}}, y@1 as y]
+ 08)--------------MemoryExec: partitions=1, partition_sizes=[1]
at test_files/group_by.slt:4184
Error: Execution("9 failures")
error: test failed, to rerun pass `-p datafusion-sqllogictest --test
sqllogictests`
Caused by:
process didn't exit successfully:
`/Users/andrewlamb/Software/datafusion2/target/debug/deps/sqllogictests-ce3a36cfeab74789`
(exit status: 1)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]