This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new d1b8c5f [FLINK-22157][table-planner-blink] Fix join & select a
portion of composite primary key will cause ArrayIndexOutOfBoundsException
d1b8c5f is described below
commit d1b8c5fd54e1d104387ef6acb94b9a9698378ed3
Author: TsReaper <[email protected]>
AuthorDate: Fri Apr 9 15:39:34 2021 +0800
[FLINK-22157][table-planner-blink] Fix join & select a portion of composite
primary key will cause ArrayIndexOutOfBoundsException
This closes #16305
(cherry picked from commit 92fbe7f1fe5f0eade036b4184cdbab8f9b791647)
---
.../plan/metadata/FlinkRelMdUniqueKeys.scala | 26 ++++----
.../plan/batch/sql/agg/HashAggregateTest.xml | 65 ++++++++++++++++++++
.../plan/batch/sql/agg/SortAggregateTest.xml | 70 ++++++++++++++++++++++
.../planner/plan/stream/sql/join/JoinTest.xml | 24 ++++++++
.../plan/batch/sql/agg/AggregateTestBase.scala | 20 ++++++-
.../plan/metadata/FlinkRelMdUniqueKeysTest.scala | 14 +++++
.../planner/plan/metadata/MetadataTestUtil.scala | 33 ++++++++++
.../planner/plan/stream/sql/join/JoinTest.scala | 15 +++++
8 files changed, 255 insertions(+), 12 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
index a9b9745..7e632ad 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -66,23 +66,27 @@ class FlinkRelMdUniqueKeys private extends
MetadataHandler[BuiltInMetadata.Uniqu
val catalogTable = sourceTable.catalogTable
catalogTable match {
case act: CatalogTable =>
+ val builder = ImmutableSet.builder[ImmutableBitSet]()
+
val schema = act.getSchema
if (schema.getPrimaryKey.isPresent) {
// use relOptTable's type which may be projected based on
original schema
val columns = relOptTable.getRowType.getFieldNames
- val columnIndices = schema.getPrimaryKey.get().getColumns map {
c =>
- columns.indexOf(c)
- }
- val builder = ImmutableSet.builder[ImmutableBitSet]()
- builder.add(ImmutableBitSet.of(columnIndices:_*))
- val uniqueSet = sourceTable.uniqueKeysSet().orElse(null)
- if (uniqueSet != null) {
- builder.addAll(uniqueSet)
+ val primaryKeyColumns = schema.getPrimaryKey.get().getColumns
+ // we check this because a portion of a composite primary key is
not unique
+ if (columns.containsAll(primaryKeyColumns)) {
+ val columnIndices = primaryKeyColumns.map(c =>
columns.indexOf(c))
+ builder.add(ImmutableBitSet.of(columnIndices: _*))
}
- builder.build()
- } else {
- sourceTable.uniqueKeysSet.orElse(null)
}
+
+ val uniqueSet = sourceTable.uniqueKeysSet.orElse(null)
+ if (uniqueSet != null) {
+ builder.addAll(uniqueSet)
+ }
+
+ val result = builder.build()
+ if (result.isEmpty) null else result
}
case table: FlinkPreparingTableBase => table.uniqueKeysSet.orElse(null)
case _ => null
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
index a9daeed..e7c41d3 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
@@ -1071,4 +1071,69 @@ HashAggregate(isMerge=[true], select=[Final_SUM(sum$0)
AS EXPR$0, Final_SUM(sum$
]]>
</Resource>
</TestCase>
+ <TestCase
name="testReduceGroupingOnTableWithCompositePrimaryKey[aggStrategy=AUTO]">
+ <Resource name="sql">
+ <![CDATA[SELECT SUM(val) FROM tableWithCompositePk GROUP BY grp1, grp2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+ +- LogicalProject(grp1=[$3], grp2=[$4], val=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
tableWithCompositePk]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- HashAggregate(isMerge=[true], groupBy=[grp1, grp2], select=[grp1, grp2,
Final_SUM(sum$0) AS EXPR$0])
+ +- Exchange(distribution=[hash[grp1, grp2]])
+ +- LocalHashAggregate(groupBy=[grp1, grp2], select=[grp1, grp2,
Partial_SUM(val) AS sum$0])
+ +- TableSourceScan(table=[[default_catalog, default_database,
tableWithCompositePk, project=[grp1, grp2, val]]], fields=[grp1, grp2, val])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testReduceGroupingOnTableWithCompositePrimaryKey[aggStrategy=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT SUM(val) FROM tableWithCompositePk GROUP BY grp1, grp2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+ +- LogicalProject(grp1=[$3], grp2=[$4], val=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
tableWithCompositePk]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- HashAggregate(isMerge=[true], groupBy=[grp1, grp2], select=[grp1, grp2,
Final_SUM(sum$0) AS EXPR$0])
+ +- Exchange(distribution=[hash[grp1, grp2]])
+ +- LocalHashAggregate(groupBy=[grp1, grp2], select=[grp1, grp2,
Partial_SUM(val) AS sum$0])
+ +- TableSourceScan(table=[[default_catalog, default_database,
tableWithCompositePk, project=[grp1, grp2, val]]], fields=[grp1, grp2, val])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testReduceGroupingOnTableWithCompositePrimaryKey[aggStrategy=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT SUM(val) FROM tableWithCompositePk GROUP BY grp1, grp2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+ +- LogicalProject(grp1=[$3], grp2=[$4], val=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
tableWithCompositePk]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- HashAggregate(isMerge=[false], groupBy=[grp1, grp2], select=[grp1, grp2,
SUM(val) AS EXPR$0])
+ +- Exchange(distribution=[hash[grp1, grp2]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
tableWithCompositePk, project=[grp1, grp2, val]]], fields=[grp1, grp2, val])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
index 54034a0..fd2d01b 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
@@ -1281,4 +1281,74 @@ SortAggregate(isMerge=[true], select=[Final_SUM(sum$0)
AS EXPR$0, Final_SUM(sum$
]]>
</Resource>
</TestCase>
+ <TestCase
name="testReduceGroupingOnTableWithCompositePrimaryKey[aggStrategy=AUTO]">
+ <Resource name="sql">
+ <![CDATA[SELECT SUM(val) FROM tableWithCompositePk GROUP BY grp1, grp2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+ +- LogicalProject(grp1=[$3], grp2=[$4], val=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
tableWithCompositePk]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- SortAggregate(isMerge=[true], groupBy=[grp1, grp2], select=[grp1, grp2,
Final_SUM(sum$0) AS EXPR$0])
+ +- Sort(orderBy=[grp1 ASC, grp2 ASC])
+ +- Exchange(distribution=[hash[grp1, grp2]])
+ +- LocalSortAggregate(groupBy=[grp1, grp2], select=[grp1, grp2,
Partial_SUM(val) AS sum$0])
+ +- Sort(orderBy=[grp1 ASC, grp2 ASC])
+ +- TableSourceScan(table=[[default_catalog, default_database,
tableWithCompositePk, project=[grp1, grp2, val]]], fields=[grp1, grp2, val])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testReduceGroupingOnTableWithCompositePrimaryKey[aggStrategy=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT SUM(val) FROM tableWithCompositePk GROUP BY grp1, grp2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+ +- LogicalProject(grp1=[$3], grp2=[$4], val=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
tableWithCompositePk]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- SortAggregate(isMerge=[true], groupBy=[grp1, grp2], select=[grp1, grp2,
Final_SUM(sum$0) AS EXPR$0])
+ +- Sort(orderBy=[grp1 ASC, grp2 ASC])
+ +- Exchange(distribution=[hash[grp1, grp2]])
+ +- LocalSortAggregate(groupBy=[grp1, grp2], select=[grp1, grp2,
Partial_SUM(val) AS sum$0])
+ +- Sort(orderBy=[grp1 ASC, grp2 ASC])
+ +- TableSourceScan(table=[[default_catalog, default_database,
tableWithCompositePk, project=[grp1, grp2, val]]], fields=[grp1, grp2, val])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testReduceGroupingOnTableWithCompositePrimaryKey[aggStrategy=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT SUM(val) FROM tableWithCompositePk GROUP BY grp1, grp2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+ +- LogicalProject(grp1=[$3], grp2=[$4], val=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
tableWithCompositePk]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- SortAggregate(isMerge=[false], groupBy=[grp1, grp2], select=[grp1, grp2,
SUM(val) AS EXPR$0])
+ +- Sort(orderBy=[grp1 ASC, grp2 ASC])
+ +- Exchange(distribution=[hash[grp1, grp2]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
tableWithCompositePk, project=[grp1, grp2, val]]], fields=[grp1, grp2, val])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
index d484733..cf0b4ca 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
@@ -398,6 +398,30 @@ Calc(select=[a1, a2, b1, b2], changelogMode=[I,UB,UA])
]]>
</Resource>
</TestCase>
+ <TestCase name="testJoinAndSelectOnPartialCompositePrimaryKey">
+ <Resource name="sql">
+ <![CDATA[SELECT A.a1 FROM A LEFT JOIN tableWithCompositePk T ON A.a1 =
T.pk1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0])
++- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, A, source:
[TestTableSource(a1, a2, a3)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
tableWithCompositePk]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a1])
++- Join(joinType=[LeftOuterJoin], where=[=(a1, pk1)], select=[a1, pk1],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1]])
+ : +- Calc(select=[a1])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
A, source: [TestTableSource(a1, a2, a3)]]], fields=[a1, a2, a3])
+ +- Exchange(distribution=[hash[pk1]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
tableWithCompositePk, project=[pk1]]], fields=[pk1])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testJoinWithSort">
<Resource name="sql">
<![CDATA[
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
index 0f35b8c..cc5bb87 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.plan.batch.sql.agg
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.{TableException, Types}
import
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.{VarSum1AggFunction,
VarSum2AggFunction}
import org.apache.flink.table.planner.utils.{BatchTableTestUtil, TableTestBase}
@@ -209,5 +208,24 @@ abstract class AggregateTestBase extends TableTestBase {
util.verifyPlan(sql)
}
+ @Test
+ def testReduceGroupingOnTableWithCompositePrimaryKey(): Unit = {
+ util.tableEnv.executeSql(
+ """
+ |CREATE TABLE tableWithCompositePk (
+ | pk1 INT,
+ | pk2 BIGINT,
+ | val BIGINT,
+ | grp1 BIGINT,
+ | grp2 BIGINT,
+ | PRIMARY KEY (pk1, pk2) NOT ENFORCED
+ |) WITH (
+ | 'connector'='values',
+ | 'bounded'='true'
+ |)
+ |""".stripMargin)
+ util.verifyPlan("SELECT SUM(val) FROM tableWithCompositePk GROUP BY grp1,
grp2")
+ }
+
// TODO supports group sets
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
index 5e9c565..5252247 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
@@ -57,6 +57,20 @@ class FlinkRelMdUniqueKeysTest extends
FlinkRelMdHandlerTestBase {
}
@Test
+ def testGetUniqueKeysOnProjectedTableScanWithPartialCompositePrimaryKey():
Unit = {
+ val table = relBuilder
+ .getRelOptSchema
+ .asInstanceOf[CalciteCatalogReader]
+ .getTable(Seq("projected_table_source_table_with_partial_pk"))
+ .asInstanceOf[TableSourceTable]
+ val tableSourceScan = new StreamExecTableSourceScan(
+ cluster,
+ streamPhysicalTraits,
+ table)
+ assertNull(mq.getUniqueKeys(tableSourceScan))
+ }
+
+ @Test
def testGetUniqueKeysOnValues(): Unit = {
assertNull(mq.getUniqueKeys(logicalValues))
assertNull(mq.getUniqueKeys(emptyValues))
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
index 9a2e0c7..bcb28d2 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
@@ -56,6 +56,9 @@ object MetadataTestUtil {
rootSchema.add("TemporalTable2", createTemporalTable2())
rootSchema.add("TemporalTable3", createTemporalTable3())
rootSchema.add("projected_table_source_table",
createProjectedTableSourceTable())
+ rootSchema.add(
+ "projected_table_source_table_with_partial_pk",
+ createProjectedTableSourceTableWithPartialCompositePrimaryKey())
rootSchema
}
@@ -268,6 +271,36 @@ object MetadataTestUtil {
Array("project=[a, c, d]"))
}
+ private def createProjectedTableSourceTableWithPartialCompositePrimaryKey():
Table = {
+ val catalogTable = CatalogTableImpl.fromProperties(
+ Map(
+ "connector" -> "values",
+ "bounded" -> "true",
+ "schema.0.name" -> "a",
+ "schema.0.data-type" -> "BIGINT NOT NULL",
+ "schema.1.name" -> "b",
+ "schema.1.data-type" -> "BIGINT NOT NULL",
+ "schema.primary-key.name" -> "PK_1",
+ "schema.primary-key.columns" -> "a,b")
+ )
+
+ val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
+ val rowType = typeFactory.buildRelNodeRowType(
+ Seq("a"),
+ Seq(new BigIntType(false)))
+
+ new MockTableSourceTable(
+ ObjectIdentifier.of(
+ "default_catalog",
+ "default_database",
+ "projected_table_source_table_with_partial_pk"),
+ rowType,
+ new TestTableSource(),
+ true,
+ catalogTable,
+ Array("project=[a]"))
+ }
+
private def getMetadataTable(
tableSchema: TableSchema,
statistic: FlinkStatistic,
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
index a260349..2be8938d 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
@@ -284,4 +284,19 @@ class JoinTest extends TableTestBase {
def testRightOuterJoinEquiAndNonEquiPred(): Unit = {
util.verifyPlan("SELECT b, y FROM t RIGHT OUTER JOIN s ON a = z AND b < x")
}
+
+ @Test
+ def testJoinAndSelectOnPartialCompositePrimaryKey(): Unit = {
+ util.tableEnv.executeSql(
+ """
+ |CREATE TABLE tableWithCompositePk (
+ | pk1 INT,
+ | pk2 BIGINT,
+ | PRIMARY KEY (pk1, pk2) NOT ENFORCED
+ |) WITH (
+ | 'connector'='values'
+ |)
+ |""".stripMargin)
+ util.verifyPlan("SELECT A.a1 FROM A LEFT JOIN tableWithCompositePk T ON
A.a1 = T.pk1")
+ }
}