This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 992340edabc118480464e962a7a9f3a0752e24a1 Author: JingsongLi <[email protected]> AuthorDate: Wed Jun 23 17:53:20 2021 +0800 [FLINK-23054][table] Join unique/pk optimization should based on upsert key --- .../nodes/physical/stream/StreamPhysicalJoin.scala | 28 +++++++------- .../planner/plan/stream/sql/join/JoinTest.xml | 43 ++++++++++++++++++++++ .../planner/plan/stream/sql/join/JoinTest.scala | 34 +++++++++++++++++ 3 files changed, 92 insertions(+), 13 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala index 98ca415..aba3acb 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala @@ -19,14 +19,15 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream import org.apache.flink.table.planner.calcite.FlinkTypeFactory +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery import org.apache.flink.table.runtime.typeutils.InternalTypeInfo -import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode} +import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalJoin import org.apache.flink.table.planner.plan.utils.JoinUtil import org.apache.calcite.plan._ -import org.apache.calcite.rel.core.{Join, JoinRelType} +import org.apache.calcite.rel.core.{Exchange, Join, JoinRelType} import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex.RexNode @@ -67,11 +68,11 @@ class StreamPhysicalJoin( */ def inputUniqueKeyContainsJoinKey(inputOrdinal: Int): Boolean = { val input = getInput(inputOrdinal) - val inputUniqueKeys = getCluster.getMetadataQuery.getUniqueKeys(input) + val joinKeys = if (inputOrdinal == 0) joinSpec.getLeftKeys else joinSpec.getRightKeys + val inputUniqueKeys = getUniqueKeys(input, joinKeys) if (inputUniqueKeys != null) { - val joinKeys = if (inputOrdinal == 0) joinSpec.getLeftKeys else joinSpec.getRightKeys inputUniqueKeys.exists { - uniqueKey => joinKeys.forall(uniqueKey.toArray.contains(_)) + uniqueKey => joinKeys.forall(uniqueKey.contains(_)) } } else { false @@ -98,21 +99,22 @@ class StreamPhysicalJoin( JoinUtil.analyzeJoinInput( InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(left.getRowType)), joinSpec.getLeftKeys, - getUniqueKeys(left))) + getUniqueKeys(left, joinSpec.getLeftKeys))) .item( "rightInputSpec", JoinUtil.analyzeJoinInput( InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(right.getRowType)), joinSpec.getRightKeys, - getUniqueKeys(right))) + getUniqueKeys(right, joinSpec.getRightKeys))) } - private def getUniqueKeys(input: RelNode): List[Array[Int]] = { - val uniqueKeys = cluster.getMetadataQuery.getUniqueKeys(input) - if (uniqueKeys == null || uniqueKeys.isEmpty) { + private def getUniqueKeys(input: RelNode, keys: Array[Int]): List[Array[Int]] = { + val upsertKeys = FlinkRelMetadataQuery.reuseOrCreate(cluster.getMetadataQuery) + .getUpsertKeysInKeyGroupRange(input, keys) + if (upsertKeys == null || upsertKeys.isEmpty) { List.empty } else { - uniqueKeys.map(_.asList.map(_.intValue).toArray).toList + upsertKeys.map(_.asList.map(_.intValue).toArray).toList } } @@ -125,8 +127,8 @@ class StreamPhysicalJoin( override def translateToExecNode(): ExecNode[_] = { new StreamExecJoin( joinSpec, - getUniqueKeys(left), - getUniqueKeys(right), + getUniqueKeys(left, joinSpec.getLeftKeys), + getUniqueKeys(right, joinSpec.getRightKeys), InputProperty.DEFAULT, InputProperty.DEFAULT, FlinkTypeFactory.toLogicalRowType(getRowType), diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml index 08bbce2..a7e5385 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml @@ -423,6 +423,49 @@ Calc(select=[a1]) ]]> </Resource> </TestCase> + <TestCase name="testJoinDisorderChangeLog"> + <Resource name="sql"> + <![CDATA[ +SELECT T1.person, T1.sum_votes, T1.prize, T2.age FROM + (SELECT T.person, T.sum_votes, award.prize FROM + (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T, + award + WHERE T.sum_votes = award.votes) T1, people T2 + WHERE T1.person = T2.person +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(person=[$0], sum_votes=[$1], prize=[$2], age=[$4]) ++- LogicalFilter(condition=[=($0, $3)]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalProject(person=[$0], sum_votes=[$1], prize=[$3]) + : +- LogicalFilter(condition=[=($1, $2)]) + : +- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalAggregate(group=[{0}], sum_votes=[SUM($1)]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, src]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, award]]) + +- LogicalTableScan(table=[[default_catalog, default_database, people]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[person, sum_votes, prize, age]) ++- Join(joinType=[InnerJoin], where=[(person = person0)], select=[person, sum_votes, prize, person0, age], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) + :- Exchange(distribution=[hash[person]]) + : +- Calc(select=[person, sum_votes, prize]) + : +- Join(joinType=[InnerJoin], where=[(sum_votes = votes)], select=[person, sum_votes, votes, prize], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) + : :- Exchange(distribution=[hash[sum_votes]]) + : : +- GroupAggregate(groupBy=[person], select=[person, SUM(votes) AS sum_votes]) + : : +- Exchange(distribution=[hash[person]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[person, votes]) + : +- Exchange(distribution=[hash[votes]]) + : +- TableSourceScan(table=[[default_catalog, default_database, award]], fields=[votes, prize]) + +- Exchange(distribution=[hash[person]]) + +- TableSourceScan(table=[[default_catalog, default_database, people]], fields=[person, age]) +]]> + </Resource> + </TestCase> <TestCase name="testJoinWithSort"> <Resource name="sql"> <