This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new fcd94fb5b7 Enhance Enforce Dist capabilities to fix, sub optimal bad
plans (#7671)
fcd94fb5b7 is described below
commit fcd94fb5b7a49597bde4b27742f1d54b97f149c6
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Sep 29 16:27:21 2023 +0300
Enhance Enforce Dist capabilities to fix, sub optimal bad plans (#7671)
* Extend capabilities of enforcedist
* Simplifications
* Fix test
* Do not use hard coded partition number
* Add comments, Fix with_new_children of CustomPlan
* Use sub-rule as separate rule.
* Add unbounded method
* Final review
* Move util code to exectree file
* Update variables and comments
* Apply suggestions from code review
Update comments
Co-authored-by: Andrew Lamb <[email protected]>
* Address reviews
* Add new tests, do not satisfy requirement if not absolutely necessary
enforce dist
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/src/datasource/listing/table.rs | 60 +--
.../src/physical_optimizer/enforce_distribution.rs | 417 ++++++++++++++++-----
.../core/src/physical_optimizer/enforce_sorting.rs | 1 -
datafusion/core/src/physical_optimizer/mod.rs | 1 +
.../core/src/physical_optimizer/optimizer.rs | 7 +
.../src/physical_optimizer/output_requirements.rs | 275 ++++++++++++++
datafusion/core/src/physical_optimizer/utils.rs | 24 ++
.../provider_filter_pushdown.rs | 19 +-
.../physical-plan/src/coalesce_partitions.rs | 4 +
datafusion/physical-plan/src/memory.rs | 9 +-
datafusion/physical-plan/src/streaming.rs | 8 +-
datafusion/sqllogictest/test_files/explain.slt | 4 +
datafusion/sqllogictest/test_files/groupby.slt | 6 +-
datafusion/sqllogictest/test_files/window.slt | 66 ++++
14 files changed, 758 insertions(+), 143 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 8360847e1b..797562e92d 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -20,16 +20,8 @@
use std::str::FromStr;
use std::{any::Any, sync::Arc};
-use arrow::compute::SortOptions;
-use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
-use arrow_schema::Schema;
-use async_trait::async_trait;
-use datafusion_common::FileTypeWriterOptions;
-use datafusion_common::{internal_err, plan_err, project_schema, SchemaExt,
ToDFSchema};
-use datafusion_expr::expr::Sort;
-use datafusion_optimizer::utils::conjunction;
-use datafusion_physical_expr::{create_physical_expr, LexOrdering,
PhysicalSortExpr};
-use futures::{future, stream, StreamExt, TryStreamExt};
+use super::helpers::{expr_applicable_for_cols, pruned_partition_list,
split_files};
+use super::PartitionedFile;
use crate::datasource::file_format::file_compression_type::{
FileCompressionType, FileTypeExt,
@@ -54,13 +46,21 @@ use crate::{
logical_expr::Expr,
physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
};
-use datafusion_common::FileType;
+use arrow::compute::SortOptions;
+use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
+use arrow_schema::Schema;
+use datafusion_common::{
+ internal_err, plan_err, project_schema, FileType, FileTypeWriterOptions,
SchemaExt,
+ ToDFSchema,
+};
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
+use datafusion_expr::expr::Sort;
+use datafusion_optimizer::utils::conjunction;
+use datafusion_physical_expr::{create_physical_expr, LexOrdering,
PhysicalSortExpr};
-use super::PartitionedFile;
-
-use super::helpers::{expr_applicable_for_cols, pruned_partition_list,
split_files};
+use async_trait::async_trait;
+use futures::{future, stream, StreamExt, TryStreamExt};
/// Configuration for creating a [`ListingTable`]
#[derive(Debug, Clone)]
@@ -996,6 +996,9 @@ impl ListingTable {
#[cfg(test)]
mod tests {
+ use std::collections::HashMap;
+ use std::fs::File;
+
use super::*;
use crate::datasource::{provider_as_source, MemTable};
use crate::execution::options::ArrowReadOptions;
@@ -1010,14 +1013,13 @@ mod tests {
logical_expr::{col, lit},
test::{columns, object_store::register_test_store},
};
+
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
- use datafusion_common::assert_contains;
- use datafusion_common::GetExt;
- use datafusion_expr::LogicalPlanBuilder;
+ use datafusion_common::{assert_contains, GetExt, ScalarValue};
+ use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
+
use rstest::*;
- use std::collections::HashMap;
- use std::fs::File;
use tempfile::TempDir;
/// It creates dummy file and checks if it can create unbounded input
executors.
@@ -2048,6 +2050,7 @@ mod tests {
}
None => SessionContext::new(),
};
+ let target_partition_number =
session_ctx.state().config().target_partitions();
// Create a new schema with one field called "a" of type Int32
let schema = Arc::new(Schema::new(vec![Field::new(
@@ -2056,6 +2059,12 @@ mod tests {
false,
)]));
+ let filter_predicate = Expr::BinaryExpr(BinaryExpr::new(
+ Box::new(Expr::Column("column1".into())),
+ Operator::GtEq,
+ Box::new(Expr::Literal(ScalarValue::Int32(Some(0)))),
+ ));
+
// Create a new batch of data to insert into the table
let batch = RecordBatch::try_new(
schema.clone(),
@@ -2136,8 +2145,10 @@ mod tests {
let source = provider_as_source(source_table);
// Create a table scan logical plan to read from the source table
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?
-
.repartition(Partitioning::Hash(vec![Expr::Column("column1".into())], 6))?
+ .filter(filter_predicate)?
.build()?;
+ // Since logical plan contains a filter, increasing parallelism is
helpful.
+ // Therefore, we will have 8 partitions in the final plan.
// Create an insert plan to insert the source data into the initial
table
let insert_into_table =
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema,
false)?.build()?;
@@ -2146,7 +2157,6 @@ mod tests {
.state()
.create_physical_plan(&insert_into_table)
.await?;
-
// Execute the physical plan and collect the results
let res = collect(plan, session_ctx.task_ctx()).await?;
// Insert returns the number of rows written, in our case this would
be 6.
@@ -2178,9 +2188,9 @@ mod tests {
// Assert that the batches read from the file match the expected
result.
assert_batches_eq!(expected, &batches);
- // Assert that 6 files were added to the table
+ // Assert that `target_partition_number` many files were added to the
table.
let num_files = tmp_dir.path().read_dir()?.count();
- assert_eq!(num_files, 6);
+ assert_eq!(num_files, target_partition_number);
// Create a physical plan from the insert plan
let plan = session_ctx
@@ -2221,9 +2231,9 @@ mod tests {
// Assert that the batches read from the file after the second append
match the expected result.
assert_batches_eq!(expected, &batches);
- // Assert that another 6 files were added to the table
+ // Assert that another `target_partition_number` many files were added
to the table.
let num_files = tmp_dir.path().read_dir()?.count();
- assert_eq!(num_files, 12);
+ assert_eq!(num_files, 2 * target_partition_number);
// Return Ok if the function
Ok(())
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index b2a1a03383..b3fb41ea10 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -27,8 +27,11 @@ use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::datasource::physical_plan::{CsvExec, ParquetExec};
-use crate::error::{DataFusionError, Result};
-use crate::physical_optimizer::utils::{add_sort_above, get_plan_string,
ExecTree};
+use crate::error::Result;
+use crate::physical_optimizer::utils::{
+ add_sort_above, get_children_exectrees, get_plan_string,
is_coalesce_partitions,
+ is_repartition, is_sort_preserving_merge, ExecTree,
+};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode,
PhysicalGroupBy};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -44,7 +47,6 @@ use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution,
ExecutionPlan};
-use datafusion_common::internal_err;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::equivalence::EquivalenceProperties;
@@ -57,6 +59,7 @@ use datafusion_physical_expr::{
};
use datafusion_physical_plan::unbounded_output;
+use datafusion_physical_plan::windows::{get_best_fitting_window,
BoundedWindowAggExec};
use itertools::izip;
/// The `EnforceDistribution` rule ensures that distribution requirements are
@@ -213,9 +216,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
distribution_context.transform_up(&|distribution_context| {
ensure_distribution(distribution_context, config)
})?;
-
- // If output ordering is not necessary, removes it
- update_plan_to_remove_unnecessary_final_order(distribution_context)
+ Ok(distribution_context.plan)
}
fn name(&self) -> &str {
@@ -979,12 +980,6 @@ fn add_roundrobin_on_top(
RepartitionExec::try_new(input,
Partitioning::RoundRobinBatch(n_target))?
.with_preserve_order(should_preserve_ordering),
) as Arc<dyn ExecutionPlan>;
- if let Some(exec_tree) = dist_onward {
- return internal_err!(
- "ExecTree should have been empty, but got:{:?}",
- exec_tree
- );
- }
// update distribution onward with new operator
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
@@ -1112,8 +1107,9 @@ fn add_spm_on_top(
}
}
-/// Updates the physical plan inside `distribution_context` if having a
-/// `RepartitionExec(RoundRobin)` is not helpful.
+/// Updates the physical plan inside `distribution_context` so that
distribution
+/// changing operators are removed from the top. If they are necessary, they
will
+/// be added in subsequent stages.
///
/// Assume that following plan is given:
/// ```text
@@ -1122,14 +1118,13 @@ fn add_spm_on_top(
/// " ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a,
b, c, d, e], output_ordering=\[a@0 ASC]",
/// ```
///
-/// `RepartitionExec` at the top is unnecessary. Since it doesn't help with
increasing parallelism.
-/// This function removes top repartition, and returns following plan.
+/// Since `RepartitionExec`s change the distribution, this function removes
+/// them and returns following plan:
///
/// ```text
-/// "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
-/// " ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b,
c, d, e], output_ordering=\[a@0 ASC]",
+/// "ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c,
d, e], output_ordering=\[a@0 ASC]",
/// ```
-fn remove_unnecessary_repartition(
+fn remove_dist_changing_operators(
distribution_context: DistributionContext,
) -> Result<DistributionContext> {
let DistributionContext {
@@ -1137,22 +1132,17 @@ fn remove_unnecessary_repartition(
mut distribution_onwards,
} = distribution_context;
- // Remove any redundant RoundRobin at the start:
- if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>()
{
- if let Partitioning::RoundRobinBatch(n_out) =
repartition.partitioning() {
- // Repartition is useless:
- if *n_out <=
repartition.input().output_partitioning().partition_count() {
- let mut new_distribution_onwards =
- vec![None; repartition.input().children().len()];
- if let Some(exec_tree) = &distribution_onwards[0] {
- for child in &exec_tree.children {
- new_distribution_onwards[child.idx] =
Some(child.clone());
- }
- }
- plan = repartition.input().clone();
- distribution_onwards = new_distribution_onwards;
- }
- }
+ // Remove any distribution changing operators at the beginning:
+ // Note that they will be re-inserted later on if necessary or helpful.
+ while is_repartition(&plan)
+ || is_coalesce_partitions(&plan)
+ || is_sort_preserving_merge(&plan)
+ {
+ // All of above operators have a single child. When we remove the top
+ // operator, we take the first child.
+ plan = plan.children()[0].clone();
+ distribution_onwards =
+ get_children_exectrees(plan.children().len(),
&distribution_onwards[0]);
}
// Create a plan with the updated children:
@@ -1162,29 +1152,6 @@ fn remove_unnecessary_repartition(
})
}
-/// Changes each child of the `dist_context.plan` such that they no longer
-/// use order preserving variants, if no ordering is required at the output
-/// of the physical plan (there is no global ordering requirement by the
query).
-fn update_plan_to_remove_unnecessary_final_order(
- dist_context: DistributionContext,
-) -> Result<Arc<dyn ExecutionPlan>> {
- let DistributionContext {
- plan,
- distribution_onwards,
- } = dist_context;
- let new_children = izip!(plan.children(), distribution_onwards)
- .map(|(mut child, mut dist_onward)| {
- replace_order_preserving_variants(&mut child, &mut dist_onward)?;
- Ok(child)
- })
- .collect::<Result<Vec<_>>>()?;
- if !new_children.is_empty() {
- plan.with_new_children(new_children)
- } else {
- Ok(plan)
- }
-}
-
/// Updates the physical plan `input` by using `dist_onward` replace order
preserving operator variants
/// with their corresponding operators that do not preserve order. It is a
wrapper for `replace_order_preserving_variants_helper`
fn replace_order_preserving_variants(
@@ -1224,18 +1191,16 @@ fn replace_order_preserving_variants_helper(
for child in &exec_tree.children {
updated_children[child.idx] =
replace_order_preserving_variants_helper(child)?;
}
- if let Some(spm) = exec_tree
- .plan
- .as_any()
- .downcast_ref::<SortPreservingMergeExec>()
- {
- return Ok(Arc::new(CoalescePartitionsExec::new(spm.input().clone())));
+ if is_sort_preserving_merge(&exec_tree.plan) {
+ return Ok(Arc::new(CoalescePartitionsExec::new(
+ updated_children[0].clone(),
+ )));
}
if let Some(repartition) =
exec_tree.plan.as_any().downcast_ref::<RepartitionExec>() {
if repartition.preserve_order() {
return Ok(Arc::new(
RepartitionExec::try_new(
- repartition.input().clone(),
+ updated_children[0].clone(),
repartition.partitioning().clone(),
)?
.with_preserve_order(false),
@@ -1275,12 +1240,29 @@ fn ensure_distribution(
repartition_beneficial_stat =
stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true);
}
-
// Remove unnecessary repartition from the physical plan if any
let DistributionContext {
- plan,
+ mut plan,
mut distribution_onwards,
- } = remove_unnecessary_repartition(dist_context)?;
+ } = remove_dist_changing_operators(dist_context)?;
+
+ if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
+ if let Some(updated_window) = get_best_fitting_window(
+ exec.window_expr(),
+ exec.input(),
+ &exec.partition_keys,
+ )? {
+ plan = updated_window;
+ }
+ } else if let Some(exec) =
plan.as_any().downcast_ref::<BoundedWindowAggExec>() {
+ if let Some(updated_window) = get_best_fitting_window(
+ exec.window_expr(),
+ exec.input(),
+ &exec.partition_keys,
+ )? {
+ plan = updated_window;
+ }
+ };
let n_children = plan.children().len();
// This loop iterates over all the children to:
@@ -1369,19 +1351,23 @@ fn ensure_distribution(
// Either:
// - Ordering requirement cannot be satisfied by preserving
ordering through repartitions, or
// - using order preserving variant is not desirable.
- if !ordering_satisfy_requirement_concrete(
+ let ordering_satisfied = ordering_satisfy_requirement_concrete(
existing_ordering,
required_input_ordering,
|| child.equivalence_properties(),
|| child.ordering_equivalence_properties(),
- ) || !order_preserving_variants_desirable
- {
+ );
+ if !ordering_satisfied || !order_preserving_variants_desirable
{
replace_order_preserving_variants(&mut child,
dist_onward)?;
- let sort_expr = PhysicalSortRequirement::to_sort_exprs(
- required_input_ordering.clone(),
- );
- // Make sure to satisfy ordering requirement
- add_sort_above(&mut child, sort_expr, None)?;
+ // If ordering requirements were satisfied before
repartitioning,
+ // make sure ordering requirements are still satisfied
after.
+ if ordering_satisfied {
+ // Make sure to satisfy ordering requirement:
+ let sort_expr = PhysicalSortRequirement::to_sort_exprs(
+ required_input_ordering.clone(),
+ );
+ add_sort_above(&mut child, sort_expr, None)?;
+ }
}
// Stop tracking distribution changing operators
*dist_onward = None;
@@ -1690,6 +1676,7 @@ mod tests {
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
use crate::physical_optimizer::enforce_sorting::EnforceSorting;
+ use crate::physical_optimizer::output_requirements::OutputRequirements;
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
@@ -1703,9 +1690,12 @@ mod tests {
use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType,
Statistics};
- use crate::physical_optimizer::test_utils::repartition_exec;
+ use crate::physical_optimizer::test_utils::{
+ coalesce_partitions_exec, repartition_exec,
+ };
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::sorts::sort::SortExec;
+
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::ScalarValue;
@@ -1714,7 +1704,7 @@ mod tests {
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_physical_expr::{
expressions, expressions::binary, expressions::lit,
expressions::Column,
- PhysicalExpr, PhysicalSortExpr,
+ LexOrdering, PhysicalExpr, PhysicalSortExpr,
};
/// Models operators like BoundedWindowExec that require an input
@@ -1722,11 +1712,23 @@ mod tests {
#[derive(Debug)]
struct SortRequiredExec {
input: Arc<dyn ExecutionPlan>,
+ expr: LexOrdering,
}
impl SortRequiredExec {
fn new(input: Arc<dyn ExecutionPlan>) -> Self {
- Self { input }
+ let expr = input.output_ordering().unwrap_or(&[]).to_vec();
+ Self { input, expr }
+ }
+
+ fn new_with_requirement(
+ input: Arc<dyn ExecutionPlan>,
+ requirement: Vec<PhysicalSortExpr>,
+ ) -> Self {
+ Self {
+ input,
+ expr: requirement,
+ }
}
}
@@ -1736,7 +1738,8 @@ mod tests {
_t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
- write!(f, "SortRequiredExec")
+ let expr: Vec<String> = self.expr.iter().map(|e|
e.to_string()).collect();
+ write!(f, "SortRequiredExec: [{}]", expr.join(","))
}
}
@@ -1778,7 +1781,10 @@ mod tests {
) -> Result<Arc<dyn ExecutionPlan>> {
assert_eq!(children.len(), 1);
let child = children.pop().unwrap();
- Ok(Arc::new(Self::new(child)))
+ Ok(Arc::new(Self::new_with_requirement(
+ child,
+ self.expr.clone(),
+ )))
}
fn execute(
@@ -2054,6 +2060,13 @@ mod tests {
Arc::new(SortRequiredExec::new(input))
}
+ fn sort_required_exec_with_req(
+ input: Arc<dyn ExecutionPlan>,
+ sort_exprs: LexOrdering,
+ ) -> Arc<dyn ExecutionPlan> {
+ Arc::new(SortRequiredExec::new_with_requirement(input, sort_exprs))
+ }
+
fn trim_plan_display(plan: &str) -> Vec<&str> {
plan.split('\n')
.map(|s| s.trim())
@@ -2118,10 +2131,15 @@ mod tests {
// `EnforceSorting` and `EnforceDistribution`.
// TODO: Orthogonalize the tests here just to verify
`EnforceDistribution` and create
// new tests for the cascade.
+
+ // Add the ancillary output requirements operator at the start:
+ let optimizer = OutputRequirements::new_add_mode();
+ let optimized = optimizer.optimize($PLAN.clone(), &config)?;
+
let optimized = if $FIRST_ENFORCE_DIST {
// Run enforce distribution rule first:
let optimizer = EnforceDistribution::new();
- let optimized = optimizer.optimize($PLAN.clone(), &config)?;
+ let optimized = optimizer.optimize(optimized, &config)?;
// The rule should be idempotent.
// Re-running this rule shouldn't introduce unnecessary
operators.
let optimizer = EnforceDistribution::new();
@@ -2133,7 +2151,7 @@ mod tests {
} else {
// Run the enforce sorting rule first:
let optimizer = EnforceSorting::new();
- let optimized = optimizer.optimize($PLAN.clone(), &config)?;
+ let optimized = optimizer.optimize(optimized, &config)?;
// Run enforce distribution rule:
let optimizer = EnforceDistribution::new();
let optimized = optimizer.optimize(optimized, &config)?;
@@ -2144,6 +2162,10 @@ mod tests {
optimized
};
+ // Remove the ancillary output requirements operator when done:
+ let optimizer = OutputRequirements::new_remove_mode();
+ let optimized = optimizer.optimize(optimized, &config)?;
+
// Now format correctly
let plan =
displayable(optimized.as_ref()).indent(true).to_string();
let actual_lines = trim_plan_display(&plan);
@@ -2978,7 +3000,7 @@ mod tests {
format!("SortMergeJoin: join_type={join_type}, on=[(a@0,
c@2)]");
let expected = match join_type {
- // Should include 6 RepartitionExecs 3 SortExecs
+ // Should include 6 RepartitionExecs (3 hash, 3 round-robin),
3 SortExecs
JoinType::Inner | JoinType::Left | JoinType::LeftSemi |
JoinType::LeftAnti =>
vec![
top_join_plan.as_str(),
@@ -2997,9 +3019,18 @@ mod tests {
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
],
- // Should include 7 RepartitionExecs
+ // Should include 7 RepartitionExecs (4 hash, 3 round-robin),
4 SortExecs
+ // Since ordering of the left child is not preserved after
SortMergeJoin
+ // when mode is Right, RgihtSemi, RightAnti, Full
+ // - We need to add one additional SortExec after
SortMergeJoin in contrast the test cases
+ // when mode is Inner, Left, LeftSemi, LeftAnti
+ // Similarly, since partitioning of the left side is not
preserved
+ // when mode is Right, RgihtSemi, RightAnti, Full
+ // - We need to add one additional Hash Repartition after
SortMergeJoin in contrast the test
+ // cases when mode is Inner, Left, LeftSemi, LeftAnti
_ => vec![
top_join_plan.as_str(),
+ // Below 2 operators are differences introduced, when
join mode is changed
"SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
join_plan.as_str(),
@@ -3021,7 +3052,7 @@ mod tests {
assert_optimized!(expected, top_join.clone(), true, true);
let expected_first_sort_enforcement = match join_type {
- // Should include 3 RepartitionExecs 3 SortExecs
+ // Should include 6 RepartitionExecs (3 hash, 3 round-robin),
3 SortExecs
JoinType::Inner | JoinType::Left | JoinType::LeftSemi |
JoinType::LeftAnti =>
vec![
top_join_plan.as_str(),
@@ -3040,9 +3071,18 @@ mod tests {
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
],
- // Should include 8 RepartitionExecs (4 of them preserves
ordering)
+ // Should include 8 RepartitionExecs (4 hash, 8 round-robin),
4 SortExecs
+ // Since ordering of the left child is not preserved after
SortMergeJoin
+ // when mode is Right, RgihtSemi, RightAnti, Full
+ // - We need to add one additional SortExec after
SortMergeJoin in contrast the test cases
+ // when mode is Inner, Left, LeftSemi, LeftAnti
+ // Similarly, since partitioning of the left side is not
preserved
+ // when mode is Right, RgihtSemi, RightAnti, Full
+ // - We need to add one additional Hash Repartition and
Roundrobin repartition after
+ // SortMergeJoin in contrast the test cases when mode is
Inner, Left, LeftSemi, LeftAnti
_ => vec![
top_join_plan.as_str(),
+ // Below 4 operators are differences introduced, when join
mode is changed
"SortPreservingRepartitionExec: partitioning=Hash([a@0],
10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"SortExec: expr=[a@0 ASC]",
@@ -3083,7 +3123,7 @@ mod tests {
format!("SortMergeJoin: join_type={join_type},
on=[(b1@6, c@2)]");
let expected = match join_type {
- // Should include 3 RepartitionExecs and 3 SortExecs
+ // Should include 6 RepartitionExecs(3 hash, 3
round-robin) and 3 SortExecs
JoinType::Inner | JoinType::Right => vec![
top_join_plan.as_str(),
join_plan.as_str(),
@@ -3101,8 +3141,8 @@ mod tests {
"RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
],
- // Should include 4 RepartitionExecs and 4 SortExecs
- _ => vec![
+ // Should include 7 RepartitionExecs (4 hash, 3
round-robin) and 4 SortExecs
+ JoinType::Left | JoinType::Full => vec![
top_join_plan.as_str(),
"SortExec: expr=[b1@6 ASC]",
"RepartitionExec: partitioning=Hash([b1@6], 10),
input_partitions=10",
@@ -3121,6 +3161,8 @@ mod tests {
"RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
],
+ // this match arm cannot be reached
+ _ => unreachable!()
};
assert_optimized!(expected, top_join.clone(), true, true);
@@ -3144,7 +3186,7 @@ mod tests {
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
],
// Should include 8 RepartitionExecs (4 of them
preserves order) and 4 SortExecs
- _ => vec![
+ JoinType::Left | JoinType::Full => vec![
top_join_plan.as_str(),
"SortPreservingRepartitionExec:
partitioning=Hash([b1@6], 10), input_partitions=10",
"RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
@@ -3165,6 +3207,8 @@ mod tests {
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
],
+ // this match arm cannot be reached
+ _ => unreachable!()
};
assert_optimized!(
expected_first_sort_enforcement,
@@ -3301,6 +3345,16 @@ mod tests {
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[a@0 ASC]",
];
assert_optimized!(expected, exec, true);
+ // In this case preserving ordering through order preserving operators
is not desirable
+ // (according to flag: bounded_order_preserving_variants)
+ // hence in this case ordering lost during CoalescePartitionsExec and
re-introduced with
+ // SortExec at the top.
+ let expected = &[
+ "SortExec: expr=[a@0 ASC]",
+ "CoalescePartitionsExec",
+ "CoalesceBatchesExec: target_batch_size=4096",
+ "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[a@0 ASC]",
+ ];
assert_optimized!(expected, exec, false);
Ok(())
}
@@ -3435,7 +3489,7 @@ mod tests {
sort_required_exec(filter_exec(sort_exec(sort_key, parquet_exec(),
false)));
let expected = &[
- "SortRequiredExec",
+ "SortRequiredExec: [c@2 ASC]",
"FilterExec: c@2 = 0",
// We can use repartition here, ordering requirement by
SortRequiredExec
// is still satisfied.
@@ -3541,6 +3595,12 @@ mod tests {
];
assert_optimized!(expected, plan.clone(), true);
+
+ let expected = &[
+ "SortExec: expr=[c@2 ASC]",
+ "CoalescePartitionsExec",
+ "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[c@2 ASC]",
+ ];
assert_optimized!(expected, plan, false);
Ok(())
}
@@ -3565,6 +3625,14 @@ mod tests {
];
assert_optimized!(expected, plan.clone(), true);
+
+ let expected = &[
+ "SortExec: expr=[c@2 ASC]",
+ "CoalescePartitionsExec",
+ "UnionExec",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC]",
+ ];
assert_optimized!(expected, plan, false);
Ok(())
}
@@ -3583,7 +3651,7 @@ mod tests {
// during repartitioning ordering is preserved
let expected = &[
- "SortRequiredExec",
+ "SortRequiredExec: [c@2 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC]",
@@ -3619,7 +3687,7 @@ mod tests {
let expected = &[
"UnionExec",
// union input 1: no repartitioning
- "SortRequiredExec",
+ "SortRequiredExec: [c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC]",
// union input 2: should repartition
"FilterExec: c@2 = 0",
@@ -3687,16 +3755,13 @@ mod tests {
("c".to_string(), "c".to_string()),
];
// sorted input
- let plan = sort_preserving_merge_exec(
- sort_key.clone(),
- projection_exec_with_alias(
- parquet_exec_multiple_sorted(vec![sort_key]),
- alias,
- ),
- );
+ let plan = sort_required_exec(projection_exec_with_alias(
+ parquet_exec_multiple_sorted(vec![sort_key]),
+ alias,
+ ));
let expected = &[
- "SortPreservingMergeExec: [c@2 ASC]",
+ "SortRequiredExec: [c@2 ASC]",
// Since this projection is trivial, increasing parallelism is not
beneficial
"ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[c@2 ASC]",
@@ -4186,11 +4251,11 @@ mod tests {
// no parallelization, because SortRequiredExec doesn't benefit from
increased parallelism
let expected_parquet = &[
- "SortRequiredExec",
+ "SortRequiredExec: [c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC]",
];
let expected_csv = &[
- "SortRequiredExec",
+ "SortRequiredExec: [c@2 ASC]",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], has_header=false",
];
@@ -4348,6 +4413,14 @@ mod tests {
];
assert_optimized!(expected, physical_plan.clone(), true);
+
+ let expected = &[
+ "SortExec: expr=[c@2 ASC]",
+ "CoalescePartitionsExec",
+ "FilterExec: c@2 = 0",
+ "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
+ "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[c@2 ASC]",
+ ];
assert_optimized!(expected, physical_plan, false);
Ok(())
@@ -4377,6 +4450,15 @@ mod tests {
];
assert_optimized!(expected, physical_plan.clone(), true);
+
+ let expected = &[
+ "SortExec: expr=[a@0 ASC]",
+ "CoalescePartitionsExec",
+ "SortExec: expr=[a@0 ASC]",
+ "FilterExec: c@2 = 0",
+ "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
+ "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[c@2 ASC]",
+ ];
assert_optimized!(expected, physical_plan, false);
Ok(())
@@ -4404,6 +4486,86 @@ mod tests {
Ok(())
}
+ #[test]
+ fn do_not_put_sort_when_input_is_invalid() -> Result<()> {
+ let schema = schema();
+ let sort_key = vec![PhysicalSortExpr {
+ expr: col("a", &schema).unwrap(),
+ options: SortOptions::default(),
+ }];
+ let input = parquet_exec();
+ let physical_plan = sort_required_exec_with_req(filter_exec(input),
sort_key);
+ let expected = &[
+ // Ordering requirement of sort required exec is NOT satisfied
+ // by existing ordering at the source.
+ "SortRequiredExec: [a@0 ASC]",
+ "FilterExec: c@2 = 0",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e]",
+ ];
+ assert_plan_txt!(expected, physical_plan);
+
+ let expected = &[
+ "SortRequiredExec: [a@0 ASC]",
+ // Since at the start of the rule ordering requirement is not
satisfied
+ // EnforceDistribution rule doesn't satisfy this requirement
either.
+ "FilterExec: c@2 = 0",
+ "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e]",
+ ];
+
+ let mut config = ConfigOptions::new();
+ config.execution.target_partitions = 10;
+ config.optimizer.enable_round_robin_repartition = true;
+ config.optimizer.bounded_order_preserving_variants = false;
+ let distribution_plan =
+ EnforceDistribution::new().optimize(physical_plan, &config)?;
+ assert_plan_txt!(expected, distribution_plan);
+
+ Ok(())
+ }
+
+ #[test]
+ fn put_sort_when_input_is_valid() -> Result<()> {
+ let schema = schema();
+ let sort_key = vec![PhysicalSortExpr {
+ expr: col("a", &schema).unwrap(),
+ options: SortOptions::default(),
+ }];
+ let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
+ let physical_plan = sort_required_exec_with_req(filter_exec(input),
sort_key);
+
+ let expected = &[
+ // Ordering requirement of sort required exec is satisfied
+ // by existing ordering at the source.
+ "SortRequiredExec: [a@0 ASC]",
+ "FilterExec: c@2 = 0",
+ "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[a@0 ASC]",
+ ];
+ assert_plan_txt!(expected, physical_plan);
+
+ let expected = &[
+ "SortRequiredExec: [a@0 ASC]",
+ // Since at the start of the rule ordering requirement is satisfied
+ // EnforceDistribution rule satisfy this requirement also.
+ // ordering is re-satisfied by introduction of SortExec.
+ "SortExec: expr=[a@0 ASC]",
+ "FilterExec: c@2 = 0",
+ // ordering is lost here
+ "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
+ "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[a@0 ASC]",
+ ];
+
+ let mut config = ConfigOptions::new();
+ config.execution.target_partitions = 10;
+ config.optimizer.enable_round_robin_repartition = true;
+ config.optimizer.bounded_order_preserving_variants = false;
+ let distribution_plan =
+ EnforceDistribution::new().optimize(physical_plan, &config)?;
+ assert_plan_txt!(expected, distribution_plan);
+
+ Ok(())
+ }
+
#[test]
fn do_not_add_unnecessary_hash() -> Result<()> {
let schema = schema();
@@ -4458,4 +4620,51 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn optimize_away_unnecessary_repartition() -> Result<()> {
+ let physical_plan =
coalesce_partitions_exec(repartition_exec(parquet_exec()));
+ let expected = &[
+ "CoalescePartitionsExec",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e]",
+ ];
+ plans_matches_expected!(expected, physical_plan.clone());
+
+ let expected =
+ &["ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e]"];
+
+ assert_optimized!(expected, physical_plan.clone(), true);
+ assert_optimized!(expected, physical_plan, false);
+
+ Ok(())
+ }
+
+ #[test]
+ fn optimize_away_unnecessary_repartition2() -> Result<()> {
+ let physical_plan =
filter_exec(repartition_exec(coalesce_partitions_exec(
+ filter_exec(repartition_exec(parquet_exec())),
+ )));
+ let expected = &[
+ "FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " CoalescePartitionsExec",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
+ ];
+ plans_matches_expected!(expected, physical_plan.clone());
+
+ let expected = &[
+ "FilterExec: c@2 = 0",
+ "FilterExec: c@2 = 0",
+ "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e]",
+ ];
+
+ assert_optimized!(expected, physical_plan.clone(), true);
+ assert_optimized!(expected, physical_plan, false);
+
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index a149330181..c4b72a7cb3 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -2035,7 +2035,6 @@ mod tests {
let orig_plan =
Arc::new(SortExec::new(sort_exprs, repartition)) as Arc<dyn
ExecutionPlan>;
let actual = get_plan_string(&orig_plan);
- println!("{:?}", actual);
let expected_input = vec![
"SortExec: expr=[nullable_col@0 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
diff --git a/datafusion/core/src/physical_optimizer/mod.rs
b/datafusion/core/src/physical_optimizer/mod.rs
index 0801a9bc59..9e22bff340 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -28,6 +28,7 @@ pub mod enforce_distribution;
pub mod enforce_sorting;
pub mod join_selection;
pub mod optimizer;
+pub mod output_requirements;
pub mod pipeline_checker;
pub mod pruning;
pub mod replace_with_order_preserving_variants;
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs
b/datafusion/core/src/physical_optimizer/optimizer.rs
index 5de70efe3c..95035e5f81 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -26,6 +26,7 @@ use
crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAgg
use crate::physical_optimizer::enforce_distribution::EnforceDistribution;
use crate::physical_optimizer::enforce_sorting::EnforceSorting;
use crate::physical_optimizer::join_selection::JoinSelection;
+use crate::physical_optimizer::output_requirements::OutputRequirements;
use crate::physical_optimizer::pipeline_checker::PipelineChecker;
use crate::physical_optimizer::topk_aggregation::TopKAggregation;
use crate::{error::Result, physical_plan::ExecutionPlan};
@@ -68,6 +69,9 @@ impl PhysicalOptimizer {
/// Create a new optimizer using the recommended list of rules
pub fn new() -> Self {
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
+ // If there is a output requirement of the query, make sure that
+ // this information is not lost across different rules during
optimization.
+ Arc::new(OutputRequirements::new_add_mode()),
Arc::new(AggregateStatistics::new()),
// Statistics-based join selection will change the Auto mode to a
real join implementation,
// like collect left, or hash join, or future sort merge join,
which will influence the
@@ -90,6 +94,9 @@ impl PhysicalOptimizer {
// The CoalesceBatches rule will not influence the distribution
and ordering of the
// whole plan tree. Therefore, to avoid influencing other rules,
it should run last.
Arc::new(CoalesceBatches::new()),
+ // Remove the ancillary output requirement operator since we are
done with the planning
+ // phase.
+ Arc::new(OutputRequirements::new_remove_mode()),
// The PipelineChecker rule will reject non-runnable query plans
that use
// pipeline-breaking operators on infinite input(s). The rule
generates a
// diagnostic error message when this happens. It makes no changes
to the
diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs
b/datafusion/core/src/physical_optimizer/output_requirements.rs
new file mode 100644
index 0000000000..4b687d7f35
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/output_requirements.rs
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The GlobalOrderRequire optimizer rule either:
+//! - Adds an auxiliary `OutputRequirementExec` operator to keep track of
global
+//! ordering and distribution requirement across rules, or
+//! - Removes the auxiliary `OutputRequirementExec` operator from the physical
plan.
+//! Since the `OutputRequirementExec` operator is only a helper operator, it
+//! shouldn't occur in the final plan (i.e. the executed plan).
+
+use std::sync::Arc;
+
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
+
+use arrow_schema::SchemaRef;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::{Result, Statistics};
+use datafusion_physical_expr::{
+ Distribution, LexOrderingReq, PhysicalSortExpr, PhysicalSortRequirement,
+};
+use
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+
+/// This rule either adds or removes [`OutputRequirements`]s to/from the
physical
+/// plan according to its `mode` attribute, which is set by the constructors
+/// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of
+/// the global requirements (ordering and distribution) across rules.
+///
+/// The primary usecase of this node and rule is to specify and preserve the
desired output
+/// ordering and distribution the entire plan. When sending to a single
client, a single partition may
+/// be desirable, but when sending to a multi-partitioned writer, keeping
multiple partitions may be
+/// better.
+#[derive(Debug)]
+pub struct OutputRequirements {
+ mode: RuleMode,
+}
+
+impl OutputRequirements {
+ /// Create a new rule which works in `Add` mode; i.e. it simply adds a
+ /// top-level [`OutputRequirementExec`] into the physical plan to keep
track
+ /// of global ordering and distribution requirements if there are any.
+ /// Note that this rule should run at the beginning.
+ pub fn new_add_mode() -> Self {
+ Self {
+ mode: RuleMode::Add,
+ }
+ }
+
+ /// Create a new rule which works in `Remove` mode; i.e. it simply removes
+ /// the top-level [`OutputRequirementExec`] from the physical plan if
there is
+ /// any. We do this because a `OutputRequirementExec` is an ancillary,
+ /// non-executable operator whose sole purpose is to track global
+ /// requirements during optimization. Therefore, a
+ /// `OutputRequirementExec` should not appear in the final plan.
+ pub fn new_remove_mode() -> Self {
+ Self {
+ mode: RuleMode::Remove,
+ }
+ }
+}
+
+#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Hash)]
+enum RuleMode {
+ Add,
+ Remove,
+}
+
+/// An ancillary, non-executable operator whose sole purpose is to track global
+/// requirements during optimization. It imposes
+/// - the ordering requirement in its `order_requirement` attribute.
+/// - the distribution requirement in its `dist_requirement` attribute.
+///
+/// See [`OutputRequirements`] for more details
+#[derive(Debug)]
+struct OutputRequirementExec {
+ input: Arc<dyn ExecutionPlan>,
+ order_requirement: Option<LexOrderingReq>,
+ dist_requirement: Distribution,
+}
+
+impl OutputRequirementExec {
+ fn new(
+ input: Arc<dyn ExecutionPlan>,
+ requirements: Option<LexOrderingReq>,
+ dist_requirement: Distribution,
+ ) -> Self {
+ Self {
+ input,
+ order_requirement: requirements,
+ dist_requirement,
+ }
+ }
+
+ fn input(&self) -> Arc<dyn ExecutionPlan> {
+ self.input.clone()
+ }
+}
+
+impl DisplayAs for OutputRequirementExec {
+ fn fmt_as(
+ &self,
+ _t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ write!(f, "OutputRequirementExec")
+ }
+}
+
+impl ExecutionPlan for OutputRequirementExec {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+
+ fn output_partitioning(&self) -> crate::physical_plan::Partitioning {
+ self.input.output_partitioning()
+ }
+
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false]
+ }
+
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ vec![self.dist_requirement.clone()]
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ self.input.output_ordering()
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![self.input.clone()]
+ }
+
+ fn required_input_ordering(&self) ->
Vec<Option<Vec<PhysicalSortRequirement>>> {
+ vec![self.order_requirement.clone()]
+ }
+
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ // Has a single child
+ Ok(children[0])
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(Self::new(
+ children.remove(0), // has a single child
+ self.order_requirement.clone(),
+ self.dist_requirement.clone(),
+ )))
+ }
+
+ fn execute(
+ &self,
+ _partition: usize,
+ _context: Arc<crate::execution::context::TaskContext>,
+ ) -> Result<crate::physical_plan::SendableRecordBatchStream> {
+ unreachable!();
+ }
+
+ fn statistics(&self) -> Statistics {
+ self.input.statistics()
+ }
+}
+
+impl PhysicalOptimizerRule for OutputRequirements {
+ fn optimize(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ _config: &ConfigOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ match self.mode {
+ RuleMode::Add => require_top_ordering(plan),
+ RuleMode::Remove => plan.transform_up(&|plan| {
+ if let Some(sort_req) =
+ plan.as_any().downcast_ref::<OutputRequirementExec>()
+ {
+ Ok(Transformed::Yes(sort_req.input()))
+ } else {
+ Ok(Transformed::No(plan))
+ }
+ }),
+ }
+ }
+
+ fn name(&self) -> &str {
+ "OutputRequirements"
+ }
+
+ fn schema_check(&self) -> bool {
+ true
+ }
+}
+
+/// This functions adds ancillary `OutputRequirementExec` to the the physical
plan, so that
+/// global requirements are not lost during optimization.
+fn require_top_ordering(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn
ExecutionPlan>> {
+ let (new_plan, is_changed) = require_top_ordering_helper(plan)?;
+ if is_changed {
+ Ok(new_plan)
+ } else {
+ // Add `OutputRequirementExec` to the top, with no specified ordering
and distribution requirement.
+ Ok(Arc::new(OutputRequirementExec::new(
+ new_plan,
+ // there is no ordering requirement
+ None,
+ Distribution::UnspecifiedDistribution,
+ )) as _)
+ }
+}
+
+/// Helper function that adds an ancillary `OutputRequirementExec` to the
given plan.
+/// First entry in the tuple is resulting plan, second entry indicates whether
any
+/// `OutputRequirementExec` is added to the plan.
+fn require_top_ordering_helper(
+ plan: Arc<dyn ExecutionPlan>,
+) -> Result<(Arc<dyn ExecutionPlan>, bool)> {
+ let mut children = plan.children();
+ // Global ordering defines desired ordering in the final result.
+ if children.len() != 1 {
+ Ok((plan, false))
+ } else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+ let req_ordering = sort_exec.output_ordering().unwrap_or(&[]);
+ let req_dist = sort_exec.required_input_distribution()[0].clone();
+ let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering);
+ Ok((
+ Arc::new(OutputRequirementExec::new(plan, Some(reqs), req_dist))
as _,
+ true,
+ ))
+ } else if let Some(spm) =
plan.as_any().downcast_ref::<SortPreservingMergeExec>() {
+ let reqs = PhysicalSortRequirement::from_sort_exprs(spm.expr());
+ Ok((
+ Arc::new(OutputRequirementExec::new(
+ plan,
+ Some(reqs),
+ Distribution::SinglePartition,
+ )) as _,
+ true,
+ ))
+ } else if plan.maintains_input_order()[0]
+ && plan.required_input_ordering()[0].is_none()
+ {
+ // Keep searching for a `SortExec` as long as ordering is maintained,
+ // and on-the-way operators do not themselves require an ordering.
+ // When an operator requires an ordering, any `SortExec` below can not
+ // be responsible for (i.e. the originator of) the global ordering.
+ let (new_child, is_changed) =
+ require_top_ordering_helper(children.swap_remove(0))?;
+ Ok((plan.with_new_children(vec![new_child])?, is_changed))
+ } else {
+ // Stop searching, there is no global ordering desired for the query.
+ Ok((plan, false))
+ }
+}
diff --git a/datafusion/core/src/physical_optimizer/utils.rs
b/datafusion/core/src/physical_optimizer/utils.rs
index 21c976e07a..0d6c85f9f2 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -73,6 +73,30 @@ impl ExecTree {
}
}
+/// Get `ExecTree` for each child of the plan if they are tracked.
+/// # Arguments
+///
+/// * `n_children` - Children count of the plan of interest
+/// * `onward` - Contains `Some(ExecTree)` of the plan tracked.
+/// - Contains `None` is plan is not tracked.
+///
+/// # Returns
+///
+/// A `Vec<Option<ExecTree>>` that contains tracking information of each child.
+/// If a child is `None`, it is not tracked. If `Some(ExecTree)` child is
tracked also.
+pub(crate) fn get_children_exectrees(
+ n_children: usize,
+ onward: &Option<ExecTree>,
+) -> Vec<Option<ExecTree>> {
+ let mut children_onward = vec![None; n_children];
+ if let Some(exec_tree) = &onward {
+ for child in &exec_tree.children {
+ children_onward[child.idx] = Some(child.clone());
+ }
+ }
+ children_onward
+}
+
/// This utility function adds a `SortExec` above an operator according to the
/// given ordering requirements while preserving the original partitioning.
pub fn add_sort_above(
diff --git
a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
index 79214092fa..73085937cb 100644
--- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
@@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+use std::ops::Deref;
+use std::sync::Arc;
+
use arrow::array::{Int32Builder, Int64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
-use async_trait::async_trait;
use datafusion::datasource::provider::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionContext, SessionState,
TaskContext};
@@ -32,10 +34,10 @@ use datafusion::physical_plan::{
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
use datafusion_common::cast::as_primitive_array;
-use datafusion_common::{not_impl_err, DataFusionError};
+use datafusion_common::{internal_err, not_impl_err, DataFusionError};
use datafusion_expr::expr::{BinaryExpr, Cast};
-use std::ops::Deref;
-use std::sync::Arc;
+
+use async_trait::async_trait;
fn create_batch(value: i32, num_rows: usize) -> Result<RecordBatch> {
let mut builder = Int32Builder::with_capacity(num_rows);
@@ -96,9 +98,14 @@ impl ExecutionPlan for CustomPlan {
fn with_new_children(
self: Arc<Self>,
- _: Vec<Arc<dyn ExecutionPlan>>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- unreachable!()
+ // CustomPlan has no children
+ if children.is_empty() {
+ Ok(self)
+ } else {
+ internal_err!("Children cannot be replaced in {self:?}")
+ }
}
fn execute(
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs
b/datafusion/physical-plan/src/coalesce_partitions.rs
index 8eddf57ae5..646d42795b 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -104,6 +104,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
self.input.equivalence_properties()
}
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false]
+ }
+
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/physical-plan/src/memory.rs
b/datafusion/physical-plan/src/memory.rs
index b29c8e9c7b..1dcdae56cf 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -126,9 +126,14 @@ impl ExecutionPlan for MemoryExec {
fn with_new_children(
self: Arc<Self>,
- _: Vec<Arc<dyn ExecutionPlan>>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- internal_err!("Children cannot be replaced in {self:?}")
+ // MemoryExec has no children
+ if children.is_empty() {
+ Ok(self)
+ } else {
+ internal_err!("Children cannot be replaced in {self:?}")
+ }
}
fn execute(
diff --git a/datafusion/physical-plan/src/streaming.rs
b/datafusion/physical-plan/src/streaming.rs
index 00809b71e4..cb972fa41e 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -163,9 +163,13 @@ impl ExecutionPlan for StreamingTableExec {
fn with_new_children(
self: Arc<Self>,
- _children: Vec<Arc<dyn ExecutionPlan>>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- internal_err!("Children cannot be replaced in {self:?}")
+ if children.is_empty() {
+ Ok(self)
+ } else {
+ internal_err!("Children cannot be replaced in {self:?}")
+ }
}
fn execute(
diff --git a/datafusion/sqllogictest/test_files/explain.slt
b/datafusion/sqllogictest/test_files/explain.slt
index b1ba1eb36d..27ab8671e9 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -242,12 +242,16 @@ logical_plan after eliminate_projection SAME TEXT AS ABOVE
logical_plan after push_down_limit SAME TEXT AS ABOVE
logical_plan TableScan: simple_explain_test projection=[a, b, c]
initial_physical_plan CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], has_header=true
+physical_plan after OutputRequirements
+OutputRequirementExec
+--CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], has_header=true
physical_plan after aggregate_statistics SAME TEXT AS ABOVE
physical_plan after join_selection SAME TEXT AS ABOVE
physical_plan after EnforceDistribution SAME TEXT AS ABOVE
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
physical_plan after EnforceSorting SAME TEXT AS ABOVE
physical_plan after coalesce_batches SAME TEXT AS ABOVE
+physical_plan after OutputRequirements CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], has_header=true
physical_plan after PipelineChecker SAME TEXT AS ABOVE
physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], has_header=true
diff --git a/datafusion/sqllogictest/test_files/groupby.slt
b/datafusion/sqllogictest/test_files/groupby.slt
index ffef93837b..5bb0f31ed5 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -2014,9 +2014,9 @@ Sort: l.col0 ASC NULLS LAST
----------TableScan: tab0 projection=[col0, col1]
physical_plan
SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
---ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY [r.col0
ASC NULLS LAST]@3 as last_col1]
-----AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1,
col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallyOrdered
-------SortExec: expr=[col0@0 ASC NULLS LAST]
+--SortExec: expr=[col0@0 ASC NULLS LAST]
+----ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY [r.col0
ASC NULLS LAST]@3 as last_col1]
+------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as
col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4),
input_partitions=4
------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1,
col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallyOrdered
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 3d9f7511be..b6325fd889 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3197,6 +3197,72 @@ SELECT a_new, d, rn1 FROM (SELECT d, a as a_new,
0 0 4
0 1 5
+query TT
+EXPLAIN SELECT SUM(a) OVER(partition by a, b order by c) as sum1,
+SUM(a) OVER(partition by b, a order by c) as sum2,
+ SUM(a) OVER(partition by a, d order by b) as sum3,
+ SUM(a) OVER(partition by d order by a) as sum4
+FROM annotated_data_infinite2;
+----
+logical_plan
+Projection: SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [annota
[...]
+--WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64))
PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----Projection: annotated_data_infinite2.a, annotated_data_infinite2.d,
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW,
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(anno [...]
+------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64))
PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS
Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d]
ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS
Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b]
ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW]]
+------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
+physical_plan
+ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@2 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@4 as sum2, SUM(annotated_data_infinite2.a) PARTIT [...]
+--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d]
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), f [...]
+----ProjectionExec: expr=[a@0 as a, d@3 as d, SUM(annotated_data_infinite2.a)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@4 as SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW, SUM(annotated_data_infinite2.a [...]
+------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, nullable: [...]
+--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, nullable [...]
+----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, nullab [...]
+------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS
LAST, c@2 ASC NULLS LAST], has_header=true
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+# re-execute the same query in multi partitions.
+# final plan should still be streamable
+query TT
+EXPLAIN SELECT SUM(a) OVER(partition by a, b order by c) as sum1,
+ SUM(a) OVER(partition by b, a order by c) as sum2,
+ SUM(a) OVER(partition by a, d order by b) as sum3,
+ SUM(a) OVER(partition by d order by a) as sum4
+FROM annotated_data_infinite2;
+----
+logical_plan
+Projection: SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [annota
[...]
+--WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64))
PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----Projection: annotated_data_infinite2.a, annotated_data_infinite2.d,
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW,
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(anno [...]
+------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64))
PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS
Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d]
ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS
Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b]
ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW]]
+------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
+physical_plan
+ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@2 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@4 as sum2, SUM(annotated_data_infinite2.a) PARTIT [...]
+--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d]
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), f [...]
+----CoalesceBatchesExec: target_batch_size=4096
+------SortPreservingRepartitionExec: partitioning=Hash([d@1], 2),
input_partitions=2
+--------ProjectionExec: expr=[a@0 as a, d@3 as d,
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW,
SUM(annotated_data_infinit [...]
+----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, nullab [...]
+------------CoalesceBatchesExec: target_batch_size=4096
+--------------SortPreservingRepartitionExec: partitioning=Hash([b@1, a@0], 2),
input_partitions=2
+----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, [...]
+------------------CoalesceBatchesExec: target_batch_size=4096
+--------------------SortPreservingRepartitionExec: partitioning=Hash([a@0,
d@3], 2), input_partitions=2
+----------------------BoundedWindowAggExec:
wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: I [...]
+------------------------CoalesceBatchesExec: target_batch_size=4096
+--------------------------SortPreservingRepartitionExec:
partitioning=Hash([a@0, b@1], 2), input_partitions=2
+----------------------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS
LAST, c@2 ASC NULLS LAST], has_header=true
+
+# reset the partition number 1 again
+statement ok
+set datafusion.execution.target_partitions = 1;
+
statement ok
drop table annotated_data_finite2