This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 865a05678a0 [FLINK-30922][table-planner] Fix errors when reference
metadata columns in column list of a partial-insert statement
865a05678a0 is described below
commit 865a05678a0cee4a3d013ddc8685d9420cc59b12
Author: Shuiqiang Chen <[email protected]>
AuthorDate: Tue Mar 7 09:26:03 2023 +0800
[FLINK-30922][table-planner] Fix errors when reference metadata columns in
column list of a partial-insert statement
This closes #22109
---
.../apache/flink/table/utils/TableSchemaUtils.java | 20 +++++++++-
.../table/planner/calcite/FlinkTypeFactory.scala | 12 ++++++
.../planner/calcite/PreValidateReWriter.scala | 4 +-
.../planner/plan/common/PartialInsertTest.xml | 38 ++++++++++++++++++
.../planner/plan/common/PartialInsertTest.scala | 45 ++++++++++++++++++++++
5 files changed, 116 insertions(+), 3 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
index c23c68664f3..90aa855e17d 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
@@ -32,6 +32,7 @@ import org.apache.flink.util.Preconditions;
import java.util.List;
import java.util.Optional;
+import java.util.function.Function;
import java.util.stream.Collectors;
/** Utilities to {@link TableSchema}. */
@@ -48,13 +49,30 @@ public class TableSchemaUtils {
* additional columns.
*/
public static TableSchema getPhysicalSchema(TableSchema tableSchema) {
+ return getTableSchema(tableSchema, TableColumn::isPhysical);
+ }
+
+ /**
+ * Return {@link TableSchema} which consists of all persisted columns.
That means, the virtual
+ * computed columns and metadata columns are filtered out.
+ *
+ * <p>Its difference from {@link
TableSchemaUtils#getPhysicalSchema(TableSchema)} is that it
+ * includes of all physical columns and metadata columns without virtual
keyword.
+ */
+ public static TableSchema getPersistedSchema(TableSchema tableSchema) {
+ return getTableSchema(tableSchema, TableColumn::isPersisted);
+ }
+
+ /** Build a {@link TableSchema} with columns filtered by a given
columnFilter. */
+ private static TableSchema getTableSchema(
+ TableSchema tableSchema, Function<TableColumn, Boolean>
columnFilter) {
Preconditions.checkNotNull(tableSchema);
TableSchema.Builder builder = new TableSchema.Builder();
tableSchema
.getTableColumns()
.forEach(
tableColumn -> {
- if (tableColumn.isPhysical()) {
+ if (columnFilter.apply(tableColumn)) {
builder.field(tableColumn.getName(),
tableColumn.getType());
}
});
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
index 20f70c089e2..e7cfb468a60 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
@@ -284,6 +284,18 @@ class FlinkTypeFactory(
buildRelNodeRowType(TableSchemaUtils.getPhysicalSchema(tableSchema))
}
+ /**
+ * Creats a struct type with the persisted columns using FlinkTypeFactory
+ *
+ * @param tableSchema
+ * schema to convert to Calcite's specific one
+ * @return
+ * a struct type with the input fieldsNames, input fieldTypes.
+ */
+ def buildPersistedRelNodeRowType(tableSchema: TableSchema): RelDataType = {
+ buildRelNodeRowType(TableSchemaUtils.getPersistedSchema(tableSchema))
+ }
+
/**
* Creates a struct type with the input fieldNames and input fieldTypes
using FlinkTypeFactory.
*
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 86e922b568d..ea53f092469 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -371,10 +371,10 @@ object PreValidateReWriter {
table.unwrap(classOf[FlinkPreparingTableBase]) match {
case t: CatalogSourceTable =>
val schema = t.getCatalogTable.getSchema
-
typeFactory.asInstanceOf[FlinkTypeFactory].buildPhysicalRelNodeRowType(schema)
+
typeFactory.asInstanceOf[FlinkTypeFactory].buildPersistedRelNodeRowType(schema)
case t: LegacyCatalogSourceTable[_] =>
val schema = t.catalogTable.getSchema
-
typeFactory.asInstanceOf[FlinkTypeFactory].buildPhysicalRelNodeRowType(schema)
+
typeFactory.asInstanceOf[FlinkTypeFactory].buildPersistedRelNodeRowType(schema)
case _ =>
table.getRowType
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
index 1ede5abe839..4bbe9087b92 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
@@ -536,4 +536,42 @@
Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d,
]]>
</Resource>
</TestCase>
+ <TestCase name="testPartialInsertWithPersistedMetadata[isBatch: true]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO metadata_sink (a,b,c,d,e,f) SELECT a,b,c,d,e,123
FROM MyTable]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.metadata_sink], fields=[a,
b, c, d, e, f])
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
f=[CAST(123:BIGINT):BIGINT])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.metadata_sink], fields=[a, b, c,
d, e, f])
++- Calc(select=[a, b, c, d, e, CAST(123 AS BIGINT) AS f])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPartialInsertWithPersistedMetadata[isBatch: false]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO metadata_sink (a,b,c,d,e,f) SELECT a,b,c,d,e,123
FROM MyTable]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.metadata_sink], fields=[a,
b, c, d, e, f])
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
f=[CAST(123:BIGINT):BIGINT])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.metadata_sink], fields=[a, b, c,
d, e, f])
++- Calc(select=[a, b, c, d, e, CAST(123 AS BIGINT) AS f])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
index e9b8aea3207..a6f3edcfd5b 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
@@ -59,6 +59,21 @@ class PartialInsertTest(isBatch: Boolean) extends
TableTestBase {
|)
|""".stripMargin)
+ util.tableEnv.executeSql(s"""create table metadata_sink (
+ | `a` INT,
+ | `b` STRING,
+ | `c` STRING,
+ | `d` STRING,
+ | `e` DOUBLE,
+ | `f` BIGINT METADATA,
+ | `g` INT METADATA VIRTUAL,
+ | `h` AS `a` + 1
+ |) with (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false',
+ | 'writable-metadata' = 'f:BIGINT, g:INT'
+ |)""".stripMargin)
+
@Test
def testPartialInsertWithComplexReorder(): Unit = {
util.verifyRelPlanInsert(
@@ -118,6 +133,36 @@ class PartialInsertTest(isBatch: Boolean) extends
TableTestBase {
"INSERT INTO partitioned_sink (e,a,g,f,c,d) " +
"SELECT e,a,456,123,c,d FROM MyTable ORDER BY a,e,c,d")
}
+
+ @Test
+ def testPartialInsertWithPersistedMetadata(): Unit = {
+ util.verifyRelPlanInsert(
+ "INSERT INTO metadata_sink (a,b,c,d,e,f) " +
+ "SELECT a,b,c,d,e,123 FROM MyTable"
+ )
+ }
+
+ @Test
+ def testPartialInsertWithVirtualMetaDataColumn(): Unit = {
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage(
+ "SQL validation failed. At line 1, column 38: Unknown target column 'g'")
+ util.verifyRelPlanInsert(
+ "INSERT INTO metadata_sink (a,b,c,d,e,g) " +
+ "SELECT a,b,c,d,e,123 FROM MyTable"
+ )
+ }
+
+ @Test
+ def testPartialInsertWithComputedColumn(): Unit = {
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage(
+ "SQL validation failed. At line 1, column 38: Unknown target column 'h'")
+ util.verifyRelPlanInsert(
+ "INSERT INTO metadata_sink (a,b,c,d,e,h) " +
+ "SELECT a,b,c,d,e,123 FROM MyTable"
+ )
+ }
}
object PartialInsertTest {