This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new ca16d28a741 [FLINK-27683][table-planner] Fix SQL hints can't work with 
targetColumns
ca16d28a741 is described below

commit ca16d28a741434f78fba508d8050786cd3281793
Author: yangxin <[email protected]>
AuthorDate: Tue Jun 7 14:19:42 2022 +0800

    [FLINK-27683][table-planner] Fix SQL hints can't work with targetColumns
    
    This closes #19892
---
 .../planner/calcite/PreValidateReWriter.scala      |  9 ++++---
 .../planner/plan/stream/sql/TableSinkTest.xml      | 16 +++++++++++++
 .../planner/plan/stream/sql/TableSinkTest.scala    | 18 ++++++++++++++
 .../runtime/stream/table/TableSinkITCase.scala     | 28 ++++++++++++++++++++++
 4 files changed, 68 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 3d930b0968b..1565859c8aa 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -31,12 +31,12 @@ import org.apache.calcite.plan.RelOptTable
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, 
RelDataTypeField}
 import org.apache.calcite.runtime.{CalciteContextException, Resources}
-import org.apache.calcite.sql.`type`.{SqlTypeName, SqlTypeUtil}
+import org.apache.calcite.sql.`type`.SqlTypeUtil
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.sql.util.SqlBasicVisitor
 import org.apache.calcite.sql.validate.{SqlValidatorException, 
SqlValidatorTable, SqlValidatorUtil}
-import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, 
SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
+import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, 
SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlTableRef, 
SqlUtil}
 import org.apache.calcite.util.Static.RESOURCE
 
 import java.util
@@ -124,7 +124,10 @@ object PreValidateReWriter {
       source: SqlCall,
       partitions: SqlNodeList): SqlCall = {
     val calciteCatalogReader = 
validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader])
-    val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
+    val names = sqlInsert.getTargetTable match {
+      case si: SqlIdentifier => si.names
+      case st: SqlTableRef => 
st.getOperandList.get(0).asInstanceOf[SqlIdentifier].names
+    }
     val table = calciteCatalogReader.getTable(names)
     if (table == null) {
       // There is no table exists in current catalog,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index 2677e804d94..3994f68f633 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -342,4 +342,20 @@ 
Sink(table=[default_catalog.default_database.SinkJoinChangeLog], fields=[person,
 ]]>
     </Resource>
   </TestCase>
+       <TestCase name="testInsertWithTargetColumnsAndSqlHint">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalSink(table=[default_catalog.default_database.appendSink], 
fields=[EXPR$0, c], hints=[[[OPTIONS options:{sink.parallelism=1}]]])
++- LogicalProject(EXPR$0=[+($0, $1)], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+Sink(table=[default_catalog.default_database.appendSink], fields=[EXPR$0, c], 
hints=[[[OPTIONS options:{sink.parallelism=1}]]], changelogMode=[NONE])
++- Calc(select=[+(a, b) AS EXPR$0, c], changelogMode=[I])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], changelogMode=[I])
+]]>
+               </Resource>
+       </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index 5c79f351b96..3a9b9f7560a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -485,4 +485,22 @@ class TableSinkTest extends TableTestBase {
       "INSERT INTO zm_test(`a`) SELECT `a` FROM MyTable")
     util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
   }
+
+  @Test
+  def testInsertWithTargetColumnsAndSqlHint(): Unit = {
+    util.addTable(s"""
+                     |CREATE TABLE appendSink (
+                     |  `a` BIGINT,
+                     |  `b` STRING
+                     |) WITH (
+                     |  'connector' = 'values',
+                     |  'sink-insert-only' = 'false'
+                     |)
+                     |""".stripMargin)
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(
+      "INSERT INTO appendSink /*+ OPTIONS('sink.parallelism' = '1') */" +
+        "(a, b) SELECT a + b, c FROM MyTable")
+    util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index 6399c29c578..679140acc5e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -1419,4 +1419,32 @@ class TableSinkITCase extends StreamingTestBase {
     ).execute().await()
     assertEquals(Seq("+I(42)"), 
TestValuesTableFactory.getOnlyRawResults.toList)
   }
+
+  @Test
+  def testInsertWithTargetColumnsAndSqlHint(): Unit = {
+    val t = env
+      .fromCollection(smallTupleData3)
+      .toTable(tEnv, 'id, 'num, 'text)
+    tEnv.createTemporaryView("src", t)
+
+    tEnv.executeSql(s"""
+                       |CREATE TABLE appendSink (
+                       |  `t` INT,
+                       |  `num` BIGINT,
+                       |  `text` STRING
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'sink-insert-only' = 'true'
+                       |)
+                       |""".stripMargin)
+    tEnv
+      .executeSql(
+        "INSERT INTO appendSink /*+ OPTIONS('sink.parallelism' = '1') */" +
+          "(t, num, text) SELECT id, num, text FROM src")
+      .await()
+
+    val result = TestValuesTableFactory.getResults("appendSink")
+    val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+    assertEquals(expected.sorted, result.sorted)
+  }
 }

Reply via email to