This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5463f244ec6 [FLINK-29781][table-planner] Fix ChangelogNormalize uses 
wrong keys after transformation by 
WatermarkAssignerChangelogNormalizeTransposeRule
5463f244ec6 is described below

commit 5463f244ec69f623d75c15374b55bb8695e92b3e
Author: lincoln.lil <[email protected]>
AuthorDate: Thu Oct 27 21:24:22 2022 +0800

    [FLINK-29781][table-planner] Fix ChangelogNormalize uses wrong keys after 
transformation by WatermarkAssignerChangelogNormalizeTransposeRule
    
    This closes #21225
---
 ...arkAssignerChangelogNormalizeTransposeRule.java | 11 ++++++
 .../stream/StreamPhysicalChangelogNormalize.scala  |  4 ++
 ...AssignerChangelogNormalizeTransposeRuleTest.xml | 43 +++++++++++++++++++++-
 ...signerChangelogNormalizeTransposeRuleTest.scala | 36 ++++++++++++++++++
 4 files changed, 93 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
index 1c532d0ca65..276b36389f3 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
@@ -47,6 +47,7 @@ import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.util.mapping.Mappings;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -359,6 +360,16 @@ public class 
WatermarkAssignerChangelogNormalizeTransposeRule
                                         inputNode,
                                         nodeAndTrait.f1.getTrait(
                                                 
FlinkRelDistributionTraitDef.INSTANCE()));
+            } else if (currentNode instanceof 
StreamPhysicalChangelogNormalize) {
+                final List<String> inputNodeFields = 
inputNode.getRowType().getFieldNames();
+                final List<String> currentNodeFields = 
currentNode.getRowType().getFieldNames();
+                int[] remappedUniqueKeys =
+                        Arrays.stream(((StreamPhysicalChangelogNormalize) 
currentNode).uniqueKeys())
+                                .map(ukIdx -> 
inputNodeFields.indexOf(currentNodeFields.get(ukIdx)))
+                                .toArray();
+                currentNode =
+                        ((StreamPhysicalChangelogNormalize) currentNode)
+                                .copy(nodeAndTrait.f1, inputNode, 
remappedUniqueKeys);
             } else {
                 currentNode =
                         currentNode.copy(nodeAndTrait.f1, 
Collections.singletonList(inputNode));
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalChangelogNormalize.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalChangelogNormalize.scala
index ac3371af21c..6cfb445ee10 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalChangelogNormalize.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalChangelogNormalize.scala
@@ -58,6 +58,10 @@ class StreamPhysicalChangelogNormalize(
       contextResolvedTable)
   }
 
