This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 679436390db [FLINK-32827][table-runtime] Fix the operator fusion 
codegen may not take effect when enabling runtime filter
679436390db is described below

commit 679436390db2ac1b54584dbafb6e1091f2f16ada
Author: fengli <[email protected]>
AuthorDate: Wed Aug 16 18:33:19 2023 +0800

    [FLINK-32827][table-runtime] Fix the operator fusion codegen may not take 
effect when enabling runtime filter
    
    This closes #23225
---
 .../runtimefilter/BatchExecRuntimeFilter.java      |  30 +++++
 .../spec/RuntimeFilterFusionCodegenSpec.scala      | 140 +++++++++++++++++++++
 .../runtime/batch/sql/RuntimeFilterITCase.java     |  26 ++--
 3 files changed, 188 insertions(+), 8 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecRuntimeFilter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecRuntimeFilter.java
index 160a9c5db7e..7cbe6c90397 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecRuntimeFilter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecRuntimeFilter.java
@@ -24,6 +24,9 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import 
org.apache.flink.table.planner.codegen.runtimefilter.RuntimeFilterCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.fusion.OpFusionCodegenSpecGenerator;
+import 
org.apache.flink.table.planner.plan.fusion.generator.TwoInputOpFusionCodegenSpecGenerator;
+import 
org.apache.flink.table.planner.plan.fusion.spec.RuntimeFilterFusionCodegenSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
@@ -91,4 +94,31 @@ public class BatchExecRuntimeFilter extends 
ExecNodeBase<RowData>
                 0,
                 false);
     }
+
+    @Override
+    public boolean supportFusionCodegen() {
+        return true;
+    }
+
+    @Override
+    protected OpFusionCodegenSpecGenerator 
translateToFusionCodegenSpecInternal(
+            PlannerBase planner, ExecNodeConfig config) {
+        OpFusionCodegenSpecGenerator leftInput =
+                getInputEdges().get(0).translateToFusionCodegenSpec(planner);
+        OpFusionCodegenSpecGenerator rightInput =
+                getInputEdges().get(1).translateToFusionCodegenSpec(planner);
+        OpFusionCodegenSpecGenerator runtimeFilterGenerator =
+                new TwoInputOpFusionCodegenSpecGenerator(
+                        leftInput,
+                        rightInput,
+                        0L,
+                        (RowType) getOutputType(),
+                        new RuntimeFilterFusionCodegenSpec(
+                                new CodeGeneratorContext(
+                                        config, 
planner.getFlinkContext().getClassLoader()),
+                                probeIndices));
+        leftInput.addOutput(1, runtimeFilterGenerator);
+        rightInput.addOutput(2, runtimeFilterGenerator);
+        return runtimeFilterGenerator;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala
new file mode 100644
index 00000000000..8e5273a8c4d
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.fusion.spec
+
+import org.apache.flink.runtime.operators.util.BloomFilter
+import org.apache.flink.table.data.binary.BinaryRowData
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
GeneratedExpression}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, 
newName, newNames}
+import org.apache.flink.table.planner.plan.fusion.{OpFusionCodegenSpecBase, 
OpFusionContext}
+import org.apache.flink.table.planner.typeutils.RowTypeUtils
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.util.Preconditions
+
+import java.util
+
+/** The operator fusion codegen spec for RuntimeFilter. */
+class RuntimeFilterFusionCodegenSpec(opCodegenCtx: CodeGeneratorContext, 
probeIndices: Array[Int])
+  extends OpFusionCodegenSpecBase(opCodegenCtx) {
+
+  private lazy val buildInputId = 1
+
+  private var buildContext: OpFusionContext = _
+  private var probeContext: OpFusionContext = _
+  private var buildType: RowType = _
+  private var probeType: RowType = _
+
+  private var buildComplete: String = _
+  private var filterTerm: String = _
+
+  override def setup(opFusionContext: OpFusionContext): Unit = {
+    super.setup(opFusionContext)
+    val inputContexts = fusionContext.getInputFusionContexts
+    assert(inputContexts.size == 2)
+    buildContext = inputContexts.get(0)
+    probeContext = inputContexts.get(1)
+
+    buildType = buildContext.getOutputType
+    probeType = probeContext.getOutputType
+  }
+
+  override def variablePrefix(): String = "runtimeFilter"
+
+  override def doProcessProduce(codegenCtx: CodeGeneratorContext): Unit = {
+    // call build side first, then call probe side
+    buildContext.processProduce(codegenCtx)
+    probeContext.processProduce(codegenCtx)
+  }
+
+  override def doEndInputProduce(codegenCtx: CodeGeneratorContext): Unit = {
+    // call build side first, then call probe side
+    buildContext.endInputProduce(codegenCtx)
+    probeContext.endInputProduce(codegenCtx)
+  }
+
+  override def doProcessConsume(
+      inputId: Int,
+      inputVars: util.List[GeneratedExpression],
+      row: GeneratedExpression): String = {
+    if (inputId == buildInputId) {
+      buildComplete = newName("buildComplete")
+      opCodegenCtx.addReusableMember(s"private transient boolean 
$buildComplete;")
+      opCodegenCtx.addReusableOpenStatement(s"$buildComplete = false;")
+
+      filterTerm = newName("filter")
+      val filterClass = className[BloomFilter]
+      opCodegenCtx.addReusableMember(s"private transient $filterClass 
$filterTerm;")
+
+      s"""
+         |${className[Preconditions]}.checkState(!$buildComplete, "Should not 
build completed.");
+         |if ($filterTerm == null && !${row.resultTerm}.isNullAt(1)) {
+         |    $filterTerm = 
$filterClass.fromBytes(${row.resultTerm}.getBinary(1));
+         |}
+         |""".stripMargin
+    } else {
+      val Seq(probeKeyTerm, probeKeyWriterTerm) = newNames("probeKeyTerm", 
"probeKeyWriterTerm")
+      // project probe key row from input
+      val probeKeyExprs = probeIndices.map(idx => inputVars.get(idx))
+      val keyProjectionCode = getExprCodeGenerator
+        .generateResultExpression(
+          probeKeyExprs,
+          RowTypeUtils.projectRowType(probeType, probeIndices),
+          classOf[BinaryRowData],
+          probeKeyTerm,
+          outRowWriter = Option(probeKeyWriterTerm))
+        .code
+
+      val found = newName("found")
+      s"""
+         |${className[Preconditions]}.checkState($buildComplete, "Should build 
completed.");
+         |
+         |boolean $found = true;
+         |if ($filterTerm != null) {
+         |  // compute the hash code of probe key
+         |  $keyProjectionCode
+         |  final int hashCode = $probeKeyTerm.hashCode();
+         |  if (!$filterTerm.testHash(hashCode)) {
+         |    $found = false;
+         |  }
+         |}
+         |// if found, call downstream to consume the row
+         |if($found) {
+         |  ${row.code}
+         |  ${fusionContext.processConsume(null, row.resultTerm)}
+         |}
+         |""".stripMargin
+    }
+  }
+
+  override def doEndInputConsume(inputId: Int): String = {
+    if (inputId == buildInputId) {
+      s"""
+         |${className[Preconditions]}.checkState(!$buildComplete, "Should not 
build completed.");
+         |LOG.info("RuntimeFilter build completed.");
+         |$buildComplete = true;
+         |""".stripMargin
+    } else {
+      s"""
+         |${className[Preconditions]}.checkState($buildComplete, "Should build 
completed.");
+         |LOG.info("Finish RuntimeFilter probe phase.");
+         |// call downstream endInput method
+         |${fusionContext.endInputConsume()}
+         |""".stripMargin
+    }
+  }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RuntimeFilterITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RuntimeFilterITCase.java
index a77720f6ee3..7dd127f8cc5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RuntimeFilterITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RuntimeFilterITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.runtime.batch.sql;
 import org.apache.flink.api.common.BatchShuffleMode;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
@@ -49,8 +50,10 @@ class RuntimeFilterITCase extends BatchTestBase {
 
     static Stream<Arguments> parameters() {
         return Stream.of(
-                Arguments.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING),
-                Arguments.of(BatchShuffleMode.ALL_EXCHANGES_PIPELINED));
+                Arguments.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING, true),
+                Arguments.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING, false),
+                Arguments.of(BatchShuffleMode.ALL_EXCHANGES_PIPELINED, true),
+                Arguments.of(BatchShuffleMode.ALL_EXCHANGES_PIPELINED, false));
     }
 
     @BeforeEach
