godfreyhe commented on code in PR #21672:
URL: https://github.com/apache/flink/pull/21672#discussion_r1089903224


##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml:
##########
@@ -663,7 +663,7 @@ Calc(select=[EXPR$0, EXPR$1, EXPR$2])
    +- Exchange(distribution=[hash[b, b0]])
       +- LocalHashAggregate(groupBy=[b, b0], select=[b, b0, Partial_COUNT(a) 
AS count$0, Partial_COUNT(id) AS count$1, Partial_SUM(a0) AS sum$2])
          +- HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[b, a, id, 
a0, b0], build=[right])
-            :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])

Review Comment:
   Do we change the default shuffle mode to blocking shuffle?



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml:
##########
@@ -71,17 +75,21 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY 
$1 ORDER BY $2 NULLS
       <![CDATA[
 Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS 
EXPR$1, w2$o0 AS EXPR$2, w0$o2 AS EXPR$3, CAST((CASE((w3$o0 > 0), w3$o1, 
null:INTEGER) / w3$o0) AS INTEGER) AS EXPR$4])
 +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w0$o0 
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, 
w1$o0, w0$o1, w3$o0, w3$o1, w2$o0, w0$o0])
-   +- Sort(orderBy=[c ASC, a ASC])
-      +- Exchange(distribution=[hash[c]])
-         +- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) 
AS w3$o0, $SUM0(a) AS w3$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
window#1=[RANK(*) AS w2$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0])
-            +- Sort(orderBy=[b ASC, c ASC])
-               +- Exchange(distribution=[hash[b]])
-                  +- OverAggregate(orderBy=[c ASC, a ASC], window#0=[MIN(a) AS 
w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, 
w0$o2, w1$o0, w0$o1])
-                     +- Sort(orderBy=[c ASC, a ASC])
-                        +- OverAggregate(orderBy=[b ASC], window#0=[COUNT(a) 
AS w0$o2, $SUM0(a) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o2, w1$o0])
-                           +- Sort(orderBy=[b ASC])
-                              +- Exchange(distribution=[single])
-                                 +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[forward])
+      +- Sort(orderBy=[c ASC, a ASC])
+         +- Exchange(distribution=[hash[c]])
+            +- OverAggregate(partitionBy=[b], orderBy=[c ASC], 
window#0=[COUNT(a) AS w3$o0, $SUM0(a) AS w3$o1 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], window#1=[RANK(*) AS w2$o0 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0])
+               +- Exchange(distribution=[forward])
+                  +- Sort(orderBy=[b ASC, c ASC])
+                     +- Exchange(distribution=[hash[b]])
+                        +- OverAggregate(orderBy=[c ASC, a ASC], 
window#0=[MIN(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o2, w1$o0, w0$o1])
+                           +- Exchange(distribution=[forward])
+                              +- Sort(orderBy=[c ASC, a ASC])
+                                 +- Exchange(distribution=[forward])
+                                    +- OverAggregate(orderBy=[b ASC], 
window#0=[COUNT(a) AS w0$o2, $SUM0(a) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0])
+                                       +- Sort(orderBy=[b ASC])

Review Comment:
   There should be an `Exchange(distribution=[forward])` between Sort and 
OverAggregate



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SingleRowJoinTest.xml:
##########
@@ -237,20 +237,18 @@ LogicalProject(EXPR$0=[*($0, 0.1:DECIMAL(2, 1))])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a2, EXPR$1])
-+- NestedLoopJoin(joinType=[InnerJoin], where=[(EXPR$1 > $f0)], select=[a2, 
EXPR$1, $f0], build=[right], singleRowJoin=[true])
-   :- Exchange(distribution=[any])
-   :  +- HashAggregate(isMerge=[true], groupBy=[a2], select=[a2, 
Final_SUM(sum$0) AS EXPR$1])
-   :     +- Exchange(distribution=[hash[a2]])
-   :        +- LocalHashAggregate(groupBy=[a2], select=[a2, Partial_SUM(a1) AS 
sum$0])
-   :           +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, A, source: [TestTableSource(a1, a2)]]], fields=[a1, 
a2])(reuse_id=[1])
-   +- Exchange(distribution=[broadcast])
-      +- HashAggregate(isMerge=[false], select=[SINGLE_VALUE(EXPR$0) AS $f0])
-         +- Calc(select=[($f0 * 0.1) AS EXPR$0])
-            +- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
-               +- Exchange(distribution=[single])
-                  +- LocalHashAggregate(select=[Partial_SUM(a1) AS sum$0])
-                     +- Calc(select=[a1])
-                        +- Reused(reference_id=[1])
++- MultipleInput(readOrder=[0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[(EXPR$1 > $f0)], 
select=[a2, EXPR$1, $f0], build=[right], singleRowJoin=[true])\n:- 
HashAggregate(isMerge=[true], groupBy=[a2], select=[a2, Final_SUM(sum$0) AS 
EXPR$1])\n:  +- [#2] Exchange(distribution=[hash[a2]])\n+- [#1] 
Exchange(distribution=[broadcast])\n])
+   :- Exchange(distribution=[broadcast])
+   :  +- HashAggregate(isMerge=[false], select=[SINGLE_VALUE(EXPR$0) AS $f0])
+   :     +- Calc(select=[($f0 * 0.1) AS EXPR$0])
+   :        +- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
+   :           +- Exchange(distribution=[single])
+   :              +- LocalHashAggregate(select=[Partial_SUM(a1) AS sum$0])
+   :                 +- Calc(select=[a1])
+   :                    +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, A, source: [TestTableSource(a1, a2)]]], fields=[a1, 
a2])(reuse_id=[1])
+   +- Exchange(distribution=[hash[a2]])
+      +- LocalHashAggregate(groupBy=[a2], select=[a2, Partial_SUM(a1) AS 
sum$0])
+         +- Reused(reference_id=[1])

