This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new e0af037d991 [FLINK-27683][table-planner] Fix SQL hints can't work with 
targetColumns
e0af037d991 is described below

commit e0af037d9910b6cfd4cc3fd8937289f939bb6d9b
Author: yangxin <[email protected]>
AuthorDate: Tue May 31 10:42:49 2022 +0800

    [FLINK-27683][table-planner] Fix SQL hints can't work with targetColumns
    
    This closes #19847
    
    (cherry picked from commit 9bcc7fd20420bbf90f4b67d98c521a8ddf775d4e)
---
 .../planner/calcite/PreValidateReWriter.scala      |  9 +++++---
 .../planner/plan/stream/sql/TableSinkTest.xml      | 16 +++++++++++++
 .../planner/plan/stream/sql/TableSinkTest.scala    | 17 ++++++++++++++
 .../runtime/stream/table/TableSinkITCase.scala     | 27 ++++++++++++++++++++++
 4 files changed, 66 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index e85ce719c76..86e922b568d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -30,8 +30,8 @@ import org.apache.calcite.plan.RelOptTable
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, 
RelDataTypeField}
 import org.apache.calcite.runtime.{CalciteContextException, Resources}
-import org.apache.calcite.sql.`type`.{SqlTypeName, SqlTypeUtil}
-import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, 
SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
+import org.apache.calcite.sql.`type`.SqlTypeUtil
+import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, 
SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlTableRef, 
SqlUtil}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.sql.util.SqlBasicVisitor
@@ -119,7 +119,10 @@ object PreValidateReWriter {
       source: SqlCall,
       partitions: SqlNodeList): SqlCall = {
     val calciteCatalogReader = 
validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader])
-    val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
+    val names = sqlInsert.getTargetTable match {
+      case si: SqlIdentifier => si.names
+      case st: SqlTableRef => 
st.getOperandList.get(0).asInstanceOf[SqlIdentifier].names
+    }
     val table = calciteCatalogReader.getTable(names)
     if (table == null) {
       // There is no table exists in current catalog,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index 2b8b78770c8..4ae3d066174 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -779,4 +779,20 @@ 
Sink(table=[default_catalog.default_database.SinkJoinChangeLog], fields=[person,
 ]]>
     </Resource>
   </TestCase>
+       <TestCase name="testInsertWithTargetColumnsAndSqlHint">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalSink(table=[default_catalog.default_database.appendSink], 
fields=[EXPR$0, c], hints=[[[OPTIONS options:{sink.parallelism=1}]]])
++- LogicalProject(EXPR$0=[+($0, $1)], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+Sink(table=[default_catalog.default_database.appendSink], fields=[EXPR$0, c], 
hints=[[[OPTIONS options:{sink.parallelism=1}]]], changelogMode=[NONE])
++- Calc(select=[+(a, b) AS EXPR$0, c], changelogMode=[I])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], changelogMode=[I])
+]]>
+               </Resource>
+       </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index a4f43257447..69621dc9b42 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -57,6 +57,23 @@ class TableSinkTest extends TableTestBase {
       |)
       |""".stripMargin)
 
+  @Test
+  def testInsertWithTargetColumnsAndSqlHint(): Unit = {
+    util.addTable(s"""
+                     |CREATE TABLE appendSink (
+                     |  `a` BIGINT,
+                     |  `b` STRING
+                     |) WITH (
+                     |  'connector' = 'values',
+                     |  'sink-insert-only' = 'false'
+                     |)
+                     |""".stripMargin)
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(
+      "INSERT INTO appendSink /*+ OPTIONS('sink.parallelism' = '1') */(a, b) 
SELECT a + b, c FROM MyTable")
+    util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
+  }
+
   @Test
   def testInsertMismatchTypeForEmptyChar(): Unit = {
     util.addTable(s"""
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index 0218edcd5f2..f7e71141fc3 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -86,6 +86,33 @@ class TableSinkITCase extends StreamingTestBase {
     assertEquals(expected.sorted, result.sorted)
   }
 
+  @Test
+  def testInsertWithTargetColumnsAndSqlHint(): Unit = {
+    val t = env
+      .fromCollection(smallTupleData3)
+      .toTable(tEnv, 'id, 'num, 'text)
+    tEnv.createTemporaryView("src", t)
+
+    tEnv.executeSql(s"""
+                       |CREATE TABLE appendSink (
+                       |  `t` INT,
+                       |  `num` BIGINT,
+                       |  `text` STRING
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'sink-insert-only' = 'true'
+                       |)
+                       |""".stripMargin)
+    tEnv
+      .executeSql(
+        "INSERT INTO appendSink /*+ OPTIONS('sink.parallelism' = '1') */(t, 
num, text) SELECT id, num, text FROM src")
+      .await()
+
+    val result = TestValuesTableFactory.getResults("appendSink")
+    val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+    assertEquals(expected.sorted, result.sorted)
+  }
+
   @Test
   def testAppendSinkWithNestedRow(): Unit = {
     val t = env

Reply via email to