This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 65b997bc46 [MINOR]: Parametrize sort-preservation tests to exercise
all situations (unbounded/bounded sources and flag behavior) (#8575)
65b997bc46 is described below
commit 65b997bc465fe6b9dc6692deebbd2d72da189702
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Dec 18 22:11:56 2023 +0300
[MINOR]: Parametrize sort-preservation tests to exercise all situations
(unbounded/bounded sources and flag behavior) (#8575)
* Re-introduce unbounded tests with new executor
* Remove unnecessary test
---
.../core/src/physical_optimizer/enforce_sorting.rs | 19 +-
.../replace_with_order_preserving_variants.rs | 275 ++++++++++++---------
datafusion/core/src/test/mod.rs | 36 +++
3 files changed, 208 insertions(+), 122 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index c0e9b834e6..2b650a4269 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -769,7 +769,7 @@ mod tests {
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::{displayable, get_plan_string, Partitioning};
use crate::prelude::{SessionConfig, SessionContext};
- use crate::test::csv_exec_sorted;
+ use crate::test::{csv_exec_sorted, stream_exec_ordered};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -2141,11 +2141,11 @@ mod tests {
}
#[tokio::test]
- #[ignore]
async fn test_with_lost_ordering_unbounded() -> Result<()> {
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs);
+ // create an unbounded source
+ let source = stream_exec_ordered(&schema, sort_exprs);
let repartition_rr = repartition_exec(source);
let repartition_hash = Arc::new(RepartitionExec::try_new(
repartition_rr,
@@ -2159,25 +2159,24 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"
+ " StreamingTableExec: partition_sizes=1, projection=[a, b,
c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
+ " StreamingTableExec: partition_sizes=1, projection=[a, b, c,
d, e], infinite_source=true, output_ordering=[a@0 ASC]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
}
#[tokio::test]
- #[ignore]
async fn test_with_lost_ordering_unbounded_parallelize_off() -> Result<()>
{
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- // Make source unbounded
- let source = csv_exec_sorted(&schema, sort_exprs);
+ // create an unbounded source
+ let source = stream_exec_ordered(&schema, sort_exprs);
let repartition_rr = repartition_exec(source);
let repartition_hash = Arc::new(RepartitionExec::try_new(
repartition_rr,
@@ -2190,13 +2189,13 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"
+ " StreamingTableExec: partition_sizes=1, projection=[a, b,
c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
+ " StreamingTableExec: partition_sizes=1, projection=[a, b, c,
d, e], infinite_source=true, output_ordering=[a@0 ASC]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
false);
Ok(())
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 41f2b39978..671891be43 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -276,9 +276,6 @@ pub(crate) fn replace_with_order_preserving_variants(
mod tests {
use super::*;
- use
crate::datasource::file_format::file_compression_type::FileCompressionType;
- use crate::datasource::listing::PartitionedFile;
- use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::filter::FilterExec;
@@ -289,14 +286,16 @@ mod tests {
use crate::physical_plan::{displayable, get_plan_string, Partitioning};
use crate::prelude::SessionConfig;
+ use crate::test::TestStreamPartition;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::tree_node::TreeNode;
- use datafusion_common::{Result, Statistics};
- use datafusion_execution::object_store::ObjectStoreUrl;
+ use datafusion_common::Result;
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::expressions::{self, col, Column};
use datafusion_physical_expr::PhysicalSortExpr;
+ use datafusion_physical_plan::streaming::StreamingTableExec;
+ use rstest::rstest;
/// Runs the `replace_with_order_preserving_variants` sub-rule and asserts
the plan
/// against the original and expected plans.
@@ -345,12 +344,15 @@ mod tests {
};
}
+ #[rstest]
#[tokio::test]
// Searches for a simple sort and a repartition just after it, the second
repartition with 1 input partition should not be affected
- async fn test_replace_multiple_input_repartition_1() -> Result<()> {
+ async fn test_replace_multiple_input_repartition_1(
+ #[values(false, true)] prefer_existing_sort: bool,
+ ) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs);
+ let source = stream_exec_ordered(&schema, sort_exprs);
let repartition =
repartition_exec_hash(repartition_exec_round_robin(source));
let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
@@ -362,23 +364,31 @@ mod tests {
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
+ assert_optimized!(
+ expected_input,
+ expected_optimized,
+ physical_plan,
+ prefer_existing_sort
+ );
Ok(())
}
+ #[rstest]
#[tokio::test]
- async fn test_with_inter_children_change_only() -> Result<()> {
+ async fn test_with_inter_children_change_only(
+ #[values(false, true)] prefer_existing_sort: bool,
+ ) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr_default("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs);
+ let source = stream_exec_ordered(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -408,7 +418,7 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+ " StreamingTableExec: partition_sizes=1,
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]",
];
let expected_optimized = [
@@ -419,17 +429,25 @@ mod tests {
" SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+ " StreamingTableExec: partition_sizes=1,
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
+ assert_optimized!(
+ expected_input,
+ expected_optimized,
+ physical_plan,
+ prefer_existing_sort
+ );
Ok(())
}
+ #[rstest]
#[tokio::test]
- async fn test_replace_multiple_input_repartition_2() -> Result<()> {
+ async fn test_replace_multiple_input_repartition_2(
+ #[values(false, true)] prefer_existing_sort: bool,
+ ) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs);
+ let source = stream_exec_ordered(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let filter = filter_exec(repartition_rr);
let repartition_hash = repartition_exec_hash(filter);
@@ -444,24 +462,32 @@ mod tests {
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
+ assert_optimized!(
+ expected_input,
+ expected_optimized,
+ physical_plan,
+ prefer_existing_sort
+ );
Ok(())
}
+ #[rstest]
#[tokio::test]
- async fn test_replace_multiple_input_repartition_with_extra_steps() ->
Result<()> {
+ async fn test_replace_multiple_input_repartition_with_extra_steps(
+ #[values(false, true)] prefer_existing_sort: bool,
+ ) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs);
+ let source = stream_exec_ordered(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash);
@@ -478,7 +504,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -486,17 +512,25 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
+ assert_optimized!(
+ expected_input,
+ expected_optimized,
+ physical_plan,
+ prefer_existing_sort
+ );
Ok(())
}
+ #[rstest]
#[tokio::test]
- async fn test_replace_multiple_input_repartition_with_extra_steps_2() ->
Result<()> {
+ async fn test_replace_multiple_input_repartition_with_extra_steps_2(
+ #[values(false, true)] prefer_existing_sort: bool,
+ ) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs);
+ let source = stream_exec_ordered(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr);
let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1);
@@ -516,7 +550,7 @@ mod tests {
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1,
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST]",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -525,17 +559,25 @@ mod tests {
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
+ assert_optimized!(
+ expected_input,
+ expected_optimized,
+ physical_plan,
+ prefer_existing_sort
+ );
Ok(())
}
+ #[rstest]
#[tokio::test]
- async fn test_not_replacing_when_no_need_to_preserve_sorting() ->
Result<()> {
+ async fn test_not_replacing_when_no_need_to_preserve_sorting(
+ #[values(false, true)] prefer_existing_sort: bool,
+ ) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs);
+ let source = stream_exec_ordered(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash);
@@ -550,7 +592,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized = [
"CoalescePartitionsExec",
@@ -558,17 +600,25 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan);
+ assert_optimized!(
+ expected_input,
+ expected_optimized,
+ physical_plan,
+ prefer_existing_sort
+ );
Ok(())
}
+ #[rstest]
#[tokio::test]
- async fn test_with_multiple_replacable_repartitions() -> Result<()> {
+ async fn test_with_multiple_replacable_repartitions(
+ #[values(false, true)] prefer_existing_sort: bool,
+ ) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs);
+ let source = stream_exec_ordered(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash);
@@ -587,7 +637,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1,
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST]",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -596,17 +646,25 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
+ assert_optimized!(
+ expected_input,
+ expected_optimized,
+ physical_plan,
+ prefer_existing_sort
+ );
Ok(())
}
+ #[rstest]
#[tokio::test]
- async fn test_not_replace_with_different_orderings() -> Result<()> {
+ async fn test_not_replace_with_different_orderings(
+ #[values(false, true)] prefer_existing_sort: bool,
+ ) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs);
+ let source = stream_exec_ordered(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let sort = sort_exec(
@@ -625,24 +683,32 @@ mod tests {
" SortExec: expr=[c@1 ASC]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized = [
"SortPreservingMergeExec: [c@1 ASC]",
" SortExec: expr=[c@1 ASC]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan);
+ assert_optimized!(
+ expected_input,
+ expected_optimized,
+ physical_plan,
+ prefer_existing_sort
+ );
Ok(())
}
+ #[rstest]
#[tokio::test]
- async fn test_with_lost_ordering() -> Result<()> {
+ async fn test_with_lost_ordering(
+ #[values(false, true)] prefer_existing_sort: bool,
+ ) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs);
+ let source = stream_exec_ordered(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -654,23 +720,31 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a, c,
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
+ assert_optimized!(
+ expected_input,
+ expected_optimized,
+ physical_plan,
+ prefer_existing_sort
+ );
Ok(())
}
+ #[rstest]
#[tokio::test]
- async fn test_with_lost_and_kept_ordering() -> Result<()> {
+ async fn test_with_lost_and_kept_ordering(
+ #[values(false, true)] prefer_existing_sort: bool,
+ ) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs);
+ let source = stream_exec_ordered(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -700,7 +774,7 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1,
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST]",
];
let expected_optimized = [
@@ -712,25 +786,33 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1,
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST]",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
+ assert_optimized!(
+ expected_input,
+ expected_optimized,
+ physical_plan,
+ prefer_existing_sort
+ );
Ok(())
}
+ #[rstest]
#[tokio::test]
- async fn test_with_multiple_child_trees() -> Result<()> {
+ async fn test_with_multiple_child_trees(
+ #[values(false, true)] prefer_existing_sort: bool,
+ ) -> Result<()> {
let schema = create_test_schema()?;
let left_sort_exprs = vec![sort_expr("a", &schema)];
- let left_source = csv_exec_sorted(&schema, left_sort_exprs);
+ let left_source = stream_exec_ordered(&schema, left_sort_exprs);
let left_repartition_rr = repartition_exec_round_robin(left_source);
let left_repartition_hash = repartition_exec_hash(left_repartition_rr);
let left_coalesce_partitions =
Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096));
let right_sort_exprs = vec![sort_expr("a", &schema)];
- let right_source = csv_exec_sorted(&schema, right_sort_exprs);
+ let right_source = stream_exec_ordered(&schema, right_sort_exprs);
let right_repartition_rr = repartition_exec_round_robin(right_source);
let right_repartition_hash =
repartition_exec_hash(right_repartition_rr);
let right_coalesce_partitions =
@@ -756,11 +838,11 @@ mod tests {
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized = [
@@ -770,41 +852,18 @@ mod tests {
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " StreamingTableExec: partition_sizes=1, projection=[a,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan);
- Ok(())
- }
-
- #[tokio::test]
- async fn test_with_bounded_input() -> Result<()> {
- let schema = create_test_schema()?;
- let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs);
- let repartition =
repartition_exec_hash(repartition_exec_round_robin(source));
- let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
-
- let physical_plan =
- sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
-
- let expected_input = [
- "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
- " SortExec: expr=[a@0 ASC NULLS LAST]",
- " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
- " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
- ];
- let expected_optimized = [
- "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
- " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
- " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
- ];
- assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
+ assert_optimized!(
+ expected_input,
+ expected_optimized,
+ physical_plan,
+ prefer_existing_sort
+ );
Ok(())
}
@@ -928,32 +987,24 @@ mod tests {
// creates a csv exec source for the test purposes
// projection and has_header parameters are given static due to testing
needs
- fn csv_exec_sorted(
+ fn stream_exec_ordered(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
let projection: Vec<usize> = vec![0, 2, 3];
- Arc::new(CsvExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema.clone(),
- file_groups: vec![vec![PartitionedFile::new(
- "file_path".to_string(),
- 100,
- )]],
- statistics: Statistics::new_unknown(schema),
- projection: Some(projection),
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![sort_exprs],
- },
- true,
- 0,
- b'"',
- None,
- FileCompressionType::UNCOMPRESSED,
- ))
+ Arc::new(
+ StreamingTableExec::try_new(
+ schema.clone(),
+ vec![Arc::new(TestStreamPartition {
+ schema: schema.clone(),
+ }) as _],
+ Some(&projection),
+ vec![sort_exprs],
+ true,
+ )
+ .unwrap(),
+ )
}
}
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 8770c0c423..7a63466a39 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -49,6 +49,7 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
use bzip2::write::BzEncoder;
#[cfg(feature = "compression")]
use bzip2::Compression as BzCompression;
+use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
#[cfg(feature = "compression")]
use flate2::write::GzEncoder;
#[cfg(feature = "compression")]
@@ -298,6 +299,41 @@ pub fn csv_exec_sorted(
))
}
+// construct a stream partition for test purposes
+pub(crate) struct TestStreamPartition {
+ pub schema: SchemaRef,
+}
+
+impl PartitionStream for TestStreamPartition {
+ fn schema(&self) -> &SchemaRef {
+ &self.schema
+ }
+ fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
+ unreachable!()
+ }
+}
+
+/// Create an unbounded stream exec
+pub fn stream_exec_ordered(
+ schema: &SchemaRef,
+ sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+) -> Arc<dyn ExecutionPlan> {
+ let sort_exprs = sort_exprs.into_iter().collect();
+
+ Arc::new(
+ StreamingTableExec::try_new(
+ schema.clone(),
+ vec![Arc::new(TestStreamPartition {
+ schema: schema.clone(),
+ }) as _],
+ None,
+ vec![sort_exprs],
+ true,
+ )
+ .unwrap(),
+ )
+}
+
/// A mock execution plan that simply returns the provided statistics
#[derive(Debug, Clone)]
pub struct StatisticsExec {