lsyldliu commented on code in PR #23282:
URL: https://github.com/apache/flink/pull/23282#discussion_r1308505600


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java:
##########
@@ -52,50 +55,91 @@ public class DynamicFilteringDependencyProcessor implements 
ExecNodeGraphProcess
 
     @Override
     public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext 
context) {
-        ExecNodeGraph factSideProcessedGraph = 
checkIfFactSourceNeedEnforceDependency(execGraph);
+        ExecNodeGraph factSideProcessedGraph =
+                checkIfFactSourceNeedEnforceDependency(execGraph, context);
         return enforceDimSideBlockingExchange(factSideProcessedGraph, context);
     }
 
-    private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(ExecNodeGraph 
execGraph) {
-        Map<BatchExecTableSourceScan, List<ExecNode<?>>> 
dynamicFilteringScanDescendants =
+    private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(
+            ExecNodeGraph execGraph, ProcessorContext context) {
+        Map<BatchExecTableSourceScan, List<DescendantInfo>> 
dynamicFilteringScanDescendants =
                 new HashMap<>();
 
         AbstractExecNodeExactlyOnceVisitor dynamicFilteringScanCollector =
                 new AbstractExecNodeExactlyOnceVisitor() {
                     @Override
                     protected void visitNode(ExecNode<?> node) {
-                        node.getInputEdges().stream()
-                                .map(ExecEdge::getSource)
-                                .forEach(
-                                        input -> {
-                                            // The character of the dynamic 
filter scan is that it
-                                            // has an input.
-                                            if (input instanceof 
BatchExecTableSourceScan
-                                                    && 
input.getInputEdges().size() > 0) {
-                                                dynamicFilteringScanDescendants
-                                                        .computeIfAbsent(
-                                                                
(BatchExecTableSourceScan) input,
-                                                                ignored -> new 
ArrayList<>())
-                                                        .add(node);
-                                            }
-                                        });
+                        for (int i = 0; i < node.getInputEdges().size(); ++i) {
+                            ExecEdge edge = node.getInputEdges().get(i);
+                            ExecNode<?> input = edge.getSource();
+
+                            // The character of the dynamic filter scan is 
that it
+                            // has an input.
+                            if (input instanceof BatchExecTableSourceScan
+                                    && input.getInputEdges().size() > 0) {
+                                dynamicFilteringScanDescendants
+                                        .computeIfAbsent(
+                                                (BatchExecTableSourceScan) 
input,
+                                                ignored -> new ArrayList<>())
+                                        .add(new DescendantInfo(node, i));
+                            }
+                        }
 
                         visitInputs(node);
                     }
                 };
         execGraph.getRootNodes().forEach(node -> 
node.accept(dynamicFilteringScanCollector));
 
-        for (Map.Entry<BatchExecTableSourceScan, List<ExecNode<?>>> entry :
+        for (Map.Entry<BatchExecTableSourceScan, List<DescendantInfo>> entry :
                 dynamicFilteringScanDescendants.entrySet()) {
-            if (entry.getValue().size() == 1) {
-                ExecNode<?> next = entry.getValue().get(0);
-                if (next instanceof BatchExecMultipleInput) {
-                    // the source can be chained with BatchExecMultipleInput
-                    continue;
-                }
-            }
-            // otherwise we need dependencies
-            entry.getKey().setNeedDynamicFilteringDependency(true);
+            BatchExecTableSourceScan tableSourceScan = entry.getKey();
+            BatchExecDynamicFilteringDataCollector 
dynamicFilteringDataCollector =
+                    getDynamicFilteringDataCollector(tableSourceScan);
+
+            // Add exchange between collector and enforcer
+            BatchExecExchange exchange =
+                    new BatchExecExchange(
+                            context.getPlanner().getTableConfig(),
+                            InputProperty.builder()
+                                    
.requiredDistribution(InputProperty.ANY_DISTRIBUTION)
+                                    
.damBehavior(InputProperty.DamBehavior.BLOCKING)
+                                    .build(),
+                            (RowType) 
dynamicFilteringDataCollector.getOutputType(),
+                            "Exchange");
+            exchange.setRequiredExchangeMode(StreamExchangeMode.BATCH);
+            exchange.setInputEdges(
+                    Collections.singletonList(
+                            ExecEdge.builder()
+                                    .source(dynamicFilteringDataCollector)
+                                    .target(exchange)
+                                    .build()));
+
+            // set enforcer inputs
+            BatchExecExecutionOrderEnforcer enforcer =
+                    new BatchExecExecutionOrderEnforcer(
+                            context.getPlanner().getTableConfig(),
+                            Arrays.asList(
+                                    exchange.getInputProperties().get(0), 
InputProperty.DEFAULT),
+                            tableSourceScan.getOutputType(),
+                            "OrderEnforcer");
+            ExecEdge edge1 = 
ExecEdge.builder().source(exchange).target(enforcer).build();
+            ExecEdge edge2 = 
ExecEdge.builder().source(tableSourceScan).target(enforcer).build();
+            enforcer.setInputEdges(Arrays.asList(edge1, edge2));
+
+            // we clear the input of tableSourceScan to avoid cycle in exec 
plan
+            tableSourceScan.setInputEdges(Collections.emptyList());
+            tableSourceScan.setInputProperties(Collections.emptyList());

Review Comment:
   I think it would be better if we new a new TableScan here instead of 
changing the final `inputProperties `.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to