This is an automated email from the ASF dual-hosted git repository.

jingge pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b1544e4e513 [FLINK-32622][table-planner] Optimize mini-batch 
assignment (#23470)
b1544e4e513 is described below

commit b1544e4e513d2b75b350c20dbb1c17a8232c22fd
Author: Jeyhun Karimov <je.kari...@gmail.com>
AuthorDate: Mon Apr 29 04:46:22 2024 +0200

    [FLINK-32622][table-planner] Optimize mini-batch assignment (#23470)
    
    * [FLINK-32622][table-planner] Do not add mini-batch assigner operator if 
it is useless
---
 .../StreamCommonSubGraphBasedOptimizer.scala       |  30 ++++-
 .../table/planner/plan/utils/FlinkRexUtil.scala    |  30 +++++
 .../plan/optimize/MiniBatchOptimizationTest.java   | 137 +++++++++++++++++++++
 .../flink/table/planner/utils/TableTestBase.scala  |  22 ++++
 4 files changed, 214 insertions(+), 5 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
index b9845aaa304..0d7dc4fff69 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
@@ -30,6 +30,7 @@ import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysical
 import 
org.apache.flink.table.planner.plan.optimize.program.{FlinkStreamProgram, 
StreamOptimizeContext}
 import org.apache.flink.table.planner.plan.schema.IntermediateRelTable
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import 
org.apache.flink.table.planner.plan.utils.FlinkRexUtil.shouldSkipMiniBatch
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext
 import org.apache.flink.table.planner.utils.TableConfigUtils
 import org.apache.flink.util.Preconditions
@@ -46,10 +47,9 @@ import scala.collection.JavaConversions._
 class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner)
   extends CommonSubGraphBasedOptimizer {
 
-  override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {
-    val tableConfig = planner.getTableConfig
-    // build RelNodeBlock plan
-    val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, 
tableConfig)
+  private def optimizeSinkBlocks(
+      tableConfig: TableConfig,
+      sinkBlocks: Seq[RelNodeBlock]): Seq[RelNodeBlock] = {
     // infer trait properties for sink block
     sinkBlocks.foreach {
       sinkBlock =>
@@ -84,7 +84,6 @@ class StreamCommonSubGraphBasedOptimizer(planner: 
StreamPlanner)
       block.setOptimizedPlan(optimizedTree)
       return sinkBlocks
     }
-
     // TODO FLINK-24048: Move changeLog inference out of optimizing phase
     // infer modifyKind property for each blocks independently
     sinkBlocks.foreach(b => optimizeBlock(b, isSinkBlock = true))
@@ -104,6 +103,27 @@ class StreamCommonSubGraphBasedOptimizer(planner: 
StreamPlanner)
     sinkBlocks
   }
 
+  override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {
+    val tableConfig = planner.getTableConfig
+    // build RelNodeBlock plan
+    val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, 
tableConfig)
+    // get the original configuration, and disable it if it is unnecessary
+    val origMiniBatchEnabled = 
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
+    try {
+      if (origMiniBatchEnabled) {
+        tableConfig.set(
+          ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
+          Boolean.box(!shouldSkipMiniBatch(sinkBlocks)))
+      }
+      optimizeSinkBlocks(tableConfig, sinkBlocks)
+    } finally {
+      // revert the changed configuration back in the end
+      tableConfig.getConfiguration.set(
+        ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
+        origMiniBatchEnabled)
+    }
+  }
+
   private def optimizeBlock(block: RelNodeBlock, isSinkBlock: Boolean): Unit = 
{
     block.children.foreach {
       child =>
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
index 82590106330..55caaf9fe40 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
@@ -21,6 +21,8 @@ import org.apache.flink.annotation.Experimental
 import org.apache.flink.configuration.ConfigOption
 import org.apache.flink.configuration.ConfigOptions.key
 import org.apache.flink.table.planner.functions.sql.SqlTryCastFunction
+import org.apache.flink.table.planner.plan.nodes.calcite.{LegacySink, Sink}
+import org.apache.flink.table.planner.plan.optimize.RelNodeBlock
 import 
org.apache.flink.table.planner.plan.utils.ExpressionDetail.ExpressionDetail
 import 
org.apache.flink.table.planner.plan.utils.ExpressionFormat.ExpressionFormat
 import org.apache.flink.table.planner.utils.{ShortcutUtils, TableConfigUtils}
@@ -31,6 +33,8 @@ import org.apache.calcite.avatica.util.ByteString
 import org.apache.calcite.plan.{RelOptPredicateList, RelOptUtil}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Calc, Filter, Project, TableScan, Values}
+import org.apache.calcite.rel.logical.LogicalUnion
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.{SqlAsOperator, SqlKind, SqlOperator}
@@ -90,6 +94,32 @@ object FlinkRexUtil {
     new CnfHelper(rexBuilder, maxCnfNodeCnt).toCnf(rex)
   }
 
+  /**
+   * Returns true if a input blocks only consist of [[Filter]], [[Project]], 
[[TableScan]],
+   * [[Calc]], [[Values]], and [[Sink]] nodes.
+   */
+  def shouldSkipMiniBatch(blocks: Seq[RelNodeBlock]): Boolean = {
+
+    val noMiniBatchRequired = {
+      (node: RelNode) =>
+        node match {
+          case _: Filter | _: Project | _: TableScan | _: Calc | _: Values | 
_: Sink |
+              _: LegacySink =>
+            true
+          case unionNode: LogicalUnion => unionNode.all
+          case _ => false
+        }
+    }
+
+    def nodeTraverser(node: RelNode): Boolean = {
+      noMiniBatchRequired(node) && node.getInputs
+        .map(n => nodeTraverser(n))
+        .forall(r => r)
+    }
+
+    blocks.map(b => nodeTraverser(b.outputNode)).forall(r => r)
+  }
+
   /** Returns true if the RexNode contains any node in the given expected 
[[RexInputRef]] nodes. */
   def containsExpectedInputRef(rex: RexNode, expectedInputRefs: 
ImmutableBitSet): Boolean = {
     val visitor = new InputRefVisitor
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/MiniBatchOptimizationTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/MiniBatchOptimizationTest.java
new file mode 100644
index 00000000000..9a7c97c5e1a
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/MiniBatchOptimizationTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.Collections;
+
+/**
+ * Test for enabling/disabling mini-batch assigner operator based on query 
plan. The optimization is
+ * performed in {@link StreamCommonSubGraphBasedOptimizer}.
+ */
+@ExtendWith(ParameterizedTestExtension.class)
+public class MiniBatchOptimizationTest extends TableTestBase {
+
+    private final StreamTableTestUtil util = 
streamTestUtil(TableConfig.getDefault());
+
+    @Parameter public boolean isMiniBatchEnabled;
+
+    public long miniBatchLatency;
+    public long miniBatchSize;
+
+    @BeforeEach
+    public void setup() {
+        miniBatchLatency = 5L;
+        miniBatchSize = 10L;
+        util.tableEnv()
+                .getConfig()
+                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
isMiniBatchEnabled)
+                .set(
+                        
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+                        Duration.ofSeconds(miniBatchLatency))
+                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 
miniBatchSize);
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE MyTableA (\n"
+                                + "  a BIGINT,\n"
+                                + "  b INT NOT NULL,\n"
+                                + "  c VARCHAR,\n"
+                                + "  d BIGINT\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = 'false')");
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE MyTableB (\n"
+                                + "  a BIGINT,\n"
+                                + "  b INT NOT NULL,\n"
+                                + "  c VARCHAR,\n"
+                                + "  d BIGINT\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = 'false')");
+    }
+
+    @TestTemplate
+    public void testMiniBatchWithAggregation() {
+        final String aggQuery =
+                "SELECT\n"
+                        + "  AVG(a) AS avg_a,\n"
+                        + "  COUNT(*) AS cnt,\n"
+                        + "  count(b) AS cnt_b,\n"
+                        + "  min(b) AS min_b,\n"
+                        + "  MAX(c) FILTER (WHERE a > 1) AS max_c\n"
+                        + "FROM MyTableA";
+
+        if (isMiniBatchEnabled) {
+            util.verifyRelPlanExpected(
+                    aggQuery,
+                    JavaScalaConversionUtil.toScala(
+                            Collections.singletonList("MiniBatchAssigner")));
+        } else {
+            util.verifyRelPlanNotExpected(
+                    aggQuery,
+                    JavaScalaConversionUtil.toScala(
+                            Collections.singletonList("MiniBatchAssigner")));
+        }
+    }
+
+    @TestTemplate
+    public void testMiniBatchWithJoin() {
+        final String joinQuery = "SELECT * FROM MyTableA a, MyTableB b WHERE 
a.a = b.a";
+
+        if (isMiniBatchEnabled) {
+            util.verifyRelPlanExpected(
+                    joinQuery,
+                    JavaScalaConversionUtil.toScala(
+                            Collections.singletonList("MiniBatchAssigner")));
+        } else {
+            util.verifyRelPlanNotExpected(
+                    joinQuery,
+                    JavaScalaConversionUtil.toScala(
+                            Collections.singletonList("MiniBatchAssigner")));
+        }
+    }
+
+    @TestTemplate
+    public void testMiniBatchWithProjectFilter() {
+        final String joinQuery = "SELECT b FROM MyTableA a WHERE a.a > 123";
+        util.verifyRelPlanNotExpected(
+                joinQuery,
+                
JavaScalaConversionUtil.toScala(Collections.singletonList("MiniBatchAssigner")));
+    }
+
+    @Parameters(name = "isMiniBatchEnabled={0}")
+    public static Object[][] data() {
+        return new Object[][] {new Object[] {true}, new Object[] {false}};
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 1e006f3d94b..a4c8d0ec1f7 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -616,6 +616,28 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
       withQueryBlockAlias = false)
   }
 
+  /**
+   * Verify whether the optimized rel plan for the given SELECT query does not 
contain the
+   * `notExpected` strings.
+   */
+  def verifyRelPlanExpected(query: String, notExpected: String*): Unit = {
+    verifyRelPlanExpected(getTableEnv.sqlQuery(query), notExpected: _*)
+  }
+
+  /**
+   * Verify whether the optimized rel plan for the given [[Table]] does not 
contain the
+   * `notExpected` strings.
+   */
+  def verifyRelPlanExpected(table: Table, expected: String*): Unit = {
+    require(expected.nonEmpty)
+    val relNode = TableTestUtil.toRelNode(table)
+    val optimizedRel = getPlanner.optimize(relNode)
+    val optimizedPlan = getOptimizedRelPlan(Array(optimizedRel), Array.empty, 
withRowType = false)
+    val result = expected.forall(optimizedPlan.contains(_))
+    val message = s"\nactual 
plan:\n$optimizedPlan\nexpected:\n${expected.mkString(", ")}"
+    assertTrue(result, message)
+  }
+
   /**
    * Verify whether the optimized rel plan for the given SELECT query does not 
contain the
    * `notExpected` strings.

Reply via email to