This is an automated email from the ASF dual-hosted git repository.
wanglijie pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new 9cd04bd004f [FLINK-32827][table-runtime] Fix the operator fusion
codegen may not take effect when enabling runtime filter
9cd04bd004f is described below
commit 9cd04bd004fbf2d0fa4266cf07909c0bbcc94813
Author: fengli <[email protected]>
AuthorDate: Wed Aug 16 18:33:19 2023 +0800
[FLINK-32827][table-runtime] Fix the operator fusion codegen may not take
effect when enabling runtime filter
This closes #23225
---
.../runtimefilter/BatchExecRuntimeFilter.java | 30 +++++
.../spec/RuntimeFilterFusionCodegenSpec.scala | 140 +++++++++++++++++++++
.../runtime/batch/sql/RuntimeFilterITCase.java | 26 ++--
3 files changed, 188 insertions(+), 8 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecRuntimeFilter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecRuntimeFilter.java
index 160a9c5db7e..7cbe6c90397 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecRuntimeFilter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecRuntimeFilter.java
@@ -24,6 +24,9 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import
org.apache.flink.table.planner.codegen.runtimefilter.RuntimeFilterCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.fusion.OpFusionCodegenSpecGenerator;
+import
org.apache.flink.table.planner.plan.fusion.generator.TwoInputOpFusionCodegenSpecGenerator;
+import
org.apache.flink.table.planner.plan.fusion.spec.RuntimeFilterFusionCodegenSpec;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
@@ -91,4 +94,31 @@ public class BatchExecRuntimeFilter extends
ExecNodeBase<RowData>
0,
false);
}
+
+ @Override
+ public boolean supportFusionCodegen() {
+ return true;
+ }
+
+ @Override
+ protected OpFusionCodegenSpecGenerator
translateToFusionCodegenSpecInternal(
+ PlannerBase planner, ExecNodeConfig config) {
+ OpFusionCodegenSpecGenerator leftInput =
+ getInputEdges().get(0).translateToFusionCodegenSpec(planner);
+ OpFusionCodegenSpecGenerator rightInput =
+ getInputEdges().get(1).translateToFusionCodegenSpec(planner);
+ OpFusionCodegenSpecGenerator runtimeFilterGenerator =
+ new TwoInputOpFusionCodegenSpecGenerator(
+ leftInput,
+ rightInput,
+ 0L,
+ (RowType) getOutputType(),
+ new RuntimeFilterFusionCodegenSpec(
+ new CodeGeneratorContext(
+ config,
planner.getFlinkContext().getClassLoader()),
+ probeIndices));
+ leftInput.addOutput(1, runtimeFilterGenerator);
+ rightInput.addOutput(2, runtimeFilterGenerator);
+ return runtimeFilterGenerator;
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala
new file mode 100644
index 00000000000..8e5273a8c4d
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.fusion.spec
+
+import org.apache.flink.runtime.operators.util.BloomFilter
+import org.apache.flink.table.data.binary.BinaryRowData
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext,
GeneratedExpression}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{className,
newName, newNames}
+import org.apache.flink.table.planner.plan.fusion.{OpFusionCodegenSpecBase,
OpFusionContext}
+import org.apache.flink.table.planner.typeutils.RowTypeUtils
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.util.Preconditions
+
+import java.util
+
+/** The operator fusion codegen spec for RuntimeFilter. */
+class RuntimeFilterFusionCodegenSpec(opCodegenCtx: CodeGeneratorContext,
probeIndices: Array[Int])
+ extends OpFusionCodegenSpecBase(opCodegenCtx) {
+
+ private lazy val buildInputId = 1
+
+ private var buildContext: OpFusionContext = _
+ private var probeContext: OpFusionContext = _
+ private var buildType: RowType = _
+ private var probeType: RowType = _
+
+ private var buildComplete: String = _
+ private var filterTerm: String = _
+
+ override def setup(opFusionContext: OpFusionContext): Unit = {
+ super.setup(opFusionContext)
+ val inputContexts = fusionContext.getInputFusionContexts
+ assert(inputContexts.size == 2)
+ buildContext = inputContexts.get(0)
+ probeContext = inputContexts.get(1)
+
+ buildType = buildContext.getOutputType
+ probeType = probeContext.getOutputType
+ }
+
+ override def variablePrefix(): String = "runtimeFilter"
+
+ override def doProcessProduce(codegenCtx: CodeGeneratorContext): Unit = {
+ // call build side first, then call probe side
+ buildContext.processProduce(codegenCtx)
+ probeContext.processProduce(codegenCtx)
+ }
+
+ override def doEndInputProduce(codegenCtx: CodeGeneratorContext): Unit = {
+ // call build side first, then call probe side
+ buildContext.endInputProduce(codegenCtx)
+ probeContext.endInputProduce(codegenCtx)
+ }
+
+ override def doProcessConsume(
+ inputId: Int,
+ inputVars: util.List[GeneratedExpression],
+ row: GeneratedExpression): String = {
+ if (inputId == buildInputId) {
+ buildComplete = newName("buildComplete")
+ opCodegenCtx.addReusableMember(s"private transient boolean
$buildComplete;")
+ opCodegenCtx.addReusableOpenStatement(s"$buildComplete = false;")
+
+ filterTerm = newName("filter")
+ val filterClass = className[BloomFilter]
+ opCodegenCtx.addReusableMember(s"private transient $filterClass
$filterTerm;")
+
+ s"""
+ |${className[Preconditions]}.checkState(!$buildComplete, "Should not
build completed.");
+ |if ($filterTerm == null && !${row.resultTerm}.isNullAt(1)) {
+ | $filterTerm =
$filterClass.fromBytes(${row.resultTerm}.getBinary(1));
+ |}
+ |""".stripMargin
+ } else {
+ val Seq(probeKeyTerm, probeKeyWriterTerm) = newNames("probeKeyTerm",
"probeKeyWriterTerm")
+ // project probe key row from input
+ val probeKeyExprs = probeIndices.map(idx => inputVars.get(idx))
+ val keyProjectionCode = getExprCodeGenerator
+ .generateResultExpression(
+ probeKeyExprs,
+ RowTypeUtils.projectRowType(probeType, probeIndices),
+ classOf[BinaryRowData],
+ probeKeyTerm,
+ outRowWriter = Option(probeKeyWriterTerm))
+ .code
+
+ val found = newName("found")
+ s"""
+ |${className[Preconditions]}.checkState($buildComplete, "Should build
completed.");
+ |
+ |boolean $found = true;
+ |if ($filterTerm != null) {
+ | // compute the hash code of probe key
+ | $keyProjectionCode
+ | final int hashCode = $probeKeyTerm.hashCode();
+ | if (!$filterTerm.testHash(hashCode)) {
+ | $found = false;
+ | }
+ |}
+ |// if found, call downstream to consume the row
+ |if($found) {
+ | ${row.code}
+ | ${fusionContext.processConsume(null, row.resultTerm)}
+ |}
+ |""".stripMargin
+ }
+ }
+
+ override def doEndInputConsume(inputId: Int): String = {
+ if (inputId == buildInputId) {
+ s"""
+ |${className[Preconditions]}.checkState(!$buildComplete, "Should not
build completed.");
+ |LOG.info("RuntimeFilter build completed.");
+ |$buildComplete = true;
+ |""".stripMargin
+ } else {
+ s"""
+ |${className[Preconditions]}.checkState($buildComplete, "Should build
completed.");
+ |LOG.info("Finish RuntimeFilter probe phase.");
+ |// call downstream endInput method
+ |${fusionContext.endInputConsume()}
+ |""".stripMargin
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RuntimeFilterITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RuntimeFilterITCase.java
index a77720f6ee3..7dd127f8cc5 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RuntimeFilterITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RuntimeFilterITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.runtime.batch.sql;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
@@ -49,8 +50,10 @@ class RuntimeFilterITCase extends BatchTestBase {
static Stream<Arguments> parameters() {
return Stream.of(
- Arguments.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING),
- Arguments.of(BatchShuffleMode.ALL_EXCHANGES_PIPELINED));
+ Arguments.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING, true),
+ Arguments.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING, false),
+ Arguments.of(BatchShuffleMode.ALL_EXCHANGES_PIPELINED, true),
+ Arguments.of(BatchShuffleMode.ALL_EXCHANGES_PIPELINED, false));
}
@BeforeEach
@@ -112,9 +115,11 @@ class RuntimeFilterITCase extends BatchTestBase {
false);
}
- @ParameterizedTest(name = "mode = {0}")
+ @ParameterizedTest(name = "mode = {0}, ofcg = {1}")
@MethodSource("parameters")
- void testSimpleRuntimeFilter(BatchShuffleMode shuffleMode) {
+ void testSimpleRuntimeFilter(BatchShuffleMode shuffleMode, boolean ofcg) {
+ tEnv.getConfig()
+
.set(ExecutionConfigOptions.TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED, ofcg);
configBatchShuffleMode(tEnv.getConfig(), shuffleMode);
checkResult(
"select * from fact, dim where x = a and z = 3",
@@ -131,9 +136,11 @@ class RuntimeFilterITCase extends BatchTestBase {
false);
}
- @ParameterizedTest(name = "mode = {0}")
+ @ParameterizedTest(name = "mode = {0}, ofcg = {1}")
@MethodSource("parameters")
- void testRuntimeFilterWithBuildSidePushDown(BatchShuffleMode shuffleMode) {
+ void testRuntimeFilterWithBuildSidePushDown(BatchShuffleMode shuffleMode,
boolean ofcg) {
+ tEnv.getConfig()
+
.set(ExecutionConfigOptions.TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED, ofcg);
configBatchShuffleMode(tEnv.getConfig(), shuffleMode);
// The following two config are used to let the build side is a direct
Agg (without
// Exchange)
@@ -162,9 +169,12 @@ class RuntimeFilterITCase extends BatchTestBase {
false);
}
- @ParameterizedTest(name = "mode = {0}")
+ @ParameterizedTest(name = "mode = {0}, ofcg = {1}")
@MethodSource("parameters")
- void testRuntimeFilterWithProbeSidePushDown(BatchShuffleMode shuffleMode)
throws Exception {
+ void testRuntimeFilterWithProbeSidePushDown(BatchShuffleMode shuffleMode,
boolean ofcg)
+ throws Exception {
+ tEnv.getConfig()
+
.set(ExecutionConfigOptions.TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED, ofcg);
configBatchShuffleMode(tEnv.getConfig(), shuffleMode);
tEnv.executeSql(
String.format(