godfreyhe commented on code in PR #20497:
URL: https://github.com/apache/flink/pull/20497#discussion_r940177474


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java:
##########
@@ -84,4 +100,108 @@ protected void visitNode(ExecNode<?> node) {
 
         return execGraph;
     }
+
+    private ExecNodeGraph enforceDimSideBlockingExchange(
+            ExecNodeGraph execGraph, ProcessorContext context) {
+        if (context.getPlanner()
+                        .getTableConfig()
+                        .getConfiguration()
+                        .get(ExecutionOptions.BATCH_SHUFFLE_MODE)
+                == BatchShuffleMode.ALL_EXCHANGES_BLOCKING) {
+            return execGraph;
+        }
+
+        Set<ExecNode<?>> nodesRequiredBlockingOutputs = new HashSet<>();
+        // Find all the dynamic filter collector node
+        AbstractExecNodeExactlyOnceVisitor 
nodesRequiredBlockingOutputsCollector =
+                new AbstractExecNodeExactlyOnceVisitor() {
+                    @Override
+                    protected void visitNode(ExecNode<?> node) {
+                        if (node instanceof 
BatchExecDynamicFilteringDataCollector) {
+                            nodesRequiredBlockingOutputs.add(node);
+                        }
+
+                        // Here either it is added in the above lines or by 
its children nodes.
+                        if (nodesRequiredBlockingOutputs.contains(node)) {
+                            node.getInputEdges().stream()
+                                    .map(ExecEdge::getSource)
+                                    
.forEach(nodesRequiredBlockingOutputs::add);
+                        }
+
+                        visitInputs(node);
+                    }
+                };
+        execGraph
+                .getRootNodes()
+                .forEach(node -> 
node.accept(nodesRequiredBlockingOutputsCollector));
+
+        // Now we make all the output edges in nodesRequiredBlockingOutputs to 
be blocking.
+        AbstractExecNodeExactlyOnceVisitor blockingEnforcerVisitor =
+                new AbstractExecNodeExactlyOnceVisitor() {
+                    @Override
+                    protected void visitNode(ExecNode<?> node) {
+                        visitInputs(node);
+
+                        // We only contains the edges that the source is in 
the set, but
+                        // the target does not.
+                        if (nodesRequiredBlockingOutputs.contains(node)) {
+                            return;
+                        }
+
+                        for (int i = 0; i < node.getInputEdges().size(); ++i) {
+                            ExecEdge edge = node.getInputEdges().get(i);
+                            ExecNode<?> source = edge.getSource();
+
+                            // We only contains the edges that the source is 
in the set, but

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java:
##########
@@ -84,4 +100,108 @@ protected void visitNode(ExecNode<?> node) {
 
         return execGraph;
     }
+
+    private ExecNodeGraph enforceDimSideBlockingExchange(

Review Comment:
   I think we can do this after ` 
entry.getKey().setNeedDynamicFilteringDependency(true);` . We just need to 
create an Exchange as the directly scan input



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DynamicFilteringITCase.java:
##########
@@ -118,8 +124,10 @@ public void before() throws Exception {
                         dataId3));
     }
 
-    @Test
-    public void testSimpleDynamicFiltering() {
+    @ParameterizedTest(name = "mode = {0}")
+    @MethodSource("parameters")

Review Comment:
   change them to class level ?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java:
##########
@@ -84,4 +100,108 @@ protected void visitNode(ExecNode<?> node) {
 
         return execGraph;
     }
+
+    private ExecNodeGraph enforceDimSideBlockingExchange(
+            ExecNodeGraph execGraph, ProcessorContext context) {
+        if (context.getPlanner()
+                        .getTableConfig()
+                        .getConfiguration()
+                        .get(ExecutionOptions.BATCH_SHUFFLE_MODE)
+                == BatchShuffleMode.ALL_EXCHANGES_BLOCKING) {
+            return execGraph;
+        }
+
+        Set<ExecNode<?>> nodesRequiredBlockingOutputs = new HashSet<>();
+        // Find all the dynamic filter collector node

Review Comment:
   Find all the dynamic filter collector nodes and their inputs



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java:
##########
@@ -84,4 +100,108 @@ protected void visitNode(ExecNode<?> node) {
 
         return execGraph;
     }
+
+    private ExecNodeGraph enforceDimSideBlockingExchange(
+            ExecNodeGraph execGraph, ProcessorContext context) {
+        if (context.getPlanner()
+                        .getTableConfig()
+                        .getConfiguration()
+                        .get(ExecutionOptions.BATCH_SHUFFLE_MODE)
+                == BatchShuffleMode.ALL_EXCHANGES_BLOCKING) {
+            return execGraph;
+        }
+
+        Set<ExecNode<?>> nodesRequiredBlockingOutputs = new HashSet<>();
+        // Find all the dynamic filter collector node
+        AbstractExecNodeExactlyOnceVisitor 
nodesRequiredBlockingOutputsCollector =
+                new AbstractExecNodeExactlyOnceVisitor() {
+                    @Override
+                    protected void visitNode(ExecNode<?> node) {
+                        if (node instanceof 
BatchExecDynamicFilteringDataCollector) {
+                            nodesRequiredBlockingOutputs.add(node);
+                        }
+
+                        // Here either it is added in the above lines or by 
its children nodes.
+                        if (nodesRequiredBlockingOutputs.contains(node)) {
+                            node.getInputEdges().stream()
+                                    .map(ExecEdge::getSource)
+                                    
.forEach(nodesRequiredBlockingOutputs::add);
+                        }
+
+                        visitInputs(node);
+                    }
+                };
+        execGraph
+                .getRootNodes()
+                .forEach(node -> 
node.accept(nodesRequiredBlockingOutputsCollector));
+
+        // Now we make all the output edges in nodesRequiredBlockingOutputs to 
be blocking.
+        AbstractExecNodeExactlyOnceVisitor blockingEnforcerVisitor =
+                new AbstractExecNodeExactlyOnceVisitor() {
+                    @Override
+                    protected void visitNode(ExecNode<?> node) {
+                        visitInputs(node);
+
+                        // We only contains the edges that the source is in 
the set, but

Review Comment:
   contains -> contain



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to