This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0d283a49f5 Fix and improve `CommonSubexprEliminate` rule (#10396)
0d283a49f5 is described below
commit 0d283a49f52d91a58c4e88e3e5e76fe0d59c1260
Author: Peter Toth <[email protected]>
AuthorDate: Wed May 8 19:21:27 2024 +0200
Fix and improve `CommonSubexprEliminate` rule (#10396)
* Revert "fix(9870): common expression elimination optimization, should
always re-find the correct expression during re-write. (#9871)"
This reverts commit cd7a00b08309f7229073e4bba686d6271726ab1c.
* expr id should always contain the full expr structure, cleaner expr ids,
better JumpMark handling, better variable names, code cleanup, some new todos
* move `Expr` from `expr_set`s to `affected_id`s
* better naming, docs fixes
* introduce `CommonExprs` type alias, minor todo fix
* add test
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/expr/src/logical_plan/plan.rs | 2 +-
.../optimizer/src/common_subexpr_eliminate.rs | 711 ++++++++++++---------
datafusion/sqllogictest/test_files/group_by.slt | 8 +-
datafusion/sqllogictest/test_files/select.slt | 24 +-
datafusion/sqllogictest/test_files/subquery.slt | 8 +-
.../sqllogictest/test_files/tpch/q1.slt.part | 6 +-
datafusion/sqllogictest/test_files/window.slt | 126 ++--
7 files changed, 500 insertions(+), 385 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index c608b51e08..1b54e76a17 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -2245,7 +2245,7 @@ impl DistinctOn {
/// Aggregates its input based on a set of grouping and aggregate
/// expressions (e.g. SUM).
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Clone, PartialEq, Eq, Hash)]
// mark non_exhaustive to encourage use of try_new/new()
#[non_exhaustive]
pub struct Aggregate {
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index cb3b4accf3..0475edd659 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -17,7 +17,6 @@
//! [`CommonSubexprEliminate`] to avoid redundant computation of common
sub-expressions
-use std::collections::hash_map::Entry;
use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;
@@ -34,90 +33,63 @@ use datafusion_common::{
use datafusion_expr::expr::Alias;
use datafusion_expr::logical_plan::{Aggregate, LogicalPlan, Projection,
Window};
use datafusion_expr::{col, Expr, ExprSchemable};
+use indexmap::IndexMap;
-/// Set of expressions generated by the [`ExprIdentifierVisitor`]
-/// and consumed by the [`CommonSubexprRewriter`].
-#[derive(Default)]
-struct ExprSet {
- /// A map from expression's identifier (stringified expr) to tuple
including:
- /// - the expression itself (cloned)
- /// - counter
- /// - DataType of this expression.
- /// - symbol used as the identifier in the alias.
- map: HashMap<Identifier, (Expr, usize, DataType, Identifier)>,
-}
-
-impl ExprSet {
- fn expr_identifier(expr: &Expr) -> Identifier {
- format!("{expr}")
- }
-
- fn get(&self, key: &Identifier) -> Option<&(Expr, usize, DataType,
Identifier)> {
- self.map.get(key)
- }
-
- fn entry(
- &mut self,
- key: Identifier,
- ) -> Entry<'_, Identifier, (Expr, usize, DataType, Identifier)> {
- self.map.entry(key)
- }
-
- fn populate_expr_set(
- &mut self,
- expr: &[Expr],
- input_schema: DFSchemaRef,
- expr_mask: ExprMask,
- ) -> Result<()> {
- expr.iter().try_for_each(|e| {
- self.expr_to_identifier(e, Arc::clone(&input_schema), expr_mask)?;
-
- Ok(())
- })
- }
-
- /// Go through an expression tree and generate identifier for every node
in this tree.
- fn expr_to_identifier(
- &mut self,
- expr: &Expr,
- input_schema: DFSchemaRef,
- expr_mask: ExprMask,
- ) -> Result<()> {
- expr.visit(&mut ExprIdentifierVisitor {
- expr_set: self,
- input_schema,
- visit_stack: vec![],
- node_count: 0,
- expr_mask,
- })?;
-
- Ok(())
- }
-}
-
-impl From<Vec<(Identifier, (Expr, usize, DataType, Identifier))>> for ExprSet {
- fn from(entries: Vec<(Identifier, (Expr, usize, DataType, Identifier))>)
-> Self {
- let mut expr_set = Self::default();
- entries.into_iter().for_each(|(k, v)| {
- expr_set.map.insert(k, v);
- });
- expr_set
- }
-}
-
-/// Identifier for each subexpression.
+/// Identifier that represents a subexpression tree.
///
-/// Note that the current implementation uses the `Display` of an expression
-/// (a `String`) as `Identifier`.
+/// Note that the current implementation contains:
+/// - the `Display` of an expression (a `String`) and
+/// - the identifiers of the childrens of the expression
+/// concatenated.
///
/// An identifier should (ideally) be able to "hash", "accumulate", "equal"
and "have no
/// collision (as low as possible)"
///
/// Since an identifier is likely to be copied many times, it is better that
an identifier
-/// is small or "copy". otherwise some kinds of reference count is needed.
String description
-/// here is not such a good choose.
+/// is small or "copy". otherwise some kinds of reference count is needed.
String
+/// description here is not such a good choose.
type Identifier = String;
+/// A cache that contains the postorder index and the identifier of expression
tree nodes
+/// by the preorder index of the nodes.
+///
+/// This cache is filled by `ExprIdentifierVisitor` during the first traversal
and is used
+/// by `CommonSubexprRewriter` during the second traversal.
+///
+/// The purpose of this cache is to quickly find the identifier of a node
during the
+/// second traversal.
+///
+/// Elements in this array are added during `f_down` so the indexes represent
the preorder
+/// index of expression nodes and thus element 0 belongs to the root of the
expression
+/// tree.
+/// The elements of the array are tuples that contain:
+/// - Postorder index that belongs to the preorder index. Assigned during
`f_up`, start
+/// from 0.
+/// - Identifier of the expression. If empty (`""`), expr should not be
considered for
+/// CSE.
+///
+/// # Example
+/// An expression like `(a + b)` would have the following `IdArray`:
+/// ```text
+/// [
+/// (2, "a + b"),
+/// (1, "a"),
+/// (0, "b")
+/// ]
+/// ```
+type IdArray = Vec<(usize, Identifier)>;
+
+/// A map that contains statistics of expressions by their identifiers.
+/// It contains:
+/// - The number of occurrences and
+/// - The DataType
+/// of an expression.
+type ExprStats = HashMap<Identifier, (usize, DataType)>;
+
+/// A map that contains the common expressions extracted during the second,
rewriting
+/// traversal.
+type CommonExprs = IndexMap<Identifier, Expr>;
+
/// Performs Common Sub-expression Elimination optimization.
///
/// This optimization improves query performance by computing expressions that
@@ -150,22 +122,27 @@ impl CommonSubexprEliminate {
/// Rewrites `exprs_list` with common sub-expressions replaced with a new
/// column.
///
- /// `affected_id` is updated with any sub expressions that were replaced.
+ /// `common_exprs` is updated with any sub expressions that were replaced.
///
/// Returns the rewritten expressions
fn rewrite_exprs_list(
&self,
exprs_list: &[&[Expr]],
- expr_set: &ExprSet,
- affected_id: &mut BTreeSet<Identifier>,
+ arrays_list: &[&[Vec<(usize, String)>]],
+ expr_stats: &ExprStats,
+ common_exprs: &mut CommonExprs,
) -> Result<Vec<Vec<Expr>>> {
exprs_list
.iter()
- .map(|exprs| {
+ .zip(arrays_list.iter())
+ .map(|(exprs, arrays)| {
exprs
.iter()
.cloned()
- .map(|expr| replace_common_expr(expr, expr_set,
affected_id))
+ .zip(arrays.iter())
+ .map(|(expr, id_array)| {
+ replace_common_expr(expr, id_array, expr_stats,
common_exprs)
+ })
.collect::<Result<Vec<_>>>()
})
.collect::<Result<Vec<_>>>()
@@ -182,20 +159,26 @@ impl CommonSubexprEliminate {
fn rewrite_expr(
&self,
exprs_list: &[&[Expr]],
+ arrays_list: &[&[Vec<(usize, String)>]],
input: &LogicalPlan,
- expr_set: &ExprSet,
+ expr_stats: &ExprStats,
config: &dyn OptimizerConfig,
) -> Result<(Vec<Vec<Expr>>, LogicalPlan)> {
- let mut affected_id = BTreeSet::<Identifier>::new();
+ let mut common_exprs = IndexMap::new();
- let rewrite_exprs =
- self.rewrite_exprs_list(exprs_list, expr_set, &mut affected_id)?;
+ let rewrite_exprs = self.rewrite_exprs_list(
+ exprs_list,
+ arrays_list,
+ expr_stats,
+ &mut common_exprs,
+ )?;
let mut new_input = self
.try_optimize(input, config)?
.unwrap_or_else(|| input.clone());
- if !affected_id.is_empty() {
- new_input = build_common_expr_project_plan(new_input, affected_id,
expr_set)?;
+ if !common_exprs.is_empty() {
+ new_input =
+ build_common_expr_project_plan(new_input, common_exprs,
expr_stats)?;
}
Ok((rewrite_exprs, new_input))
@@ -207,7 +190,8 @@ impl CommonSubexprEliminate {
config: &dyn OptimizerConfig,
) -> Result<LogicalPlan> {
let mut window_exprs = vec![];
- let mut expr_set = ExprSet::default();
+ let mut arrays_per_window = vec![];
+ let mut expr_stats = ExprStats::new();
// Get all window expressions inside the consecutive window operators.
// Consecutive window expressions may refer to same complex expression.
@@ -226,18 +210,34 @@ impl CommonSubexprEliminate {
plan = input.as_ref().clone();
let input_schema = Arc::clone(input.schema());
- expr_set.populate_expr_set(&window_expr, input_schema,
ExprMask::Normal)?;
+ let arrays = to_arrays(
+ &window_expr,
+ input_schema,
+ &mut expr_stats,
+ ExprMask::Normal,
+ )?;
window_exprs.push(window_expr);
+ arrays_per_window.push(arrays);
}
let mut window_exprs = window_exprs
.iter()
.map(|expr| expr.as_slice())
.collect::<Vec<_>>();
+ let arrays_per_window = arrays_per_window
+ .iter()
+ .map(|arrays| arrays.as_slice())
+ .collect::<Vec<_>>();
- let (mut new_expr, new_input) =
- self.rewrite_expr(&window_exprs, &plan, &expr_set, config)?;
+ assert_eq!(window_exprs.len(), arrays_per_window.len());
+ let (mut new_expr, new_input) = self.rewrite_expr(
+ &window_exprs,
+ &arrays_per_window,
+ &plan,
+ &expr_stats,
+ config,
+ )?;
assert_eq!(window_exprs.len(), new_expr.len());
// Construct consecutive window operator, with their corresponding new
window expressions.
@@ -274,39 +274,49 @@ impl CommonSubexprEliminate {
input,
..
} = aggregate;
- let mut expr_set = ExprSet::default();
+ let mut expr_stats = ExprStats::new();
- // build expr_set, with groupby and aggr
+ // rewrite inputs
let input_schema = Arc::clone(input.schema());
- expr_set.populate_expr_set(
+ let group_arrays = to_arrays(
group_expr,
Arc::clone(&input_schema),
+ &mut expr_stats,
ExprMask::Normal,
)?;
- expr_set.populate_expr_set(aggr_expr, input_schema, ExprMask::Normal)?;
+ let aggr_arrays =
+ to_arrays(aggr_expr, input_schema, &mut expr_stats,
ExprMask::Normal)?;
- // rewrite inputs
- let (mut new_expr, new_input) =
- self.rewrite_expr(&[group_expr, aggr_expr], input, &expr_set,
config)?;
+ let (mut new_expr, new_input) = self.rewrite_expr(
+ &[group_expr, aggr_expr],
+ &[&group_arrays, &aggr_arrays],
+ input,
+ &expr_stats,
+ config,
+ )?;
// note the reversed pop order.
let new_aggr_expr = pop_expr(&mut new_expr)?;
let new_group_expr = pop_expr(&mut new_expr)?;
// create potential projection on top
- let mut expr_set = ExprSet::default();
+ let mut expr_stats = ExprStats::new();
let new_input_schema = Arc::clone(new_input.schema());
- expr_set.populate_expr_set(
+ let aggr_arrays = to_arrays(
&new_aggr_expr,
new_input_schema.clone(),
+ &mut expr_stats,
ExprMask::NormalAndAggregates,
)?;
-
- let mut affected_id = BTreeSet::<Identifier>::new();
- let mut rewritten =
- self.rewrite_exprs_list(&[&new_aggr_expr], &expr_set, &mut
affected_id)?;
+ let mut common_exprs = IndexMap::new();
+ let mut rewritten = self.rewrite_exprs_list(
+ &[&new_aggr_expr],
+ &[&aggr_arrays],
+ &expr_stats,
+ &mut common_exprs,
+ )?;
let rewritten = pop_expr(&mut rewritten)?;
- if affected_id.is_empty() {
+ if common_exprs.is_empty() {
// Alias aggregation expressions if they have changed
let new_aggr_expr = new_aggr_expr
.iter()
@@ -319,19 +329,13 @@ impl CommonSubexprEliminate {
Aggregate::try_new(Arc::new(new_input), new_group_expr,
new_aggr_expr)
.map(LogicalPlan::Aggregate)
} else {
- let mut agg_exprs = vec![];
-
- for id in affected_id {
- match expr_set.get(&id) {
- Some((expr, _, _, symbol)) => {
- // todo: check `nullable`
- agg_exprs.push(expr.clone().alias(symbol.as_str()));
- }
- _ => {
- return internal_err!("expr_set invalid state");
- }
- }
- }
+ let mut agg_exprs = common_exprs
+ .into_iter()
+ .map(|(expr_id, expr)| {
+ // todo: check `nullable`
+ expr.alias(expr_id)
+ })
+ .collect::<Vec<_>>();
let mut proj_exprs = vec![];
for expr in &new_group_expr {
@@ -343,7 +347,7 @@ impl CommonSubexprEliminate {
agg_exprs.push(expr.alias(&name));
proj_exprs.push(Expr::Column(Column::from_name(name)));
} else {
- let id = ExprSet::expr_identifier(&expr_rewritten);
+ let id = expr_identifier(&expr_rewritten,
"".to_string());
let (qualifier, field) =
expr_rewritten.to_field(&new_input_schema)?;
let out_name = qualified_name(qualifier.as_ref(),
field.name());
@@ -379,13 +383,13 @@ impl CommonSubexprEliminate {
let inputs = plan.inputs();
let input = inputs[0];
let input_schema = Arc::clone(input.schema());
- let mut expr_set = ExprSet::default();
+ let mut expr_stats = ExprStats::new();
- // Visit expr list and build expr identifier to occuring count map
(`expr_set`).
- expr_set.populate_expr_set(&expr, input_schema, ExprMask::Normal)?;
+ // Visit expr list and build expr identifier to occuring count map
(`expr_stats`).
+ let arrays = to_arrays(&expr, input_schema, &mut expr_stats,
ExprMask::Normal)?;
let (mut new_expr, new_input) =
- self.rewrite_expr(&[&expr], input, &expr_set, config)?;
+ self.rewrite_expr(&[&expr], &[&arrays], input, &expr_stats,
config)?;
plan.with_new_exprs(pop_expr(&mut new_expr)?, vec![new_input])
}
@@ -471,37 +475,56 @@ fn pop_expr(new_expr: &mut Vec<Vec<Expr>>) ->
Result<Vec<Expr>> {
.ok_or_else(|| DataFusionError::Internal("Failed to pop
expression".to_string()))
}
+fn to_arrays(
+ expr: &[Expr],
+ input_schema: DFSchemaRef,
+ expr_stats: &mut ExprStats,
+ expr_mask: ExprMask,
+) -> Result<Vec<Vec<(usize, String)>>> {
+ expr.iter()
+ .map(|e| {
+ let mut id_array = vec![];
+ expr_to_identifier(
+ e,
+ expr_stats,
+ &mut id_array,
+ Arc::clone(&input_schema),
+ expr_mask,
+ )?;
+
+ Ok(id_array)
+ })
+ .collect::<Result<Vec<_>>>()
+}
+
/// Build the "intermediate" projection plan that evaluates the extracted
common
/// expressions.
///
/// # Arguments
/// input: the input plan
///
-/// affected_id: which common subexpressions were used (and thus are added to
+/// common_exprs: which common subexpressions were used (and thus are added to
/// intermediate projection)
///
-/// expr_set: the set of common subexpressions
+/// expr_stats: the set of common subexpressions
fn build_common_expr_project_plan(
input: LogicalPlan,
- affected_id: BTreeSet<Identifier>,
- expr_set: &ExprSet,
+ common_exprs: CommonExprs,
+ expr_stats: &ExprStats,
) -> Result<LogicalPlan> {
- let mut project_exprs = vec![];
let mut fields_set = BTreeSet::new();
-
- for id in affected_id {
- match expr_set.get(&id) {
- Some((expr, _, data_type, symbol)) => {
- // todo: check `nullable`
- let field = Field::new(&id, data_type.clone(), true);
- fields_set.insert(field.name().to_owned());
- project_exprs.push(expr.clone().alias(symbol.as_str()));
- }
- _ => {
- return internal_err!("expr_set invalid state");
- }
- }
- }
+ let mut project_exprs = common_exprs
+ .into_iter()
+ .map(|(expr_id, expr)| {
+ let Some((_, data_type)) = expr_stats.get(&expr_id) else {
+ return internal_err!("expr_stats invalid state");
+ };
+ // todo: check `nullable`
+ let field = Field::new(&expr_id, data_type.clone(), true);
+ fields_set.insert(field.name().to_owned());
+ Ok(expr.alias(expr_id))
+ })
+ .collect::<Result<Vec<_>>>()?;
for (qualifier, field) in input.schema().iter() {
if fields_set.insert(qualified_name(qualifier, field.name())) {
@@ -610,26 +633,29 @@ impl ExprMask {
/// `Expr` without sub-expr (column, literal etc.) will not have identifier
/// because they should not be recognized as common sub-expr.
struct ExprIdentifierVisitor<'a> {
- // param
- expr_set: &'a mut ExprSet,
- /// input schema for the node that we're optimizing, so we can determine
the correct datatype
- /// for each subexpression
+ // statistics of expressions
+ expr_stats: &'a mut ExprStats,
+ // cache to speed up second traversal
+ id_array: &'a mut IdArray,
+ // input schema for the node that we're optimizing, so we can determine
the correct datatype
+ // for each subexpression
input_schema: DFSchemaRef,
// inner states
visit_stack: Vec<VisitRecord>,
- /// increased in fn_down, start from 0.
- node_count: usize,
- /// which expression should be skipped?
+ // preorder index, start from 0.
+ down_index: usize,
+ // postorder index, start from 0.
+ up_index: usize,
+ // which expression should be skipped?
expr_mask: ExprMask,
}
/// Record item that used when traversing a expression tree.
enum VisitRecord {
- /// `usize` is the monotone increasing series number assigned in
pre_visit().
- /// Starts from 0. Is used to index the identifier array `id_array` in
post_visit().
+ /// `usize` postorder index assigned in `f-down`(). Starts from 0.
EnterMark(usize),
/// the node's children were skipped => jump to f_up on same node
- JumpMark(usize),
+ JumpMark,
/// Accumulated identifier of sub expression.
ExprItem(Identifier),
}
@@ -637,17 +663,19 @@ enum VisitRecord {
impl ExprIdentifierVisitor<'_> {
/// Find the first `EnterMark` in the stack, and accumulates every
`ExprItem`
/// before it.
- fn pop_enter_mark(&mut self) -> (usize, Identifier) {
+ fn pop_enter_mark(&mut self) -> Option<(usize, Identifier)> {
let mut desc = String::new();
while let Some(item) = self.visit_stack.pop() {
match item {
- VisitRecord::EnterMark(idx) | VisitRecord::JumpMark(idx) => {
- return (idx, desc);
+ VisitRecord::EnterMark(idx) => {
+ return Some((idx, desc));
}
VisitRecord::ExprItem(id) => {
+ desc.push('|');
desc.push_str(&id);
}
+ VisitRecord::JumpMark => return None,
}
}
unreachable!("Enter mark should paired with node number");
@@ -658,55 +686,89 @@ impl TreeNodeVisitor for ExprIdentifierVisitor<'_> {
type Node = Expr;
fn f_down(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
- // related to https://github.com/apache/datafusion/issues/8814
+ // related to https://github.com/apache/arrow-datafusion/issues/8814
// If the expr contain volatile expression or is a short-circuit
expression, skip it.
+ // TODO: propagate is_volatile state bottom-up + consider non-volatile
sub-expressions for CSE
+ // TODO: consider surely executed children of "short circuited"s for
CSE
if expr.short_circuits() || expr.is_volatile()? {
- self.visit_stack
- .push(VisitRecord::JumpMark(self.node_count));
- return Ok(TreeNodeRecursion::Jump); // go to f_up
+ self.visit_stack.push(VisitRecord::JumpMark);
+
+ return Ok(TreeNodeRecursion::Jump);
}
+ self.id_array.push((0, "".to_string()));
self.visit_stack
- .push(VisitRecord::EnterMark(self.node_count));
- self.node_count += 1;
+ .push(VisitRecord::EnterMark(self.down_index));
+ self.down_index += 1;
Ok(TreeNodeRecursion::Continue)
}
fn f_up(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
- let (_idx, sub_expr_identifier) = self.pop_enter_mark();
-
- // skip exprs should not be recognize.
- if self.expr_mask.ignores(expr) {
- let curr_expr_identifier = ExprSet::expr_identifier(expr);
- self.visit_stack
- .push(VisitRecord::ExprItem(curr_expr_identifier));
+ let Some((down_index, sub_expr_id)) = self.pop_enter_mark() else {
return Ok(TreeNodeRecursion::Continue);
- }
- let curr_expr_identifier = ExprSet::expr_identifier(expr);
- let alias_symbol =
format!("{curr_expr_identifier}{sub_expr_identifier}");
+ };
- self.visit_stack
- .push(VisitRecord::ExprItem(alias_symbol.clone()));
+ let expr_id = expr_identifier(expr, sub_expr_id);
- let data_type = expr.get_type(&self.input_schema)?;
+ self.id_array[down_index].0 = self.up_index;
+ if !self.expr_mask.ignores(expr) {
+ self.id_array[down_index].1.clone_from(&expr_id);
+
+ // TODO: can we capture the data type in the second traversal only
for
+ // replaced expressions?
+ let data_type = expr.get_type(&self.input_schema)?;
+ let (count, _) = self
+ .expr_stats
+ .entry(expr_id.clone())
+ .or_insert((0, data_type));
+ *count += 1;
+ }
+ self.visit_stack.push(VisitRecord::ExprItem(expr_id));
+ self.up_index += 1;
- self.expr_set
- .entry(curr_expr_identifier)
- .or_insert_with(|| (expr.clone(), 0, data_type, alias_symbol))
- .1 += 1;
Ok(TreeNodeRecursion::Continue)
}
}
-/// Rewrite expression by common sub-expression with a corresponding temporary
-/// column name that will compute the subexpression.
-///
-/// `affected_id` is updated with any sub expressions that were replaced
+fn expr_identifier(expr: &Expr, sub_expr_identifier: Identifier) -> Identifier
{
+ format!("{{{expr}{sub_expr_identifier}}}")
+}
+
+/// Go through an expression tree and generate identifier for every node in
this tree.
+fn expr_to_identifier(
+ expr: &Expr,
+ expr_stats: &mut ExprStats,
+ id_array: &mut Vec<(usize, Identifier)>,
+ input_schema: DFSchemaRef,
+ expr_mask: ExprMask,
+) -> Result<()> {
+ expr.visit(&mut ExprIdentifierVisitor {
+ expr_stats,
+ id_array,
+ input_schema,
+ visit_stack: vec![],
+ down_index: 0,
+ up_index: 0,
+ expr_mask,
+ })?;
+
+ Ok(())
+}
+
+/// Rewrite expression by replacing detected common sub-expression with
+/// the corresponding temporary column name. That column contains the
+/// evaluate result of replaced expression.
struct CommonSubexprRewriter<'a> {
- expr_set: &'a ExprSet,
- /// Which identifier is replaced.
- affected_id: &'a mut BTreeSet<Identifier>,
+ // statistics of expressions
+ expr_stats: &'a ExprStats,
+ // cache to speed up second traversal
+ id_array: &'a IdArray,
+ // common expression, that are replaced during the second traversal, are
collected to
+ // this map
+ common_exprs: &'a mut CommonExprs,
+ // preorder index, starts from 0.
+ down_index: usize,
}
impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
@@ -720,43 +782,53 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump));
}
- let curr_id = &ExprSet::expr_identifier(&expr);
-
- // lookup previously visited expression
- match self.expr_set.get(curr_id) {
- Some((_, counter, _, symbol)) => {
- // if has a commonly used (a.k.a. 1+ use) expr
- if *counter > 1 {
- self.affected_id.insert(curr_id.clone());
-
- let expr_name = expr.display_name()?;
- // Alias this `Column` expr to it original "expr name",
- // `projection_push_down` optimizer use "expr name" to
eliminate useless
- // projections.
- Ok(Transformed::new(
- col(symbol).alias(expr_name),
- true,
- TreeNodeRecursion::Jump,
- ))
- } else {
- Ok(Transformed::no(expr))
- }
+ let (up_index, expr_id) = &self.id_array[self.down_index];
+ self.down_index += 1;
+
+ // skip `Expr`s without identifier (empty identifier).
+ if expr_id.is_empty() {
+ return Ok(Transformed::no(expr));
+ }
+
+ let (counter, _) = self.expr_stats.get(expr_id).unwrap();
+ if *counter > 1 {
+ // step index to skip all sub-node (which has smaller series
number).
+ while self.down_index < self.id_array.len()
+ && self.id_array[self.down_index].0 < *up_index
+ {
+ self.down_index += 1;
}
- None => Ok(Transformed::no(expr)),
+
+ let expr_name = expr.display_name()?;
+ self.common_exprs.insert(expr_id.clone(), expr);
+ // Alias this `Column` expr to it original "expr name",
+ // `projection_push_down` optimizer use "expr name" to eliminate
useless
+ // projections.
+ // TODO: do we really need to alias here?
+ Ok(Transformed::new(
+ col(expr_id).alias(expr_name),
+ true,
+ TreeNodeRecursion::Jump,
+ ))
+ } else {
+ Ok(Transformed::no(expr))
}
}
}
/// Replace common sub-expression in `expr` with the corresponding temporary
-/// column name, updating `affected_id` with any replaced expressions
+/// column name, updating `common_exprs` with any replaced expressions
fn replace_common_expr(
expr: Expr,
- expr_set: &ExprSet,
- affected_id: &mut BTreeSet<Identifier>,
+ id_array: &IdArray,
+ expr_stats: &ExprStats,
+ common_exprs: &mut CommonExprs,
) -> Result<Expr> {
expr.rewrite(&mut CommonSubexprRewriter {
- expr_set,
- affected_id,
+ expr_stats,
+ id_array,
+ common_exprs,
+ down_index: 0,
})
.data()
}
@@ -789,6 +861,74 @@ mod test {
assert_eq!(expected, formatted_plan);
}
+ #[test]
+ fn id_array_visitor() -> Result<()> {
+ let expr = ((sum(col("a") + lit(1))) - avg(col("c"))) * lit(2);
+
+ let schema = Arc::new(DFSchema::from_unqualifed_fields(
+ vec![
+ Field::new("a", DataType::Int64, false),
+ Field::new("c", DataType::Int64, false),
+ ]
+ .into(),
+ Default::default(),
+ )?);
+
+ // skip aggregates
+ let mut id_array = vec![];
+ expr_to_identifier(
+ &expr,
+ &mut HashMap::new(),
+ &mut id_array,
+ Arc::clone(&schema),
+ ExprMask::Normal,
+ )?;
+
+ let expected = vec![
+ (8, "{(SUM(a + Int32(1)) - AVG(c)) * Int32(2)|{Int32(2)}|{SUM(a +
Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a + Int32(1))|{a +
Int32(1)|{Int32(1)}|{a}}}}}"),
+ (6, "{SUM(a + Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a +
Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}"),
+ (3, ""),
+ (2, "{a + Int32(1)|{Int32(1)}|{a}}"),
+ (0, ""),
+ (1, ""),
+ (5, ""),
+ (4, ""),
+ (7, "")
+ ]
+ .into_iter()
+ .map(|(number, id)| (number, id.into()))
+ .collect::<Vec<_>>();
+ assert_eq!(expected, id_array);
+
+ // include aggregates
+ let mut id_array = vec![];
+ expr_to_identifier(
+ &expr,
+ &mut HashMap::new(),
+ &mut id_array,
+ Arc::clone(&schema),
+ ExprMask::NormalAndAggregates,
+ )?;
+
+ let expected = vec![
+ (8, "{(SUM(a + Int32(1)) - AVG(c)) * Int32(2)|{Int32(2)}|{SUM(a +
Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a + Int32(1))|{a +
Int32(1)|{Int32(1)}|{a}}}}}"),
+ (6, "{SUM(a + Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a +
Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}"),
+ (3, "{SUM(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}"),
+ (2, "{a + Int32(1)|{Int32(1)}|{a}}"),
+ (0, ""),
+ (1, ""),
+ (5, "{AVG(c)|{c}}"),
+ (4, ""),
+ (7, "")
+ ]
+ .into_iter()
+ .map(|(number, id)| (number, id.into()))
+ .collect::<Vec<_>>();
+ assert_eq!(expected, id_array);
+
+ Ok(())
+ }
+
#[test]
fn tpch_q1_simplified() -> Result<()> {
// SQL:
@@ -811,8 +951,8 @@ mod test {
)?
.build()?;
- let expected = "Aggregate: groupBy=[[]], aggr=[[SUM(test.a * (Int32(1)
- test.b)Int32(1) - test.btest.bInt32(1)test.a AS test.a * Int32(1) - test.b),
SUM(test.a * (Int32(1) - test.b)Int32(1) - test.btest.bInt32(1)test.a AS test.a
* Int32(1) - test.b * (Int32(1) + test.c))]]\
- \n Projection: test.a * (Int32(1) - test.b) AS test.a * (Int32(1) -
test.b)Int32(1) - test.btest.bInt32(1)test.a, test.a, test.b, test.c\
+ let expected = "Aggregate: groupBy=[[]], aggr=[[SUM({test.a *
(Int32(1) - test.b)|{Int32(1) - test.b|{test.b}|{Int32(1)}}|{test.a}} AS test.a
* Int32(1) - test.b), SUM({test.a * (Int32(1) - test.b)|{Int32(1) -
test.b|{test.b}|{Int32(1)}}|{test.a}} AS test.a * Int32(1) - test.b * (Int32(1)
+ test.c))]]\
+ \n Projection: test.a * (Int32(1) - test.b) AS {test.a * (Int32(1) -
test.b)|{Int32(1) - test.b|{test.b}|{Int32(1)}}|{test.a}}, test.a, test.b,
test.c\
\n TableScan: test";
assert_optimized_plan_eq(expected, &plan);
@@ -864,8 +1004,8 @@ mod test {
)?
.build()?;
- let expected = "Projection: AVG(test.a)test.a AS AVG(test.a) AS col1,
AVG(test.a)test.a AS AVG(test.a) AS col2, col3, AVG(test.c) AS AVG(test.c),
my_agg(test.a)test.a AS my_agg(test.a) AS col4, my_agg(test.a)test.a AS
my_agg(test.a) AS col5, col6, my_agg(test.c) AS my_agg(test.c)\
- \n Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS AVG(test.a)test.a,
my_agg(test.a) AS my_agg(test.a)test.a, AVG(test.b) AS col3, AVG(test.c) AS
AVG(test.c), my_agg(test.b) AS col6, my_agg(test.c) AS my_agg(test.c)]]\
+ let expected = "Projection: {AVG(test.a)|{test.a}} AS AVG(test.a) AS
col1, {AVG(test.a)|{test.a}} AS AVG(test.a) AS col2, col3, {AVG(test.c)} AS
AVG(test.c), {my_agg(test.a)|{test.a}} AS my_agg(test.a) AS col4,
{my_agg(test.a)|{test.a}} AS my_agg(test.a) AS col5, col6, {my_agg(test.c)} AS
my_agg(test.c)\
+ \n Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS
{AVG(test.a)|{test.a}}, my_agg(test.a) AS {my_agg(test.a)|{test.a}},
AVG(test.b) AS col3, AVG(test.c) AS {AVG(test.c)}, my_agg(test.b) AS col6,
my_agg(test.c) AS {my_agg(test.c)}]]\
\n TableScan: test";
assert_optimized_plan_eq(expected, &plan);
@@ -883,8 +1023,8 @@ mod test {
)?
.build()?;
- let expected = "Projection: Int32(1) + AVG(test.a)test.a AS
AVG(test.a), Int32(1) - AVG(test.a)test.a AS AVG(test.a), Int32(1) +
my_agg(test.a)test.a AS my_agg(test.a), Int32(1) - my_agg(test.a)test.a AS
my_agg(test.a)\
- \n Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS AVG(test.a)test.a,
my_agg(test.a) AS my_agg(test.a)test.a]]\
+ let expected = "Projection: Int32(1) + {AVG(test.a)|{test.a}} AS
AVG(test.a), Int32(1) - {AVG(test.a)|{test.a}} AS AVG(test.a), Int32(1) +
{my_agg(test.a)|{test.a}} AS my_agg(test.a), Int32(1) -
{my_agg(test.a)|{test.a}} AS my_agg(test.a)\
+ \n Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS
{AVG(test.a)|{test.a}}, my_agg(test.a) AS {my_agg(test.a)|{test.a}}]]\
\n TableScan: test";
assert_optimized_plan_eq(expected, &plan);
@@ -900,9 +1040,7 @@ mod test {
)?
.build()?;
- let expected = "Aggregate: groupBy=[[]], aggr=[[AVG(UInt32(1) +
test.atest.aUInt32(1) AS UInt32(1) + test.a) AS col1, my_agg(UInt32(1) +
test.atest.aUInt32(1) AS UInt32(1) + test.a) AS col2]]\
- \n Projection: UInt32(1) + test.a AS UInt32(1) +
test.atest.aUInt32(1), test.a, test.b, test.c\
- \n TableScan: test";
+ let expected = "Aggregate: groupBy=[[]], aggr=[[AVG({UInt32(1) +
test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS col1, my_agg({UInt32(1)
+ test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS col2]]\n Projection:
UInt32(1) + test.a AS {UInt32(1) + test.a|{test.a}|{UInt32(1)}}, test.a,
test.b, test.c\n TableScan: test";
assert_optimized_plan_eq(expected, &plan);
@@ -917,8 +1055,8 @@ mod test {
)?
.build()?;
- let expected = "Aggregate: groupBy=[[UInt32(1) + test.atest.aUInt32(1)
AS UInt32(1) + test.a]], aggr=[[AVG(UInt32(1) + test.atest.aUInt32(1) AS
UInt32(1) + test.a) AS col1, my_agg(UInt32(1) + test.atest.aUInt32(1) AS
UInt32(1) + test.a) AS col2]]\
- \n Projection: UInt32(1) + test.a AS UInt32(1) +
test.atest.aUInt32(1), test.a, test.b, test.c\
+ let expected = "Aggregate: groupBy=[[{UInt32(1) +
test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a]], aggr=[[AVG({UInt32(1) +
test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS col1, my_agg({UInt32(1)
+ test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS col2]]\
+ \n Projection: UInt32(1) + test.a AS {UInt32(1) +
test.a|{test.a}|{UInt32(1)}}, test.a, test.b, test.c\
\n TableScan: test";
assert_optimized_plan_eq(expected, &plan);
@@ -938,9 +1076,9 @@ mod test {
)?
.build()?;
- let expected = "Projection: UInt32(1) + test.a, UInt32(1) +
AVG(UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + test.a)UInt32(1) +
test.atest.aUInt32(1) AS UInt32(1) + test.a AS AVG(UInt32(1) + test.a) AS col1,
UInt32(1) - AVG(UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) +
test.a)UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + test.a AS AVG(UInt32(1)
+ test.a) AS col2, AVG(UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) +
test.a)UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + [...]
- \n Aggregate: groupBy=[[UInt32(1) + test.atest.aUInt32(1) AS
UInt32(1) + test.a]], aggr=[[AVG(UInt32(1) + test.atest.aUInt32(1) AS UInt32(1)
+ test.a) AS AVG(UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) +
test.a)UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + test.a,
my_agg(UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + test.a) AS
my_agg(UInt32(1) + test.atest.aUInt32(1) AS UInt32(1) + test.a)UInt32(1) +
test.atest.aUInt32(1) AS UInt32(1) + test.a]]\
- \n Projection: UInt32(1) + test.a AS UInt32(1) +
test.atest.aUInt32(1), test.a, test.b, test.c\
+ let expected = "Projection: UInt32(1) + test.a, UInt32(1) +
{AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) +
test.a)|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) +
test.a|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}}} AS AVG(UInt32(1) + test.a)
AS col1, UInt32(1) - {AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS
UInt32(1) + test.a)|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) +
test.a|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}}} AS AVG(UInt32( [...]
+ \n Aggregate: groupBy=[[{UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS
UInt32(1) + test.a]], aggr=[[AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS
UInt32(1) + test.a) AS {AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS
UInt32(1) + test.a)|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) +
test.a|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}}}, my_agg({UInt32(1) +
test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS {my_agg({UInt32(1) +
test.a|{test.a}|{UInt32(1)}} AS UIn [...]
+ \n Projection: UInt32(1) + test.a AS {UInt32(1) +
test.a|{test.a}|{UInt32(1)}}, test.a, test.b, test.c\
\n TableScan: test";
assert_optimized_plan_eq(expected, &plan);
@@ -965,9 +1103,9 @@ mod test {
)?
.build()?;
- let expected = "Projection: table.test.col.a, UInt32(1) +
AVG(UInt32(1) + table.test.col.atable.test.col.aUInt32(1) AS UInt32(1) +
table.test.col.a)UInt32(1) + table.test.col.atable.test.col.aUInt32(1) AS
UInt32(1) + table.test.col.a AS AVG(UInt32(1) + table.test.col.a),
AVG(UInt32(1) + table.test.col.atable.test.col.aUInt32(1) AS UInt32(1) +
table.test.col.a)UInt32(1) + table.test.col.atable.test.col.aUInt32(1) AS
UInt32(1) + table.test.col.a AS AVG(UInt32(1) + table.test.col.a)\
- \n Aggregate: groupBy=[[table.test.col.a]], aggr=[[AVG(UInt32(1) +
table.test.col.atable.test.col.aUInt32(1) AS UInt32(1) + table.test.col.a) AS
AVG(UInt32(1) + table.test.col.atable.test.col.aUInt32(1) AS UInt32(1) +
table.test.col.a)UInt32(1) + table.test.col.atable.test.col.aUInt32(1) AS
UInt32(1) + table.test.col.a]]\
- \n Projection: UInt32(1) + table.test.col.a AS UInt32(1) +
table.test.col.atable.test.col.aUInt32(1), table.test.col.a\
+ let expected = "Projection: table.test.col.a, UInt32(1) +
{AVG({UInt32(1) + table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1)
+ table.test.col.a)|{{UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) +
table.test.col.a|{{UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}}}}} AS AVG(UInt32(1) +
table.test.col.a), {AVG({UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) +
table.test.col.a)|{{UInt32(1) + table.test.co [...]
+ \n Aggregate: groupBy=[[table.test.col.a]], aggr=[[AVG({UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) +
table.test.col.a) AS {AVG({UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) +
table.test.col.a)|{{UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) +
table.test.col.a|{{UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}}}}}]]\
+ \n Projection: UInt32(1) + table.test.col.a AS {UInt32(1) +
table.test.col.a|{table.test.col.a}|{UInt32(1)}}, table.test.col.a\
\n TableScan: table.test";
assert_optimized_plan_eq(expected, &plan);
@@ -986,8 +1124,8 @@ mod test {
])?
.build()?;
- let expected = "Projection: Int32(1) + test.atest.aInt32(1) AS
Int32(1) + test.a AS first, Int32(1) + test.atest.aInt32(1) AS Int32(1) +
test.a AS second\
- \n Projection: Int32(1) + test.a AS Int32(1) + test.atest.aInt32(1),
test.a, test.b, test.c\
+ let expected = "Projection: {Int32(1) + test.a|{test.a}|{Int32(1)}} AS
Int32(1) + test.a AS first, {Int32(1) + test.a|{test.a}|{Int32(1)}} AS Int32(1)
+ test.a AS second\
+ \n Projection: Int32(1) + test.a AS {Int32(1) +
test.a|{test.a}|{Int32(1)}}, test.a, test.b, test.c\
\n TableScan: test";
assert_optimized_plan_eq(expected, &plan);
@@ -1031,35 +1169,28 @@ mod test {
#[test]
fn redundant_project_fields() {
let table_scan = test_table_scan().unwrap();
- let affected_id: BTreeSet<Identifier> =
- ["c+a".to_string(), "b+a".to_string()].into_iter().collect();
- let expr_set_1 = vec![
- (
- "c+a".to_string(),
- (col("c") + col("a"), 1, DataType::UInt32, "c+a".to_string()),
- ),
- (
- "b+a".to_string(),
- (col("b") + col("a"), 1, DataType::UInt32, "b+a".to_string()),
- ),
- ]
- .into();
- let expr_set_2 = vec![
- (
- "c+a".to_string(),
- (col("c+a"), 1, DataType::UInt32, "c+a".to_string()),
- ),
- (
- "b+a".to_string(),
- (col("b+a"), 1, DataType::UInt32, "b+a".to_string()),
- ),
- ]
- .into();
+ let expr_stats_1 = ExprStats::from([
+ ("c+a".to_string(), (1, DataType::UInt32)),
+ ("b+a".to_string(), (1, DataType::UInt32)),
+ ]);
+ let common_exprs_1 = IndexMap::from([
+ ("c+a".to_string(), col("c") + col("a")),
+ ("b+a".to_string(), col("b") + col("a")),
+ ]);
+ let exprs_stats_2 = ExprStats::from([
+ ("c+a".to_string(), (1, DataType::UInt32)),
+ ("b+a".to_string(), (1, DataType::UInt32)),
+ ]);
+ let common_exprs_2 = IndexMap::from([
+ ("c+a".to_string(), col("c+a")),
+ ("b+a".to_string(), col("b+a")),
+ ]);
let project =
- build_common_expr_project_plan(table_scan, affected_id.clone(),
&expr_set_1)
+ build_common_expr_project_plan(table_scan, common_exprs_1,
&expr_stats_1)
.unwrap();
let project_2 =
- build_common_expr_project_plan(project, affected_id,
&expr_set_2).unwrap();
+ build_common_expr_project_plan(project, common_exprs_2,
&exprs_stats_2)
+ .unwrap();
let mut field_set = BTreeSet::new();
for name in project_2.schema().field_names() {
@@ -1076,57 +1207,33 @@ mod test {
.unwrap()
.build()
.unwrap();
- let affected_id: BTreeSet<Identifier> =
- ["test1.c+test1.a".to_string(), "test1.b+test1.a".to_string()]
- .into_iter()
- .collect();
- let expr_set_1 = vec![
- (
- "test1.c+test1.a".to_string(),
- (
- col("test1.c") + col("test1.a"),
- 1,
- DataType::UInt32,
- "test1.c+test1.a".to_string(),
- ),
- ),
- (
- "test1.b+test1.a".to_string(),
- (
- col("test1.b") + col("test1.a"),
- 1,
- DataType::UInt32,
- "test1.b+test1.a".to_string(),
- ),
- ),
- ]
- .into();
- let expr_set_2 = vec![
+ let expr_stats_1 = ExprStats::from([
+ ("test1.c+test1.a".to_string(), (1, DataType::UInt32)),
+ ("test1.b+test1.a".to_string(), (1, DataType::UInt32)),
+ ]);
+ let common_exprs_1 = IndexMap::from([
(
"test1.c+test1.a".to_string(),
- (
- col("test1.c+test1.a"),
- 1,
- DataType::UInt32,
- "test1.c+test1.a".to_string(),
- ),
+ col("test1.c") + col("test1.a"),
),
(
"test1.b+test1.a".to_string(),
- (
- col("test1.b+test1.a"),
- 1,
- DataType::UInt32,
- "test1.b+test1.a".to_string(),
- ),
+ col("test1.b") + col("test1.a"),
),
- ]
- .into();
+ ]);
+ let expr_stats_2 = ExprStats::from([
+ ("test1.c+test1.a".to_string(), (1, DataType::UInt32)),
+ ("test1.b+test1.a".to_string(), (1, DataType::UInt32)),
+ ]);
+ let common_exprs_2 = IndexMap::from([
+ ("test1.c+test1.a".to_string(), col("test1.c+test1.a")),
+ ("test1.b+test1.a".to_string(), col("test1.b+test1.a")),
+ ]);
let project =
- build_common_expr_project_plan(join, affected_id.clone(),
&expr_set_1)
- .unwrap();
+ build_common_expr_project_plan(join, common_exprs_1,
&expr_stats_1).unwrap();
let project_2 =
- build_common_expr_project_plan(project, affected_id,
&expr_set_2).unwrap();
+ build_common_expr_project_plan(project, common_exprs_2,
&expr_stats_2)
+ .unwrap();
let mut field_set = BTreeSet::new();
for name in project_2.schema().field_names() {
@@ -1193,8 +1300,8 @@ mod test {
.build()?;
let expected = "Projection: test.a, test.b, test.c\
- \n Filter: Int32(1) + test.atest.aInt32(1) - Int32(10) > Int32(1) +
test.atest.aInt32(1)\
- \n Projection: Int32(1) + test.a AS Int32(1) +
test.atest.aInt32(1), test.a, test.b, test.c\
+ \n Filter: {Int32(1) + test.a|{test.a}|{Int32(1)}} - Int32(10) >
{Int32(1) + test.a|{test.a}|{Int32(1)}}\
+ \n Projection: Int32(1) + test.a AS {Int32(1) +
test.a|{test.a}|{Int32(1)}}, test.a, test.b, test.c\
\n TableScan: test";
assert_optimized_plan_eq(expected, &plan);
diff --git a/datafusion/sqllogictest/test_files/group_by.slt
b/datafusion/sqllogictest/test_files/group_by.slt
index 986a36944f..5a605ea58b 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -4187,8 +4187,8 @@ EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)),
MAX(DISTINCT CAST(x AS DOUBLE))
logical_plan
01)Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT
t1.x)
02)--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]]
-03)----Aggregate: groupBy=[[t1.y, CAST(t1.x AS Float64)t1.x AS t1.x AS
alias1]], aggr=[[]]
-04)------Projection: CAST(t1.x AS Float64) AS CAST(t1.x AS Float64)t1.x, t1.y
+03)----Aggregate: groupBy=[[t1.y, {CAST(t1.x AS Float64)|{t1.x}} AS t1.x AS
alias1]], aggr=[[]]
+04)------Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS Float64)|{t1.x}},
t1.y
05)--------TableScan: t1 projection=[x, y]
physical_plan
01)ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as
MAX(DISTINCT t1.x)]
@@ -4200,8 +4200,8 @@ physical_plan
07)------------CoalesceBatchesExec: target_batch_size=2
08)--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8),
input_partitions=8
09)----------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
-10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, CAST(t1.x AS
Float64)t1.x@0 as alias1], aggr=[]
-11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x
AS Float64)t1.x, y@1 as y]
+10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, {CAST(t1.x AS
Float64)|{t1.x}}@0 as alias1], aggr=[]
+11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as
{CAST(t1.x AS Float64)|{t1.x}}, y@1 as y]
12)----------------------MemoryExec: partitions=1, partition_sizes=[1]
# create an unbounded table that contains ordered timestamp.
diff --git a/datafusion/sqllogictest/test_files/select.slt
b/datafusion/sqllogictest/test_files/select.slt
index f91cfd6be6..24163e37de 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1426,12 +1426,12 @@ query TT
EXPLAIN SELECT x/2, x/2+1 FROM t;
----
logical_plan
-01)Projection: t.x / Int64(2)Int64(2)t.x AS t.x / Int64(2), t.x /
Int64(2)Int64(2)t.x AS t.x / Int64(2) + Int64(1)
-02)--Projection: t.x / Int64(2) AS t.x / Int64(2)Int64(2)t.x
+01)Projection: {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2), {t.x /
Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2) + Int64(1)
+02)--Projection: t.x / Int64(2) AS {t.x / Int64(2)|{Int64(2)}|{t.x}}
03)----TableScan: t projection=[x]
physical_plan
-01)ProjectionExec: expr=[t.x / Int64(2)Int64(2)t.x@0 as t.x / Int64(2), t.x /
Int64(2)Int64(2)t.x@0 + 1 as t.x / Int64(2) + Int64(1)]
-02)--ProjectionExec: expr=[x@0 / 2 as t.x / Int64(2)Int64(2)t.x]
+01)ProjectionExec: expr=[{t.x / Int64(2)|{Int64(2)}|{t.x}}@0 as t.x /
Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}}@0 + 1 as t.x / Int64(2) + Int64(1)]
+02)--ProjectionExec: expr=[x@0 / 2 as {t.x / Int64(2)|{Int64(2)}|{t.x}}]
03)----MemoryExec: partitions=1, partition_sizes=[1]
query II
@@ -1444,12 +1444,12 @@ query TT
EXPLAIN SELECT abs(x), abs(x) + abs(y) FROM t;
----
logical_plan
-01)Projection: abs(t.x)t.x AS abs(t.x), abs(t.x)t.x AS abs(t.x) + abs(t.y)
-02)--Projection: abs(t.x) AS abs(t.x)t.x, t.y
+01)Projection: {abs(t.x)|{t.x}} AS abs(t.x), {abs(t.x)|{t.x}} AS abs(t.x) +
abs(t.y)
+02)--Projection: abs(t.x) AS {abs(t.x)|{t.x}}, t.y
03)----TableScan: t projection=[x, y]
physical_plan
-01)ProjectionExec: expr=[abs(t.x)t.x@0 as abs(t.x), abs(t.x)t.x@0 + abs(y@1)
as abs(t.x) + abs(t.y)]
-02)--ProjectionExec: expr=[abs(x@0) as abs(t.x)t.x, y@1 as y]
+01)ProjectionExec: expr=[{abs(t.x)|{t.x}}@0 as abs(t.x), {abs(t.x)|{t.x}}@0 +
abs(y@1) as abs(t.x) + abs(t.y)]
+02)--ProjectionExec: expr=[abs(x@0) as {abs(t.x)|{t.x}}, y@1 as y]
03)----MemoryExec: partitions=1, partition_sizes=[1]
query II
@@ -1613,6 +1613,14 @@ select count(1) from v;
----
1
+# Ensure CSE resolves columns correctly
+# should be 3, not 1
+# https://github.com/apache/datafusion/issues/10413
+query I
+select a + b from (select 1 as a, 2 as b, 1 as "a + b");
+----
+1
+
# run below query without logical optimizations
statement ok
set datafusion.optimizer.max_passes=0;
diff --git a/datafusion/sqllogictest/test_files/subquery.slt
b/datafusion/sqllogictest/test_files/subquery.slt
index 7196418af1..085f192a5d 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -1072,8 +1072,8 @@ query TT
explain select a/2, a/2 + 1 from t
----
logical_plan
-01)Projection: t.a / Int64(2)Int64(2)t.a AS t.a / Int64(2), t.a /
Int64(2)Int64(2)t.a AS t.a / Int64(2) + Int64(1)
-02)--Projection: t.a / Int64(2) AS t.a / Int64(2)Int64(2)t.a
+01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a /
Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
+02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
03)----TableScan: t projection=[a]
statement ok
@@ -1083,8 +1083,8 @@ query TT
explain select a/2, a/2 + 1 from t
----
logical_plan
-01)Projection: t.a / Int64(2)Int64(2)t.a AS t.a / Int64(2), t.a /
Int64(2)Int64(2)t.a AS t.a / Int64(2) + Int64(1)
-02)--Projection: t.a / Int64(2) AS t.a / Int64(2)Int64(2)t.a
+01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a /
Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
+02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
03)----TableScan: t projection=[a]
###
diff --git a/datafusion/sqllogictest/test_files/tpch/q1.slt.part
b/datafusion/sqllogictest/test_files/tpch/q1.slt.part
index cc71167537..f2c14f2628 100644
--- a/datafusion/sqllogictest/test_files/tpch/q1.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q1.slt.part
@@ -42,8 +42,8 @@ explain select
logical_plan
01)Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS
LAST
02)--Projection: lineitem.l_returnflag, lineitem.l_linestatus,
SUM(lineitem.l_quantity) AS sum_qty, SUM(lineitem.l_extendedprice) AS
sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
AS sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge,
AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS
avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(*) AS count_order
-03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]],
aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice),
SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount)Decimal128(Some(1),20,0) -
lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice
AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount)
AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount),
SUM(lineitem.l [...]
-04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount) AS lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount)Decimal128(Some(1),20,0) -
lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice,
lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount,
lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus
+03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]],
aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice),
SUM({lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount)|{Decimal128(Some(1),20,0) -
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}}
AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount)
AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), [...]
+04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount) AS {lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount)|{Decimal128(Some(1),20,0) -
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}},
lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount,
lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus
05)--------Filter: lineitem.l_shipdate <= Date32("10471")
06)----------TableScan: lineitem projection=[l_quantity, l_extendedprice,
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate],
partial_filters=[lineitem.l_shipdate <= Date32("10471")]
physical_plan
@@ -54,7 +54,7 @@ physical_plan
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([l_returnflag@0,
l_linestatus@1], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[l_returnflag@5 as
l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity),
SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity),
AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(*)]
-08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 -
l_discount@2) as lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount)Decimal128(Some(1),20,0) -
lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice,
l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2
as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5
as l_linestatus]
+08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 -
l_discount@2) as {lineitem.l_extendedprice * (Decimal128(Some(1),20,0) -
lineitem.l_discount)|{Decimal128(Some(1),20,0) -
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}},
l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2
as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5
as l_linestatus]
09)----------------CoalesceBatchesExec: target_batch_size=8192
10)------------------FilterExec: l_shipdate@6 <= 10471
11)--------------------CsvExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_quantity, l_extendedprice, l_discount, l_tax, l [...]
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 924bc16348..af09e644c9 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1687,20 +1687,20 @@ EXPLAIN SELECT c3,
logical_plan
01)Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
02)--Limit: skip=0, fetch=5
-03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 +
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3 AS
aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURR [...]
-04)------Projection: aggregate_test_100.c3 +
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3,
aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 +
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3 AS
aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS [...]
-06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS
aggregate_test_100.c3 +
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3,
aggregate_test_100.c2, aggregate_test_100.c3, aggregate_test_100.c9
+03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS
aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING [...]
+04)------Projection: {aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}},
aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS
aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DE [...]
+06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS
{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}},
aggregate_test_100.c2, aggregate_test_100.c3, aggregate_test_100.c9
07)------------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
physical_plan
01)ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
02)--GlobalLimitExec: skip=0, fetch=5
03)----WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered:
false, [...]
-04)------ProjectionExec: expr=[aggregate_test_100.c3 +
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3@0 as
aggregate_test_100.c3 +
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3, c3@2 as c3,
c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@4 as SUM(aggregate_test_10 [...]
+04)------ProjectionExec: expr=[{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 as
{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c3@2 as
c3, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@4 as SUM(a [...]
05)--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRE [...]
-06)----------SortPreservingMergeExec: [aggregate_test_100.c3 +
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3@0 DESC,c9@3
DESC,c2@1 ASC NULLS LAST]
-07)------------SortExec: expr=[aggregate_test_100.c3 +
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3@0 DESC,c9@3
DESC,c2@1 ASC NULLS LAST], preserve_partitioning=[true]
-08)--------------ProjectionExec: expr=[c3@1 + c4@2 as aggregate_test_100.c3 +
aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3, c2@0 as c2,
c3@1 as c3, c9@3 as c9]
+06)----------SortPreservingMergeExec: [{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0
DESC,c9@3 DESC,c2@1 ASC NULLS LAST]
+07)------------SortExec: expr=[{aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0
DESC,c9@3 DESC,c2@1 ASC NULLS LAST], preserve_partitioning=[true]
+08)--------------ProjectionExec: expr=[c3@1 + c4@2 as {aggregate_test_100.c3 +
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c2@0 as
c2, c3@1 as c3, c9@3 as c9]
09)----------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
10)------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2,
c3, c4, c9], has_header=true
@@ -2544,11 +2544,11 @@ logical_plan
02)--Limit: skip=0, fetch=5
03)----Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
04)------Projection: SUM(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING AS sum1, SUM(annotated_data_finite.desc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1
FOLLOWING AS sum2, SUM(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10
FOLLOWING AS sum3, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_dat
[...]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.desc_col AS
Int64)annotated_data_finite.desc_col AS annotated_data_finite.desc_col) ROWS
BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) ROWS BETWEEN 8 PRECEDING
AND 1 FOLLOWING AS COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
-06)----------Projection: CAST(annotated_data_finite.desc_col AS
Int64)annotated_data_finite.desc_col, annotated_data_finite.inc_col,
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING,
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING,
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] ROWS BE [...]
-07)------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col
AS Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS
Int64)annotated_data_finite.desc_col AS annotated_data_finite.desc_col) ORDER
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1
FOLLOWING, SUM(CAST(annotated_data_finite.inc_c [...]
-08)--------------WindowAggr:
windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS
Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4
FOLLOWING, SUM(CAST(annotated_data_finite.desc_col AS
Int64)annotated_data_finite.desc_col AS annotated_data_finite.desc_col) ORDER
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8
FOLLOWING, SUM(CAST(annotated_data_finite. [...]
-09)----------------Projection: CAST(annotated_data_finite.desc_col AS Int64)
AS CAST(annotated_data_finite.desc_col AS Int64)annotated_data_finite.desc_col,
CAST(annotated_data_finite.inc_col AS Int64) AS
CAST(annotated_data_finite.inc_col AS Int64)annotated_data_finite.inc_col,
annotated_data_finite.ts, annotated_data_finite.inc_col,
annotated_data_finite.desc_col
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite.desc_col
AS Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col)
ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) ROWS BETWEEN 8
PRECEDING AND 1 FOLLOWING AS COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+06)----------Projection: {CAST(annotated_data_finite.desc_col AS
Int64)|{annotated_data_finite.desc_col}}, annotated_data_finite.inc_col,
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING,
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING,
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RO [...]
+07)------------WindowAggr:
windowExpr=[[SUM({CAST(annotated_data_finite.inc_col AS
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING, SUM({CAST(annotated_data_finite.desc_col AS
Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING
AND 1 FOLLOWING, SUM({CAST(annotated_data_f [...]
+08)--------------WindowAggr:
windowExpr=[[SUM({CAST(annotated_data_finite.inc_col AS
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4
FOLLOWING, SUM({CAST(annotated_data_finite.desc_col AS
Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING
AND 8 FOLLOWING, SUM({CAST(annotated_d [...]
+09)----------------Projection: CAST(annotated_data_finite.desc_col AS Int64)
AS {CAST(annotated_data_finite.desc_col AS
Int64)|{annotated_data_finite.desc_col}}, CAST(annotated_data_finite.inc_col AS
Int64) AS {CAST(annotated_data_finite.inc_col AS
Int64)|{annotated_data_finite.inc_col}}, annotated_data_finite.ts,
annotated_data_finite.inc_col, annotated_data_finite.desc_col
10)------------------TableScan: annotated_data_finite projection=[ts, inc_col,
desc_col]
physical_plan
01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3,
min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2,
max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as
sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as
minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as
cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]
@@ -2556,10 +2556,10 @@ physical_plan
03)----SortExec: TopK(fetch=5), expr=[inc_col@24 DESC],
preserve_partitioning=[false]
04)------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING@13 as sum1, SUM(annotated_data_finite.desc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1
FOLLOWING@14 as sum2, SUM(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10
FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDE [...]
05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.desc_col) ROWS
BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)),
end_bound: Following(UInt64(1)), is_causal: false }, COUNT(*) ROWS BETWEEN 8
PRECEDING AND 1 FOLLOWING: Ok(Field { name: "C [...]
-06)----------ProjectionExec: expr=[CAST(annotated_data_finite.desc_col AS
Int64)annotated_data_finite.desc_col@0 as CAST(annotated_data_finite.desc_col
AS Int64)annotated_data_finite.desc_col, inc_col@3 as inc_col,
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@5 as
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotat [...]
+06)----------ProjectionExec: expr=[{CAST(annotated_data_finite.desc_col AS
Int64)|{annotated_data_finite.desc_col}}@0 as
{CAST(annotated_data_finite.desc_col AS
Int64)|{annotated_data_finite.desc_col}}, inc_col@3 as inc_col,
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@5 as
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, S [...]
07)------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(Int32(10)), end_bound: Fol [...]
08)--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING
AND 4 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(Int32(4)), end_bound: [...]
-09)----------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as
CAST(annotated_data_finite.desc_col AS Int64)annotated_data_finite.desc_col,
CAST(inc_col@1 AS Int64) as CAST(annotated_data_finite.inc_col AS
Int64)annotated_data_finite.inc_col, ts@0 as ts, inc_col@1 as inc_col,
desc_col@2 as desc_col]
+09)----------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as
{CAST(annotated_data_finite.desc_col AS
Int64)|{annotated_data_finite.desc_col}}, CAST(inc_col@1 AS Int64) as
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}},
ts@0 as ts, inc_col@1 as inc_col, desc_col@2 as desc_col]
10)------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts,
inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
query IIIIIIIIIIIIIIIIIIIIIIII
@@ -2708,9 +2708,9 @@ logical_plan
02)--Limit: skip=0, fetch=5
03)----Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
04)------Projection: SUM(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING AS sum1, SUM(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING AS sum2, MIN(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING AS min1, MIN(annotated_data_finite.inc_col) [...]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col AS
Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOU [...]
-06)----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite.inc_col
AS Int64)annotated_data_finite.inc_col AS annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWE [...]
-07)------------Projection: CAST(annotated_data_finite.inc_col AS Float64) AS
CAST(annotated_data_finite.inc_col AS Float64)annotated_data_finite.inc_col,
CAST(annotated_data_finite.inc_col AS Int64) AS
CAST(annotated_data_finite.inc_col AS Int64)annotated_data_finite.inc_col,
annotated_data_finite.ts, annotated_data_finite.inc_col
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite.inc_col AS
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN [...]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite.inc_col
AS Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE [...]
+07)------------Projection: CAST(annotated_data_finite.inc_col AS Int64) AS
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}},
CAST(annotated_data_finite.inc_col AS Float64) AS
{CAST(annotated_data_finite.inc_col AS
Float64)|{annotated_data_finite.inc_col}}, annotated_data_finite.ts,
annotated_data_finite.inc_col
08)--------------TableScan: annotated_data_finite projection=[ts, inc_col]
physical_plan
01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1,
min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as
count2, avg1@8 as avg1, avg2@9 as avg2]
@@ -2719,7 +2719,7 @@ physical_plan
04)------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING@9 as sum1, SUM(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING@4 as sum2, MIN(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING@10 as min1, MIN(annotated_dat [...]
05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND 5 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(Int32(NULL)), en [...]
06)----------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(NULL [...]
-07)------------ProjectionExec: expr=[CAST(inc_col@1 AS Float64) as
CAST(annotated_data_finite.inc_col AS Float64)annotated_data_finite.inc_col,
CAST(inc_col@1 AS Int64) as CAST(annotated_data_finite.inc_col AS
Int64)annotated_data_finite.inc_col, ts@0 as ts, inc_col@1 as inc_col]
+07)------------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}},
CAST(inc_col@1 AS Float64) as {CAST(annotated_data_finite.inc_col AS
Float64)|{annotated_data_finite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
08)--------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts,
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
query IIIIIIIIRR
@@ -2809,9 +2809,9 @@ logical_plan
02)--Limit: skip=0, fetch=5
03)----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
04)------Projection: SUM(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING AS count1, COUNT(annotated_data_ [...]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col
AS Int64)annotated_data_infinite.inc_col AS annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED
PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING]]
-06)----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col
AS Int64)annotated_data_infinite.inc_col AS annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING]]
-07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS
CAST(annotated_data_infinite.inc_col AS Int64)annotated_data_infinite.inc_col,
annotated_data_infinite.ts, annotated_data_infinite.inc_col
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite.inc_col
AS Int64)|{annotated_data_infinite.inc_col}} AS
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING,
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+06)----------WindowAggr:
windowExpr=[[SUM({CAST(annotated_data_infinite.inc_col AS
Int64)|{annotated_data_infinite.inc_col}} AS annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING]]
+07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS
{CAST(annotated_data_infinite.inc_col AS
Int64)|{annotated_data_infinite.inc_col}}, annotated_data_infinite.ts,
annotated_data_infinite.inc_col
08)--------------TableScan: annotated_data_infinite projection=[ts, inc_col]
physical_plan
01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1,
count2@3 as count2]
@@ -2819,7 +2819,7 @@ physical_plan
03)----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@5 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING@3 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@6 as count1, COUNT(a [...]
04)------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)) [...]
05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64( [...]
-06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
CAST(annotated_data_infinite.inc_col AS Int64)annotated_data_infinite.inc_col,
ts@0 as ts, inc_col@1 as inc_col]
+06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
{CAST(annotated_data_infinite.inc_col AS
Int64)|{annotated_data_infinite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
07)------------StreamingTableExec: partition_sizes=1, projection=[ts,
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
query IIII
@@ -2856,9 +2856,9 @@ logical_plan
02)--Limit: skip=0, fetch=5
03)----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
04)------Projection: SUM(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING AS count1, COUNT(annotated_data_ [...]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col
AS Int64)annotated_data_infinite.inc_col AS annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED
PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING]]
-06)----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite.inc_col
AS Int64)annotated_data_infinite.inc_col AS annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING]]
-07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS
CAST(annotated_data_infinite.inc_col AS Int64)annotated_data_infinite.inc_col,
annotated_data_infinite.ts, annotated_data_infinite.inc_col
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite.inc_col
AS Int64)|{annotated_data_infinite.inc_col}} AS
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING,
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+06)----------WindowAggr:
windowExpr=[[SUM({CAST(annotated_data_infinite.inc_col AS
Int64)|{annotated_data_infinite.inc_col}} AS annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING]]
+07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS
{CAST(annotated_data_infinite.inc_col AS
Int64)|{annotated_data_infinite.inc_col}}, annotated_data_infinite.ts,
annotated_data_infinite.inc_col
08)--------------TableScan: annotated_data_infinite projection=[ts, inc_col]
physical_plan
01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1,
count2@3 as count2]
@@ -2866,7 +2866,7 @@ physical_plan
03)----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@5 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING@3 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@6 as count1, COUNT(a [...]
04)------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)) [...]
05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64( [...]
-06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
CAST(annotated_data_infinite.inc_col AS Int64)annotated_data_infinite.inc_col,
ts@0 as ts, inc_col@1 as inc_col]
+06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
{CAST(annotated_data_infinite.inc_col AS
Int64)|{annotated_data_infinite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
07)------------StreamingTableExec: partition_sizes=1, projection=[ts,
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
@@ -2953,13 +2953,13 @@ EXPLAIN SELECT a, b, c,
logical_plan
01)Projection: annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.c, SUM(annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1,
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NU [...]
02)--Limit: skip=0, fetch=5
-03)----WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_in [...]
-04)------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b,
annotated_data_in [...]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annot [...]
-06)----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a [...]
-07)------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c AS
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
SUM(CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a,
annotated_d [...]
-08)--------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.c
AS Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, SUM(CAST(annotated_data_infinite2.c AS
Int64)annotated_data_infinite2.c AS annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [...]
-09)----------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c,
annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.c, annotated_data_infinite2.d
+03)----WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c AS
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
SUM({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS
annotated_data_infinite2.c) PARTITION BY [annotat [...]
+04)------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c AS
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION
BY [annotated_data_infinite2.b, annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
SUM({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotat
[...]
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c AS
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, SUM({CAST(annotated_data_infinite2.c AS
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER [...]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c AS
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
SUM({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_i [...]
+07)------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c
AS Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
SUM({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, a [...]
+08)--------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.c
AS Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, SUM({CAST(annotated_data_infinite2.c AS
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] [...]
+09)----------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS
{CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}},
annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.c, annotated_data_infinite2.d
10)------------------TableScan: annotated_data_infinite2 projection=[a, b, c,
d]
physical_plan
01)ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c,
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS
LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING@9 as sum1, SUM(annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC N
[...]
@@ -2970,7 +2970,7 @@ physical_plan
06)----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS
LAST, annotated_data_infinite2.c ASC NULLS LA [...]
07)------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING [...]
08)--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, d [...]
-09)----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as
CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, a@0 as a,
b@1 as b, c@2 as c, d@3 as d]
+09)----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as
{CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}}, a@0
as a, b@1 as b, c@2 as c, d@3 as d]
10)------------------StreamingTableExec: partition_sizes=1, projection=[a, b,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS
LAST, c@2 ASC NULLS LAST]
query IIIIIIIIIIIIIII
@@ -3022,13 +3022,13 @@ logical_plan
01)Limit: skip=0, fetch=5
02)--Sort: annotated_data_finite2.c ASC NULLS LAST, fetch=5
03)----Projection: annotated_data_finite2.a, annotated_data_finite2.b,
annotated_data_finite2.c, SUM(annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1,
SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a,
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST,
annotated_ [...]
-04)------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST,
annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY
[...]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d]
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND
1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_d [...]
-06)----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c
AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b,
annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c [...]
-07)------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c AS
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a,
annotated_data_fin [...]
-08)--------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c AS
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d]
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND
1 FOLLOWING, SUM(CAST(annotated_data_finite2.c AS
Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.b, annot [...]
-09)----------------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_finite2.c
AS Int64)annotated_data_finite2.c AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, SUM(CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c
AS annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a,
annotated_data_finite2.b] ORDER BY [annotated_data_fini [...]
-10)------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c,
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c,
annotated_data_finite2.d
+04)------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST,
annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
SUM({CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}} AS
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] [...]
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d]
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND
1 FOLLOWING, SUM({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.b, annotated_data_finite2.a, a [...]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, SUM({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_
[...]
+07)------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING,
SUM({CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}} AS
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotate [...]
+08)--------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c
AS Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d]
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND
1 FOLLOWING, SUM({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite [...]
+09)----------------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_finite2.c
AS Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING, SUM({CAST(annotated_data_finite2.c AS
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated [...]
+10)------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS
{CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}},
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c,
annotated_data_finite2.d
11)--------------------TableScan: annotated_data_finite2 projection=[a, b, c,
d]
physical_plan
01)GlobalLimitExec: skip=0, fetch=5
@@ -3045,7 +3045,7 @@ physical_plan
12)----------------------BoundedWindowAggExec:
wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a,
annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1
FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d]
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND
1 FOL [...]
13)------------------------SortExec: expr=[a@1 ASC NULLS LAST,b@2 ASC NULLS
LAST,d@4 ASC NULLS LAST,c@3 ASC NULLS LAST], preserve_partitioning=[false]
14)--------------------------BoundedWindowAggExec:
wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a,
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST]
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a,
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST]
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true,
dict_ [...]
-15)----------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) as
CAST(annotated_data_finite2.c AS Int64)annotated_data_finite2.c, a@0 as a, b@1
as b, c@2 as c, d@3 as d]
+15)----------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) as
{CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}}, a@0 as a,
b@1 as b, c@2 as c, d@3 as d]
16)------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS
LAST], has_header=true
query IIIIIIIIIIIIIII
@@ -3211,21 +3211,21 @@ FROM annotated_data_infinite2;
----
logical_plan
01)Projection: SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [ann [...]
-02)--WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-03)----Projection: CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a, annotated_data_infinite2.a,
annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC [...]
-04)------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
-06)----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
-07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS
CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a,
annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.c, annotated_data_infinite2.d
+02)--WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+03)----Projection: {CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}, annotated_data_infinite2.a,
annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2. [...]
+04)------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
+07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}},
annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.c, annotated_data_infinite2.d
08)--------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
physical_plan
01)ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@3 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@5 as sum2, SUM(annotated_data_infinite2.a) PAR [...]
02)--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d]
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }) [...]
-03)----ProjectionExec: expr=[CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a@0 as CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a, a@1 as a, d@4 as d,
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_in [...]
+03)----ProjectionExec: expr=[{CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}@0 as {CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}, a@1 as a, d@4 as d,
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotat [...]
04)------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, nullabl [...]
05)--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, nulla [...]
06)----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, nul [...]
-07)------------ProjectionExec: expr=[CAST(a@0 AS Int64) as
CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a, a@0 as a,
b@1 as b, c@2 as c, d@3 as d]
+07)------------ProjectionExec: expr=[CAST(a@0 AS Int64) as
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, a@0
as a, b@1 as b, c@2 as c, d@3 as d]
08)--------------StreamingTableExec: partition_sizes=1, projection=[a, b, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS
LAST, c@2 ASC NULLS LAST]
statement ok
@@ -3242,29 +3242,29 @@ FROM annotated_data_infinite2;
----
logical_plan
01)Projection: SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [ann [...]
-02)--WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-03)----Projection: CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a, annotated_data_infinite2.a,
annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC [...]
-04)------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
-05)--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
-06)----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a AS annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
-07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS
CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a,
annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.c, annotated_data_infinite2.d
+02)--WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+03)----Projection: {CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}, annotated_data_infinite2.a,
annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2. [...]
+04)------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
+05)--------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
+06)----------WindowAggr: windowExpr=[[SUM({CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
+07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}},
annotated_data_infinite2.a, annotated_data_infinite2.b,
annotated_data_infinite2.c, annotated_data_infinite2.d
08)--------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
physical_plan
01)ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@3 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@5 as sum2, SUM(annotated_data_infinite2.a) PAR [...]
02)--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d]
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }) [...]
03)----CoalesceBatchesExec: target_batch_size=4096
-04)------RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2,
preserve_order=true, sort_exprs=CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a@0 ASC NULLS LAST,a@1 ASC NULLS LAST
-05)--------ProjectionExec: expr=[CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a@0 as CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a, a@1 as a, d@4 as d,
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_dat [...]
+04)------RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2,
preserve_order=true, sort_exprs={CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST,a@1 ASC NULLS LAST
+05)--------ProjectionExec: expr=[{CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}@0 as {CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}, a@1 as a, d@4 as d,
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, ann
[...]
06)----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a)
PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, nul [...]
07)------------CoalesceBatchesExec: target_batch_size=4096
-08)--------------RepartitionExec: partitioning=Hash([b@2, a@1], 2),
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC
NULLS LAST,c@3 ASC NULLS LAST,CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a@0 ASC NULLS LAST
+08)--------------RepartitionExec: partitioning=Hash([b@2, a@1], 2),
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC
NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
09)----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int6 [...]
10)------------------CoalesceBatchesExec: target_batch_size=4096
-11)--------------------RepartitionExec: partitioning=Hash([a@1, d@4], 2),
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC
NULLS LAST,c@3 ASC NULLS LAST,CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a@0 ASC NULLS LAST
+11)--------------------RepartitionExec: partitioning=Hash([a@1, d@4], 2),
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC
NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
12)----------------------BoundedWindowAggExec:
wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type [...]
13)------------------------CoalesceBatchesExec: target_batch_size=4096
-14)--------------------------RepartitionExec: partitioning=Hash([a@1, b@2],
2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2
ASC NULLS LAST,c@3 ASC NULLS LAST,CAST(annotated_data_infinite2.a AS
Int64)annotated_data_infinite2.a@0 ASC NULLS LAST
-15)----------------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as
CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a, a@0 as a,
b@1 as b, c@2 as c, d@3 as d]
+14)--------------------------RepartitionExec: partitioning=Hash([a@1, b@2],
2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2
ASC NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
+15)----------------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, a@0
as a, b@1 as b, c@2 as c, d@3 as d]
16)------------------------------RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=1
17)--------------------------------StreamingTableExec: partition_sizes=1,
projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]