alamb commented on code in PR #17962:
URL: https://github.com/apache/datafusion/pull/17962#discussion_r2415009464


##########
datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs:
##########
@@ -46,223 +47,207 @@ use datafusion_physical_plan::joins::{HashJoinExec, 
PartitionMode};
 use datafusion::datasource::memory::MemorySourceConfig;
 use datafusion_physical_plan::repartition::RepartitionExec;
 use datafusion_physical_plan::{
-    collect, displayable, get_plan_string, ExecutionPlan, Partitioning,
+    collect, displayable, ExecutionPlan, Partitioning,
 };
 
 use object_store::memory::InMemory;
 use object_store::ObjectStore;
 use rstest::rstest;
 use url::Url;
 
-/// Runs the `replace_with_order_preserving_variants` sub-rule and asserts
-/// the plan against the original and expected plans.
-///
-/// # Parameters
-///
-/// * `$EXPECTED_PLAN_LINES`: Expected input plan.
-/// * `EXPECTED_OPTIMIZED_PLAN_LINES`: Optimized plan when the flag
-///   `prefer_existing_sort` is `false`.
-/// * `EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: Optimized plan when
-///   the flag `prefer_existing_sort` is `true`.
-/// * `$PLAN`: The plan to optimize.
-macro_rules! assert_optimized_prefer_sort_on_off {
-    ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, 
$EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, 
$PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => {
-        if $PREFER_EXISTING_SORT {
-            assert_optimized!(
-                $EXPECTED_PLAN_LINES,
-                $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES,
-                $PLAN,
-                $PREFER_EXISTING_SORT,
-                $SOURCE_UNBOUNDED
-            );
-        } else {
-            assert_optimized!(
-                $EXPECTED_PLAN_LINES,
-                $EXPECTED_OPTIMIZED_PLAN_LINES,
-                $PLAN,
-                $PREFER_EXISTING_SORT,
-                $SOURCE_UNBOUNDED
-            );
-        }
-    };
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum Boundedness {
+    Unbounded,
+    Bounded,
 }
 
-/// Runs the `replace_with_order_preserving_variants` sub-rule and asserts
-/// the plan against the original and expected plans for both bounded and
-/// unbounded cases.
-///
-/// # Parameters
-///
-/// * `EXPECTED_UNBOUNDED_PLAN_LINES`: Expected input unbounded plan.
-/// * `EXPECTED_BOUNDED_PLAN_LINES`: Expected input bounded plan.
-/// * `EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES`: Optimized plan, which is
-///   the same regardless of the value of the `prefer_existing_sort` flag.
-/// * `EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES`: Optimized plan when the flag
-///   `prefer_existing_sort` is `false` for bounded cases.
-/// * `EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: Optimized plan
-///   when the flag `prefer_existing_sort` is `true` for bounded cases.
-/// * `$PLAN`: The plan to optimize.
-/// * `$SOURCE_UNBOUNDED`: Whether the given plan contains an unbounded source.
-macro_rules! assert_optimized_in_all_boundedness_situations {
-    ($EXPECTED_UNBOUNDED_PLAN_LINES: expr,  $EXPECTED_BOUNDED_PLAN_LINES: 
expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, 
$EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, 
$EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, 
$SOURCE_UNBOUNDED: expr, $PREFER_EXISTING_SORT: expr) => {
-        if $SOURCE_UNBOUNDED {
-            assert_optimized_prefer_sort_on_off!(
-                $EXPECTED_UNBOUNDED_PLAN_LINES,
-                $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES,
-                $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES,
-                $PLAN,
-                $PREFER_EXISTING_SORT,
-                $SOURCE_UNBOUNDED
-            );
-        } else {
-            assert_optimized_prefer_sort_on_off!(
-                $EXPECTED_BOUNDED_PLAN_LINES,
-                $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES,
-                $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES,
-                $PLAN,
-                $PREFER_EXISTING_SORT,
-                $SOURCE_UNBOUNDED
-            );
-        }
-    };
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum SortPreference {
+    PreserveOrder,
+    MaximizeParallelism,
 }
 
-/// Runs the `replace_with_order_preserving_variants` sub-rule and asserts
-/// the plan against the original and expected plans.
-///
-/// # Parameters
-///
-/// * `$EXPECTED_PLAN_LINES`: Expected input plan.
-/// * `$EXPECTED_OPTIMIZED_PLAN_LINES`: Expected optimized plan.
-/// * `$PLAN`: The plan to optimize.
-/// * `$PREFER_EXISTING_SORT`: Value of the `prefer_existing_sort` flag.
-#[macro_export]
-macro_rules! assert_optimized {
-        ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, 
$PLAN: expr, $PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => {
-            let physical_plan = $PLAN;
-            let formatted = 
displayable(physical_plan.as_ref()).indent(true).to_string();
-            let actual: Vec<&str> = formatted.trim().lines().collect();
-
-            let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES
-                .iter().map(|s| *s).collect();
-
-            assert_eq!(
-                expected_plan_lines, actual,
-                "\n**Original Plan 
Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n"
-            );
+struct ReplaceTest {
+    plan: Arc<dyn ExecutionPlan>,
+    boundedness: Boundedness,
+    sort_preference: SortPreference,
+}
 
-            let expected_optimized_lines: Vec<&str> = 
$EXPECTED_OPTIMIZED_PLAN_LINES.iter().map(|s| *s).collect();
+impl ReplaceTest {
+    fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+        Self {
+            plan,
+            boundedness: Boundedness::Bounded,
+            sort_preference: SortPreference::MaximizeParallelism,
+        }
+    }
+
+    fn with_boundedness(mut self, boundedness: Boundedness) -> Self {
+        self.boundedness = boundedness;
+        self
+    }
 
-            // Run the rule top-down
-            let mut config = ConfigOptions::new();
-            config.optimizer.prefer_existing_sort=$PREFER_EXISTING_SORT;
-            let plan_with_pipeline_fixer = 
OrderPreservationContext::new_default(physical_plan);
-            let parallel = 
plan_with_pipeline_fixer.transform_up(|plan_with_pipeline_fixer| 
replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, 
&config)).data().and_then(check_integrity)?;
-            let optimized_physical_plan = parallel.plan;
+    fn with_sort_preference(mut self, sort_preference: SortPreference) -> Self 
{
+        self.sort_preference = sort_preference;
+        self
+    }
 
-            // Get string representation of the plan
-            let actual = get_plan_string(&optimized_physical_plan);
-            assert_eq!(
-                expected_optimized_lines, actual,
-                "\n**Optimized Plan 
Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n"
+    async fn execute_plan(&self) -> String {
+        let mut config = ConfigOptions::new();
+        config.optimizer.prefer_existing_sort =
+            self.sort_preference == SortPreference::PreserveOrder;
+
+        let plan_with_pipeline_fixer = OrderPreservationContext::new_default(
+            self.plan.clone().reset_state().unwrap(),
+        );
+
+        let parallel = plan_with_pipeline_fixer
+            .transform_up(|plan_with_pipeline_fixer| {
+                replace_with_order_preserving_variants(
+                    plan_with_pipeline_fixer,
+                    false,
+                    false,
+                    &config,
+                )
+            })
+            .data()
+            .and_then(check_integrity)
+            .unwrap();
+
+        let optimized_physical_plan = parallel.plan;
+        let optimized_plan_string = 
displayable(optimized_physical_plan.as_ref())
+            .indent(true)
+            .to_string();
+
+        if self.boundedness == Boundedness::Bounded {
+            let ctx = SessionContext::new();
+            let object_store = InMemory::new();
+            object_store
+                .put(
+                    &object_store::path::Path::from("file_path"),
+                    bytes::Bytes::from("").into(),
+                )
+                .await
+                .expect("could not create object store");
+            ctx.register_object_store(
+                &Url::parse("test://").unwrap(),
+                Arc::new(object_store),
+            );
+            let task_ctx = Arc::new(TaskContext::from(&ctx));
+            let res = collect(optimized_physical_plan, task_ctx).await;
+            assert!(
+                res.is_ok(),
+                "Some errors occurred while executing the optimized physical 
plan: {:?}\nPlan: {}",
+                res.unwrap_err(), optimized_plan_string
             );
+        }
+
+        optimized_plan_string
+    }
+
+    async fn run(&self) -> String {
+        let input_plan_string = 
displayable(self.plan.as_ref()).indent(true).to_string();
+
+        let optimized = self.execute_plan().await;
 
-            if !$SOURCE_UNBOUNDED {
-                let ctx = SessionContext::new();
-                let object_store = InMemory::new();
-                object_store.put(&object_store::path::Path::from("file_path"), 
bytes::Bytes::from("").into()).await?;
-                ctx.register_object_store(&Url::parse("test://").unwrap(), 
Arc::new(object_store));
-                let task_ctx = Arc::new(TaskContext::from(&ctx));
-                let res = collect(optimized_physical_plan, task_ctx).await;
-                assert!(
-                    res.is_ok(),
-                    "Some errors occurred while executing the optimized 
physical plan: {:?}", res.unwrap_err()
-                );
-            }
-        };
+        if input_plan_string == optimized {
+            format!("Input / Optimized:\n{input_plan_string}")
+        } else {
+            format!("Input:\n{input_plan_string}\nOptimized:\n{optimized}")
+        }
     }
+}
 
 #[rstest]
 #[tokio::test]
 // Searches for a simple sort and a repartition just after it, the second 
repartition with 1 input partition should not be affected
 async fn test_replace_multiple_input_repartition_1(
-    #[values(false, true)] source_unbounded: bool,
-    #[values(false, true)] prefer_existing_sort: bool,
+    #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: 
Boundedness,

Review Comment:
   this is so much easier to read!



##########
datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs:
##########
@@ -200,66 +177,72 @@ async fn test_replace_multiple_input_repartition_1(
     let sort = sort_exec_with_preserve_partitioning(sort_exprs.clone(), 
repartition);
     let physical_plan = sort_preserving_merge_exec(sort_exprs, sort);
 
-    // Expected inputs unbounded and bounded
-    let expected_input_unbounded = [
-            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
-            "  SortExec: expr=[a@0 ASC NULLS LAST], 
preserve_partitioning=[true]",
-            "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
-            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        StreamingTableExec: partition_sizes=1, projection=[a, c, 
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
-        ];
-    let expected_input_bounded = [
-            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
-            "  SortExec: expr=[a@0 ASC NULLS LAST], 
preserve_partitioning=[true]",
-            "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
-            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        DataSourceExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
-        ];
-
-    // Expected unbounded result (same for with and without flag)
-    let expected_optimized_unbounded = [
-            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
-            "  RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
-            "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "      StreamingTableExec: partition_sizes=1, projection=[a, c, 
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
-        ];
-
-    // Expected bounded results with and without flag
-    let expected_optimized_bounded = [
-            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
-            "  SortExec: expr=[a@0 ASC NULLS LAST], 
preserve_partitioning=[true]",
-            "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
-            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        DataSourceExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
-        ];
-    let expected_optimized_bounded_sort_preserve = [
-            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
-            "  RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
-            "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "      DataSourceExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
-        ];
-    assert_optimized_in_all_boundedness_situations!(
-        expected_input_unbounded,
-        expected_input_bounded,
-        expected_optimized_unbounded,
-        expected_optimized_bounded,
-        expected_optimized_bounded_sort_preserve,
-        physical_plan,
-        source_unbounded,
-        prefer_existing_sort
-    );
+    let run = ReplaceTest::new(physical_plan)
+        .with_boundedness(boundedness)
+        .with_sort_preference(sort_pref);
+
+    let physical_plan = run.run().await;
+
+    allow_duplicates! {
+    match (boundedness, sort_pref) {
+        (Boundedness::Bounded, SortPreference::MaximizeParallelism) => {

Review Comment:
   yes it is much better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to