This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new 5466716b20d [FLINK-29781][table-planner] Fix ChangelogNormalize uses
wrong keys after transformation by
WatermarkAssignerChangelogNormalizeTransposeRule
5466716b20d is described below
commit 5466716b20d5c720bf29dea560909e7055870555
Author: lincoln.lil <[email protected]>
AuthorDate: Thu Oct 27 21:24:22 2022 +0800
[FLINK-29781][table-planner] Fix ChangelogNormalize uses wrong keys after
transformation by WatermarkAssignerChangelogNormalizeTransposeRule
This closes #21225
(cherry picked from commit 5463f244ec69f623d75c15374b55bb8695e92b3e)
---
...arkAssignerChangelogNormalizeTransposeRule.java | 11 ++++++
.../stream/StreamPhysicalChangelogNormalize.scala | 4 ++
...AssignerChangelogNormalizeTransposeRuleTest.xml | 43 +++++++++++++++++++++-
...signerChangelogNormalizeTransposeRuleTest.scala | 36 ++++++++++++++++++
4 files changed, 93 insertions(+), 1 deletion(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
index 1c532d0ca65..276b36389f3 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
@@ -47,6 +47,7 @@ import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.mapping.Mappings;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -359,6 +360,16 @@ public class
WatermarkAssignerChangelogNormalizeTransposeRule
inputNode,
nodeAndTrait.f1.getTrait(
FlinkRelDistributionTraitDef.INSTANCE()));
+ } else if (currentNode instanceof
StreamPhysicalChangelogNormalize) {
+ final List<String> inputNodeFields =
inputNode.getRowType().getFieldNames();
+ final List<String> currentNodeFields =
currentNode.getRowType().getFieldNames();
+ int[] remappedUniqueKeys =
+ Arrays.stream(((StreamPhysicalChangelogNormalize)
currentNode).uniqueKeys())
+ .map(ukIdx ->
inputNodeFields.indexOf(currentNodeFields.get(ukIdx)))
+ .toArray();
+ currentNode =
+ ((StreamPhysicalChangelogNormalize) currentNode)
+ .copy(nodeAndTrait.f1, inputNode,
remappedUniqueKeys);
} else {
currentNode =
currentNode.copy(nodeAndTrait.f1,
Collections.singletonList(inputNode));
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalChangelogNormalize.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalChangelogNormalize.scala
index ac3371af21c..6cfb445ee10 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalChangelogNormalize.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalChangelogNormalize.scala
@@ -58,6 +58,10 @@ class StreamPhysicalChangelogNormalize(
contextResolvedTable)
}
+ def copy(traitSet: RelTraitSet, input: RelNode, uniqueKeys: Array[Int]):
RelNode = {
+ new StreamPhysicalChangelogNormalize(cluster, traitSet, input, uniqueKeys,
contextResolvedTable)
+ }
+
override def explainTerms(pw: RelWriter): RelWriter = {
val fieldNames = getRowType.getFieldNames
super
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
index b0e1b7e155e..6020b3fd13f 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
@@ -44,7 +44,7 @@ Calc(select=[currency2, cnt, w$start AS w_start, w$end AS
w_end], changelogMode=
+- GroupWindowAggregate(groupBy=[currency2], window=[TumblingGroupWindow('w$,
currency_time, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime],
select=[currency2, COUNT(*) AS cnt, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I])
+- Exchange(distribution=[hash[currency2]], changelogMode=[I,UB,UA,D])
+- Calc(select=[currency2, currency_time], changelogMode=[I,UB,UA,D])
- +- ChangelogNormalize(key=[currency2], changelogMode=[I,UB,UA,D])
+ +- ChangelogNormalize(key=[currency], changelogMode=[I,UB,UA,D])
+- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
+- WatermarkAssigner(rowtime=[currency_time],
watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[UA,D])
+- Calc(select=[+(currency, 2) AS currency2, TO_TIMESTAMP(c)
AS currency_time, currency], changelogMode=[UA,D])
@@ -185,6 +185,47 @@ Calc(select=[currency, cnt, w$start AS w_start, w$end AS
w_end], changelogMode=[
+- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
+- WatermarkAssigner(rowtime=[currency_time],
watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[UA,D])
+- TableSourceScan(table=[[default_catalog, default_database,
simple_src, project=[currency, currency_time], metadata=[]]], fields=[currency,
currency_time], changelogMode=[UA,D])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPushdownCalcNotAffectChangelogNormalizeKey">
+ <Resource name="sql">
+ <![CDATA[
+SELECT t1.a, t1.b, t2.f
+FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time
+ ON t1.a = t2.a WHERE t2.f = true
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$1], b=[$2], f=[$6])
++- LogicalFilter(condition=[=($6, true)])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 1}])
+ :- LogicalWatermarkAssigner(rowtime=[ingestion_time], watermark=[$0])
+ : +- LogicalProject(ingestion_time=[CAST($2):TIMESTAMP(3) *ROWTIME*],
a=[$0], b=[$1])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
t1]])
+ +- LogicalFilter(condition=[=($cor0.a, $2)])
+ +- LogicalSnapshot(period=[$cor0.ingestion_time])
+ +- LogicalWatermarkAssigner(rowtime=[ingestion_time],
watermark=[$1])
+ +- LogicalProject(k=[$0], ingestion_time=[CAST($3):TIMESTAMP(3)
*ROWTIME*], a=[$1], f=[$2])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, t2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, f], changelogMode=[I])
++- TemporalJoin(joinType=[InnerJoin], where=[AND(=(a, a0),
__TEMPORAL_JOIN_CONDITION(ingestion_time, ingestion_time0,
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(a0), __TEMPORAL_JOIN_LEFT_KEY(a),
__TEMPORAL_JOIN_RIGHT_KEY(a0)))], select=[ingestion_time, a, b,
ingestion_time0, a0, f], changelogMode=[I])
+ :- Exchange(distribution=[hash[a]], changelogMode=[I])
+ : +- WatermarkAssigner(rowtime=[ingestion_time],
watermark=[ingestion_time], changelogMode=[I])
+ : +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS
ingestion_time, a, b], changelogMode=[I])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
t1]], fields=[a, b, ingestion_time], changelogMode=[I])
+ +- Exchange(distribution=[hash[a]], changelogMode=[I,UB,UA,D])
+ +- Calc(select=[ingestion_time, a, f], where=[f],
changelogMode=[I,UB,UA,D])
+ +- ChangelogNormalize(key=[a], changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[a]], changelogMode=[I,UA,D])
+ +- WatermarkAssigner(rowtime=[ingestion_time],
watermark=[ingestion_time], changelogMode=[I,UA,D])
+ +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3)
*ROWTIME*) AS ingestion_time, a, f], changelogMode=[I,UA,D])
+ +- TableSourceScan(table=[[default_catalog,
default_database, t2, project=[a, f], metadata=[ts]]], fields=[a, f,
ingestion_time], changelogMode=[I,UA,D])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
index 935a7d11ae2..c6b9b3b99fa 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
@@ -166,4 +166,40 @@ class WatermarkAssignerChangelogNormalizeTransposeRuleTest
extends TableTestBase
|""".stripMargin
util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
}
+
+ @Test
+ def testPushdownCalcNotAffectChangelogNormalizeKey(): Unit = {
+ util.addTable("""
+ |CREATE TABLE t1 (
+ | ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
+ | a VARCHAR NOT NULL,
+ | b VARCHAR NOT NULL,
+ | WATERMARK FOR ingestion_time AS ingestion_time
+ |) WITH (
+ | 'connector' = 'values',
+ | 'readable-metadata' = 'ts:TIMESTAMP(3)'
+ |)
+ """.stripMargin)
+ util.addTable("""
+ |CREATE TABLE t2 (
+ | k VARBINARY,
+ | ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
+ | a VARCHAR NOT NULL,
+ | f BOOLEAN NOT NULL,
+ | WATERMARK FOR `ingestion_time` AS `ingestion_time`,
+ | PRIMARY KEY (`a`) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values',
+ | 'readable-metadata' = 'ts:TIMESTAMP(3)',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ """.stripMargin)
+ val sql =
+ """
+ |SELECT t1.a, t1.b, t2.f
+ |FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time
+ | ON t1.a = t2.a WHERE t2.f = true
+ |""".stripMargin
+ util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
+ }
}