This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9bc61b31ae Increase test coverage for unbounded and bounded cases
(#8581)
9bc61b31ae is described below
commit 9bc61b31ae4f67c55c03214c9b807079e4fe0f44
Author: Mustafa Akur <[email protected]>
AuthorDate: Tue Dec 19 18:22:33 2023 +0300
Increase test coverage for unbounded and bounded cases (#8581)
* Re-introduce unbounded tests with new executor
* Remove unnecessary test
* Enhance test coverage
* Review
* Test passes
* Change argument order
* Parametrize enforce sorting test
* Imports
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
.../core/src/physical_optimizer/enforce_sorting.rs | 92 ++-
.../replace_with_order_preserving_variants.rs | 714 ++++++++++++++++++---
datafusion/core/src/test/mod.rs | 28 +-
3 files changed, 697 insertions(+), 137 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 2b650a4269..2ecc1e11b9 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -60,8 +60,8 @@ use crate::physical_plan::{
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
-
use datafusion_physical_plan::repartition::RepartitionExec;
+
use itertools::izip;
/// This rule inspects [`SortExec`]'s in the given physical plan and removes
the
@@ -769,7 +769,7 @@ mod tests {
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::{displayable, get_plan_string, Partitioning};
use crate::prelude::{SessionConfig, SessionContext};
- use crate::test::{csv_exec_sorted, stream_exec_ordered};
+ use crate::test::{csv_exec_ordered, csv_exec_sorted, stream_exec_ordered};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -777,6 +777,8 @@ mod tests {
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::{col, Column, NotExpr};
+ use rstest::rstest;
+
fn create_test_schema() -> Result<SchemaRef> {
let nullable_column = Field::new("nullable_col", DataType::Int32,
true);
let non_nullable_column = Field::new("non_nullable_col",
DataType::Int32, false);
@@ -2140,12 +2142,19 @@ mod tests {
Ok(())
}
+ #[rstest]
#[tokio::test]
- async fn test_with_lost_ordering_unbounded() -> Result<()> {
+ async fn test_with_lost_ordering_unbounded_bounded(
+ #[values(false, true)] source_unbounded: bool,
+ ) -> Result<()> {
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- // create an unbounded source
- let source = stream_exec_ordered(&schema, sort_exprs);
+ // create either bounded or unbounded source
+ let source = if source_unbounded {
+ stream_exec_ordered(&schema, sort_exprs)
+ } else {
+ csv_exec_ordered(&schema, sort_exprs)
+ };
let repartition_rr = repartition_exec(source);
let repartition_hash = Arc::new(RepartitionExec::try_new(
repartition_rr,
@@ -2154,50 +2163,71 @@ mod tests {
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let physical_plan = sort_exec(vec![sort_expr("a", &schema)],
coalesce_partitions);
- let expected_input = [
+ // Expected inputs unbounded and bounded
+ let expected_input_unbounded = vec![
"SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, b,
c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
];
- let expected_optimized = [
+ let expected_input_bounded = vec![
+ "SortExec: expr=[a@0 ASC]",
+ " CoalescePartitionsExec",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
+ ];
+
+ // Expected unbounded result (same for with and without flag)
+ let expected_optimized_unbounded = vec![
"SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c,
d, e], infinite_source=true, output_ordering=[a@0 ASC]",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
- Ok(())
- }
- #[tokio::test]
- async fn test_with_lost_ordering_unbounded_parallelize_off() -> Result<()>
{
- let schema = create_test_schema3()?;
- let sort_exprs = vec![sort_expr("a", &schema)];
- // create an unbounded source
- let source = stream_exec_ordered(&schema, sort_exprs);
- let repartition_rr = repartition_exec(source);
- let repartition_hash = Arc::new(RepartitionExec::try_new(
- repartition_rr,
- Partitioning::Hash(vec![col("c", &schema).unwrap()], 10),
- )?) as _;
- let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
- let physical_plan = sort_exec(vec![sort_expr("a", &schema)],
coalesce_partitions);
-
- let expected_input = ["SortExec: expr=[a@0 ASC]",
+ // Expected bounded results with and without flag
+ let expected_optimized_bounded = vec![
+ "SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " StreamingTableExec: partition_sizes=1, projection=[a, b,
c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
];
- let expected_optimized = [
+ let expected_optimized_bounded_parallelize_sort = vec![
"SortPreservingMergeExec: [a@0 ASC]",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " StreamingTableExec: partition_sizes=1, projection=[a, b, c,
d, e], infinite_source=true, output_ordering=[a@0 ASC]",
+ " SortExec: expr=[a@0 ASC]",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan,
false);
+ let (expected_input, expected_optimized,
expected_optimized_sort_parallelize) =
+ if source_unbounded {
+ (
+ expected_input_unbounded,
+ expected_optimized_unbounded.clone(),
+ expected_optimized_unbounded,
+ )
+ } else {
+ (
+ expected_input_bounded,
+ expected_optimized_bounded,
+ expected_optimized_bounded_parallelize_sort,
+ )
+ };
+ assert_optimized!(
+ expected_input,
+ expected_optimized,
+ physical_plan.clone(),
+ false
+ );
+ assert_optimized!(
+ expected_input,
+ expected_optimized_sort_parallelize,
+ physical_plan,
+ true
+ );
Ok(())
}
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 671891be43..0ff7e9f48e 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -276,6 +276,9 @@ pub(crate) fn replace_with_order_preserving_variants(
mod tests {
use super::*;
+ use
crate::datasource::file_format::file_compression_type::FileCompressionType;
+ use crate::datasource::listing::PartitionedFile;
+ use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::filter::FilterExec;
@@ -285,35 +288,95 @@ mod tests {
use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{displayable, get_plan_string, Partitioning};
use crate::prelude::SessionConfig;
-
use crate::test::TestStreamPartition;
+
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::tree_node::TreeNode;
- use datafusion_common::Result;
+ use datafusion_common::{Result, Statistics};
+ use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::expressions::{self, col, Column};
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::streaming::StreamingTableExec;
+
use rstest::rstest;
- /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts
the plan
- /// against the original and expected plans.
+ /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts
+ /// the plan against the original and expected plans for both bounded and
+ /// unbounded cases.
///
- /// `$EXPECTED_PLAN_LINES`: input plan
- /// `$EXPECTED_OPTIMIZED_PLAN_LINES`: optimized plan
- /// `$PLAN`: the plan to optimized
- /// `$ALLOW_BOUNDED`: whether to allow the plan to be optimized for
bounded cases
- macro_rules! assert_optimized {
- ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr,
$PLAN: expr) => {
+ /// # Parameters
+ ///
+ /// * `EXPECTED_UNBOUNDED_PLAN_LINES`: Expected input unbounded plan.
+ /// * `EXPECTED_BOUNDED_PLAN_LINES`: Expected input bounded plan.
+ /// * `EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES`: Optimized plan, which is
+ /// the same regardless of the value of the `prefer_existing_sort` flag.
+ /// * `EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES`: Optimized plan when the flag
+ /// `prefer_existing_sort` is `false` for bounded cases.
+ /// * `EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: Optimized
plan
+ /// when the flag `prefer_existing_sort` is `true` for bounded cases.
+ /// * `$PLAN`: The plan to optimize.
+ /// * `$SOURCE_UNBOUNDED`: Whether the given plan contains an unbounded
source.
+ macro_rules! assert_optimized_in_all_boundedness_situations {
+ ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES:
expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr,
$EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr,
$EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr,
$SOURCE_UNBOUNDED: expr) => {
+ if $SOURCE_UNBOUNDED {
+ assert_optimized_prefer_sort_on_off!(
+ $EXPECTED_UNBOUNDED_PLAN_LINES,
+ $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES,
+ $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES,
+ $PLAN
+ );
+ } else {
+ assert_optimized_prefer_sort_on_off!(
+ $EXPECTED_BOUNDED_PLAN_LINES,
+ $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES,
+ $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES,
+ $PLAN
+ );
+ }
+ };
+ }
+
+ /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts
+ /// the plan against the original and expected plans.
+ ///
+ /// # Parameters
+ ///
+ /// * `$EXPECTED_PLAN_LINES`: Expected input plan.
+ /// * `EXPECTED_OPTIMIZED_PLAN_LINES`: Optimized plan when the flag
+ /// `prefer_existing_sort` is `false`.
+ /// * `EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: Optimized plan when
+ /// the flag `prefer_existing_sort` is `true`.
+ /// * `$PLAN`: The plan to optimize.
+ macro_rules! assert_optimized_prefer_sort_on_off {
+ ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr,
$EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => {
assert_optimized!(
$EXPECTED_PLAN_LINES,
$EXPECTED_OPTIMIZED_PLAN_LINES,
- $PLAN,
+ $PLAN.clone(),
false
);
+ assert_optimized!(
+ $EXPECTED_PLAN_LINES,
+ $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES,
+ $PLAN,
+ true
+ );
};
- ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr,
$PLAN: expr, $ALLOW_BOUNDED: expr) => {
+ }
+
+ /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts
+ /// the plan against the original and expected plans.
+ ///
+ /// # Parameters
+ ///
+ /// * `$EXPECTED_PLAN_LINES`: Expected input plan.
+ /// * `$EXPECTED_OPTIMIZED_PLAN_LINES`: Expected optimized plan.
+ /// * `$PLAN`: The plan to optimize.
+ /// * `$PREFER_EXISTING_SORT`: Value of the `prefer_existing_sort` flag.
+ macro_rules! assert_optimized {
+ ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr,
$PLAN: expr, $PREFER_EXISTING_SORT: expr) => {
let physical_plan = $PLAN;
let formatted =
displayable(physical_plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
@@ -329,8 +392,7 @@ mod tests {
let expected_optimized_lines: Vec<&str> =
$EXPECTED_OPTIMIZED_PLAN_LINES.iter().map(|s| *s).collect();
// Run the rule top-down
- // let optimized_physical_plan =
physical_plan.transform_down(&replace_repartition_execs)?;
- let config =
SessionConfig::new().with_prefer_existing_sort($ALLOW_BOUNDED);
+ let config =
SessionConfig::new().with_prefer_existing_sort($PREFER_EXISTING_SORT);
let plan_with_pipeline_fixer =
OrderPreservationContext::new(physical_plan);
let parallel =
plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer|
replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false,
config.options()))?;
let optimized_physical_plan = parallel.plan;
@@ -348,35 +410,67 @@ mod tests {
#[tokio::test]
// Searches for a simple sort and a repartition just after it, the second
repartition with 1 input partition should not be affected
async fn test_replace_multiple_input_repartition_1(
- #[values(false, true)] prefer_existing_sort: bool,
+ #[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = stream_exec_ordered(&schema, sort_exprs);
+ let source = if source_unbounded {
+ stream_exec_ordered(&schema, sort_exprs)
+ } else {
+ csv_exec_sorted(&schema, sort_exprs)
+ };
let repartition =
repartition_exec_hash(repartition_exec_round_robin(source));
let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
- let expected_input = [
+ // Expected inputs unbounded and bounded
+ let expected_input_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- let expected_optimized = [
+ let expected_input_bounded = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " SortExec: expr=[a@0 ASC NULLS LAST]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+
+ // Expected unbounded result (same for with and without flag)
+ let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(
- expected_input,
- expected_optimized,
+
+ // Expected bounded results with and without flag
+ let expected_optimized_bounded = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " SortExec: expr=[a@0 ASC NULLS LAST]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ let expected_optimized_bounded_sort_preserve = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ assert_optimized_in_all_boundedness_situations!(
+ expected_input_unbounded,
+ expected_input_bounded,
+ expected_optimized_unbounded,
+ expected_optimized_bounded,
+ expected_optimized_bounded_sort_preserve,
physical_plan,
- prefer_existing_sort
+ source_unbounded
);
Ok(())
}
@@ -384,11 +478,15 @@ mod tests {
#[rstest]
#[tokio::test]
async fn test_with_inter_children_change_only(
- #[values(false, true)] prefer_existing_sort: bool,
+ #[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr_default("a", &schema)];
- let source = stream_exec_ordered(&schema, sort_exprs);
+ let source = if source_unbounded {
+ stream_exec_ordered(&schema, sort_exprs)
+ } else {
+ csv_exec_sorted(&schema, sort_exprs)
+ };
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -408,7 +506,8 @@ mod tests {
sort2,
);
- let expected_input = [
+ // Expected inputs unbounded and bounded
+ let expected_input_unbounded = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC]",
" FilterExec: c@1 > 3",
@@ -420,8 +519,21 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1,
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]",
];
+ let expected_input_bounded = [
+ "SortPreservingMergeExec: [a@0 ASC]",
+ " SortExec: expr=[a@0 ASC]",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " SortExec: expr=[a@0 ASC]",
+ " CoalescePartitionsExec",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+ ];
- let expected_optimized = [
+ // Expected unbounded result (same for with and without flag)
+ let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC]",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
@@ -431,11 +543,38 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1,
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]",
];
- assert_optimized!(
- expected_input,
- expected_optimized,
+
+ // Expected bounded results with and without flag
+ let expected_optimized_bounded = [
+ "SortPreservingMergeExec: [a@0 ASC]",
+ " SortExec: expr=[a@0 ASC]",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " SortExec: expr=[a@0 ASC]",
+ " CoalescePartitionsExec",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+ ];
+ let expected_optimized_bounded_sort_preserve = [
+ "SortPreservingMergeExec: [a@0 ASC]",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " SortPreservingMergeExec: [a@0 ASC]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+ ];
+ assert_optimized_in_all_boundedness_situations!(
+ expected_input_unbounded,
+ expected_input_bounded,
+ expected_optimized_unbounded,
+ expected_optimized_bounded,
+ expected_optimized_bounded_sort_preserve,
physical_plan,
- prefer_existing_sort
+ source_unbounded
);
Ok(())
}
@@ -443,11 +582,15 @@ mod tests {
#[rstest]
#[tokio::test]
async fn test_replace_multiple_input_repartition_2(
- #[values(false, true)] prefer_existing_sort: bool,
+ #[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = stream_exec_ordered(&schema, sort_exprs);
+ let source = if source_unbounded {
+ stream_exec_ordered(&schema, sort_exprs)
+ } else {
+ csv_exec_sorted(&schema, sort_exprs)
+ };
let repartition_rr = repartition_exec_round_robin(source);
let filter = filter_exec(repartition_rr);
let repartition_hash = repartition_exec_hash(filter);
@@ -456,7 +599,8 @@ mod tests {
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
- let expected_input = [
+ // Expected inputs unbounded and bounded
+ let expected_input_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
@@ -464,18 +608,48 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- let expected_optimized = [
+ let expected_input_bounded = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " SortExec: expr=[a@0 ASC NULLS LAST]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+
+ // Expected unbounded result (same for with and without flag)
+ let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(
- expected_input,
- expected_optimized,
+
+ // Expected bounded results with and without flag
+ let expected_optimized_bounded = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " SortExec: expr=[a@0 ASC NULLS LAST]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ let expected_optimized_bounded_sort_preserve = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ assert_optimized_in_all_boundedness_situations!(
+ expected_input_unbounded,
+ expected_input_bounded,
+ expected_optimized_unbounded,
+ expected_optimized_bounded,
+ expected_optimized_bounded_sort_preserve,
physical_plan,
- prefer_existing_sort
+ source_unbounded
);
Ok(())
}
@@ -483,11 +657,15 @@ mod tests {
#[rstest]
#[tokio::test]
async fn test_replace_multiple_input_repartition_with_extra_steps(
- #[values(false, true)] prefer_existing_sort: bool,
+ #[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = stream_exec_ordered(&schema, sort_exprs);
+ let source = if source_unbounded {
+ stream_exec_ordered(&schema, sort_exprs)
+ } else {
+ csv_exec_sorted(&schema, sort_exprs)
+ };
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash);
@@ -497,7 +675,8 @@ mod tests {
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
- let expected_input = [
+ // Expected inputs unbounded and bounded
+ let expected_input_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
@@ -506,7 +685,18 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- let expected_optimized = [
+ let expected_input_bounded = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " SortExec: expr=[a@0 ASC NULLS LAST]",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+
+ // Expected unbounded result (same for with and without flag)
+ let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
@@ -514,11 +704,33 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(
- expected_input,
- expected_optimized,
+
+ // Expected bounded results with and without flag
+ let expected_optimized_bounded = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " SortExec: expr=[a@0 ASC NULLS LAST]",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ let expected_optimized_bounded_sort_preserve = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ assert_optimized_in_all_boundedness_situations!(
+ expected_input_unbounded,
+ expected_input_bounded,
+ expected_optimized_unbounded,
+ expected_optimized_bounded,
+ expected_optimized_bounded_sort_preserve,
physical_plan,
- prefer_existing_sort
+ source_unbounded
);
Ok(())
}
@@ -526,11 +738,15 @@ mod tests {
#[rstest]
#[tokio::test]
async fn test_replace_multiple_input_repartition_with_extra_steps_2(
- #[values(false, true)] prefer_existing_sort: bool,
+ #[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = stream_exec_ordered(&schema, sort_exprs);
+ let source = if source_unbounded {
+ stream_exec_ordered(&schema, sort_exprs)
+ } else {
+ csv_exec_sorted(&schema, sort_exprs)
+ };
let repartition_rr = repartition_exec_round_robin(source);
let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr);
let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1);
@@ -542,7 +758,8 @@ mod tests {
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
- let expected_input = [
+ // Expected inputs unbounded and bounded
+ let expected_input_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
@@ -552,7 +769,19 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1,
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST]",
];
- let expected_optimized = [
+ let expected_input_bounded = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " SortExec: expr=[a@0 ASC NULLS LAST]",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+
+ // Expected unbounded result (same for with and without flag)
+ let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
@@ -561,11 +790,35 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(
- expected_input,
- expected_optimized,
+
+ // Expected bounded results with and without flag
+ let expected_optimized_bounded = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " SortExec: expr=[a@0 ASC NULLS LAST]",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ let expected_optimized_bounded_sort_preserve = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ assert_optimized_in_all_boundedness_situations!(
+ expected_input_unbounded,
+ expected_input_bounded,
+ expected_optimized_unbounded,
+ expected_optimized_bounded,
+ expected_optimized_bounded_sort_preserve,
physical_plan,
- prefer_existing_sort
+ source_unbounded
);
Ok(())
}
@@ -573,11 +826,15 @@ mod tests {
#[rstest]
#[tokio::test]
async fn test_not_replacing_when_no_need_to_preserve_sorting(
- #[values(false, true)] prefer_existing_sort: bool,
+ #[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = stream_exec_ordered(&schema, sort_exprs);
+ let source = if source_unbounded {
+ stream_exec_ordered(&schema, sort_exprs)
+ } else {
+ csv_exec_sorted(&schema, sort_exprs)
+ };
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash);
@@ -586,7 +843,8 @@ mod tests {
let physical_plan: Arc<dyn ExecutionPlan> =
coalesce_partitions_exec(coalesce_batches_exec);
- let expected_input = [
+ // Expected inputs unbounded and bounded
+ let expected_input_unbounded = [
"CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
@@ -594,7 +852,17 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- let expected_optimized = [
+ let expected_input_bounded = [
+ "CoalescePartitionsExec",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+
+ // Expected unbounded result (same for with and without flag)
+ let expected_optimized_unbounded = [
"CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
@@ -602,11 +870,26 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(
- expected_input,
- expected_optimized,
+
+ // Expected bounded results same with and without flag, because there
is no executor with ordering requirement
+ let expected_optimized_bounded = [
+ "CoalescePartitionsExec",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ let expected_optimized_bounded_sort_preserve =
expected_optimized_bounded;
+
+ assert_optimized_in_all_boundedness_situations!(
+ expected_input_unbounded,
+ expected_input_bounded,
+ expected_optimized_unbounded,
+ expected_optimized_bounded,
+ expected_optimized_bounded_sort_preserve,
physical_plan,
- prefer_existing_sort
+ source_unbounded
);
Ok(())
}
@@ -614,11 +897,15 @@ mod tests {
#[rstest]
#[tokio::test]
async fn test_with_multiple_replacable_repartitions(
- #[values(false, true)] prefer_existing_sort: bool,
+ #[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = stream_exec_ordered(&schema, sort_exprs);
+ let source = if source_unbounded {
+ stream_exec_ordered(&schema, sort_exprs)
+ } else {
+ csv_exec_sorted(&schema, sort_exprs)
+ };
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash);
@@ -629,7 +916,8 @@ mod tests {
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
- let expected_input = [
+ // Expected inputs unbounded and bounded
+ let expected_input_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
@@ -639,7 +927,19 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1,
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST]",
];
- let expected_optimized = [
+ let expected_input_bounded = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " SortExec: expr=[a@0 ASC NULLS LAST]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+
+ // Expected unbounded result (same for with and without flag)
+ let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
@@ -648,11 +948,35 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(
- expected_input,
- expected_optimized,
+
+ // Expected bounded results with and without flag
+ let expected_optimized_bounded = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " SortExec: expr=[a@0 ASC NULLS LAST]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ let expected_optimized_bounded_sort_preserve = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ assert_optimized_in_all_boundedness_situations!(
+ expected_input_unbounded,
+ expected_input_bounded,
+ expected_optimized_unbounded,
+ expected_optimized_bounded,
+ expected_optimized_bounded_sort_preserve,
physical_plan,
- prefer_existing_sort
+ source_unbounded
);
Ok(())
}
@@ -660,11 +984,15 @@ mod tests {
#[rstest]
#[tokio::test]
async fn test_not_replace_with_different_orderings(
- #[values(false, true)] prefer_existing_sort: bool,
+ #[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = stream_exec_ordered(&schema, sort_exprs);
+ let source = if source_unbounded {
+ stream_exec_ordered(&schema, sort_exprs)
+ } else {
+ csv_exec_sorted(&schema, sort_exprs)
+ };
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let sort = sort_exec(
@@ -678,25 +1006,49 @@ mod tests {
sort,
);
- let expected_input = [
+ // Expected inputs unbounded and bounded
+ let expected_input_unbounded = [
"SortPreservingMergeExec: [c@1 ASC]",
" SortExec: expr=[c@1 ASC]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- let expected_optimized = [
+ let expected_input_bounded = [
+ "SortPreservingMergeExec: [c@1 ASC]",
+ " SortExec: expr=[c@1 ASC]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+
+ // Expected unbounded result (same for with and without flag)
+ let expected_optimized_unbounded = [
"SortPreservingMergeExec: [c@1 ASC]",
" SortExec: expr=[c@1 ASC]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(
- expected_input,
- expected_optimized,
+
+ // Expected bounded results same with and without flag, because
ordering requirement of the executor is different than the existing ordering.
+ let expected_optimized_bounded = [
+ "SortPreservingMergeExec: [c@1 ASC]",
+ " SortExec: expr=[c@1 ASC]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ let expected_optimized_bounded_sort_preserve =
expected_optimized_bounded;
+
+ assert_optimized_in_all_boundedness_situations!(
+ expected_input_unbounded,
+ expected_input_bounded,
+ expected_optimized_unbounded,
+ expected_optimized_bounded,
+ expected_optimized_bounded_sort_preserve,
physical_plan,
- prefer_existing_sort
+ source_unbounded
);
Ok(())
}
@@ -704,35 +1056,67 @@ mod tests {
#[rstest]
#[tokio::test]
async fn test_with_lost_ordering(
- #[values(false, true)] prefer_existing_sort: bool,
+ #[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = stream_exec_ordered(&schema, sort_exprs);
+ let source = if source_unbounded {
+ stream_exec_ordered(&schema, sort_exprs)
+ } else {
+ csv_exec_sorted(&schema, sort_exprs)
+ };
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let physical_plan =
sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions,
false);
- let expected_input = [
+ // Expected inputs unbounded and bounded
+ let expected_input_unbounded = [
"SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- let expected_optimized = [
+ let expected_input_bounded = [
+ "SortExec: expr=[a@0 ASC NULLS LAST]",
+ " CoalescePartitionsExec",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+
+ // Expected unbounded result (same for with and without flag)
+ let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(
- expected_input,
- expected_optimized,
+
+ // Expected bounded results with and without flag
+ let expected_optimized_bounded = [
+ "SortExec: expr=[a@0 ASC NULLS LAST]",
+ " CoalescePartitionsExec",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ let expected_optimized_bounded_sort_preserve = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ assert_optimized_in_all_boundedness_situations!(
+ expected_input_unbounded,
+ expected_input_bounded,
+ expected_optimized_unbounded,
+ expected_optimized_bounded,
+ expected_optimized_bounded_sort_preserve,
physical_plan,
- prefer_existing_sort
+ source_unbounded
);
Ok(())
}
@@ -740,11 +1124,15 @@ mod tests {
#[rstest]
#[tokio::test]
async fn test_with_lost_and_kept_ordering(
- #[values(false, true)] prefer_existing_sort: bool,
+ #[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = stream_exec_ordered(&schema, sort_exprs);
+ let source = if source_unbounded {
+ stream_exec_ordered(&schema, sort_exprs)
+ } else {
+ csv_exec_sorted(&schema, sort_exprs)
+ };
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -764,7 +1152,8 @@ mod tests {
sort2,
);
- let expected_input = [
+ // Expected inputs unbounded and bounded
+ let expected_input_unbounded = [
"SortPreservingMergeExec: [c@1 ASC]",
" SortExec: expr=[c@1 ASC]",
" FilterExec: c@1 > 3",
@@ -776,8 +1165,21 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1,
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST]",
];
+ let expected_input_bounded = [
+ "SortPreservingMergeExec: [c@1 ASC]",
+ " SortExec: expr=[c@1 ASC]",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " SortExec: expr=[c@1 ASC]",
+ " CoalescePartitionsExec",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
- let expected_optimized = [
+ // Expected unbounded result (same for with and without flag)
+ let expected_optimized_unbounded = [
"SortPreservingMergeExec: [c@1 ASC]",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC",
@@ -788,11 +1190,39 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1,
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST]",
];
- assert_optimized!(
- expected_input,
- expected_optimized,
+
+ // Expected bounded results with and without flag
+ let expected_optimized_bounded = [
+ "SortPreservingMergeExec: [c@1 ASC]",
+ " SortExec: expr=[c@1 ASC]",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " SortExec: expr=[c@1 ASC]",
+ " CoalescePartitionsExec",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ let expected_optimized_bounded_sort_preserve = [
+ "SortPreservingMergeExec: [c@1 ASC]",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " SortExec: expr=[c@1 ASC]",
+ " CoalescePartitionsExec",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ assert_optimized_in_all_boundedness_situations!(
+ expected_input_unbounded,
+ expected_input_bounded,
+ expected_optimized_unbounded,
+ expected_optimized_bounded,
+ expected_optimized_bounded_sort_preserve,
physical_plan,
- prefer_existing_sort
+ source_unbounded
);
Ok(())
}
@@ -800,19 +1230,27 @@ mod tests {
#[rstest]
#[tokio::test]
async fn test_with_multiple_child_trees(
- #[values(false, true)] prefer_existing_sort: bool,
+ #[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let left_sort_exprs = vec![sort_expr("a", &schema)];
- let left_source = stream_exec_ordered(&schema, left_sort_exprs);
+ let left_source = if source_unbounded {
+ stream_exec_ordered(&schema, left_sort_exprs)
+ } else {
+ csv_exec_sorted(&schema, left_sort_exprs)
+ };
let left_repartition_rr = repartition_exec_round_robin(left_source);
let left_repartition_hash = repartition_exec_hash(left_repartition_rr);
let left_coalesce_partitions =
Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096));
let right_sort_exprs = vec![sort_expr("a", &schema)];
- let right_source = stream_exec_ordered(&schema, right_sort_exprs);
+ let right_source = if source_unbounded {
+ stream_exec_ordered(&schema, right_sort_exprs)
+ } else {
+ csv_exec_sorted(&schema, right_sort_exprs)
+ };
let right_repartition_rr = repartition_exec_round_robin(right_source);
let right_repartition_hash =
repartition_exec_hash(right_repartition_rr);
let right_coalesce_partitions =
@@ -831,7 +1269,8 @@ mod tests {
sort,
);
- let expected_input = [
+ // Expected inputs unbounded and bounded
+ let expected_input_unbounded = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC]",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1,
c@1)]",
@@ -844,8 +1283,22 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
+ let expected_input_bounded = [
+ "SortPreservingMergeExec: [a@0 ASC]",
+ " SortExec: expr=[a@0 ASC]",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1,
c@1)]",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
- let expected_optimized = [
+ // Expected unbounded result (same for with and without flag)
+ let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC]",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1,
c@1)]",
@@ -858,11 +1311,32 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(
- expected_input,
- expected_optimized,
+
+ // Expected bounded results same with and without flag, because
ordering get lost during intermediate executor anyway. Hence no need to preserve
+ // existing ordering.
+ let expected_optimized_bounded = [
+ "SortPreservingMergeExec: [a@0 ASC]",
+ " SortExec: expr=[a@0 ASC]",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1,
c@1)]",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ ];
+ let expected_optimized_bounded_sort_preserve =
expected_optimized_bounded;
+
+ assert_optimized_in_all_boundedness_situations!(
+ expected_input_unbounded,
+ expected_input_bounded,
+ expected_optimized_unbounded,
+ expected_optimized_bounded,
+ expected_optimized_bounded_sort_preserve,
physical_plan,
- prefer_existing_sort
+ source_unbounded
);
Ok(())
}
@@ -985,8 +1459,7 @@ mod tests {
Ok(schema)
}
- // creates a csv exec source for the test purposes
- // projection and has_header parameters are given static due to testing
needs
+ // creates a stream exec source for the test purposes
fn stream_exec_ordered(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
@@ -1007,4 +1480,35 @@ mod tests {
.unwrap(),
)
}
+
+ // creates a csv exec source for the test purposes
+ // projection and has_header parameters are given static due to testing
needs
+ fn csv_exec_sorted(
+ schema: &SchemaRef,
+ sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+ ) -> Arc<dyn ExecutionPlan> {
+ let sort_exprs = sort_exprs.into_iter().collect();
+ let projection: Vec<usize> = vec![0, 2, 3];
+
+ Arc::new(CsvExec::new(
+ FileScanConfig {
+ object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+ file_schema: schema.clone(),
+ file_groups: vec![vec![PartitionedFile::new(
+ "file_path".to_string(),
+ 100,
+ )]],
+ statistics: Statistics::new_unknown(schema),
+ projection: Some(projection),
+ limit: None,
+ table_partition_cols: vec![],
+ output_ordering: vec![sort_exprs],
+ },
+ true,
+ 0,
+ b'"',
+ None,
+ FileCompressionType::UNCOMPRESSED,
+ ))
+ }
}
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 7a63466a39..ed5aa15e29 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -43,13 +43,13 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, FileType, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{Partitioning, PhysicalSortExpr};
+use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
#[cfg(feature = "compression")]
use bzip2::write::BzEncoder;
#[cfg(feature = "compression")]
use bzip2::Compression as BzCompression;
-use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
#[cfg(feature = "compression")]
use flate2::write::GzEncoder;
#[cfg(feature = "compression")]
@@ -334,6 +334,32 @@ pub fn stream_exec_ordered(
)
}
+/// Create a csv exec for tests
+pub fn csv_exec_ordered(
+ schema: &SchemaRef,
+ sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+) -> Arc<dyn ExecutionPlan> {
+ let sort_exprs = sort_exprs.into_iter().collect();
+
+ Arc::new(CsvExec::new(
+ FileScanConfig {
+ object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+ file_schema: schema.clone(),
+ file_groups:
vec![vec![PartitionedFile::new("file_path".to_string(), 100)]],
+ statistics: Statistics::new_unknown(schema),
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ output_ordering: vec![sort_exprs],
+ },
+ true,
+ 0,
+ b'"',
+ None,
+ FileCompressionType::UNCOMPRESSED,
+ ))
+}
+
/// A mock execution plan that simply returns the provided statistics
#[derive(Debug, Clone)]
pub struct StatisticsExec {