+  def copy(traitSet: RelTraitSet, input: RelNode, uniqueKeys: Array[Int]): 
RelNode = {
+    new StreamPhysicalChangelogNormalize(cluster, traitSet, input, uniqueKeys, 
contextResolvedTable)
+  }
+
   override def explainTerms(pw: RelWriter): RelWriter = {
     val fieldNames = getRowType.getFieldNames
     super
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
index b0e1b7e155e..6020b3fd13f 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml
@@ -44,7 +44,7 @@ Calc(select=[currency2, cnt, w$start AS w_start, w$end AS 
w_end], changelogMode=
 +- GroupWindowAggregate(groupBy=[currency2], window=[TumblingGroupWindow('w$, 
currency_time, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
select=[currency2, COUNT(*) AS cnt, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I])
    +- Exchange(distribution=[hash[currency2]], changelogMode=[I,UB,UA,D])
       +- Calc(select=[currency2, currency_time], changelogMode=[I,UB,UA,D])
-         +- ChangelogNormalize(key=[currency2], changelogMode=[I,UB,UA,D])
+         +- ChangelogNormalize(key=[currency], changelogMode=[I,UB,UA,D])
             +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
                +- WatermarkAssigner(rowtime=[currency_time], 
watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[UA,D])
                   +- Calc(select=[+(currency, 2) AS currency2, TO_TIMESTAMP(c) 
AS currency_time, currency], changelogMode=[UA,D])
@@ -185,6 +185,47 @@ Calc(select=[currency, cnt, w$start AS w_start, w$end AS 
w_end], changelogMode=[
          +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
             +- WatermarkAssigner(rowtime=[currency_time], 
watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[UA,D])
                +- TableSourceScan(table=[[default_catalog, default_database, 
simple_src, project=[currency, currency_time], metadata=[]]], fields=[currency, 
currency_time], changelogMode=[UA,D])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPushdownCalcNotAffectChangelogNormalizeKey">
+    <Resource name="sql">
+      <![CDATA[
+SELECT t1.a, t1.b, t2.f
+FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time
+ ON t1.a = t2.a WHERE t2.f = true
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$1], b=[$2], f=[$6])
++- LogicalFilter(condition=[=($6, true)])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 1}])
+      :- LogicalWatermarkAssigner(rowtime=[ingestion_time], watermark=[$0])
+      :  +- LogicalProject(ingestion_time=[CAST($2):TIMESTAMP(3) *ROWTIME*], 
a=[$0], b=[$1])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
t1]])
+      +- LogicalFilter(condition=[=($cor0.a, $2)])
+         +- LogicalSnapshot(period=[$cor0.ingestion_time])
+            +- LogicalWatermarkAssigner(rowtime=[ingestion_time], 
watermark=[$1])
+               +- LogicalProject(k=[$0], ingestion_time=[CAST($3):TIMESTAMP(3) 
*ROWTIME*], a=[$1], f=[$2])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, t2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, f], changelogMode=[I])
++- TemporalJoin(joinType=[InnerJoin], where=[AND(=(a, a0), 
__TEMPORAL_JOIN_CONDITION(ingestion_time, ingestion_time0, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(a0), __TEMPORAL_JOIN_LEFT_KEY(a), 
__TEMPORAL_JOIN_RIGHT_KEY(a0)))], select=[ingestion_time, a, b, 
ingestion_time0, a0, f], changelogMode=[I])
+   :- Exchange(distribution=[hash[a]], changelogMode=[I])
+   :  +- WatermarkAssigner(rowtime=[ingestion_time], 
watermark=[ingestion_time], changelogMode=[I])
+   :     +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) *ROWTIME*) AS 
ingestion_time, a, b], changelogMode=[I])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, 
t1]], fields=[a, b, ingestion_time], changelogMode=[I])
+   +- Exchange(distribution=[hash[a]], changelogMode=[I,UB,UA,D])
+      +- Calc(select=[ingestion_time, a, f], where=[f], 
changelogMode=[I,UB,UA,D])
+         +- ChangelogNormalize(key=[a], changelogMode=[I,UB,UA,D])
+            +- Exchange(distribution=[hash[a]], changelogMode=[I,UA,D])
+               +- WatermarkAssigner(rowtime=[ingestion_time], 
watermark=[ingestion_time], changelogMode=[I,UA,D])
+                  +- Calc(select=[CAST(ingestion_time AS TIMESTAMP(3) 
*ROWTIME*) AS ingestion_time, a, f], changelogMode=[I,UA,D])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, t2, project=[a, f], metadata=[ts]]], fields=[a, f, 
ingestion_time], changelogMode=[I,UA,D])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
index 935a7d11ae2..c6b9b3b99fa 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala
@@ -166,4 +166,40 @@ class WatermarkAssignerChangelogNormalizeTransposeRuleTest 
extends TableTestBase
         |""".stripMargin
     util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
   }
+
+  @Test
+  def testPushdownCalcNotAffectChangelogNormalizeKey(): Unit = {
+    util.addTable("""
+                    |CREATE TABLE t1 (
+                    |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
+                    |  a VARCHAR NOT NULL,
+                    |  b VARCHAR NOT NULL,
+                    |  WATERMARK FOR ingestion_time AS ingestion_time
+                    |) WITH (
+                    | 'connector' = 'values',
+                    | 'readable-metadata' = 'ts:TIMESTAMP(3)'
+                    |)
+      """.stripMargin)
+    util.addTable("""
+                    |CREATE TABLE t2 (
+                    |  k VARBINARY,
+                    |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',
+                    |  a VARCHAR NOT NULL,
+                    |  f BOOLEAN NOT NULL,
+                    |  WATERMARK FOR `ingestion_time` AS `ingestion_time`,
+                    |  PRIMARY KEY (`a`) NOT ENFORCED
+                    |) WITH (
+                    | 'connector' = 'values',
+                    | 'readable-metadata' = 'ts:TIMESTAMP(3)',
+                    | 'changelog-mode' = 'I,UA,D'
+                    |)
+      """.stripMargin)
+    val sql =
+      """
+        |SELECT t1.a, t1.b, t2.f
+        |FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time
+        | ON t1.a = t2.a WHERE t2.f = true
+        |""".stripMargin
+    util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
+  }
 }

Reply via email to