This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ffd2520 [FLINK-23962][table-planner] Fix updateKind trait propagating
problem in changelog inference for DAG optimizing
ffd2520 is described below
commit ffd2520620069840b88cde72e2f02fd99a96ea70
Author: shuo.cs <[email protected]>
AuthorDate: Wed Aug 25 18:18:15 2021 +0800
[FLINK-23962][table-planner] Fix updateKind trait propagating problem in
changelog inference for DAG optimizing
This closes #16979
---
.../StreamCommonSubGraphBasedOptimizer.scala | 139 +++++++--------------
.../physical/stream/ChangelogModeInferenceTest.xml | 65 ++++++++++
.../stream/ChangelogModeInferenceTest.scala | 64 ++++++++++
3 files changed, 176 insertions(+), 92 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
index 7fc101f..fb9e10f 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
@@ -26,11 +26,10 @@ import
org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval,
MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode,
ModifyKindSet, ModifyKindSetTraitDef, UpdateKind, UpdateKindTraitDef}
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.calcite.{LegacySink, Sink}
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDataStreamScan,
StreamPhysicalIntermediateTableScan, StreamPhysicalRel}
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDataStreamScan,
StreamPhysicalIntermediateTableScan, StreamPhysicalLegacyTableSourceScan,
StreamPhysicalRel, StreamPhysicalTableSourceScan}
import
org.apache.flink.table.planner.plan.optimize.program.{FlinkStreamProgram,
StreamOptimizeContext}
import org.apache.flink.table.planner.plan.schema.IntermediateRelTable
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
-import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
import org.apache.flink.table.planner.utils.TableConfigUtils
import org.apache.flink.util.Preconditions
@@ -40,7 +39,6 @@ import org.apache.calcite.rex.RexBuilder
import java.util
import java.util.Collections
-
import scala.collection.JavaConversions._
/**
@@ -85,12 +83,14 @@ class StreamCommonSubGraphBasedOptimizer(planner:
StreamPlanner)
return sinkBlocks
}
- // infer updateAsRetraction property and miniBatchInterval property for
all input blocks
+ // TODO FLINK-24048: Move changeLog inference out of optimizing phase
+ // infer modifyKind property for each blocks independently
+ sinkBlocks.foreach(b => optimizeBlock(b, isSinkBlock = true))
+ // infer and propagate updateKind and miniBatchInterval property for each
blocks
sinkBlocks.foreach { b =>
- inferTraits(b, b.isUpdateBeforeRequired, b.getMiniBatchInterval,
isSinkBlock = true)
+ propagateUpdateKindAndMiniBatchInterval(
+ b, b.isUpdateBeforeRequired, b.getMiniBatchInterval, isSinkBlock =
true)
}
- // propagate updateAsRetraction property and miniBatchInterval property to
all input blocks
- sinkBlocks.foreach(propagateTraits(_, isSinkBlock = true))
// clear the intermediate result
sinkBlocks.foreach(resetIntermediateResult)
// optimize recursively RelNodeBlock
@@ -184,102 +184,56 @@ class StreamCommonSubGraphBasedOptimizer(planner:
StreamPlanner)
}
/**
- * Infer UpdateAsRetraction property and MiniBatchInterval property for
each block.
- * NOTES: this method should not change the original RelNode tree.
- *
- * @param block The [[RelNodeBlock]] instance.
- * @param updateBeforeRequired True if UPDATE_BEFORE message is required
for updates
- * @param miniBatchInterval mini-batch interval of the block.
- * @param isSinkBlock True if the given block is sink block.
- */
- private def inferTraits(
+ * Infer updateKind and MiniBatchInterval property for each block.
+ * Optimize order: from parent block to child blocks.
+ * NOTES: this method should not change the original RelNode tree.
+ *
+ * @param block The [[RelNodeBlock]] instance.
+ * @param updateBeforeRequired True if UPDATE_BEFORE message is required for
updates
+ * @param miniBatchInterval mini-batch interval of the block.
+ * @param isSinkBlock True if the given block is sink block.
+ */
+ private def propagateUpdateKindAndMiniBatchInterval(
block: RelNodeBlock,
updateBeforeRequired: Boolean,
miniBatchInterval: MiniBatchInterval,
isSinkBlock: Boolean): Unit = {
+ val blockLogicalPlan = block.getPlan
+ // infer updateKind and miniBatchInterval with required trait
+ val optimizedPlan = optimizeTree(
+ blockLogicalPlan, updateBeforeRequired, miniBatchInterval, isSinkBlock)
+ // propagate the inferred updateKind and miniBatchInterval to the child
blocks
+ propagateTraits(optimizedPlan)
block.children.foreach {
child =>
- if (child.getNewOutputNode.isEmpty) {
- inferTraits(
- child,
- updateBeforeRequired = false,
- miniBatchInterval = MiniBatchInterval.NONE,
- isSinkBlock = false)
- }
- }
-
- val blockLogicalPlan = block.getPlan
- blockLogicalPlan match {
- case _: LegacySink | _: Sink =>
- require(isSinkBlock)
- val optimizedPlan = optimizeTree(
- blockLogicalPlan, updateBeforeRequired, miniBatchInterval,
isSinkBlock = true)
- block.setOptimizedPlan(optimizedPlan)
-
- case o =>
- val optimizedPlan = optimizeTree(
- o, updateBeforeRequired, miniBatchInterval, isSinkBlock =
isSinkBlock)
- val name = createUniqueIntermediateRelTableName
- val modifyKindSetTrait =
optimizedPlan.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
- val intermediateRelTable = createIntermediateRelTable(
- name,
- optimizedPlan,
- modifyKindSetTrait.modifyKindSet,
- // use false for block trait inference and propagation
- isUpdateBeforeRequired = false)
- val newTableScan =
wrapIntermediateRelTableToTableScan(intermediateRelTable, name)
- block.setNewOutputNode(newTableScan)
- block.setOutputTableName(name)
- block.setOptimizedPlan(optimizedPlan)
+ propagateUpdateKindAndMiniBatchInterval(
+ child,
+ updateBeforeRequired = child.isUpdateBeforeRequired,
+ miniBatchInterval = child.getMiniBatchInterval,
+ isSinkBlock = false)
}
- }
- /**
- * Propagate updateBeforeRequired property and miniBatchInterval property
to all input blocks.
- *
- * @param block The [[RelNodeBlock]] instance.
- * @param isSinkBlock True if the given block is sink block.
- */
- private def propagateTraits(block: RelNodeBlock, isSinkBlock: Boolean): Unit
= {
-
- // process current block
- def shipTraits(
- rel: RelNode,
- miniBatchInterval: MiniBatchInterval): Unit = {
- rel match {
- case _: StreamPhysicalDataStreamScan | _:
StreamPhysicalIntermediateTableScan =>
- val scan = rel.asInstanceOf[TableScan]
- val updateKindTrait =
scan.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
- val miniBatchIntervalTrait =
scan.getTraitSet.getTrait(MiniBatchIntervalTraitDef.INSTANCE)
- val tableName = scan.getTable.getQualifiedName.mkString(".")
- val inputBlocks = block.children.filter(b =>
tableName.equals(b.getOutputTableName))
- Preconditions.checkArgument(inputBlocks.size <= 1)
- if (inputBlocks.size == 1) {
- val mergedInterval = if (isSinkBlock) {
- // traits of sinkBlock have already been
- // initialized before first round of optimization.
- miniBatchIntervalTrait.getMiniBatchInterval
- } else {
- FlinkRelOptUtil.mergeMiniBatchInterval(
- miniBatchIntervalTrait.getMiniBatchInterval, miniBatchInterval)
- }
- val newInterval = FlinkRelOptUtil.mergeMiniBatchInterval(
- inputBlocks.head.getMiniBatchInterval,mergedInterval)
- inputBlocks.head.setMiniBatchInterval(newInterval)
-
- if (updateKindTrait.updateKind == UpdateKind.BEFORE_AND_AFTER) {
- inputBlocks.head.setUpdateBeforeRequired(true)
- }
- }
- case ser: StreamPhysicalRel => ser.getInputs.foreach { e =>
- shipTraits(e, miniBatchInterval)
+ def propagateTraits(rel: RelNode): Unit = rel match {
+ case _: StreamPhysicalDataStreamScan | _:
StreamPhysicalIntermediateTableScan |
+ _: StreamPhysicalLegacyTableSourceScan | _:
StreamPhysicalTableSourceScan =>
+ val scan = rel.asInstanceOf[TableScan]
+ val updateKindTrait =
scan.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
+ val miniBatchIntervalTrait =
scan.getTraitSet.getTrait(MiniBatchIntervalTraitDef.INSTANCE)
+ val tableName = scan.getTable.getQualifiedName.mkString(".")
+ val inputBlocks = block.children.filter(b =>
tableName.equals(b.getOutputTableName))
+ Preconditions.checkArgument(inputBlocks.size <= 1)
+ if (inputBlocks.size == 1) {
+ val childBlock = inputBlocks.head
+ // propagate miniBatchInterval trait to child block
+
childBlock.setMiniBatchInterval(miniBatchIntervalTrait.getMiniBatchInterval)
+ // propagate updateKind trait to child block
+ val requireUB = updateKindTrait.updateKind ==
UpdateKind.BEFORE_AND_AFTER
+ childBlock.setUpdateBeforeRequired(requireUB ||
childBlock.isUpdateBeforeRequired)
}
- }
+ case ser: StreamPhysicalRel => ser.getInputs.foreach(e =>
propagateTraits(e))
+ case _ => // do nothing
}
-
- shipTraits(block.getOptimizedPlan, block.getMiniBatchInterval)
- block.children.foreach(propagateTraits(_, isSinkBlock = false))
}
/**
@@ -290,6 +244,7 @@ class StreamCommonSubGraphBasedOptimizer(planner:
StreamPlanner)
private def resetIntermediateResult(block: RelNodeBlock): Unit = {
block.setNewOutputNode(null)
block.setOutputTableName(null)
+ block.setOptimizedPlan(null)
block.children.foreach {
child =>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
index ad647c8..13b8870 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
@@ -220,4 +220,69 @@ Calc(select=[amount, currency, rowtime,
PROCTIME_MATERIALIZE(proctime) AS procti
]]>
</Resource>
</TestCase>
+ <TestCase name="testPropagateUpdateKindAmongRelNodeBlocks">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[number,
word])
++- LogicalProject(number=[$0], word=[$1])
+ +- LogicalFilter(condition=[>($1, _UTF-16LE'a')])
+ +- LogicalUnion(all=[true])
+ :- LogicalProject(number=[+($1, 1)], word=[$0])
+ : +- LogicalAggregate(group=[{0}], number=[SUM($1)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [CollectionTableSource(word, number)]]])
+ +- LogicalProject(number=[-($1, 1)], word=[$0])
+ +- LogicalAggregate(group=[{0}], number=[SUM($1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [CollectionTableSource(word, number)]]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[number,
word])
++- LogicalProject(number=[$0], word=[$1])
+ +- LogicalFilter(condition=[<($1, _UTF-16LE'a')])
+ +- LogicalUnion(all=[true])
+ :- LogicalProject(number=[+($1, 1)], word=[$0])
+ : +- LogicalAggregate(group=[{0}], number=[SUM($1)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [CollectionTableSource(word, number)]]])
+ +- LogicalProject(number=[-($1, 1)], word=[$0])
+ +- LogicalAggregate(group=[{0}], number=[SUM($1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [CollectionTableSource(word, number)]]])
+
+LogicalSink(table=[default_catalog.default_database.sink2], fields=[number,
word])
++- LogicalProject(number=[$1], word=[$0])
+ +- LogicalAggregate(group=[{0}], number=[SUM($1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [CollectionTableSource(word, number)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[number, word],
changelogMode=[NONE])
++- Calc(select=[number, word], where=[>(word, _UTF-16LE'a')],
changelogMode=[I,UB,UA])
+ +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
+ :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
+ : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS
number], changelogMode=[I,UB,UA])
+ : +- Exchange(distribution=[hash[word]], changelogMode=[I])
+ : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [CollectionTableSource(word, number)]]],
fields=[word, number], changelogMode=[I])
+ +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
+ +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS
number], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[word]], changelogMode=[I])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [CollectionTableSource(word, number)]]],
fields=[word, number], changelogMode=[I])
+
+Sink(table=[default_catalog.default_database.sink1], fields=[number, word],
changelogMode=[NONE])
++- Calc(select=[number, word], where=[<(word, _UTF-16LE'a')],
changelogMode=[I,UB,UA])
+ +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
+ :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
+ : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS
number], changelogMode=[I,UB,UA])
+ : +- Exchange(distribution=[hash[word]], changelogMode=[I])
+ : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [CollectionTableSource(word, number)]]],
fields=[word, number], changelogMode=[I])
+ +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
+ +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS
number], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[word]], changelogMode=[I])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [CollectionTableSource(word, number)]]],
fields=[word, number], changelogMode=[I])
+
+Sink(table=[default_catalog.default_database.sink2], fields=[number, word],
changelogMode=[NONE])
++- Calc(select=[number, word], changelogMode=[I,UB,UA])
+ +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number],
changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[word]], changelogMode=[I])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [CollectionTableSource(word, number)]]], fields=[word,
number], changelogMode=[I])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
index f1a4d39..99dfa2c 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
@@ -21,6 +21,7 @@ package
org.apache.flink.table.planner.plan.rules.physical.stream
import org.apache.flink.api.common.time.Time
import org.apache.flink.table.api.{ExplainDetail, _}
import org.apache.flink.table.api.config.OptimizerConfigOptions
+import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
import
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram
import org.apache.flink.table.planner.utils.{AggregatePhaseStrategy,
TableTestBase}
@@ -181,4 +182,67 @@ class ChangelogModeInferenceTest extends TableTestBase {
""".stripMargin
util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
}
+
+ @Test
+ def testPropagateUpdateKindAmongRelNodeBlocks(): Unit = {
+ util.tableEnv.getConfig.getConfiguration.setBoolean(
+
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
+ true)
+ util.addTable(
+ """
+ |create table sink1 (
+ | a INT,
+ | b VARCHAR
+ |) with (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+ util.addTable(
+ """
+ |create table sink2 (
+ | a INT,
+ | b VARCHAR,
+ | primary key (b) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql(
+ """
+ |CREATE VIEW v1 AS
+ |SELECT
+ | SUM(number) AS number, word
+ |FROM MyTable
+ |GROUP BY word
+ |""".stripMargin)
+
+ util.tableEnv.executeSql(
+ """
+ |CREATE VIEW v2 AS
+ |SELECT number + 1 AS number, word FROM v1
+ |UNION ALL
+ |SELECT number - 1 AS number, word FROM v1
+ |""".stripMargin)
+
+ val statementSet = util.tableEnv.createStatementSet()
+ // sink1 requires UB
+ statementSet.addInsertSql(
+ """
+ |INSERT INTO sink1 SELECT number, word FROM v2 WHERE word > 'a'
+ |""".stripMargin)
+ // sink2 requires UB
+ statementSet.addInsertSql(
+ """
+ |INSERT INTO sink1 SELECT number, word FROM v2 WHERE word < 'a'
+ |""".stripMargin)
+ // sink3 does not require UB
+ statementSet.addInsertSql(
+ """
+ |INSERT INTO sink2 SELECT * FROM v1
+ |""".stripMargin)
+ util.verifyRelPlan(statementSet, ExplainDetail.CHANGELOG_MODE)
+ }
}