mustafasrepo commented on code in PR #5880:
URL: https://github.com/apache/arrow-datafusion/pull/5880#discussion_r1159635631


##########
datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs:
##########
@@ -2225,21 +2221,39 @@ mod tests {
             true,
         )
         .await?;
-        let df = ctx.sql("EXPLAIN SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left 
as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < 
t2.a1 + 10").await?;
-        let physical_plan = df.create_physical_plan().await?;
-        let task_ctx = ctx.task_ctx();
-        let results = collect(physical_plan.clone(), task_ctx).await.unwrap();
-        let formatted = pretty_format_batches(&results).unwrap().to_string();
-        let found = formatted
-            .lines()
-            .any(|line| line.contains("SymmetricHashJoinExec"));
-        assert!(found);
+        let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN 
right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
+        let dataframe = ctx.sql(sql).await?;
+        let physical_plan = dataframe.create_physical_plan().await?;
+        let formatted = 
crate::physical_plan::displayable(physical_plan.as_ref())
+            .indent()
+            .to_string();
+        let expected = {
+            [
+                "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: 
\"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { 
left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, 
cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: 
BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, 
cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: 
Literal { value: Int64(3) } } }, op: And, right: BinaryExpr { left: CastExpr { 
expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: 
CastOptions { safe: false } }, op: Lt, right: BinaryExpr { left: CastExpr { 
expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: 
CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(10) } } 
} }",
+                "  CoalesceBatchesExec: target_batch_size=8192",
+                "    RepartitionExec: partitioning=Hash([Column { name: 
\"a2\", index: 1 }], 8), input_partitions=1",
+                // "   CsvExec: files={1 group: [[tempdir/left.csv]]}, 
has_header=false, limit=None, projection=[a1, a2]",
+                "  CoalesceBatchesExec: target_batch_size=8192",
+                "    RepartitionExec: partitioning=Hash([Column { name: 
\"a2\", index: 1 }], 8), input_partitions=1",
+                // "   CsvExec: files={1 group: [[tempdir/right.csv]]}, 
has_header=false, limit=None, projection=[a1, a2]"
+            ]
+        };
+        let mut actual: Vec<&str> = formatted.trim().lines().collect();
+        // Remove CSV lines
+        actual.remove(3);
+        actual.remove(5);
+
+        assert_eq!(
+            expected,
+            actual[..],
+            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+        );
         Ok(())
     }
 
     #[tokio::test(flavor = "multi_thread")]
     async fn join_change_in_planner_without_sort() -> Result<()> {

Review Comment:
   I guess these tests check for physical plan generation. Since we are not 
executing anything, I guess `flavor = "multi_thread"` is unnecessary. You can 
write the tests  `join_change_in_planner_without_sort_not_allowed` and 
`join_change_in_planner_without_sort`  without `flavor = "multi_thread"` as in 
`join_change_in_planner`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to