This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new d1b8c5f  [FLINK-22157][table-planner-blink] Fix join & select a 
portion of composite primary key will cause ArrayIndexOutOfBoundsException
d1b8c5f is described below

commit d1b8c5fd54e1d104387ef6acb94b9a9698378ed3
Author: TsReaper <[email protected]>
AuthorDate: Fri Apr 9 15:39:34 2021 +0800

    [FLINK-22157][table-planner-blink] Fix join & select a portion of composite 
primary key will cause ArrayIndexOutOfBoundsException
    
    This closes #16305
    
    (cherry picked from commit 92fbe7f1fe5f0eade036b4184cdbab8f9b791647)
---
 .../plan/metadata/FlinkRelMdUniqueKeys.scala       | 26 ++++----
 .../plan/batch/sql/agg/HashAggregateTest.xml       | 65 ++++++++++++++++++++
 .../plan/batch/sql/agg/SortAggregateTest.xml       | 70 ++++++++++++++++++++++
 .../planner/plan/stream/sql/join/JoinTest.xml      | 24 ++++++++
 .../plan/batch/sql/agg/AggregateTestBase.scala     | 20 ++++++-
 .../plan/metadata/FlinkRelMdUniqueKeysTest.scala   | 14 +++++
 .../planner/plan/metadata/MetadataTestUtil.scala   | 33 ++++++++++
 .../planner/plan/stream/sql/join/JoinTest.scala    | 15 +++++
 8 files changed, 255 insertions(+), 12 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
index a9b9745..7e632ad 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -66,23 +66,27 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
         val catalogTable = sourceTable.catalogTable
         catalogTable match {
           case act: CatalogTable =>
+            val builder = ImmutableSet.builder[ImmutableBitSet]()
+
             val schema = act.getSchema
             if (schema.getPrimaryKey.isPresent) {
               // use relOptTable's type which may be projected based on 
original schema
               val columns = relOptTable.getRowType.getFieldNames
-              val columnIndices = schema.getPrimaryKey.get().getColumns map { 
c =>
-                columns.indexOf(c)
-              }
-              val builder = ImmutableSet.builder[ImmutableBitSet]()
-              builder.add(ImmutableBitSet.of(columnIndices:_*))
-              val uniqueSet = sourceTable.uniqueKeysSet().orElse(null)
-              if (uniqueSet != null) {
-                builder.addAll(uniqueSet)
+              val primaryKeyColumns = schema.getPrimaryKey.get().getColumns
+              // we check this because a portion of a composite primary key is 
not unique
+              if (columns.containsAll(primaryKeyColumns)) {
+                val columnIndices = primaryKeyColumns.map(c => 
columns.indexOf(c))
+                builder.add(ImmutableBitSet.of(columnIndices: _*))
               }
-              builder.build()
-            } else {
-              sourceTable.uniqueKeysSet.orElse(null)
             }
+
+            val uniqueSet = sourceTable.uniqueKeysSet.orElse(null)
+            if (uniqueSet != null) {
+              builder.addAll(uniqueSet)
+            }
+
+            val result = builder.build()
+            if (result.isEmpty) null else result
         }
       case table: FlinkPreparingTableBase => table.uniqueKeysSet.orElse(null)
       case _ => null
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
index a9daeed..e7c41d3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
@@ -1071,4 +1071,69 @@ HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) 
AS EXPR$0, Final_SUM(sum$
 ]]>
     </Resource>
   </TestCase>
