This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 992340edabc118480464e962a7a9f3a0752e24a1
Author: JingsongLi <[email protected]>
AuthorDate: Wed Jun 23 17:53:20 2021 +0800

    [FLINK-23054][table] Join unique/pk optimization should based on upsert key
---
 .../nodes/physical/stream/StreamPhysicalJoin.scala | 28 +++++++-------
 .../planner/plan/stream/sql/join/JoinTest.xml      | 43 ++++++++++++++++++++++
 .../planner/plan/stream/sql/join/JoinTest.scala    | 34 +++++++++++++++++
 3 files changed, 92 insertions(+), 13 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
index 98ca415..aba3acb 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
@@ -19,14 +19,15 @@
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
-import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode}
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin
 import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalJoin
 import org.apache.flink.table.planner.plan.utils.JoinUtil
 
 import org.apache.calcite.plan._
-import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rel.core.{Exchange, Join, JoinRelType}
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexNode
@@ -67,11 +68,11 @@ class StreamPhysicalJoin(
    */
   def inputUniqueKeyContainsJoinKey(inputOrdinal: Int): Boolean = {
     val input = getInput(inputOrdinal)
-    val inputUniqueKeys = getCluster.getMetadataQuery.getUniqueKeys(input)
+    val joinKeys = if (inputOrdinal == 0) joinSpec.getLeftKeys else 
joinSpec.getRightKeys
+    val inputUniqueKeys = getUniqueKeys(input, joinKeys)
     if (inputUniqueKeys != null) {
-      val joinKeys = if (inputOrdinal == 0) joinSpec.getLeftKeys else 
joinSpec.getRightKeys
       inputUniqueKeys.exists {
-        uniqueKey => joinKeys.forall(uniqueKey.toArray.contains(_))
+        uniqueKey => joinKeys.forall(uniqueKey.contains(_))
       }
     } else {
       false
@@ -98,21 +99,22 @@ class StreamPhysicalJoin(
           JoinUtil.analyzeJoinInput(
               
InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(left.getRowType)),
               joinSpec.getLeftKeys,
-              getUniqueKeys(left)))
+              getUniqueKeys(left, joinSpec.getLeftKeys)))
       .item(
           "rightInputSpec",
           JoinUtil.analyzeJoinInput(
               
InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(right.getRowType)),
               joinSpec.getRightKeys,
-              getUniqueKeys(right)))
+              getUniqueKeys(right, joinSpec.getRightKeys)))
   }
 
-  private def getUniqueKeys(input: RelNode): List[Array[Int]] = {
-    val uniqueKeys = cluster.getMetadataQuery.getUniqueKeys(input)
-    if (uniqueKeys == null || uniqueKeys.isEmpty) {
+  private def getUniqueKeys(input: RelNode, keys: Array[Int]): 
List[Array[Int]] = {
+    val upsertKeys = 
FlinkRelMetadataQuery.reuseOrCreate(cluster.getMetadataQuery)
+        .getUpsertKeysInKeyGroupRange(input, keys)
+    if (upsertKeys == null || upsertKeys.isEmpty) {
       List.empty
     } else {
-      uniqueKeys.map(_.asList.map(_.intValue).toArray).toList
+      upsertKeys.map(_.asList.map(_.intValue).toArray).toList
     }
 
   }
@@ -125,8 +127,8 @@ class StreamPhysicalJoin(
   override def translateToExecNode(): ExecNode[_] = {
     new StreamExecJoin(
         joinSpec,
-        getUniqueKeys(left),
-        getUniqueKeys(right),
+        getUniqueKeys(left, joinSpec.getLeftKeys),
+        getUniqueKeys(right, joinSpec.getRightKeys),
         InputProperty.DEFAULT,
         InputProperty.DEFAULT,
         FlinkTypeFactory.toLogicalRowType(getRowType),
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
index 08bbce2..a7e5385 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
@@ -423,6 +423,49 @@ Calc(select=[a1])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testJoinDisorderChangeLog">
+    <Resource name="sql">
+      <![CDATA[
+SELECT T1.person, T1.sum_votes, T1.prize, T2.age FROM
+ (SELECT T.person, T.sum_votes, award.prize FROM
+   (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T,
+   award
+   WHERE T.sum_votes = award.votes) T1, people T2
+ WHERE T1.person = T2.person
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(person=[$0], sum_votes=[$1], prize=[$2], age=[$4])
++- LogicalFilter(condition=[=($0, $3)])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalProject(person=[$0], sum_votes=[$1], prize=[$3])
+      :  +- LogicalFilter(condition=[=($1, $2)])
+      :     +- LogicalJoin(condition=[true], joinType=[inner])
+      :        :- LogicalAggregate(group=[{0}], sum_votes=[SUM($1)])
+      :        :  +- LogicalTableScan(table=[[default_catalog, 
default_database, src]])
+      :        +- LogicalTableScan(table=[[default_catalog, default_database, 
award]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, people]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[person, sum_votes, prize, age])
++- Join(joinType=[InnerJoin], where=[(person = person0)], select=[person, 
sum_votes, prize, person0, age], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
+   :- Exchange(distribution=[hash[person]])
+   :  +- Calc(select=[person, sum_votes, prize])
+   :     +- Join(joinType=[InnerJoin], where=[(sum_votes = votes)], 
select=[person, sum_votes, votes, prize], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
+   :        :- Exchange(distribution=[hash[sum_votes]])
+   :        :  +- GroupAggregate(groupBy=[person], select=[person, SUM(votes) 
AS sum_votes])
+   :        :     +- Exchange(distribution=[hash[person]])
+   :        :        +- TableSourceScan(table=[[default_catalog, 
default_database, src]], fields=[person, votes])
+   :        +- Exchange(distribution=[hash[votes]])
+   :           +- TableSourceScan(table=[[default_catalog, default_database, 
award]], fields=[votes, prize])
+   +- Exchange(distribution=[hash[person]])
+      +- TableSourceScan(table=[[default_catalog, default_database, people]], 
fields=[person, age])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testJoinWithSort">
     <Resource name="sql">
       <![CDATA[
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
index 12f0424..fd70df2 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
@@ -299,4 +299,38 @@ class JoinTest extends TableTestBase {
         |""".stripMargin)
     util.verifyExecPlan("SELECT A.a1 FROM A LEFT JOIN tableWithCompositePk T 
ON A.a1 = T.pk1")
   }
+
+  @Test
+  def testJoinDisorderChangeLog(): Unit = {
+    util.tableEnv.executeSql(
+      """
+        |CREATE TABLE src (person String, votes BIGINT) WITH(
+        |  'connector' = 'values'
+        |)
+        |""".stripMargin)
+
+    util.tableEnv.executeSql(
+      """
+        |CREATE TABLE award (votes BIGINT, prize DOUBLE, PRIMARY KEY(votes) 
NOT ENFORCED) WITH(
+        |  'connector' = 'values'
+        |)
+        |""".stripMargin)
+
+    util.tableEnv.executeSql(
+      """
+        |CREATE TABLE people (person STRING, age INT, PRIMARY KEY(person) NOT 
ENFORCED) WITH(
+        |  'connector' = 'values'
+        |)
+        |""".stripMargin)
+
+    util.verifyExecPlan(
+      """
+        |SELECT T1.person, T1.sum_votes, T1.prize, T2.age FROM
+        | (SELECT T.person, T.sum_votes, award.prize FROM
+        |   (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) 
T,
+        |   award
+        |   WHERE T.sum_votes = award.votes) T1, people T2
+        | WHERE T1.person = T2.person
+        |""".stripMargin)
+  }
 }

Reply via email to