This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new f6a617b [FLINK-26460][table-planner] Fix Unsupported type when
convertTypeToSpec: MAP
f6a617b is described below
commit f6a617b50f618af8f5284acca64d61328b4a7448
Author: Tartarus0zm <[email protected]>
AuthorDate: Fri Apr 1 11:33:35 2022 +0800
[FLINK-26460][table-planner] Fix Unsupported type when convertTypeToSpec:
MAP
(cherry picked from commit 33e7c84fb0f6aadf9d228c41c0ba6808634a7e36)
This closes #1896
---
.../planner/calcite/PreValidateReWriter.scala | 22 ++++++++++---
.../planner/plan/stream/sql/TableSinkTest.xml | 16 ++++++++++
.../planner/plan/stream/sql/TableSinkTest.scala | 21 +++++++++++++
.../runtime/stream/sql/TableSinkITCase.scala | 36 ++++++++++++++++++++++
4 files changed, 91 insertions(+), 4 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 39c0aa5..3d930b0 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.calcite
import org.apache.flink.sql.parser.SqlProperty
import org.apache.flink.sql.parser.dml.RichSqlInsert
import org.apache.flink.sql.parser.dql.SqlRichExplain
+import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
import org.apache.flink.table.api.ValidationException
import
org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects,
notSupported}
import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable,
FlinkPreparingTableBase, LegacyCatalogSourceTable}
@@ -30,12 +31,12 @@ import org.apache.calcite.plan.RelOptTable
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory,
RelDataTypeField}
import org.apache.calcite.runtime.{CalciteContextException, Resources}
-import org.apache.calcite.sql.`type`.SqlTypeUtil
+import org.apache.calcite.sql.`type`.{SqlTypeName, SqlTypeUtil}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.util.SqlBasicVisitor
import org.apache.calcite.sql.validate.{SqlValidatorException,
SqlValidatorTable, SqlValidatorUtil}
-import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlLiteral,
SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
+import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier,
SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
import org.apache.calcite.util.Static.RESOURCE
import java.util
@@ -391,8 +392,21 @@ object PreValidateReWriter {
== desiredType)) {
node
} else {
- SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO,
- node, SqlTypeUtil.convertTypeToSpec(desiredType))
+ // See FLINK-26460 for more details
+ val sqlDataTypeSpec =
+ if (SqlTypeUtil.isNull(currentType) && SqlTypeUtil.isMap(desiredType))
{
+ val keyType = desiredType.getKeyType
+ val valueType = desiredType.getValueType
+ new SqlDataTypeSpec(
+ new SqlMapTypeNameSpec(
+
SqlTypeUtil.convertTypeToSpec(keyType).withNullable(keyType.isNullable),
+
SqlTypeUtil.convertTypeToSpec(valueType).withNullable(valueType.isNullable),
+ SqlParserPos.ZERO),
+ SqlParserPos.ZERO)
+ } else {
+ SqlTypeUtil.convertTypeToSpec(desiredType)
+ }
+ SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO, node,
sqlDataTypeSpec)
}
}
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index b7dea9f..78d2822 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -748,4 +748,20 @@
Sink(table=[default_catalog.default_database.SinkJoinChangeLog], fields=[person,
]]>
</Resource>
</TestCase>
+ <TestCase name="testInsertPartColumn">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.zm_test], fields=[a, m1,
m2, m3, m4])
++- LogicalProject(a=[CAST($0):BIGINT], m1=[null:(VARCHAR(2147483647) CHARACTER
SET "UTF-16LE", BIGINT) MAP], m2=[null:(VARCHAR(2147483647) CHARACTER SET
"UTF-16LE", BIGINT) MAP], m3=[null:(VARCHAR(2147483647) CHARACTER SET
"UTF-16LE", BIGINT) MAP], m4=[null:(VARCHAR(2147483647) CHARACTER SET
"UTF-16LE", BIGINT) MAP])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.zm_test], fields=[a, m1, m2, m3,
m4], changelogMode=[NONE])
++- Calc(select=[CAST(a AS BIGINT) AS a, null:(VARCHAR(2147483647), BIGINT) MAP
AS m1, null:(VARCHAR(2147483647), BIGINT) MAP AS m2, null:(VARCHAR(2147483647),
BIGINT) MAP AS m3, null:(VARCHAR(2147483647), BIGINT) MAP AS m4],
changelogMode=[I])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], changelogMode=[I])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index f760fb5..e8bfaee 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -771,6 +771,27 @@ class TableSinkTest extends TableTestBase {
util.verifyAstPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
}
+
+ @Test
+ def testInsertPartColumn(): Unit = {
+ util.addTable(
+ s"""
+ |CREATE TABLE zm_test (
+ | `a` BIGINT,
+ | `m1` MAP<STRING, BIGINT>,
+ | `m2` MAP<STRING NOT NULL, BIGINT>,
+ | `m3` MAP<STRING, BIGINT NOT NULL>,
+ | `m4` MAP<STRING NOT NULL, BIGINT NOT NULL>
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'true'
+ |)
+ |""".stripMargin)
+ val stmtSet = util.tableEnv.createStatementSet()
+ stmtSet.addInsertSql(
+ "INSERT INTO zm_test(`a`) SELECT `a` FROM MyTable")
+ util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
+ }
}
/** tests table factory use ParallelSourceFunction which support parallelism
by env*/
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
index 20b61a3..a056873 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
@@ -154,4 +154,40 @@ class TableSinkITCase(mode: StateBackendMode) extends
StreamingWithStateTestBase
val expected = List("+I[jason, 4]")
assertEquals(expected.sorted, result.sorted)
}
+
+ @Test
+ def testInsertPartColumn(): Unit = {
+ tEnv.executeSql(
+ """
+ |CREATE TABLE zm_test (
+ | `person` String,
+ | `votes` BIGINT,
+ | `m1` MAP<STRING, BIGINT>,
+ | `m2` MAP<STRING NOT NULL, BIGINT>,
+ | `m3` MAP<STRING, BIGINT NOT NULL>,
+ | `m4` MAP<STRING NOT NULL, BIGINT NOT NULL>
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'true'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(
+ """
+ |insert into zm_test(`person`, `votes`)
+ | select
+ | `person`,
+ | `votes`
+ | from
+ | src
+ |""".stripMargin).await()
+
+ val result = TestValuesTableFactory.getResults("zm_test")
+ val expected = List(
+ "+I[jason, 1, null, null, null, null]",
+ "+I[jason, 1, null, null, null, null]",
+ "+I[jason, 1, null, null, null, null]",
+ "+I[jason, 1, null, null, null, null]")
+ assertEquals(expected.sorted, result.sorted)
+ }
}