This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 65b997bc46 [MINOR]: Parametrize sort-preservation tests to exercise 
all situations (unbounded/bounded sources and flag behavior) (#8575)
65b997bc46 is described below

commit 65b997bc465fe6b9dc6692deebbd2d72da189702
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Dec 18 22:11:56 2023 +0300

    [MINOR]: Parametrize sort-preservation tests to exercise all situations 
(unbounded/bounded sources and flag behavior) (#8575)
    
    * Re-introduce unbounded tests with new executor
    
    * Remove unnecessary test
---
 .../core/src/physical_optimizer/enforce_sorting.rs |  19 +-
 .../replace_with_order_preserving_variants.rs      | 275 ++++++++++++---------
 datafusion/core/src/test/mod.rs                    |  36 +++
 3 files changed, 208 insertions(+), 122 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index c0e9b834e6..2b650a4269 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -769,7 +769,7 @@ mod tests {
     use crate::physical_plan::repartition::RepartitionExec;
     use crate::physical_plan::{displayable, get_plan_string, Partitioning};
     use crate::prelude::{SessionConfig, SessionContext};
-    use crate::test::csv_exec_sorted;
+    use crate::test::{csv_exec_sorted, stream_exec_ordered};
 
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -2141,11 +2141,11 @@ mod tests {
     }
 
     #[tokio::test]
-    #[ignore]
     async fn test_with_lost_ordering_unbounded() -> Result<()> {
         let schema = create_test_schema3()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs);
+        // create an unbounded source
+        let source = stream_exec_ordered(&schema, sort_exprs);
         let repartition_rr = repartition_exec(source);
         let repartition_hash = Arc::new(RepartitionExec::try_new(
             repartition_rr,
@@ -2159,25 +2159,24 @@ mod tests {
             "  CoalescePartitionsExec",
             "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"
+            "        StreamingTableExec: partition_sizes=1, projection=[a, b, 
c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC]",
             "  RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
             "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "      CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
+            "      StreamingTableExec: partition_sizes=1, projection=[a, b, c, 
d, e], infinite_source=true, output_ordering=[a@0 ASC]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
         Ok(())
     }
 
     #[tokio::test]
-    #[ignore]
     async fn test_with_lost_ordering_unbounded_parallelize_off() -> Result<()> 
{
         let schema = create_test_schema3()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        // Make source unbounded
-        let source = csv_exec_sorted(&schema, sort_exprs);
+        // create an unbounded source
+        let source = stream_exec_ordered(&schema, sort_exprs);
         let repartition_rr = repartition_exec(source);
         let repartition_hash = Arc::new(RepartitionExec::try_new(
             repartition_rr,
@@ -2190,13 +2189,13 @@ mod tests {
             "  CoalescePartitionsExec",
             "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"
+            "        StreamingTableExec: partition_sizes=1, projection=[a, b, 
c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC]",
             "  RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
             "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "      CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
+            "      StreamingTableExec: partition_sizes=1, projection=[a, b, c, 
d, e], infinite_source=true, output_ordering=[a@0 ASC]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
false);
         Ok(())
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 41f2b39978..671891be43 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -276,9 +276,6 @@ pub(crate) fn replace_with_order_preserving_variants(
 mod tests {
     use super::*;
 
-    use 
crate::datasource::file_format::file_compression_type::FileCompressionType;
-    use crate::datasource::listing::PartitionedFile;
-    use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
     use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::filter::FilterExec;
@@ -289,14 +286,16 @@ mod tests {
     use crate::physical_plan::{displayable, get_plan_string, Partitioning};
     use crate::prelude::SessionConfig;
 
+    use crate::test::TestStreamPartition;
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
     use datafusion_common::tree_node::TreeNode;
-    use datafusion_common::{Result, Statistics};
-    use datafusion_execution::object_store::ObjectStoreUrl;
+    use datafusion_common::Result;
     use datafusion_expr::{JoinType, Operator};
     use datafusion_physical_expr::expressions::{self, col, Column};
     use datafusion_physical_expr::PhysicalSortExpr;
+    use datafusion_physical_plan::streaming::StreamingTableExec;
+    use rstest::rstest;
 
     /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts 
the plan
     /// against the original and expected plans.
@@ -345,12 +344,15 @@ mod tests {
         };
     }
 
+    #[rstest]
     #[tokio::test]
     // Searches for a simple sort and a repartition just after it, the second 
repartition with 1 input partition should not be affected
-    async fn test_replace_multiple_input_repartition_1() -> Result<()> {
+    async fn test_replace_multiple_input_repartition_1(
+        #[values(false, true)] prefer_existing_sort: bool,
+    ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs);
+        let source = stream_exec_ordered(&schema, sort_exprs);
         let repartition = 
repartition_exec_hash(repartition_exec_round_robin(source));
         let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
 
@@ -362,23 +364,31 @@ mod tests {
             "  SortExec: expr=[a@0 ASC NULLS LAST]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        StreamingTableExec: partition_sizes=1, projection=[a, c, 
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "      CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "      StreamingTableExec: partition_sizes=1, projection=[a, c, 
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
+        assert_optimized!(
+            expected_input,
+            expected_optimized,
+            physical_plan,
+            prefer_existing_sort
+        );
         Ok(())
     }
 
+    #[rstest]
     #[tokio::test]
-    async fn test_with_inter_children_change_only() -> Result<()> {
+    async fn test_with_inter_children_change_only(
+        #[values(false, true)] prefer_existing_sort: bool,
+    ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr_default("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs);
+        let source = stream_exec_ordered(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -408,7 +418,7 @@ mod tests {
             "            CoalescePartitionsExec",
             "              RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "                RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "                  CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+            "                  StreamingTableExec: partition_sizes=1, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]",
         ];
 
         let expected_optimized = [
@@ -419,17 +429,25 @@ mod tests {
             "        SortPreservingMergeExec: [a@0 ASC]",
             "          RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+            "              StreamingTableExec: partition_sizes=1, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
+        assert_optimized!(
+            expected_input,
+            expected_optimized,
+            physical_plan,
+            prefer_existing_sort
+        );
         Ok(())
     }
 
+    #[rstest]
     #[tokio::test]
-    async fn test_replace_multiple_input_repartition_2() -> Result<()> {
+    async fn test_replace_multiple_input_repartition_2(
+        #[values(false, true)] prefer_existing_sort: bool,
+    ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs);
+        let source = stream_exec_ordered(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let filter = filter_exec(repartition_rr);
         let repartition_hash = repartition_exec_hash(filter);
@@ -444,24 +462,32 @@ mod tests {
             "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      FilterExec: c@1 > 3",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "          StreamingTableExec: partition_sizes=1, projection=[a, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "    FilterExec: c@1 > 3",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        StreamingTableExec: partition_sizes=1, projection=[a, c, 
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
+        assert_optimized!(
+            expected_input,
+            expected_optimized,
+            physical_plan,
+            prefer_existing_sort
+        );
         Ok(())
     }
 
+    #[rstest]
     #[tokio::test]
-    async fn test_replace_multiple_input_repartition_with_extra_steps() -> 
Result<()> {
+    async fn test_replace_multiple_input_repartition_with_extra_steps(
+        #[values(false, true)] prefer_existing_sort: bool,
+    ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs);
+        let source = stream_exec_ordered(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let filter = filter_exec(repartition_hash);
@@ -478,7 +504,7 @@ mod tests {
             "      FilterExec: c@1 > 3",
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            StreamingTableExec: partition_sizes=1, projection=[a, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -486,17 +512,25 @@ mod tests {
             "    FilterExec: c@1 > 3",
             "      RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "          StreamingTableExec: partition_sizes=1, projection=[a, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
+        assert_optimized!(
+            expected_input,
+            expected_optimized,
+            physical_plan,
+            prefer_existing_sort
+        );
         Ok(())
     }
 
+    #[rstest]
     #[tokio::test]
-    async fn test_replace_multiple_input_repartition_with_extra_steps_2() -> 
Result<()> {
+    async fn test_replace_multiple_input_repartition_with_extra_steps_2(
+        #[values(false, true)] prefer_existing_sort: bool,
+    ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs);
+        let source = stream_exec_ordered(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr);
         let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1);
@@ -516,7 +550,7 @@ mod tests {
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          CoalesceBatchesExec: target_batch_size=8192",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "              StreamingTableExec: partition_sizes=1, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST]",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -525,17 +559,25 @@ mod tests {
             "      RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "        CoalesceBatchesExec: target_batch_size=8192",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            StreamingTableExec: partition_sizes=1, projection=[a, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
+        assert_optimized!(
+            expected_input,
+            expected_optimized,
+            physical_plan,
+            prefer_existing_sort
+        );
         Ok(())
     }
 
+    #[rstest]
     #[tokio::test]
-    async fn test_not_replacing_when_no_need_to_preserve_sorting() -> 
Result<()> {
+    async fn test_not_replacing_when_no_need_to_preserve_sorting(
+        #[values(false, true)] prefer_existing_sort: bool,
+    ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs);
+        let source = stream_exec_ordered(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let filter = filter_exec(repartition_hash);
@@ -550,7 +592,7 @@ mod tests {
             "    FilterExec: c@1 > 3",
             "      RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "          StreamingTableExec: partition_sizes=1, projection=[a, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
         let expected_optimized = [
             "CoalescePartitionsExec",
@@ -558,17 +600,25 @@ mod tests {
             "    FilterExec: c@1 > 3",
             "      RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "          StreamingTableExec: partition_sizes=1, projection=[a, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        assert_optimized!(
+            expected_input,
+            expected_optimized,
+            physical_plan,
+            prefer_existing_sort
+        );
         Ok(())
     }
 
+    #[rstest]
     #[tokio::test]
-    async fn test_with_multiple_replacable_repartitions() -> Result<()> {
+    async fn test_with_multiple_replacable_repartitions(
+        #[values(false, true)] prefer_existing_sort: bool,
+    ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs);
+        let source = stream_exec_ordered(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let filter = filter_exec(repartition_hash);
@@ -587,7 +637,7 @@ mod tests {
             "        FilterExec: c@1 > 3",
             "          RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "              StreamingTableExec: partition_sizes=1, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST]",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -596,17 +646,25 @@ mod tests {
             "      FilterExec: c@1 > 3",
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            StreamingTableExec: partition_sizes=1, projection=[a, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
+        assert_optimized!(
+            expected_input,
+            expected_optimized,
+            physical_plan,
+            prefer_existing_sort
+        );
         Ok(())
     }
 
+    #[rstest]
     #[tokio::test]
-    async fn test_not_replace_with_different_orderings() -> Result<()> {
+    async fn test_not_replace_with_different_orderings(
+        #[values(false, true)] prefer_existing_sort: bool,
+    ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs);
+        let source = stream_exec_ordered(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let sort = sort_exec(
@@ -625,24 +683,32 @@ mod tests {
             "  SortExec: expr=[c@1 ASC]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        StreamingTableExec: partition_sizes=1, projection=[a, c, 
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [c@1 ASC]",
             "  SortExec: expr=[c@1 ASC]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        StreamingTableExec: partition_sizes=1, projection=[a, c, 
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        assert_optimized!(
+            expected_input,
+            expected_optimized,
+            physical_plan,
+            prefer_existing_sort
+        );
         Ok(())
     }
 
+    #[rstest]
     #[tokio::test]
-    async fn test_with_lost_ordering() -> Result<()> {
+    async fn test_with_lost_ordering(
+        #[values(false, true)] prefer_existing_sort: bool,
+    ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs);
+        let source = stream_exec_ordered(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -654,23 +720,31 @@ mod tests {
             "  CoalescePartitionsExec",
             "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        StreamingTableExec: partition_sizes=1, projection=[a, c, 
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "      CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "      StreamingTableExec: partition_sizes=1, projection=[a, c, 
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
+        assert_optimized!(
+            expected_input,
+            expected_optimized,
+            physical_plan,
+            prefer_existing_sort
+        );
         Ok(())
     }
 
+    #[rstest]
     #[tokio::test]
-    async fn test_with_lost_and_kept_ordering() -> Result<()> {
+    async fn test_with_lost_and_kept_ordering(
+        #[values(false, true)] prefer_existing_sort: bool,
+    ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs);
+        let source = stream_exec_ordered(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -700,7 +774,7 @@ mod tests {
             "            CoalescePartitionsExec",
             "              RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "                RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "                  CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "                  StreamingTableExec: partition_sizes=1, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST]",
         ];
 
         let expected_optimized = [
@@ -712,25 +786,33 @@ mod tests {
             "          CoalescePartitionsExec",
             "            RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "              RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "                CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "                StreamingTableExec: partition_sizes=1, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST]",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
+        assert_optimized!(
+            expected_input,
+            expected_optimized,
+            physical_plan,
+            prefer_existing_sort
+        );
         Ok(())
     }
 
+    #[rstest]
     #[tokio::test]
-    async fn test_with_multiple_child_trees() -> Result<()> {
+    async fn test_with_multiple_child_trees(
+        #[values(false, true)] prefer_existing_sort: bool,
+    ) -> Result<()> {
         let schema = create_test_schema()?;
 
         let left_sort_exprs = vec![sort_expr("a", &schema)];
-        let left_source = csv_exec_sorted(&schema, left_sort_exprs);
+        let left_source = stream_exec_ordered(&schema, left_sort_exprs);
         let left_repartition_rr = repartition_exec_round_robin(left_source);
         let left_repartition_hash = repartition_exec_hash(left_repartition_rr);
         let left_coalesce_partitions =
             Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096));
 
         let right_sort_exprs = vec![sort_expr("a", &schema)];
-        let right_source = csv_exec_sorted(&schema, right_sort_exprs);
+        let right_source = stream_exec_ordered(&schema, right_sort_exprs);
         let right_repartition_rr = repartition_exec_round_robin(right_source);
         let right_repartition_hash = 
repartition_exec_hash(right_repartition_rr);
         let right_coalesce_partitions =
@@ -756,11 +838,11 @@ mod tests {
             "      CoalesceBatchesExec: target_batch_size=4096",
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            StreamingTableExec: partition_sizes=1, projection=[a, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
             "      CoalesceBatchesExec: target_batch_size=4096",
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            StreamingTableExec: partition_sizes=1, projection=[a, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
 
         let expected_optimized = [
@@ -770,41 +852,18 @@ mod tests {
             "      CoalesceBatchesExec: target_batch_size=4096",
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            StreamingTableExec: partition_sizes=1, projection=[a, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
             "      CoalesceBatchesExec: target_batch_size=4096",
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            StreamingTableExec: partition_sizes=1, projection=[a, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_with_bounded_input() -> Result<()> {
-        let schema = create_test_schema()?;
-        let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs);
-        let repartition = 
repartition_exec_hash(repartition_exec_round_robin(source));
-        let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
-
-        let physical_plan =
-            sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
-
-        let expected_input = [
-            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
-            "  SortExec: expr=[a@0 ASC NULLS LAST]",
-            "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
-            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
-        ];
-        let expected_optimized = [
-            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
-            "  RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
-            "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "      CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
-        ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
+        assert_optimized!(
+            expected_input,
+            expected_optimized,
+            physical_plan,
+            prefer_existing_sort
+        );
         Ok(())
     }
 
@@ -928,32 +987,24 @@ mod tests {
 
     // creates a csv exec source for the test purposes
     // projection and has_header parameters are given static due to testing 
needs
-    fn csv_exec_sorted(
+    fn stream_exec_ordered(
         schema: &SchemaRef,
         sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
     ) -> Arc<dyn ExecutionPlan> {
         let sort_exprs = sort_exprs.into_iter().collect();
         let projection: Vec<usize> = vec![0, 2, 3];
 
-        Arc::new(CsvExec::new(
-            FileScanConfig {
-                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-                file_schema: schema.clone(),
-                file_groups: vec![vec![PartitionedFile::new(
-                    "file_path".to_string(),
-                    100,
-                )]],
-                statistics: Statistics::new_unknown(schema),
-                projection: Some(projection),
-                limit: None,
-                table_partition_cols: vec![],
-                output_ordering: vec![sort_exprs],
-            },
-            true,
-            0,
-            b'"',
-            None,
-            FileCompressionType::UNCOMPRESSED,
-        ))
+        Arc::new(
+            StreamingTableExec::try_new(
+                schema.clone(),
+                vec![Arc::new(TestStreamPartition {
+                    schema: schema.clone(),
+                }) as _],
+                Some(&projection),
+                vec![sort_exprs],
+                true,
+            )
+            .unwrap(),
+        )
     }
 }
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 8770c0c423..7a63466a39 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -49,6 +49,7 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
 use bzip2::write::BzEncoder;
 #[cfg(feature = "compression")]
 use bzip2::Compression as BzCompression;
+use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
 #[cfg(feature = "compression")]
 use flate2::write::GzEncoder;
 #[cfg(feature = "compression")]
@@ -298,6 +299,41 @@ pub fn csv_exec_sorted(
     ))
 }
 
+// construct a stream partition for test purposes
+pub(crate) struct TestStreamPartition {
+    pub schema: SchemaRef,
+}
+
+impl PartitionStream for TestStreamPartition {
+    fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
+        unreachable!()
+    }
+}
+
+/// Create an unbounded stream exec
+pub fn stream_exec_ordered(
+    schema: &SchemaRef,
+    sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+) -> Arc<dyn ExecutionPlan> {
+    let sort_exprs = sort_exprs.into_iter().collect();
+
+    Arc::new(
+        StreamingTableExec::try_new(
+            schema.clone(),
+            vec![Arc::new(TestStreamPartition {
+                schema: schema.clone(),
+            }) as _],
+            None,
+            vec![sort_exprs],
+            true,
+        )
+        .unwrap(),
+    )
+}
+
 /// A mock execution plan that simply returns the provided statistics
 #[derive(Debug, Clone)]
 pub struct StatisticsExec {


Reply via email to