+  <TestCase 
name="testReduceGroupingOnTableWithCompositePrimaryKey[aggStrategy=AUTO]">
+    <Resource name="sql">
+      <![CDATA[SELECT SUM(val) FROM tableWithCompositePk GROUP BY grp1, grp2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+   +- LogicalProject(grp1=[$3], grp2=[$4], val=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
tableWithCompositePk]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- HashAggregate(isMerge=[true], groupBy=[grp1, grp2], select=[grp1, grp2, 
Final_SUM(sum$0) AS EXPR$0])
+   +- Exchange(distribution=[hash[grp1, grp2]])
+      +- LocalHashAggregate(groupBy=[grp1, grp2], select=[grp1, grp2, 
Partial_SUM(val) AS sum$0])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
tableWithCompositePk, project=[grp1, grp2, val]]], fields=[grp1, grp2, val])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testReduceGroupingOnTableWithCompositePrimaryKey[aggStrategy=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT SUM(val) FROM tableWithCompositePk GROUP BY grp1, grp2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+   +- LogicalProject(grp1=[$3], grp2=[$4], val=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
tableWithCompositePk]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- HashAggregate(isMerge=[true], groupBy=[grp1, grp2], select=[grp1, grp2, 
Final_SUM(sum$0) AS EXPR$0])
+   +- Exchange(distribution=[hash[grp1, grp2]])
+      +- LocalHashAggregate(groupBy=[grp1, grp2], select=[grp1, grp2, 
Partial_SUM(val) AS sum$0])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
tableWithCompositePk, project=[grp1, grp2, val]]], fields=[grp1, grp2, val])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testReduceGroupingOnTableWithCompositePrimaryKey[aggStrategy=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT SUM(val) FROM tableWithCompositePk GROUP BY grp1, grp2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+   +- LogicalProject(grp1=[$3], grp2=[$4], val=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
tableWithCompositePk]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- HashAggregate(isMerge=[false], groupBy=[grp1, grp2], select=[grp1, grp2, 
SUM(val) AS EXPR$0])
+   +- Exchange(distribution=[hash[grp1, grp2]])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
tableWithCompositePk, project=[grp1, grp2, val]]], fields=[grp1, grp2, val])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
index 54034a0..fd2d01b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
@@ -1281,4 +1281,74 @@ SortAggregate(isMerge=[true], select=[Final_SUM(sum$0) 
AS EXPR$0, Final_SUM(sum$
 ]]>
     </Resource>
   </TestCase>
+  <TestCase 
name="testReduceGroupingOnTableWithCompositePrimaryKey[aggStrategy=AUTO]">
+    <Resource name="sql">
+      <![CDATA[SELECT SUM(val) FROM tableWithCompositePk GROUP BY grp1, grp2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+   +- LogicalProject(grp1=[$3], grp2=[$4], val=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
tableWithCompositePk]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- SortAggregate(isMerge=[true], groupBy=[grp1, grp2], select=[grp1, grp2, 
Final_SUM(sum$0) AS EXPR$0])
+   +- Sort(orderBy=[grp1 ASC, grp2 ASC])
+      +- Exchange(distribution=[hash[grp1, grp2]])
+         +- LocalSortAggregate(groupBy=[grp1, grp2], select=[grp1, grp2, 
Partial_SUM(val) AS sum$0])
+            +- Sort(orderBy=[grp1 ASC, grp2 ASC])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
tableWithCompositePk, project=[grp1, grp2, val]]], fields=[grp1, grp2, val])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testReduceGroupingOnTableWithCompositePrimaryKey[aggStrategy=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT SUM(val) FROM tableWithCompositePk GROUP BY grp1, grp2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+   +- LogicalProject(grp1=[$3], grp2=[$4], val=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
tableWithCompositePk]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- SortAggregate(isMerge=[true], groupBy=[grp1, grp2], select=[grp1, grp2, 
Final_SUM(sum$0) AS EXPR$0])
+   +- Sort(orderBy=[grp1 ASC, grp2 ASC])
+      +- Exchange(distribution=[hash[grp1, grp2]])
+         +- LocalSortAggregate(groupBy=[grp1, grp2], select=[grp1, grp2, 
Partial_SUM(val) AS sum$0])
+            +- Sort(orderBy=[grp1 ASC, grp2 ASC])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
tableWithCompositePk, project=[grp1, grp2, val]]], fields=[grp1, grp2, val])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testReduceGroupingOnTableWithCompositePrimaryKey[aggStrategy=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT SUM(val) FROM tableWithCompositePk GROUP BY grp1, grp2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+   +- LogicalProject(grp1=[$3], grp2=[$4], val=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
tableWithCompositePk]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- SortAggregate(isMerge=[false], groupBy=[grp1, grp2], select=[grp1, grp2, 
SUM(val) AS EXPR$0])
+   +- Sort(orderBy=[grp1 ASC, grp2 ASC])
+      +- Exchange(distribution=[hash[grp1, grp2]])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
tableWithCompositePk, project=[grp1, grp2, val]]], fields=[grp1, grp2, val])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
index d484733..cf0b4ca 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
@@ -398,6 +398,30 @@ Calc(select=[a1, a2, b1, b2], changelogMode=[I,UB,UA])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testJoinAndSelectOnPartialCompositePrimaryKey">
+    <Resource name="sql">
+      <![CDATA[SELECT A.a1 FROM A LEFT JOIN tableWithCompositePk T ON A.a1 = 
T.pk1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0])
++- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+   :- LogicalTableScan(table=[[default_catalog, default_database, A, source: 
[TestTableSource(a1, a2, a3)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
tableWithCompositePk]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a1])
++- Join(joinType=[LeftOuterJoin], where=[=(a1, pk1)], select=[a1, pk1], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a1]])
+   :  +- Calc(select=[a1])
+   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
A, source: [TestTableSource(a1, a2, a3)]]], fields=[a1, a2, a3])
+   +- Exchange(distribution=[hash[pk1]])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
tableWithCompositePk, project=[pk1]]], fields=[pk1])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testJoinWithSort">
     <Resource name="sql">
       <![CDATA[
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
index 0f35b8c..cc5bb87 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.plan.batch.sql.agg
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.api.{TableException, Types}
 import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.{VarSum1AggFunction,
 VarSum2AggFunction}
 import org.apache.flink.table.planner.utils.{BatchTableTestUtil, TableTestBase}
@@ -209,5 +208,24 @@ abstract class AggregateTestBase extends TableTestBase {
     util.verifyPlan(sql)
   }
 
+  @Test
+  def testReduceGroupingOnTableWithCompositePrimaryKey(): Unit = {
+    util.tableEnv.executeSql(
+      """
+        |CREATE TABLE tableWithCompositePk (
+        |  pk1 INT,
+        |  pk2 BIGINT,
+        |  val BIGINT,
+        |  grp1 BIGINT,
+        |  grp2 BIGINT,
+        |  PRIMARY KEY (pk1, pk2) NOT ENFORCED
+        |) WITH (
+        |  'connector'='values',
+        |  'bounded'='true'
+        |)
+        |""".stripMargin)
+    util.verifyPlan("SELECT SUM(val) FROM tableWithCompositePk GROUP BY grp1, 
grp2")
+  }
+
   // TODO supports group sets
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
index 5e9c565..5252247 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
@@ -57,6 +57,20 @@ class FlinkRelMdUniqueKeysTest extends 
FlinkRelMdHandlerTestBase {
   }
 
   @Test
+  def testGetUniqueKeysOnProjectedTableScanWithPartialCompositePrimaryKey(): 
Unit = {
+    val table = relBuilder
+      .getRelOptSchema
+      .asInstanceOf[CalciteCatalogReader]
+      .getTable(Seq("projected_table_source_table_with_partial_pk"))
+      .asInstanceOf[TableSourceTable]
+    val tableSourceScan = new StreamExecTableSourceScan(
+      cluster,
+      streamPhysicalTraits,
+      table)
+    assertNull(mq.getUniqueKeys(tableSourceScan))
+  }
+
+  @Test
   def testGetUniqueKeysOnValues(): Unit = {
     assertNull(mq.getUniqueKeys(logicalValues))
     assertNull(mq.getUniqueKeys(emptyValues))
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
index 9a2e0c7..bcb28d2 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
@@ -56,6 +56,9 @@ object MetadataTestUtil {
     rootSchema.add("TemporalTable2", createTemporalTable2())
     rootSchema.add("TemporalTable3", createTemporalTable3())
     rootSchema.add("projected_table_source_table", 
createProjectedTableSourceTable())
+    rootSchema.add(
+      "projected_table_source_table_with_partial_pk",
+      createProjectedTableSourceTableWithPartialCompositePrimaryKey())
     rootSchema
   }
 
@@ -268,6 +271,36 @@ object MetadataTestUtil {
       Array("project=[a, c, d]"))
   }
 
+  private def createProjectedTableSourceTableWithPartialCompositePrimaryKey(): 
Table = {
+    val catalogTable = CatalogTableImpl.fromProperties(
+      Map(
+        "connector" -> "values",
+        "bounded" -> "true",
+        "schema.0.name" -> "a",
+        "schema.0.data-type" -> "BIGINT NOT NULL",
+        "schema.1.name" -> "b",
+        "schema.1.data-type" -> "BIGINT NOT NULL",
+        "schema.primary-key.name" -> "PK_1",
+        "schema.primary-key.columns" -> "a,b")
+    )
+
+    val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
+    val rowType = typeFactory.buildRelNodeRowType(
+      Seq("a"),
+      Seq(new BigIntType(false)))
+
+    new MockTableSourceTable(
+      ObjectIdentifier.of(
+        "default_catalog",
+        "default_database",
+        "projected_table_source_table_with_partial_pk"),
+      rowType,
+      new TestTableSource(),
+      true,
+      catalogTable,
+      Array("project=[a]"))
+  }
+
   private def getMetadataTable(
       tableSchema: TableSchema,
       statistic: FlinkStatistic,
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
index a260349..2be8938d 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
@@ -284,4 +284,19 @@ class JoinTest extends TableTestBase {
   def testRightOuterJoinEquiAndNonEquiPred(): Unit = {
     util.verifyPlan("SELECT b, y FROM t RIGHT OUTER JOIN s ON a = z AND b < x")
   }
+
+  @Test
+  def testJoinAndSelectOnPartialCompositePrimaryKey(): Unit = {
+    util.tableEnv.executeSql(
+      """
+        |CREATE TABLE tableWithCompositePk (
+        |  pk1 INT,
+        |  pk2 BIGINT,
+        |  PRIMARY KEY (pk1, pk2) NOT ENFORCED
+        |) WITH (
+        |  'connector'='values'
+        |)
+        |""".stripMargin)
+    util.verifyPlan("SELECT A.a1 FROM A LEFT JOIN tableWithCompositePk T ON 
A.a1 = T.pk1")
+  }
 }

Reply via email to