This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e1117f0897cee184622bf5cce772c869d95afb7e
Author: JingsongLi <[email protected]>
AuthorDate: Wed Jun 23 17:47:01 2021 +0800

    [FLINK-23054][table] TemporalJoinRewrite should based on upsert key
---
 .../rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
index 474bfc8..c812f2d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
@@ -161,21 +161,21 @@ class TemporalJoinRewriteWithUniqueKeyRule extends 
RelOptRule(
     val rightFields = snapshot.getRowType.getFieldList
     val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(snapshot.getCluster.getMetadataQuery)
 
-    val uniqueKeySet = fmq.getUniqueKeys(snapshot.getInput())
+    val upsertKeySet = fmq.getUpsertKeys(snapshot.getInput())
     val fields = snapshot.getRowType.getFieldList
 
-    if (uniqueKeySet != null && uniqueKeySet.size() > 0) {
+    if (upsertKeySet != null && upsertKeySet.size() > 0) {
       val leftFieldCnt = leftInput.getRowType.getFieldCount
-      val uniqueKeySetInputRefs = uniqueKeySet.filter(_.nonEmpty)
+      val upsertKeySetInputRefs = upsertKeySet.filter(_.nonEmpty)
         .map(_.toArray
           .map(fields)
-          // build InputRef of unique key in snapshot
+          // build InputRef of upsert key in snapshot
           .map(f => rexBuilder.makeInputRef(
             f.getType,
             leftFieldCnt + rightFields.indexOf(f)))
           .toSeq)
-      // select shortest unique key as primary key
-      uniqueKeySetInputRefs
+      // select shortest upsert key as primary key
+      upsertKeySetInputRefs
         .toArray
         .sortBy(_.length)
         .headOption

Reply via email to