This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new f3a8af5 [FLINK-26460][table-planner] Fix Unsupported type when
convertTypeToSpec: MAP
f3a8af5 is described below
commit f3a8af5831b1b39025bc61ff7dc446d9435c197f
Author: Tartarus0zm <[email protected]>
AuthorDate: Fri Nov 12 14:54:57 2021 +0800
[FLINK-26460][table-planner] Fix Unsupported type when convertTypeToSpec:
MAP
(cherry picked from commit 33e7c84fb0f6aadf9d228c41c0ba6808634a7e36)
This closes #18967
---
.../planner/calcite/PreValidateReWriter.scala | 22 ++++++++++---
.../planner/plan/stream/sql/TableSinkTest.xml | 16 ++++++++++
.../planner/plan/stream/sql/TableSinkTest.scala | 21 +++++++++++++
.../runtime/stream/sql/TableSinkITCase.scala | 36 ++++++++++++++++++++++
4 files changed, 91 insertions(+), 4 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 39c0aa5..3d930b0 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.calcite
import org.apache.flink.sql.parser.SqlProperty
import org.apache.flink.sql.parser.dml.RichSqlInsert
import org.apache.flink.sql.parser.dql.SqlRichExplain
+import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
import org.apache.flink.table.api.ValidationException
import
org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects,
notSupported}
import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable,
FlinkPreparingTableBase, LegacyCatalogSourceTable}
@@ -30,12 +31,12 @@ import org.apache.calcite.plan.RelOptTable
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory,
RelDataTypeField}
import org.apache.calcite.runtime.{CalciteContextException, Resources}
-import org.apache.calcite.sql.`type`.SqlTypeUtil
+import org.apache.calcite.sql.`type`.{SqlTypeName, SqlTypeUtil}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.util.SqlBasicVisitor
import org.apache.calcite.sql.validate.{SqlValidatorException,
SqlValidatorTable, SqlValidatorUtil}
-import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlLiteral,
SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
+import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier,
SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
import org.apache.calcite.util.Static.RESOURCE
import java.util
@@ -391,8 +392,21 @@ object PreValidateReWriter {
== desiredType)) {
node
} else {
- SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO,
- node, SqlTypeUtil.convertTypeToSpec(desiredType))
+ // See FLINK-26460 for more details
+ val sqlDataTypeSpec =
+ if (SqlTypeUtil.isNull(currentType) && SqlTypeUtil.isMap(desiredType))
{
+ val keyType = desiredType.getKeyType
+ val valueType = desiredType.getValueType
+ new SqlDataTypeSpec(
+ new SqlMapTypeNameSpec(
+
SqlTypeUtil.convertTypeToSpec(keyType).withNullable(keyType.isNullable),
+
SqlTypeUtil.convertTypeToSpec(valueType).withNullable(valueType.isNullable),
+ SqlParserPos.ZERO),
+ SqlParserPos.ZERO)
+ } else {
+ SqlTypeUtil.convertTypeToSpec(desiredType)
+ }
+ SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO, node,
sqlDataTypeSpec)
}
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index 28c1048..669d1a8 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -32,6 +32,22 @@ Sink(table=[default_catalog.default_database.appendSink],
fields=[EXPR$0, c], ch
]]>
</Resource>
</TestCase>
+ <TestCase name="testInsertPartColumn">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.zm_test], fields=[a, m1,
m2, m3, m4])
++- LogicalProject(a=[CAST($0):BIGINT], m1=[null:(VARCHAR(2147483647) CHARACTER
SET "UTF-16LE", BIGINT) MAP], m2=[null:(VARCHAR(2147483647) CHARACTER SET
"UTF-16LE", BIGINT) MAP], m3=[null:(VARCHAR(2147483647) CHARACTER SET
"UTF-16LE", BIGINT) MAP], m4=[null:(VARCHAR(2147483647) CHARACTER SET
"UTF-16LE", BIGINT) MAP])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.zm_test], fields=[a, m1, m2, m3,
m4], changelogMode=[NONE])
++- Calc(select=[CAST(a) AS a, null:(VARCHAR(2147483647) CHARACTER SET
"UTF-16LE", BIGINT) MAP AS m1, null:(VARCHAR(2147483647) CHARACTER SET
"UTF-16LE", BIGINT) MAP AS m2, null:(VARCHAR(2147483647) CHARACTER SET
"UTF-16LE", BIGINT) MAP AS m3, null:(VARCHAR(2147483647) CHARACTER SET
"UTF-16LE", BIGINT) MAP AS m4], changelogMode=[I])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], changelogMode=[I])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testAppendUpsertAndRetractSink">
<Resource name="ast">
<![CDATA[
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index 461c4fc..5c79f35 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -464,4 +464,25 @@ class TableSinkTest extends TableTestBase {
| WHERE rank_number < 10
|""".stripMargin)
}
+
+ @Test
+ def testInsertPartColumn(): Unit = {
+ util.addTable(
+ s"""
+ |CREATE TABLE zm_test (
+ | `a` BIGINT,
+ | `m1` MAP<STRING, BIGINT>,
+ | `m2` MAP<STRING NOT NULL, BIGINT>,
+ | `m3` MAP<STRING, BIGINT NOT NULL>,
+ | `m4` MAP<STRING NOT NULL, BIGINT NOT NULL>
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'true'
+ |)
+ |""".stripMargin)
+ val stmtSet = util.tableEnv.createStatementSet()
+ stmtSet.addInsertSql(
+ "INSERT INTO zm_test(`a`) SELECT `a` FROM MyTable")
+ util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
index 20b61a3..a056873 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
@@ -154,4 +154,40 @@ class TableSinkITCase(mode: StateBackendMode) extends
StreamingWithStateTestBase
val expected = List("+I[jason, 4]")
assertEquals(expected.sorted, result.sorted)
}
+
+ @Test
+ def testInsertPartColumn(): Unit = {
+ tEnv.executeSql(
+ """
+ |CREATE TABLE zm_test (
+ | `person` String,
+ | `votes` BIGINT,
+ | `m1` MAP<STRING, BIGINT>,
+ | `m2` MAP<STRING NOT NULL, BIGINT>,
+ | `m3` MAP<STRING, BIGINT NOT NULL>,
+ | `m4` MAP<STRING NOT NULL, BIGINT NOT NULL>
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'true'
+ |)
+ |""".stripMargin)
+
+ tEnv.executeSql(
+ """
+ |insert into zm_test(`person`, `votes`)
+ | select
+ | `person`,
+ | `votes`
+ | from
+ | src
+ |""".stripMargin).await()
+
+ val result = TestValuesTableFactory.getResults("zm_test")
+ val expected = List(
+ "+I[jason, 1, null, null, null, null]",
+ "+I[jason, 1, null, null, null, null]",
+ "+I[jason, 1, null, null, null, null]",
+ "+I[jason, 1, null, null, null, null]")
+ assertEquals(expected.sorted, result.sorted)
+ }
}