@@ -112,9 +115,11 @@ class RuntimeFilterITCase extends BatchTestBase {
                 false);
     }
 
-    @ParameterizedTest(name = "mode = {0}")
+    @ParameterizedTest(name = "mode = {0}, ofcg = {1}")
     @MethodSource("parameters")
-    void testSimpleRuntimeFilter(BatchShuffleMode shuffleMode) {
+    void testSimpleRuntimeFilter(BatchShuffleMode shuffleMode, boolean ofcg) {
+        tEnv.getConfig()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED, ofcg);
         configBatchShuffleMode(tEnv.getConfig(), shuffleMode);
         checkResult(
                 "select * from fact, dim where x = a and z = 3",
@@ -131,9 +136,11 @@ class RuntimeFilterITCase extends BatchTestBase {
                 false);
     }
 
-    @ParameterizedTest(name = "mode = {0}")
+    @ParameterizedTest(name = "mode = {0}, ofcg = {1}")
     @MethodSource("parameters")
-    void testRuntimeFilterWithBuildSidePushDown(BatchShuffleMode shuffleMode) {
+    void testRuntimeFilterWithBuildSidePushDown(BatchShuffleMode shuffleMode, 
boolean ofcg) {
+        tEnv.getConfig()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED, ofcg);
         configBatchShuffleMode(tEnv.getConfig(), shuffleMode);
         // The following two config are used to let the build side is a direct 
Agg (without
         // Exchange)
@@ -162,9 +169,12 @@ class RuntimeFilterITCase extends BatchTestBase {
                 false);
     }
 
-    @ParameterizedTest(name = "mode = {0}")
+    @ParameterizedTest(name = "mode = {0}, ofcg = {1}")
     @MethodSource("parameters")
-    void testRuntimeFilterWithProbeSidePushDown(BatchShuffleMode shuffleMode) 
throws Exception {
+    void testRuntimeFilterWithProbeSidePushDown(BatchShuffleMode shuffleMode, 
boolean ofcg)
+            throws Exception {
+        tEnv.getConfig()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED, ofcg);
         configBatchShuffleMode(tEnv.getConfig(), shuffleMode);
         tEnv.executeSql(
                 String.format(

Reply via email to