lsyldliu commented on code in PR #23282:
URL: https://github.com/apache/flink/pull/23282#discussion_r1308172368


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java:
##########
@@ -52,50 +55,91 @@ public class DynamicFilteringDependencyProcessor implements 
ExecNodeGraphProcess
 
     @Override
     public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext 
context) {
-        ExecNodeGraph factSideProcessedGraph = 
checkIfFactSourceNeedEnforceDependency(execGraph);
+        ExecNodeGraph factSideProcessedGraph =
+                checkIfFactSourceNeedEnforceDependency(execGraph, context);
         return enforceDimSideBlockingExchange(factSideProcessedGraph, context);
     }
 
-    private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(ExecNodeGraph 
execGraph) {
-        Map<BatchExecTableSourceScan, List<ExecNode<?>>> 
dynamicFilteringScanDescendants =
+    private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(
+            ExecNodeGraph execGraph, ProcessorContext context) {
+        Map<BatchExecTableSourceScan, List<DescendantInfo>> 
dynamicFilteringScanDescendants =
                 new HashMap<>();
 
         AbstractExecNodeExactlyOnceVisitor dynamicFilteringScanCollector =
                 new AbstractExecNodeExactlyOnceVisitor() {
                     @Override
                     protected void visitNode(ExecNode<?> node) {
-                        node.getInputEdges().stream()
-                                .map(ExecEdge::getSource)
-                                .forEach(
-                                        input -> {
-                                            // The character of the dynamic 
filter scan is that it
-                                            // has an input.
-                                            if (input instanceof 
BatchExecTableSourceScan
-                                                    && 
input.getInputEdges().size() > 0) {
-                                                dynamicFilteringScanDescendants
-                                                        .computeIfAbsent(
-                                                                
(BatchExecTableSourceScan) input,
-                                                                ignored -> new 
ArrayList<>())
-                                                        .add(node);
-                                            }
-                                        });
+                        for (int i = 0; i < node.getInputEdges().size(); ++i) {
+                            ExecEdge edge = node.getInputEdges().get(i);
+                            ExecNode<?> input = edge.getSource();
+
+                            // The character of the dynamic filter scan is 
that it
+                            // has an input.
+                            if (input instanceof BatchExecTableSourceScan
+                                    && input.getInputEdges().size() > 0) {
+                                dynamicFilteringScanDescendants
+                                        .computeIfAbsent(
+                                                (BatchExecTableSourceScan) 
input,
+                                                ignored -> new ArrayList<>())
+                                        .add(new DescendantInfo(node, i));
+                            }
+                        }
 
                         visitInputs(node);
                     }
                 };
         execGraph.getRootNodes().forEach(node -> 
node.accept(dynamicFilteringScanCollector));
 
-        for (Map.Entry<BatchExecTableSourceScan, List<ExecNode<?>>> entry :
+        for (Map.Entry<BatchExecTableSourceScan, List<DescendantInfo>> entry :
                 dynamicFilteringScanDescendants.entrySet()) {
-            if (entry.getValue().size() == 1) {
-                ExecNode<?> next = entry.getValue().get(0);
-                if (next instanceof BatchExecMultipleInput) {
-                    // the source can be chained with BatchExecMultipleInput
-                    continue;
-                }
-            }
-            // otherwise we need dependencies
-            entry.getKey().setNeedDynamicFilteringDependency(true);
+            BatchExecTableSourceScan tableSourceScan = entry.getKey();
+            BatchExecDynamicFilteringDataCollector 
dynamicFilteringDataCollector =
+                    getDynamicFilteringDataCollector(tableSourceScan);
+
+            // Add exchange between collector and enforcer
+            BatchExecExchange exchange =
+                    new BatchExecExchange(
+                            context.getPlanner().getTableConfig(),
+                            InputProperty.builder()
+                                    
.requiredDistribution(InputProperty.ANY_DISTRIBUTION)
+                                    
.damBehavior(InputProperty.DamBehavior.BLOCKING)
+                                    .build(),
+                            (RowType) 
dynamicFilteringDataCollector.getOutputType(),
+                            "Exchange");
+            exchange.setRequiredExchangeMode(StreamExchangeMode.BATCH);
+            exchange.setInputEdges(
+                    Collections.singletonList(
+                            ExecEdge.builder()
+                                    .source(dynamicFilteringDataCollector)
+                                    .target(exchange)
+                                    .build()));
+
+            // set enforcer inputs
+            BatchExecExecutionOrderEnforcer enforcer =
+                    new BatchExecExecutionOrderEnforcer(
+                            context.getPlanner().getTableConfig(),
+                            Arrays.asList(
+                                    exchange.getInputProperties().get(0), 
InputProperty.DEFAULT),
+                            tableSourceScan.getOutputType(),
+                            "OrderEnforcer");
+            ExecEdge edge1 = 
ExecEdge.builder().source(exchange).target(enforcer).build();
+            ExecEdge edge2 = 
ExecEdge.builder().source(tableSourceScan).target(enforcer).build();
+            enforcer.setInputEdges(Arrays.asList(edge1, edge2));
+
+            // we clear the input of tableSourceScan to avoid cycle in exec 
plan
+            tableSourceScan.setInputEdges(Collections.emptyList());
+            tableSourceScan.setInputProperties(Collections.emptyList());

Review Comment:
   Can you explain why we need to reset the `inputProperties`?  I'm concerned 
that the `inputProperties` is final from the initial design, we shouldn't 
change it as soon as possible. I think we don't need to reset it without having 
side effects, there are many places that generate the new Exchange without 
resetting it.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala:
##########
@@ -73,15 +73,14 @@ class BatchPlanner(
     val processors = new util.ArrayList[ExecNodeGraphProcessor]()
     // deadlock breakup
     processors.add(new DeadlockBreakupProcessor())
+    if 
(getTableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED))
 {
+      processors.add(new DynamicFilteringDependencyProcessor)
+    }
     // multiple input creation
     if 
(getTableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED))
 {
       processors.add(new MultipleInputNodeCreationProcessor(false))
     }
     processors.add(new ForwardHashExchangeProcessor)
-    if 
(getTableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED))
 {
-      processors.add(new DynamicFilteringDependencyProcessor)
-      processors.add(new ResetTransformationProcessor)

Review Comment:
   The `ResetTransformationProcessor` class can be deleted now.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java:
##########
@@ -204,4 +248,16 @@ private BatchExecExchange createExchange(
 
         return exchange;
     }
+
+    private static class DescendantInfo {
+        /** The DynamicFilteringScan is the {@link inputId}th input of current 
descendant . */

Review Comment:
   @code inputId



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java:
##########
@@ -52,50 +55,91 @@ public class DynamicFilteringDependencyProcessor implements 
ExecNodeGraphProcess
 
     @Override
     public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext 
context) {
-        ExecNodeGraph factSideProcessedGraph = 
checkIfFactSourceNeedEnforceDependency(execGraph);
+        ExecNodeGraph factSideProcessedGraph =
+                checkIfFactSourceNeedEnforceDependency(execGraph, context);
         return enforceDimSideBlockingExchange(factSideProcessedGraph, context);
     }
 
-    private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(ExecNodeGraph 
execGraph) {
-        Map<BatchExecTableSourceScan, List<ExecNode<?>>> 
dynamicFilteringScanDescendants =
+    private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(

Review Comment:
   This method's name should be changed, it always inserts enforce dependency.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/dynamicfiltering/ExecutionOrderEnforcerCodeGenerator.scala:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.codegen.dynamicfiltering
+
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
OperatorCodeGenerator}
+import 
org.apache.flink.table.planner.codegen.CodeGenUtils.{DEFAULT_INPUT1_TERM, 
DEFAULT_INPUT2_TERM}
+import 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExecutionOrderEnforcer
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
+import org.apache.flink.table.types.logical.RowType
+
+/**
+ * Operator code generator for ExecutionOrderEnforcer operator. Input1 is the 
dependent upstream,
+ * input2 is the source. see [[BatchExecExecutionOrderEnforcer]] for details.
+ */
+object ExecutionOrderEnforcerCodeGenerator {
+  def gen(
+      ctx: CodeGeneratorContext,
+      input1Type: RowType,
+      input2Type: RowType): CodeGenOperatorFactory[RowData] = {
+
+    val processElement2Code =
+      s"""
+         |${generateCollect(s"$DEFAULT_INPUT2_TERM")}
+         |""".stripMargin
+
+    new CodeGenOperatorFactory[RowData](
+      OperatorCodeGenerator.generateTwoInputStreamOperator(
+        ctx,
+        "ExecutionOrderEnforcerOperator",
+        "",
+        processElement2Code,

Review Comment:
   ```suggestion
                s"""
            |${generateCollect(s"$DEFAULT_INPUT2_TERM")}
            |""".stripMargin,
   ```



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/dynamicfiltering/ExecutionOrderEnforcerCodeGenerator.scala:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.codegen.dynamicfiltering
+
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
OperatorCodeGenerator}
+import 
org.apache.flink.table.planner.codegen.CodeGenUtils.{DEFAULT_INPUT1_TERM, 
DEFAULT_INPUT2_TERM}
+import 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExecutionOrderEnforcer
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
+import org.apache.flink.table.types.logical.RowType
+
+/**
+ * Operator code generator for ExecutionOrderEnforcer operator. Input1 is the 
dependent upstream,
+ * input2 is the source. see [[BatchExecExecutionOrderEnforcer]] for details.
+ */
+object ExecutionOrderEnforcerCodeGenerator {
+  def gen(
+      ctx: CodeGeneratorContext,
+      input1Type: RowType,
+      input2Type: RowType): CodeGenOperatorFactory[RowData] = {
+
+    val processElement2Code =
+      s"""
+         |${generateCollect(s"$DEFAULT_INPUT2_TERM")}
+         |""".stripMargin
+
+    new CodeGenOperatorFactory[RowData](
+      OperatorCodeGenerator.generateTwoInputStreamOperator(
+        ctx,
+        "ExecutionOrderEnforcerOperator",
+        "",
+        processElement2Code,
+        input1Type,
+        input2Type,
+        DEFAULT_INPUT1_TERM,
+        DEFAULT_INPUT2_TERM,
+        None,
+        // we cannot pass None or use default here, because this operator must 
implement BoundedMultiInput

Review Comment:
   If we don't implement the `BoundedMultiInput`, what is the side effect?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java:
##########
@@ -108,44 +102,10 @@ protected Transformation<RowData> translateToPlanInternal(
             return transformation;

Review Comment:
   This if logic can be removed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to