Review Comment:
   The `Exchange(distribution=[any])` is added in `DeadlockBreakupProcessor` if 
the shuffle mode is pipeline shuffle 



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MultipleInputITCase.scala:
##########
@@ -217,7 +217,7 @@ object MultipleInputITCase {
 
   @Parameters(name = "shuffleMode: {0}")
   def parameters: Array[BatchShuffleMode] =
-    Array(BatchShuffleMode.ALL_EXCHANGES_BLOCKING, 
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/MultipleInputCreationTest.scala:
##########
@@ -355,5 +355,5 @@ object MultipleInputCreationTest {
 
   @Parameters(name = "shuffleMode: {0}")
   def parameters: Array[BatchShuffleMode] =
-    Array(BatchShuffleMode.ALL_EXCHANGES_BLOCKING, 
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)

Review Comment:
   I do think we should remove BatchShuffleMode.ALL_EXCHANGES_PIPELINED,  We 
can defined the parameter as 
   ```
     def parameters(): util.Collection[Array[java.lang.Object]] = {
       Seq[Array[AnyRef]](
         Array(BatchShuffleMode.ALL_EXCHANGES_BLOCKING, 
adaptive_scheduler_enabled),
         Array(BatchShuffleMode.ALL_EXCHANGES_BLOCKING, 
adaptive_scheduler_disabled),
         Array(BatchShuffleMode.ALL_EXCHANGES_PIPELINED, 
adaptive_scheduler_disabled),
       )
   }
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DynamicFilteringITCase.java:
##########
@@ -48,9 +48,7 @@ public class DynamicFilteringITCase extends BatchTestBase {
     private Catalog catalog;
 
     static Stream<Arguments> parameters() {
-        return Stream.of(
-                Arguments.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING),
-                Arguments.of(BatchShuffleMode.ALL_EXCHANGES_PIPELINED));
+        return 
Stream.of(Arguments.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING));

Review Comment:
   could this pr make sure the `ALL_EXCHANGES_PIPELINED ` mode is tested ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to