This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new f6a617b  [FLINK-26460][table-planner] Fix Unsupported type when 
convertTypeToSpec: MAP
f6a617b is described below

commit f6a617b50f618af8f5284acca64d61328b4a7448
Author: Tartarus0zm <[email protected]>
AuthorDate: Fri Apr 1 11:33:35 2022 +0800

    [FLINK-26460][table-planner] Fix Unsupported type when convertTypeToSpec: 
MAP
    
    (cherry picked from commit 33e7c84fb0f6aadf9d228c41c0ba6808634a7e36)
    
    This closes #1896
---
 .../planner/calcite/PreValidateReWriter.scala      | 22 ++++++++++---
 .../planner/plan/stream/sql/TableSinkTest.xml      | 16 ++++++++++
 .../planner/plan/stream/sql/TableSinkTest.scala    | 21 +++++++++++++
 .../runtime/stream/sql/TableSinkITCase.scala       | 36 ++++++++++++++++++++++
 4 files changed, 91 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 39c0aa5..3d930b0 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.calcite
 import org.apache.flink.sql.parser.SqlProperty
 import org.apache.flink.sql.parser.dml.RichSqlInsert
 import org.apache.flink.sql.parser.dql.SqlRichExplain
+import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
 import org.apache.flink.table.api.ValidationException
 import 
org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects,
 notSupported}
 import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable, 
FlinkPreparingTableBase, LegacyCatalogSourceTable}
@@ -30,12 +31,12 @@ import org.apache.calcite.plan.RelOptTable
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, 
RelDataTypeField}
 import org.apache.calcite.runtime.{CalciteContextException, Resources}
-import org.apache.calcite.sql.`type`.SqlTypeUtil
+import org.apache.calcite.sql.`type`.{SqlTypeName, SqlTypeUtil}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.sql.util.SqlBasicVisitor
 import org.apache.calcite.sql.validate.{SqlValidatorException, 
SqlValidatorTable, SqlValidatorUtil}
-import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlLiteral, 
SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
+import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, 
SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
 import org.apache.calcite.util.Static.RESOURCE
 
 import java.util
@@ -391,8 +392,21 @@ object PreValidateReWriter {
       == desiredType)) {
       node
     } else {
-      SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO,
-        node, SqlTypeUtil.convertTypeToSpec(desiredType))
+      // See FLINK-26460 for more details
+      val sqlDataTypeSpec =
+        if (SqlTypeUtil.isNull(currentType) && SqlTypeUtil.isMap(desiredType)) 
{
+          val keyType = desiredType.getKeyType
+          val valueType = desiredType.getValueType
+          new SqlDataTypeSpec(
+            new SqlMapTypeNameSpec(
+              
SqlTypeUtil.convertTypeToSpec(keyType).withNullable(keyType.isNullable),
+              
SqlTypeUtil.convertTypeToSpec(valueType).withNullable(valueType.isNullable),
+              SqlParserPos.ZERO),
+            SqlParserPos.ZERO)
+        } else {
+          SqlTypeUtil.convertTypeToSpec(desiredType)
+        }
+      SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO, node, 
sqlDataTypeSpec)
     }
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index b7dea9f..78d2822 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -748,4 +748,20 @@ 
Sink(table=[default_catalog.default_database.SinkJoinChangeLog], fields=[person,
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testInsertPartColumn">
+       <Resource name="ast">
+         <![CDATA[
+LogicalSink(table=[default_catalog.default_database.zm_test], fields=[a, m1, 
m2, m3, m4])
++- LogicalProject(a=[CAST($0):BIGINT], m1=[null:(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE", BIGINT) MAP], m2=[null:(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE", BIGINT) MAP], m3=[null:(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE", BIGINT) MAP], m4=[null:(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE", BIGINT) MAP])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+       </Resource>
+       <Resource name="optimized rel plan">
+         <![CDATA[
+Sink(table=[default_catalog.default_database.zm_test], fields=[a, m1, m2, m3, 
m4], changelogMode=[NONE])
++- Calc(select=[CAST(a AS BIGINT) AS a, null:(VARCHAR(2147483647), BIGINT) MAP 
AS m1, null:(VARCHAR(2147483647), BIGINT) MAP AS m2, null:(VARCHAR(2147483647), 
BIGINT) MAP AS m3, null:(VARCHAR(2147483647), BIGINT) MAP AS m4], 
changelogMode=[I])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], changelogMode=[I])
+]]>
+       </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index f760fb5..e8bfaee 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -771,6 +771,27 @@ class TableSinkTest extends TableTestBase {
 
     util.verifyAstPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
   }
+
+  @Test
+  def testInsertPartColumn(): Unit = {
+    util.addTable(
+      s"""
+         |CREATE TABLE zm_test (
+         |  `a` BIGINT,
+         |  `m1` MAP<STRING, BIGINT>,
+         |  `m2` MAP<STRING NOT NULL, BIGINT>,
+         |  `m3` MAP<STRING, BIGINT NOT NULL>,
+         |  `m4` MAP<STRING NOT NULL, BIGINT NOT NULL>
+         |) WITH (
+         |  'connector' = 'values',
+         |  'sink-insert-only' = 'true'
+         |)
+         |""".stripMargin)
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(
+      "INSERT INTO zm_test(`a`) SELECT `a` FROM MyTable")
+    util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
+  }
 }
 
 /** tests table factory use ParallelSourceFunction which support parallelism 
by env*/
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
index 20b61a3..a056873 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
@@ -154,4 +154,40 @@ class TableSinkITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     val expected = List("+I[jason, 4]")
     assertEquals(expected.sorted, result.sorted)
   }
+
+  @Test
+  def testInsertPartColumn(): Unit = {
+    tEnv.executeSql(
+      """
+        |CREATE TABLE zm_test (
+        |  `person` String,
+        |  `votes` BIGINT,
+        |  `m1` MAP<STRING, BIGINT>,
+        |  `m2` MAP<STRING NOT NULL, BIGINT>,
+        |  `m3` MAP<STRING, BIGINT NOT NULL>,
+        |  `m4` MAP<STRING NOT NULL, BIGINT NOT NULL>
+        |) WITH (
+        |  'connector' = 'values',
+        |  'sink-insert-only' = 'true'
+        |)
+        |""".stripMargin)
+
+    tEnv.executeSql(
+      """
+        |insert into zm_test(`person`, `votes`)
+        |  select
+        |    `person`,
+        |    `votes`
+        |  from
+        |    src
+        |""".stripMargin).await()
+
+    val result = TestValuesTableFactory.getResults("zm_test")
+    val expected = List(
+      "+I[jason, 1, null, null, null, null]",
+      "+I[jason, 1, null, null, null, null]",
+      "+I[jason, 1, null, null, null, null]",
+      "+I[jason, 1, null, null, null, null]")
+    assertEquals(expected.sorted, result.sorted)
+  }
 }

Reply via email to