This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e1117f0897cee184622bf5cce772c869d95afb7e Author: JingsongLi <[email protected]> AuthorDate: Wed Jun 23 17:47:01 2021 +0800 [FLINK-23054][table] TemporalJoinRewrite should based on upsert key --- .../rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala index 474bfc8..c812f2d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala @@ -161,21 +161,21 @@ class TemporalJoinRewriteWithUniqueKeyRule extends RelOptRule( val rightFields = snapshot.getRowType.getFieldList val fmq = FlinkRelMetadataQuery.reuseOrCreate(snapshot.getCluster.getMetadataQuery) - val uniqueKeySet = fmq.getUniqueKeys(snapshot.getInput()) + val upsertKeySet = fmq.getUpsertKeys(snapshot.getInput()) val fields = snapshot.getRowType.getFieldList - if (uniqueKeySet != null && uniqueKeySet.size() > 0) { + if (upsertKeySet != null && upsertKeySet.size() > 0) { val leftFieldCnt = leftInput.getRowType.getFieldCount - val uniqueKeySetInputRefs = uniqueKeySet.filter(_.nonEmpty) + val upsertKeySetInputRefs = upsertKeySet.filter(_.nonEmpty) .map(_.toArray .map(fields) - // build InputRef of unique key in snapshot + // build InputRef of upsert key in snapshot .map(f => rexBuilder.makeInputRef( f.getType, leftFieldCnt + rightFields.indexOf(f))) .toSeq) - // select shortest unique key as primary key - uniqueKeySetInputRefs + // select shortest upsert key as primary key + upsertKeySetInputRefs .toArray .sortBy(_.length) .headOption
