This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new d08e108  [FLINK-23962][table-planner] Fix updateKind trait propagating 
problem in changelog inference for DAG optimizing
d08e108 is described below

commit d08e10849fd711cf375fcfbf05b9e1fe6d6ce94f
Author: shuo.cs <[email protected]>
AuthorDate: Wed Aug 25 18:18:15 2021 +0800

    [FLINK-23962][table-planner] Fix updateKind trait propagating problem in 
changelog inference for DAG optimizing
    
    This closes #16979
    
    (cherry picked from commit ffd2520620069840b88cde72e2f02fd99a96ea70)
---
 .../StreamCommonSubGraphBasedOptimizer.scala       | 139 +++++++--------------
 .../physical/stream/ChangelogModeInferenceTest.xml |  65 ++++++++++
 .../stream/ChangelogModeInferenceTest.scala        |  64 ++++++++++
 3 files changed, 176 insertions(+), 92 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
index 7fc101f..fb9e10f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
@@ -26,11 +26,10 @@ import 
org.apache.flink.table.planner.delegation.StreamPlanner
 import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, 
MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode, 
ModifyKindSet, ModifyKindSetTraitDef, UpdateKind, UpdateKindTraitDef}
 import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.calcite.{LegacySink, Sink}
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDataStreamScan,
 StreamPhysicalIntermediateTableScan, StreamPhysicalRel}
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDataStreamScan,
 StreamPhysicalIntermediateTableScan, StreamPhysicalLegacyTableSourceScan, 
StreamPhysicalRel, StreamPhysicalTableSourceScan}
 import 
org.apache.flink.table.planner.plan.optimize.program.{FlinkStreamProgram, 
StreamOptimizeContext}
 import org.apache.flink.table.planner.plan.schema.IntermediateRelTable
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
-import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
 import org.apache.flink.table.planner.utils.TableConfigUtils
 import org.apache.flink.util.Preconditions
 
@@ -40,7 +39,6 @@ import org.apache.calcite.rex.RexBuilder
 
 import java.util
 import java.util.Collections
