This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 509ad090b4 Improvement: keep order-preserving repartitions for
streaming aggregates (#21107)
509ad090b4 is described below
commit 509ad090b4805d835156adae57f9a74d60cbdd42
Author: xudong.w <[email protected]>
AuthorDate: Thu Mar 26 11:46:49 2026 +0800
Improvement: keep order-preserving repartitions for streaming aggregates
(#21107)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes #.
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
This PR updates `EnforceDistribution` to keep order-preserving
repartition variants when preserving input ordering allows the parent
operator to remain incremental/streaming.
Previously, order-preserving variants could be removed when
`prefer_existing_sort = false` or when there was no explicit ordering
requirement, even if dropping the ordering would force a parent operator
such as `AggregateExec` to fall back to blocking execution. This change
adds a targeted `preserving_order_enables_streaming` check and uses it
to avoid replacing `RepartitionExec(..., preserve_order=true)` /
`SortPreservingMergeExec` when that preserved ordering is what enables
streaming behavior.
As a result, the optimizer now prefers keeping order-preserving
repartitioning in these cases, and the updated sqllogictests reflect the
new physical plans: instead of inserting a `SortExec` above a plain
repartition, plans now retain `RepartitionExec(... preserve_order=true)`
so sorted or partially sorted aggregates can continue running
incrementally.
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
No extra sort needed for these cases
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../physical_optimizer/enforce_distribution.rs | 132 +++++++++++++++++++++
.../physical-optimizer/src/enforce_distribution.rs | 51 +++++++-
.../test_files/agg_func_substitute.slt | 27 ++---
datafusion/sqllogictest/test_files/group_by.slt | 18 ++-
.../test_files/preserve_file_partitioning.slt | 26 ++--
.../test_files/repartition_subset_satisfaction.slt | 24 ++--
datafusion/sqllogictest/test_files/unnest.slt | 17 ++-
7 files changed, 233 insertions(+), 62 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index e14dc389d1..3a6106c453 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -46,6 +46,8 @@ use datafusion_common::tree_node::{
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_expr::{JoinType, Operator};
+use datafusion_functions_aggregate::count::count_udaf;
+use datafusion_physical_expr::aggregate::AggregateExprBuilder;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal,
binary, lit};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{
@@ -462,6 +464,71 @@ fn aggregate_exec_with_alias(
)
}
+fn partitioned_count_aggregate_exec(
+ input: Arc<dyn ExecutionPlan>,
+ group_alias_pairs: Vec<(String, String)>,
+ count_column: &str,
+) -> Arc<dyn ExecutionPlan> {
+ let input_schema = input.schema();
+ let group_by_expr = group_alias_pairs
+ .iter()
+ .map(|(column, alias)| {
+ (
+ col(column, &input_schema).unwrap() as Arc<dyn PhysicalExpr>,
+ alias.clone(),
+ )
+ })
+ .collect::<Vec<_>>();
+ let partial_group_by = PhysicalGroupBy::new_single(group_by_expr.clone());
+ let final_group_by = PhysicalGroupBy::new_single(
+ group_by_expr
+ .iter()
+ .enumerate()
+ .map(|(idx, (_expr, alias))| {
+ (
+ Arc::new(Column::new(alias, idx)) as Arc<dyn PhysicalExpr>,
+ alias.clone(),
+ )
+ })
+ .collect::<Vec<_>>(),
+ );
+
+ let aggr_expr = vec![Arc::new(
+ AggregateExprBuilder::new(
+ count_udaf(),
+ vec![col(count_column, &input_schema).unwrap()],
+ )
+ .schema(Arc::clone(&input_schema))
+ .alias(format!("COUNT({count_column})"))
+ .build()
+ .unwrap(),
+ )];
+
+ let partial = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Partial,
+ partial_group_by,
+ aggr_expr.clone(),
+ vec![None],
+ input,
+ Arc::clone(&input_schema),
+ )
+ .unwrap(),
+ );
+
+ Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::FinalPartitioned,
+ final_group_by,
+ aggr_expr,
+ vec![None],
+ Arc::clone(&partial) as _,
+ partial.schema(),
+ )
+ .unwrap(),
+ )
+}
+
fn hash_join_exec(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
@@ -3322,6 +3389,71 @@ fn preserve_ordering_through_repartition() -> Result<()>
{
Ok(())
}
+#[test]
+fn preserve_ordering_for_streaming_sorted_aggregate() -> Result<()> {
+ let schema = schema();
+ let sort_key: LexOrdering = [PhysicalSortExpr {
+ expr: col("a", &schema)?,
+ options: SortOptions::default(),
+ }]
+ .into();
+ let input = parquet_exec_multiple_sorted(vec![sort_key]);
+ let physical_plan = partitioned_count_aggregate_exec(
+ input,
+ vec![("a".to_string(), "a".to_string())],
+ "b",
+ );
+
+ let test_config = TestConfig::default().with_query_execution_partitions(2);
+
+ let plan_distrib = test_config.to_plan(physical_plan.clone(),
&DISTRIB_DISTRIB_SORT);
+ assert_plan!(plan_distrib, @r"
+ AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[COUNT(b)],
ordering_mode=Sorted
+ RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2,
preserve_order=true, sort_exprs=a@0 ASC
+ AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[COUNT(b)],
ordering_mode=Sorted
+ DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+ ");
+
+ let plan_sort = test_config.to_plan(physical_plan, &SORT_DISTRIB_DISTRIB);
+ assert_plan!(plan_distrib, plan_sort);
+
+ Ok(())
+}
+
+#[test]
+fn preserve_ordering_for_streaming_partially_sorted_aggregate() -> Result<()> {
+ let schema = schema();
+ let sort_key: LexOrdering = [PhysicalSortExpr {
+ expr: col("a", &schema)?,
+ options: SortOptions::default(),
+ }]
+ .into();
+ let input = parquet_exec_multiple_sorted(vec![sort_key]);
+ let physical_plan = partitioned_count_aggregate_exec(
+ input,
+ vec![
+ ("a".to_string(), "a".to_string()),
+ ("b".to_string(), "b".to_string()),
+ ],
+ "c",
+ );
+
+ let test_config = TestConfig::default().with_query_execution_partitions(2);
+
+ let plan_distrib = test_config.to_plan(physical_plan.clone(),
&DISTRIB_DISTRIB_SORT);
+ assert_plan!(plan_distrib, @r"
+ AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b],
aggr=[COUNT(c)], ordering_mode=PartiallySorted([0])
+ RepartitionExec: partitioning=Hash([a@0, b@1], 2), input_partitions=2,
preserve_order=true, sort_exprs=a@0 ASC
+ AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b],
aggr=[COUNT(c)], ordering_mode=PartiallySorted([0])
+ DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
+ ");
+
+ let plan_sort = test_config.to_plan(physical_plan, &SORT_DISTRIB_DISTRIB);
+ assert_plan!(plan_distrib, plan_sort);
+
+ Ok(())
+}
+
#[test]
fn do_not_preserve_ordering_through_repartition() -> Result<()> {
let schema = schema();
diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs
b/datafusion/physical-optimizer/src/enforce_distribution.rs
index 0ea1b766cd..504197a2de 100644
--- a/datafusion/physical-optimizer/src/enforce_distribution.rs
+++ b/datafusion/physical-optimizer/src/enforce_distribution.rs
@@ -928,6 +928,43 @@ fn add_hash_on_top(
///
/// * `input`: Current node.
///
+/// Checks whether preserving the child's ordering enables the parent to
+/// run in streaming mode. Compares the parent's pipeline behavior with
+/// the ordered child vs. an unordered (coalesced) child. If removing the
+/// ordering would cause the parent to switch from streaming to blocking,
+/// keeping the order-preserving variant is beneficial.
+///
+/// Only applicable to single-child operators; returns `Ok(false)` for
+/// multi-child operators (e.g. joins) where child substitution semantics are
+/// ambiguous.
+fn preserving_order_enables_streaming(
+ parent: &Arc<dyn ExecutionPlan>,
+ ordered_child: &Arc<dyn ExecutionPlan>,
+) -> Result<bool> {
+ // Only applicable to single-child operators that maintain input order
+ // (e.g. AggregateExec in PartiallySorted mode). Operators that don't
+ // maintain input order (e.g. SortExec) handle ordering themselves —
+ // preserving SPM for them is unnecessary.
+ if parent.children().len() != 1 {
+ return Ok(false);
+ }
+ if !parent.maintains_input_order()[0] {
+ return Ok(false);
+ }
+ // Build parent with the ordered child
+ let with_ordered =
+ Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)])?;
+ if with_ordered.pipeline_behavior() == EmissionType::Final {
+ // Parent is blocking even with ordering — no benefit
+ return Ok(false);
+ }
+ // Build parent with an unordered child via CoalescePartitionsExec.
+ let unordered_child: Arc<dyn ExecutionPlan> =
+ Arc::new(CoalescePartitionsExec::new(Arc::clone(ordered_child)));
+ let without_ordered =
Arc::clone(parent).with_new_children(vec![unordered_child])?;
+ Ok(without_ordered.pipeline_behavior() == EmissionType::Final)
+}
+
/// # Returns
///
/// Updated node with an execution plan, where the desired single distribution
@@ -1340,6 +1377,12 @@ pub fn ensure_distribution(
}
};
+ let streaming_benefit = if child.data {
+ preserving_order_enables_streaming(&plan, &child.plan)?
+ } else {
+ false
+ };
+
// There is an ordering requirement of the operator:
if let Some(required_input_ordering) = required_input_ordering {
// Either:
@@ -1352,6 +1395,7 @@ pub fn ensure_distribution(
.ordering_satisfy_requirement(sort_req.clone())?;
if (!ordering_satisfied ||
!order_preserving_variants_desirable)
+ && !streaming_benefit
&& child.data
{
child = replace_order_preserving_variants(child)?;
@@ -1372,6 +1416,11 @@ pub fn ensure_distribution(
// Stop tracking distribution changing operators
child.data = false;
} else {
+ let streaming_benefit = if child.data {
+ preserving_order_enables_streaming(&plan, &child.plan)?
+ } else {
+ false
+ };
// no ordering requirement
match requirement {
// Operator requires specific distribution.
@@ -1380,7 +1429,7 @@ pub fn ensure_distribution(
// ordering is pointless. However, if it does maintain
// input order, we keep order-preserving variants so
// ordering can flow through to ancestors that need it.
- if !maintains {
+ if !maintains && !streaming_benefit {
child = replace_order_preserving_variants(child)?;
}
}
diff --git a/datafusion/sqllogictest/test_files/agg_func_substitute.slt
b/datafusion/sqllogictest/test_files/agg_func_substitute.slt
index 2b33452184..e0199c8250 100644
--- a/datafusion/sqllogictest/test_files/agg_func_substitute.slt
+++ b/datafusion/sqllogictest/test_files/agg_func_substitute.slt
@@ -45,11 +45,10 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[a@0 as a,
nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c
ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
-03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
-05)--------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
-06)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1, maintains_sort_order=true
-07)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv,
has_header=true
+03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4,
preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
+04)------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+05)--------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1, maintains_sort_order=true
+06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv,
has_header=true
query TT
@@ -64,11 +63,10 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[a@0 as a,
nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c
ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
-03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
-05)--------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
-06)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1, maintains_sort_order=true
-07)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv,
has_header=true
+03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4,
preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
+04)------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+05)--------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1, maintains_sort_order=true
+06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv,
has_header=true
query TT
EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1 + 100] as result
@@ -82,11 +80,10 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)
+ Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
-03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
-05)--------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
-06)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1, maintains_sort_order=true
-07)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv,
has_header=true
+03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4,
preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
+04)------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+05)--------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1, maintains_sort_order=true
+06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv,
has_header=true
query II
SELECT a, ARRAY_AGG(c ORDER BY c)[1] as result
diff --git a/datafusion/sqllogictest/test_files/group_by.slt
b/datafusion/sqllogictest/test_files/group_by.slt
index 4b1f663bb8..366326479d 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -3971,11 +3971,10 @@ logical_plan
02)--TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
physical_plan
01)AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b],
aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
-02)--SortExec: expr=[c@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
-04)------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b],
aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
-05)--------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1, maintains_sort_order=true
-06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c,
d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])],
file_type=csv, has_header=true
+02)--RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8,
preserve_order=true, sort_exprs=c@0 ASC NULLS LAST
+03)----AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b],
aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
+04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1,
maintains_sort_order=true
+05)--------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c,
d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])],
file_type=csv, has_header=true
# drop table multiple_ordered_table_with_pk
statement ok
@@ -4011,11 +4010,10 @@ logical_plan
02)--TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
physical_plan
01)AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b],
aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
-02)--SortExec: expr=[c@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
-04)------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b],
aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
-05)--------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1, maintains_sort_order=true
-06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c,
d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])],
file_type=csv, has_header=true
+02)--RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8,
preserve_order=true, sort_exprs=c@0 ASC NULLS LAST
+03)----AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b],
aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
+04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1,
maintains_sort_order=true
+05)--------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c,
d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])],
file_type=csv, has_header=true
statement ok
set datafusion.execution.target_partitions = 1;
diff --git a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt
b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt
index cfa71ddfcc..7c24a6bd2a 100644
--- a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt
+++ b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt
@@ -288,10 +288,9 @@ physical_plan
01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*),
avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
-04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
-05)--------RepartitionExec: partitioning=Hash([f_dkey@0], 3),
input_partitions=3
-06)----------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
-07)------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet
+04)------RepartitionExec: partitioning=Hash([f_dkey@0], 3),
input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST
+05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+06)----------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet
# Verify results without optimization
query TIR
@@ -361,16 +360,15 @@ physical_plan
01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, max(d.env)@1 as max(d.env),
max(d.service)@2 as max(d.service), count(Int64(1))@3 as count(*),
sum(f.value)@4 as sum(f.value)]
03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey],
aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)],
ordering_mode=Sorted
-04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
-05)--------RepartitionExec: partitioning=Hash([f_dkey@0], 3),
input_partitions=3
-06)----------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)],
ordering_mode=Sorted
-07)------------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey,
env@0 as env, service@1 as service]
-08)--------------HashJoinExec: mode=CollectLeft, join_type=Inner,
on=[(d_dkey@0, f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4]
-09)----------------CoalescePartitionsExec
-10)------------------FilterExec: service@2 = log
-11)--------------------RepartitionExec: partitioning=RoundRobinBatch(3),
input_partitions=1
-12)----------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]},
projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 =
log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <=
log AND log <= service_max@1, required_guarantees=[service in (log)]
-13)----------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parqu [...]
+04)------RepartitionExec: partitioning=Hash([f_dkey@0], 3),
input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST
+05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey],
aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)],
ordering_mode=Sorted
+06)----------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey, env@0
as env, service@1 as service]
+07)------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0,
f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4]
+08)--------------CoalescePartitionsExec
+09)----------------FilterExec: service@2 = log
+10)------------------RepartitionExec: partitioning=RoundRobinBatch(3),
input_partitions=1
+11)--------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]},
projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 =
log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <=
log AND log <= service_max@1, required_guarantees=[service in (log)]
+12)--------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST],
file_type=parquet [...]
# Verify results without optimization
query TTTIR rowsort
diff --git
a/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt
b/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt
index ac2463237b..fd49fd9004 100644
--- a/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt
+++ b/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt
@@ -162,10 +162,9 @@ physical_plan
01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, time_bin@1 ASC NULLS
LAST]
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as time_bin,
count(Int64(1))@2 as count(*), avg(fact_table_ordered.value)@3 as
avg(fact_table_ordered.value)]
03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
-04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 ASC NULLS LAST],
preserve_partitioning=[true]
-05)--------RepartitionExec: partitioning=Hash([f_dkey@0,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1], 3),
input_partitions=3
-06)----------AggregateExec: mode=Partial, gby=[f_dkey@2 as f_dkey,
date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 },
timestamp@0) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months:
0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
-07)------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]},
projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS
[...]
+04)------RepartitionExec: partitioning=Hash([f_dkey@0,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1], 3),
input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 ASC NULLS LAST
+05)--------AggregateExec: mode=Partial, gby=[f_dkey@2 as f_dkey,
date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 },
timestamp@0) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months:
0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)],
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
+06)----------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]},
projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LA
[...]
# Verify results without subset satisfaction
query TPIR rowsort
@@ -375,15 +374,14 @@ physical_plan
06)----------AggregateExec: mode=Partial, gby=[env@1 as env, time_bin@0 as
time_bin], aggr=[avg(a.max_bin_value)]
07)------------ProjectionExec:
expr=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),j.timestamp)@1 as time_bin, env@2 as env,
max(j.value)@3 as max_bin_value]
08)--------------AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as
f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days:
0, nanoseconds: 30000000000 }"),j.timestamp)@1 as
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),j.timestamp), env@2 as env], aggr=[max(j.value)],
ordering_mode=PartiallySorted([0, 1])
-09)----------------SortExec: expr=[f_dkey@0 ASC NULLS LAST,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),j.timestamp)@1 ASC NULLS LAST],
preserve_partitioning=[true]
-10)------------------RepartitionExec: partitioning=Hash([f_dkey@0,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),j.timestamp)@1, env@2], 3), input_partitions=3
-11)--------------------AggregateExec: mode=Partial, gby=[f_dkey@0 as f_dkey,
date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 },
timestamp@2) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months:
0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@1 as env],
aggr=[max(j.value)], ordering_mode=PartiallySorted([0, 1])
-12)----------------------ProjectionExec: expr=[f_dkey@3 as f_dkey, env@0 as
env, timestamp@1 as timestamp, value@2 as value]
-13)------------------------HashJoinExec: mode=CollectLeft, join_type=Inner,
on=[(d_dkey@1, f_dkey@2)], projection=[env@0, timestamp@2, value@3, f_dkey@4]
-14)--------------------------CoalescePartitionsExec
-15)----------------------------FilterExec: service@1 = log, projection=[env@0,
d_dkey@2]
-16)------------------------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_f [...]
-17)--------------------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]},
projection=[timestamp, value, f_dkey], output_ordering=[f_dke [...]
+09)----------------RepartitionExec: partitioning=Hash([f_dkey@0,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),j.timestamp)@1, env@2], 3), input_partitions=3,
preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST,
date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0,
nanoseconds: 30000000000 }"),j.timestamp)@1 ASC NULLS LAST
+10)------------------AggregateExec: mode=Partial, gby=[f_dkey@0 as f_dkey,
date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 },
timestamp@2) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months:
0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@1 as env],
aggr=[max(j.value)], ordering_mode=PartiallySorted([0, 1])
+11)--------------------ProjectionExec: expr=[f_dkey@3 as f_dkey, env@0 as env,
timestamp@1 as timestamp, value@2 as value]
+12)----------------------HashJoinExec: mode=CollectLeft, join_type=Inner,
on=[(d_dkey@1, f_dkey@2)], projection=[env@0, timestamp@2, value@3, f_dkey@4]
+13)------------------------CoalescePartitionsExec
+14)--------------------------FilterExec: service@1 = log, projection=[env@0,
d_dkey@2]
+15)----------------------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_fil [...]
+16)------------------------DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]},
projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@ [...]
# Verify results without subset satisfaction
query TPR rowsort
diff --git a/datafusion/sqllogictest/test_files/unnest.slt
b/datafusion/sqllogictest/test_files/unnest.slt
index ba499679a9..8cfc01380d 100644
--- a/datafusion/sqllogictest/test_files/unnest.slt
+++ b/datafusion/sqllogictest/test_files/unnest.slt
@@ -987,15 +987,14 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as
generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
-03)----SortExec: expr=[generated_id@0 ASC NULLS LAST],
preserve_partitioning=[true]
-04)------RepartitionExec: partitioning=Hash([generated_id@0], 4),
input_partitions=4
-05)--------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id],
aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
-06)----------ProjectionExec: expr=[generated_id@0 as generated_id,
__unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
-07)------------UnnestExec
-08)--------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as
__unnest_placeholder(make_array(range().value))]
-09)----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1, maintains_sort_order=true
-10)------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
-11)--------------------LazyMemoryExec: partitions=1, batch_generators=[range:
start=1, end=5, batch_size=8192]
+03)----RepartitionExec: partitioning=Hash([generated_id@0], 4),
input_partitions=4, preserve_order=true, sort_exprs=generated_id@0 ASC NULLS
LAST
+04)------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id],
aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
+05)--------ProjectionExec: expr=[generated_id@0 as generated_id,
__unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
+06)----------UnnestExec
+07)------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as
__unnest_placeholder(make_array(range().value))]
+08)--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1, maintains_sort_order=true
+09)----------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
+10)------------------LazyMemoryExec: partitions=1, batch_generators=[range:
start=1, end=5, batch_size=8192]
# Unnest array where data is already ordered by column2 (100, 200, 300, 400)
statement ok
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]