This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 865a05678a0 [FLINK-30922][table-planner] Fix errors when reference 
metadata columns in column list of a partial-insert statement
865a05678a0 is described below

commit 865a05678a0cee4a3d013ddc8685d9420cc59b12
Author: Shuiqiang Chen <[email protected]>
AuthorDate: Tue Mar 7 09:26:03 2023 +0800

    [FLINK-30922][table-planner] Fix errors when reference metadata columns in 
column list of a partial-insert statement
    
    This closes #22109
---
 .../apache/flink/table/utils/TableSchemaUtils.java | 20 +++++++++-
 .../table/planner/calcite/FlinkTypeFactory.scala   | 12 ++++++
 .../planner/calcite/PreValidateReWriter.scala      |  4 +-
 .../planner/plan/common/PartialInsertTest.xml      | 38 ++++++++++++++++++
 .../planner/plan/common/PartialInsertTest.scala    | 45 ++++++++++++++++++++++
 5 files changed, 116 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
index c23c68664f3..90aa855e17d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
@@ -32,6 +32,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /** Utilities to {@link TableSchema}. */
@@ -48,13 +49,30 @@ public class TableSchemaUtils {
      * additional columns.
      */
     public static TableSchema getPhysicalSchema(TableSchema tableSchema) {
+        return getTableSchema(tableSchema, TableColumn::isPhysical);
+    }
+
+    /**
+     * Return {@link TableSchema} which consists of all persisted columns. 
That means, the virtual
+     * computed columns and metadata columns are filtered out.
+     *
+     * <p>Its difference from {@link 
TableSchemaUtils#getPhysicalSchema(TableSchema)} is that it
+     * includes of all physical columns and metadata columns without virtual 
keyword.
+     */
+    public static TableSchema getPersistedSchema(TableSchema tableSchema) {
+        return getTableSchema(tableSchema, TableColumn::isPersisted);
+    }
+
+    /** Build a {@link TableSchema} with columns filtered by a given 
columnFilter. */
+    private static TableSchema getTableSchema(
+            TableSchema tableSchema, Function<TableColumn, Boolean> 
columnFilter) {
         Preconditions.checkNotNull(tableSchema);
         TableSchema.Builder builder = new TableSchema.Builder();
         tableSchema
                 .getTableColumns()
                 .forEach(
                         tableColumn -> {
-                            if (tableColumn.isPhysical()) {
+                            if (columnFilter.apply(tableColumn)) {
                                 builder.field(tableColumn.getName(), 
tableColumn.getType());
                             }
                         });
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
index 20f70c089e2..e7cfb468a60 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
@@ -284,6 +284,18 @@ class FlinkTypeFactory(
     buildRelNodeRowType(TableSchemaUtils.getPhysicalSchema(tableSchema))
   }
 
+  /**
+   * Creats a struct type with the persisted columns using FlinkTypeFactory
+   *
+   * @param tableSchema
+   *   schema to convert to Calcite's specific one
+   * @return
+   *   a struct type with the input fieldsNames, input fieldTypes.
+   */
+  def buildPersistedRelNodeRowType(tableSchema: TableSchema): RelDataType = {
+    buildRelNodeRowType(TableSchemaUtils.getPersistedSchema(tableSchema))
+  }
+
   /**
    * Creates a struct type with the input fieldNames and input fieldTypes 
using FlinkTypeFactory.
    *
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 86e922b568d..ea53f092469 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -371,10 +371,10 @@ object PreValidateReWriter {
     table.unwrap(classOf[FlinkPreparingTableBase]) match {
       case t: CatalogSourceTable =>
         val schema = t.getCatalogTable.getSchema
-        
typeFactory.asInstanceOf[FlinkTypeFactory].buildPhysicalRelNodeRowType(schema)
+        
typeFactory.asInstanceOf[FlinkTypeFactory].buildPersistedRelNodeRowType(schema)
       case t: LegacyCatalogSourceTable[_] =>
         val schema = t.catalogTable.getSchema
-        
typeFactory.asInstanceOf[FlinkTypeFactory].buildPhysicalRelNodeRowType(schema)
+        
typeFactory.asInstanceOf[FlinkTypeFactory].buildPersistedRelNodeRowType(schema)
       case _ =>
         table.getRowType
     }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
index 1ede5abe839..4bbe9087b92 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
@@ -536,4 +536,42 @@ 
Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d,
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testPartialInsertWithPersistedMetadata[isBatch: true]">
+    <Resource name="sql">
+         <![CDATA[INSERT INTO metadata_sink (a,b,c,d,e,f) SELECT a,b,c,d,e,123 
FROM MyTable]]>
+       </Resource>
+    <Resource name="ast">
+         <![CDATA[
+LogicalSink(table=[default_catalog.default_database.metadata_sink], fields=[a, 
b, c, d, e, f])
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
f=[CAST(123:BIGINT):BIGINT])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+         <![CDATA[
+Sink(table=[default_catalog.default_database.metadata_sink], fields=[a, b, c, 
d, e, f])
++- Calc(select=[a, b, c, d, e, CAST(123 AS BIGINT) AS f])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPartialInsertWithPersistedMetadata[isBatch: false]">
+       <Resource name="sql">
+         <![CDATA[INSERT INTO metadata_sink (a,b,c,d,e,f) SELECT a,b,c,d,e,123 
FROM MyTable]]>
+       </Resource>
+       <Resource name="ast">
+         <![CDATA[
+LogicalSink(table=[default_catalog.default_database.metadata_sink], fields=[a, 
b, c, d, e, f])
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
f=[CAST(123:BIGINT):BIGINT])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+       </Resource>
+       <Resource name="optimized rel plan">
+         <![CDATA[
+Sink(table=[default_catalog.default_database.metadata_sink], fields=[a, b, c, 
d, e, f])
++- Calc(select=[a, b, c, d, e, CAST(123 AS BIGINT) AS f])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+       </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
index e9b8aea3207..a6f3edcfd5b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
@@ -59,6 +59,21 @@ class PartialInsertTest(isBatch: Boolean) extends 
TableTestBase {
                               |)
                               |""".stripMargin)
 
+  util.tableEnv.executeSql(s"""create table metadata_sink (
+                              |  `a` INT,
+                              |  `b` STRING,
+                              |  `c` STRING,
+                              |  `d` STRING,
+                              |  `e` DOUBLE,
+                              |  `f` BIGINT METADATA,
+                              |  `g` INT METADATA VIRTUAL,
+                              |  `h` AS `a` + 1
+                              |) with (
+                              |  'connector' = 'values',
+                              |  'sink-insert-only' = 'false',
+                              |  'writable-metadata' = 'f:BIGINT, g:INT'
+                              |)""".stripMargin)
+
   @Test
   def testPartialInsertWithComplexReorder(): Unit = {
     util.verifyRelPlanInsert(
@@ -118,6 +133,36 @@ class PartialInsertTest(isBatch: Boolean) extends 
TableTestBase {
       "INSERT INTO partitioned_sink (e,a,g,f,c,d) " +
         "SELECT e,a,456,123,c,d FROM MyTable ORDER BY a,e,c,d")
   }
+
+  @Test
+  def testPartialInsertWithPersistedMetadata(): Unit = {
+    util.verifyRelPlanInsert(
+      "INSERT INTO metadata_sink (a,b,c,d,e,f) " +
+        "SELECT a,b,c,d,e,123 FROM MyTable"
+    )
+  }
+
+  @Test
+  def testPartialInsertWithVirtualMetaDataColumn(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage(
+      "SQL validation failed. At line 1, column 38: Unknown target column 'g'")
+    util.verifyRelPlanInsert(
+      "INSERT INTO metadata_sink (a,b,c,d,e,g) " +
+        "SELECT a,b,c,d,e,123 FROM MyTable"
+    )
+  }
+
+  @Test
+  def testPartialInsertWithComputedColumn(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage(
+      "SQL validation failed. At line 1, column 38: Unknown target column 'h'")
+    util.verifyRelPlanInsert(
+      "INSERT INTO metadata_sink (a,b,c,d,e,h) " +
+        "SELECT a,b,c,d,e,123 FROM MyTable"
+    )
+  }
 }
 
 object PartialInsertTest {

Reply via email to