This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new ca16d28a741 [FLINK-27683][table-planner] Fix SQL hints can't work with
targetColumns
ca16d28a741 is described below
commit ca16d28a741434f78fba508d8050786cd3281793
Author: yangxin <[email protected]>
AuthorDate: Tue Jun 7 14:19:42 2022 +0800
[FLINK-27683][table-planner] Fix SQL hints can't work with targetColumns
This closes #19892
---
.../planner/calcite/PreValidateReWriter.scala | 9 ++++---
.../planner/plan/stream/sql/TableSinkTest.xml | 16 +++++++++++++
.../planner/plan/stream/sql/TableSinkTest.scala | 18 ++++++++++++++
.../runtime/stream/table/TableSinkITCase.scala | 28 ++++++++++++++++++++++
4 files changed, 68 insertions(+), 3 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 3d930b0968b..1565859c8aa 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -31,12 +31,12 @@ import org.apache.calcite.plan.RelOptTable
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory,
RelDataTypeField}
import org.apache.calcite.runtime.{CalciteContextException, Resources}
-import org.apache.calcite.sql.`type`.{SqlTypeName, SqlTypeUtil}
+import org.apache.calcite.sql.`type`.SqlTypeUtil
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.util.SqlBasicVisitor
import org.apache.calcite.sql.validate.{SqlValidatorException,
SqlValidatorTable, SqlValidatorUtil}
-import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier,
SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
+import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier,
SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlTableRef,
SqlUtil}
import org.apache.calcite.util.Static.RESOURCE
import java.util
@@ -124,7 +124,10 @@ object PreValidateReWriter {
source: SqlCall,
partitions: SqlNodeList): SqlCall = {
val calciteCatalogReader =
validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader])
- val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
+ val names = sqlInsert.getTargetTable match {
+ case si: SqlIdentifier => si.names
+ case st: SqlTableRef =>
st.getOperandList.get(0).asInstanceOf[SqlIdentifier].names
+ }
val table = calciteCatalogReader.getTable(names)
if (table == null) {
// There is no table exists in current catalog,
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index 2677e804d94..3994f68f633 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -342,4 +342,20 @@
Sink(table=[default_catalog.default_database.SinkJoinChangeLog], fields=[person,
]]>
</Resource>
</TestCase>
+ <TestCase name="testInsertWithTargetColumnsAndSqlHint">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.appendSink],
fields=[EXPR$0, c], hints=[[[OPTIONS options:{sink.parallelism=1}]]])
++- LogicalProject(EXPR$0=[+($0, $1)], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.appendSink], fields=[EXPR$0, c],
hints=[[[OPTIONS options:{sink.parallelism=1}]]], changelogMode=[NONE])
++- Calc(select=[+(a, b) AS EXPR$0, c], changelogMode=[I])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], changelogMode=[I])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index 5c79f351b96..3a9b9f7560a 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -485,4 +485,22 @@ class TableSinkTest extends TableTestBase {
"INSERT INTO zm_test(`a`) SELECT `a` FROM MyTable")
util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
}
+
+ @Test
+ def testInsertWithTargetColumnsAndSqlHint(): Unit = {
+ util.addTable(s"""
+ |CREATE TABLE appendSink (
+ | `a` BIGINT,
+ | `b` STRING
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+ val stmtSet = util.tableEnv.createStatementSet()
+ stmtSet.addInsertSql(
+ "INSERT INTO appendSink /*+ OPTIONS('sink.parallelism' = '1') */" +
+ "(a, b) SELECT a + b, c FROM MyTable")
+ util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index 6399c29c578..679140acc5e 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -1419,4 +1419,32 @@ class TableSinkITCase extends StreamingTestBase {
).execute().await()
assertEquals(Seq("+I(42)"),
TestValuesTableFactory.getOnlyRawResults.toList)
}
+
+ @Test
+ def testInsertWithTargetColumnsAndSqlHint(): Unit = {
+ val t = env
+ .fromCollection(smallTupleData3)
+ .toTable(tEnv, 'id, 'num, 'text)
+ tEnv.createTemporaryView("src", t)
+
+ tEnv.executeSql(s"""
+ |CREATE TABLE appendSink (
+ | `t` INT,
+ | `num` BIGINT,
+ | `text` STRING
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'true'
+ |)
+ |""".stripMargin)
+ tEnv
+ .executeSql(
+ "INSERT INTO appendSink /*+ OPTIONS('sink.parallelism' = '1') */" +
+ "(t, num, text) SELECT id, num, text FROM src")
+ .await()
+
+ val result = TestValuesTableFactory.getResults("appendSink")
+ val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+ assertEquals(expected.sorted, result.sorted)
+ }
}