-
 import scala.collection.JavaConversions._
 
 /**
@@ -85,12 +83,14 @@ class StreamCommonSubGraphBasedOptimizer(planner: 
StreamPlanner)
       return sinkBlocks
     }
 
-    // infer updateAsRetraction property and miniBatchInterval property for 
all input blocks
+    // TODO FLINK-24048: Move changeLog inference out of optimizing phase
+    // infer modifyKind property for each blocks independently
+    sinkBlocks.foreach(b => optimizeBlock(b, isSinkBlock = true))
+    // infer and propagate updateKind and miniBatchInterval property for each 
blocks
     sinkBlocks.foreach { b =>
-      inferTraits(b, b.isUpdateBeforeRequired, b.getMiniBatchInterval, 
isSinkBlock = true)
+      propagateUpdateKindAndMiniBatchInterval(
+        b, b.isUpdateBeforeRequired, b.getMiniBatchInterval, isSinkBlock = 
true)
     }
-    // propagate updateAsRetraction property and miniBatchInterval property to 
all input blocks
-    sinkBlocks.foreach(propagateTraits(_, isSinkBlock = true))
     // clear the intermediate result
     sinkBlocks.foreach(resetIntermediateResult)
     // optimize recursively RelNodeBlock
@@ -184,102 +184,56 @@ class StreamCommonSubGraphBasedOptimizer(planner: 
StreamPlanner)
   }
 
   /**
-    * Infer UpdateAsRetraction property and MiniBatchInterval property for 
each block.
-    * NOTES: this method should not change the original RelNode tree.
-    *
-    * @param block              The [[RelNodeBlock]] instance.
-    * @param updateBeforeRequired True if UPDATE_BEFORE message is required 
for updates
-    * @param miniBatchInterval  mini-batch interval of the block.
-    * @param isSinkBlock        True if the given block is sink block.
-    */
-  private def inferTraits(
+   * Infer updateKind and MiniBatchInterval property for each block.
+   * Optimize order: from parent block to child blocks.
+   * NOTES: this method should not change the original RelNode tree.
+   *
+   * @param block              The [[RelNodeBlock]] instance.
+   * @param updateBeforeRequired True if UPDATE_BEFORE message is required for 
updates
+   * @param miniBatchInterval  mini-batch interval of the block.
+   * @param isSinkBlock        True if the given block is sink block.
+   */
+  private def propagateUpdateKindAndMiniBatchInterval(
       block: RelNodeBlock,
       updateBeforeRequired: Boolean,
       miniBatchInterval: MiniBatchInterval,
       isSinkBlock: Boolean): Unit = {
+    val blockLogicalPlan = block.getPlan
+    // infer updateKind and miniBatchInterval with required trait
+    val optimizedPlan = optimizeTree(
+      blockLogicalPlan, updateBeforeRequired, miniBatchInterval, isSinkBlock)
+    // propagate the inferred updateKind and miniBatchInterval to the child 
blocks
+    propagateTraits(optimizedPlan)
 
     block.children.foreach {
       child =>
-        if (child.getNewOutputNode.isEmpty) {
-          inferTraits(
-            child,
-            updateBeforeRequired = false,
-            miniBatchInterval = MiniBatchInterval.NONE,
-            isSinkBlock = false)
-        }
-    }
-
-    val blockLogicalPlan = block.getPlan
-    blockLogicalPlan match {
-      case _: LegacySink | _: Sink =>
-        require(isSinkBlock)
-        val optimizedPlan = optimizeTree(
-          blockLogicalPlan, updateBeforeRequired, miniBatchInterval, 
isSinkBlock = true)
-        block.setOptimizedPlan(optimizedPlan)
-
-      case o =>
-        val optimizedPlan = optimizeTree(
-          o, updateBeforeRequired, miniBatchInterval, isSinkBlock = 
isSinkBlock)
-        val name = createUniqueIntermediateRelTableName
-        val modifyKindSetTrait = 
optimizedPlan.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
-        val intermediateRelTable = createIntermediateRelTable(
-          name,
-          optimizedPlan,
-          modifyKindSetTrait.modifyKindSet,
-          // use false for block trait inference and propagation
-          isUpdateBeforeRequired = false)
-        val newTableScan = 
wrapIntermediateRelTableToTableScan(intermediateRelTable, name)
-        block.setNewOutputNode(newTableScan)
-        block.setOutputTableName(name)
-        block.setOptimizedPlan(optimizedPlan)
+        propagateUpdateKindAndMiniBatchInterval(
+          child,
+          updateBeforeRequired = child.isUpdateBeforeRequired,
+          miniBatchInterval = child.getMiniBatchInterval,
+          isSinkBlock = false)
     }
-  }
 
-  /**
-    * Propagate updateBeforeRequired property and miniBatchInterval property 
to all input blocks.
-    *
-    * @param block The [[RelNodeBlock]] instance.
-    * @param isSinkBlock True if the given block is sink block.
-    */
-  private def propagateTraits(block: RelNodeBlock, isSinkBlock: Boolean): Unit 
= {
-
-    // process current block
-    def shipTraits(
-        rel: RelNode,
-        miniBatchInterval: MiniBatchInterval): Unit = {
-      rel match {
-        case _: StreamPhysicalDataStreamScan | _: 
StreamPhysicalIntermediateTableScan =>
-          val scan = rel.asInstanceOf[TableScan]
-          val updateKindTrait = 
scan.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
-          val miniBatchIntervalTrait = 
scan.getTraitSet.getTrait(MiniBatchIntervalTraitDef.INSTANCE)
-          val tableName = scan.getTable.getQualifiedName.mkString(".")
-          val inputBlocks = block.children.filter(b => 
tableName.equals(b.getOutputTableName))
-          Preconditions.checkArgument(inputBlocks.size <= 1)
-          if (inputBlocks.size == 1) {
-            val mergedInterval = if (isSinkBlock) {
-              // traits of sinkBlock have already been
-              // initialized before first round of optimization.
-              miniBatchIntervalTrait.getMiniBatchInterval
-            } else {
-              FlinkRelOptUtil.mergeMiniBatchInterval(
-                miniBatchIntervalTrait.getMiniBatchInterval, miniBatchInterval)
-            }
-            val newInterval = FlinkRelOptUtil.mergeMiniBatchInterval(
-              inputBlocks.head.getMiniBatchInterval,mergedInterval)
-            inputBlocks.head.setMiniBatchInterval(newInterval)
-
-            if (updateKindTrait.updateKind == UpdateKind.BEFORE_AND_AFTER) {
-              inputBlocks.head.setUpdateBeforeRequired(true)
-            }
-          }
-        case ser: StreamPhysicalRel => ser.getInputs.foreach { e =>
-          shipTraits(e, miniBatchInterval)
+    def propagateTraits(rel: RelNode): Unit = rel match {
+      case _: StreamPhysicalDataStreamScan | _: 
StreamPhysicalIntermediateTableScan |
+           _: StreamPhysicalLegacyTableSourceScan | _: 
StreamPhysicalTableSourceScan =>
+        val scan = rel.asInstanceOf[TableScan]
+        val updateKindTrait = 
scan.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
+        val miniBatchIntervalTrait = 
scan.getTraitSet.getTrait(MiniBatchIntervalTraitDef.INSTANCE)
+        val tableName = scan.getTable.getQualifiedName.mkString(".")
+        val inputBlocks = block.children.filter(b => 
tableName.equals(b.getOutputTableName))
+        Preconditions.checkArgument(inputBlocks.size <= 1)
+        if (inputBlocks.size == 1) {
+          val childBlock = inputBlocks.head
+          // propagate miniBatchInterval trait to child block
+          
childBlock.setMiniBatchInterval(miniBatchIntervalTrait.getMiniBatchInterval)
+          // propagate updateKind trait to child block
+          val requireUB = updateKindTrait.updateKind == 
UpdateKind.BEFORE_AND_AFTER
+          childBlock.setUpdateBeforeRequired(requireUB || 
childBlock.isUpdateBeforeRequired)
         }
-      }
+      case ser: StreamPhysicalRel => ser.getInputs.foreach(e => 
propagateTraits(e))
+      case _ => // do nothing
     }
-
-    shipTraits(block.getOptimizedPlan, block.getMiniBatchInterval)
-    block.children.foreach(propagateTraits(_, isSinkBlock = false))
   }
 
   /**
@@ -290,6 +244,7 @@ class StreamCommonSubGraphBasedOptimizer(planner: 
StreamPlanner)
   private def resetIntermediateResult(block: RelNodeBlock): Unit = {
     block.setNewOutputNode(null)
     block.setOutputTableName(null)
+    block.setOptimizedPlan(null)
 
     block.children.foreach {
       child =>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
index ad647c8..13b8870 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
@@ -220,4 +220,69 @@ Calc(select=[amount, currency, rowtime, 
PROCTIME_MATERIALIZE(proctime) AS procti
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testPropagateUpdateKindAmongRelNodeBlocks">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[number, 
word])
++- LogicalProject(number=[$0], word=[$1])
+   +- LogicalFilter(condition=[>($1, _UTF-16LE'a')])
+      +- LogicalUnion(all=[true])
+         :- LogicalProject(number=[+($1, 1)], word=[$0])
+         :  +- LogicalAggregate(group=[{0}], number=[SUM($1)])
+         :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [CollectionTableSource(word, number)]]])
+         +- LogicalProject(number=[-($1, 1)], word=[$0])
+            +- LogicalAggregate(group=[{0}], number=[SUM($1)])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [CollectionTableSource(word, number)]]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[number, 
word])
++- LogicalProject(number=[$0], word=[$1])
+   +- LogicalFilter(condition=[<($1, _UTF-16LE'a')])
+      +- LogicalUnion(all=[true])
+         :- LogicalProject(number=[+($1, 1)], word=[$0])
+         :  +- LogicalAggregate(group=[{0}], number=[SUM($1)])
+         :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [CollectionTableSource(word, number)]]])
+         +- LogicalProject(number=[-($1, 1)], word=[$0])
+            +- LogicalAggregate(group=[{0}], number=[SUM($1)])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [CollectionTableSource(word, number)]]])
+
+LogicalSink(table=[default_catalog.default_database.sink2], fields=[number, 
word])
++- LogicalProject(number=[$1], word=[$0])
+   +- LogicalAggregate(group=[{0}], number=[SUM($1)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [CollectionTableSource(word, number)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[number, word], 
changelogMode=[NONE])
++- Calc(select=[number, word], where=[>(word, _UTF-16LE'a')], 
changelogMode=[I,UB,UA])
+   +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
+      :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
+      :  +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS 
number], changelogMode=[I,UB,UA])
+      :     +- Exchange(distribution=[hash[word]], changelogMode=[I])
+      :        +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [CollectionTableSource(word, number)]]], 
fields=[word, number], changelogMode=[I])
+      +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
+         +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS 
number], changelogMode=[I,UB,UA])
+            +- Exchange(distribution=[hash[word]], changelogMode=[I])
+               +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [CollectionTableSource(word, number)]]], 
fields=[word, number], changelogMode=[I])
+
+Sink(table=[default_catalog.default_database.sink1], fields=[number, word], 
changelogMode=[NONE])
++- Calc(select=[number, word], where=[<(word, _UTF-16LE'a')], 
changelogMode=[I,UB,UA])
+   +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
+      :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
+      :  +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS 
number], changelogMode=[I,UB,UA])
+      :     +- Exchange(distribution=[hash[word]], changelogMode=[I])
+      :        +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [CollectionTableSource(word, number)]]], 
fields=[word, number], changelogMode=[I])
+      +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
+         +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS 
number], changelogMode=[I,UB,UA])
+            +- Exchange(distribution=[hash[word]], changelogMode=[I])
+               +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [CollectionTableSource(word, number)]]], 
fields=[word, number], changelogMode=[I])
+
+Sink(table=[default_catalog.default_database.sink2], fields=[number, word], 
changelogMode=[NONE])
++- Calc(select=[number, word], changelogMode=[I,UB,UA])
+   +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], 
changelogMode=[I,UB,UA])
+      +- Exchange(distribution=[hash[word]], changelogMode=[I])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, 
number], changelogMode=[I])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
index f1a4d39..99dfa2c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
@@ -21,6 +21,7 @@ package 
org.apache.flink.table.planner.plan.rules.physical.stream
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.table.api.{ExplainDetail, _}
 import org.apache.flink.table.api.config.OptimizerConfigOptions
+import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
 import 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram
 import org.apache.flink.table.planner.utils.{AggregatePhaseStrategy, 
TableTestBase}
 
@@ -181,4 +182,67 @@ class ChangelogModeInferenceTest extends TableTestBase {
       """.stripMargin
     util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
   }
+
+  @Test
+  def testPropagateUpdateKindAmongRelNodeBlocks(): Unit = {
+    util.tableEnv.getConfig.getConfiguration.setBoolean(
+      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+      true)
+    util.addTable(
+      """
+        |create table sink1 (
+        |  a INT,
+        |  b VARCHAR
+        |) with (
+        |  'connector' = 'values',
+        |  'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+    util.addTable(
+      """
+        |create table sink2 (
+        |  a INT,
+        |  b VARCHAR,
+        |  primary key (b) not enforced
+        |) with (
+        |  'connector' = 'values',
+        |  'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+
+    util.tableEnv.executeSql(
+      """
+        |CREATE VIEW v1 AS
+        |SELECT
+        |  SUM(number) AS number, word
+        |FROM MyTable
+        |GROUP BY word
+        |""".stripMargin)
+
+    util.tableEnv.executeSql(
+      """
+        |CREATE VIEW v2 AS
+        |SELECT number + 1 AS number, word FROM v1
+        |UNION ALL
+        |SELECT number - 1 AS number, word FROM v1
+        |""".stripMargin)
+
+    val statementSet = util.tableEnv.createStatementSet()
+    // sink1 requires UB
+    statementSet.addInsertSql(
+      """
+        |INSERT INTO sink1 SELECT number, word FROM v2 WHERE word > 'a'
+        |""".stripMargin)
+    // sink2 requires UB
+    statementSet.addInsertSql(
+      """
+        |INSERT INTO sink1 SELECT number, word FROM v2 WHERE word < 'a'
+        |""".stripMargin)
+    // sink3 does not require UB
+    statementSet.addInsertSql(
+      """
+        |INSERT INTO sink2 SELECT * FROM v1
+        |""".stripMargin)
+    util.verifyRelPlan(statementSet, ExplainDetail.CHANGELOG_MODE)
+  }
 }

Reply via email to