This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new fbae8fc  [FLINK-17666][table-planner-blink] Insert into partitioned 
table can fail with select *
fbae8fc is described below

commit fbae8fc97caa6ffd852963cec2979a4ebf08eb07
Author: Danny Chan <[email protected]>
AuthorDate: Tue Jun 16 16:18:47 2020 +0800

    [FLINK-17666][table-planner-blink] Insert into partitioned table can fail 
with select *
    
    This closes #12656
---
 .../hive/TableEnvHiveConnectorITCase.java          | 19 ++++++++
 .../flink/sql/parser/dql/SqlShowCatalogs.java      |  2 +-
 .../table/planner/calcite/FlinkPlannerImpl.scala   |  2 +-
 .../planner/calcite/PreValidateReWriter.scala      | 15 ++++--
 .../batch/sql/PartitionableSinkITCase.scala        | 54 +++++++++++++++++++---
 5 files changed, 78 insertions(+), 14 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
index dd12899..f2a8108 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
@@ -669,6 +669,25 @@ public class TableEnvHiveConnectorITCase {
                }
        }
 
+       @Test
+       public void testInsertPartitionWithStarSource() throws Exception {
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create table src (x int,y string)");
+               HiveTestUtils.createTextTableInserter(
+                               hiveShell,
+                               "default",
+                               "src")
+                               .addRow(new Object[]{1, "a"})
+                               .commit();
+               tableEnv.executeSql("create table dest (x int) partitioned by 
(p1 int,p2 string)");
+               TableEnvUtil.execInsertSqlAndWaitResult(tableEnv,
+                               "insert into dest partition (p1=1) select * 
from src");
+               List<Row> results = 
Lists.newArrayList(tableEnv.sqlQuery("select * from dest").execute().collect());
+               assertEquals("[1,1,a]", results.toString());
+               tableEnv.executeSql("drop table if exists src");
+               tableEnv.executeSql("drop table if exists dest");
+       }
+
        private TableEnvironment getTableEnvWithHiveCatalog() {
                TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
                tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java
index 0821c18..1e2861f 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java
@@ -47,7 +47,7 @@ public class SqlShowCatalogs extends SqlCall {
 
        @Override
        public List<SqlNode> getOperandList() {
-               return Collections.EMPTY_LIST;
+               return Collections.emptyList();
        }
 
        @Override
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index b38fa92..395dce3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -111,7 +111,7 @@ class FlinkPlannerImpl(
   private def validate(sqlNode: SqlNode, validator: FlinkCalciteSqlValidator): 
SqlNode = {
     try {
       sqlNode.accept(new PreValidateReWriter(
-        validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader]), 
typeFactory))
+        validator, typeFactory))
       // do extended validation.
       sqlNode match {
         case node: ExtendedSqlNode =>
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index e0a7fb9..4cb2aae 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -42,13 +42,13 @@ import scala.collection.JavaConversions._
 /** Implements [[org.apache.calcite.sql.util.SqlVisitor]]
   * interface to do some rewrite work before sql node validation. */
 class PreValidateReWriter(
-    val catalogReader: CalciteCatalogReader,
+    val validator: FlinkCalciteSqlValidator,
     val typeFactory: RelDataTypeFactory) extends SqlBasicVisitor[Unit] {
   override def visit(call: SqlCall): Unit = {
     call match {
       case r: RichSqlInsert if r.getStaticPartitions.nonEmpty => r.getSource 
match {
         case select: SqlSelect =>
-          appendPartitionProjects(r, catalogReader, typeFactory, select, 
r.getStaticPartitions)
+          appendPartitionProjects(r, validator, typeFactory, select, 
r.getStaticPartitions)
         case source =>
           throw new ValidationException(
             s"INSERT INTO <table> PARTITION statement only support SELECT 
clause for now," +
@@ -79,16 +79,17 @@ object PreValidateReWriter {
     * Where the "tpe1" and "tpe2" are data types of column a and c of target 
table A.
     *
     * @param sqlInsert            RichSqlInsert instance
-    * @param calciteCatalogReader catalog reader
+    * @param validator            Validator
     * @param typeFactory          type factory
     * @param select               Source sql select
     * @param partitions           Static partition statements
     */
   def appendPartitionProjects(sqlInsert: RichSqlInsert,
-      calciteCatalogReader: CalciteCatalogReader,
+      validator: FlinkCalciteSqlValidator,
       typeFactory: RelDataTypeFactory,
       select: SqlSelect,
       partitions: SqlNodeList): Unit = {
+    val calciteCatalogReader = 
validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader])
     val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
     val table = calciteCatalogReader.getTable(names)
     if (table == null) {
@@ -114,7 +115,11 @@ object PreValidateReWriter {
       assignedFields.put(targetField.getIndex,
         maybeCast(value, value.createSqlType(typeFactory), 
targetField.getType, typeFactory))
     }
-    val currentNodes = new 
util.ArrayList[SqlNode](select.getSelectList.getList)
+    // Expands the select list first in case there is a star(*).
+    // Validates the select first to register the where scope.
+    validator.validate(select)
+    val selectList = validator.expandStar(select.getSelectList, select, false)
+    val currentNodes = new util.ArrayList[SqlNode](selectList.getList)
     val fixedNodes = new util.ArrayList[SqlNode]
     0 until targetRowType.getFieldList.length foreach {
       idx =>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index 1350724..1820047 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.runtime.batch.sql
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, 
STRING_TYPE_INFO}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.configuration.Configuration
@@ -32,7 +32,7 @@ import org.apache.flink.table.descriptors.DescriptorProperties
 import org.apache.flink.table.descriptors.Schema.SCHEMA
 import org.apache.flink.table.factories.TableSinkFactory
 import org.apache.flink.table.filesystem.FileSystemOptions
-import 
org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase._
+import 
org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase.{type4,
 type_int_string, _}
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
@@ -52,12 +52,11 @@ import scala.collection.JavaConverters._
 import scala.collection.Seq
 
 /**
-  * Test cases for [[org.apache.flink.table.sinks.PartitionableTableSink]].
-  */
+ * Test cases for [[org.apache.flink.table.sinks.PartitionableTableSink]].
+ */
 class PartitionableSinkITCase extends BatchTestBase {
 
   private val _expectedException = ExpectedException.none
-  private val type4 = new RowTypeInfo(INT_TYPE_INFO, INT_TYPE_INFO, 
INT_TYPE_INFO)
 
   @Rule
   def expectedEx: ExpectedException = _expectedException
@@ -71,6 +70,7 @@ class PartitionableSinkITCase extends BatchTestBase {
       
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 3)
     registerCollection("nonSortTable", testData, type3, "a, b, c", 
dataNullables)
     registerCollection("sortTable", testData1, type3, "a, b, c", dataNullables)
+    registerCollection("starTable", testData2, type_int_string, "b, c", 
Array(true, true))
     PartitionableSinkITCase.init()
   }
 
@@ -159,6 +159,29 @@ class PartitionableSinkITCase extends BatchTestBase {
   }
 
   @Test
+  def testInsertWithStaticPartitionAndStarSource(): Unit = {
+    registerTableSink(partitionColumns = Array("b", "c"))
+    execInsertSqlAndWaitResult("insert into sinkTable partition(b=1) select * 
from starTable")
+    assertEquals(List(
+      "1,1,Hello world, how are you?",
+      "3,1,I'm fine, thank you",
+      "4,1,你好,陌生人",
+      "4,1,你好,陌生人,我是中国人"),
+      RESULT1.toList)
+    assertEquals(List(
+      "4,1,你好,陌生人,我是",
+      "4,1,你好,陌生人,我是中国人,你来自哪里?"),
+      RESULT2.toList)
+    assertEquals(List(
+      "2,1,Hello",
+      "1,1,Hello world",
+      "2,1,Hi",
+      "3,1,I'm fine, thank",
+      "3,1,I'm fine, thank you, and you?"),
+      RESULT3.toList)
+  }
+
+  @Test
   def testStaticPartitionNotInPartitionFields(): Unit = {
     expectedEx.expect(classOf[ValidationException])
     registerTableSink(tableName = "sinkTable2", rowType = type4,
@@ -183,6 +206,9 @@ class PartitionableSinkITCase extends BatchTestBase {
 }
 
 object PartitionableSinkITCase {
+  val type4 = new RowTypeInfo(INT_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
+  val type_int_string = new RowTypeInfo(INT_TYPE_INFO, STRING_TYPE_INFO)
+
   val RESULT1 = new JLinkedList[String]()
   val RESULT2 = new JLinkedList[String]()
   val RESULT3 = new JLinkedList[String]()
@@ -199,8 +225,8 @@ object PartitionableSinkITCase {
   }
 
   /**
-    * Sink function of unsafe memory.
-    */
+   * Sink function of unsafe memory.
+   */
   class UnsafeMemorySinkFunction(outputType: TypeInformation[Row])
     extends RichSinkFunction[Row] {
     private var resultSet: JLinkedList[String] = _
@@ -251,6 +277,20 @@ object PartitionableSinkITCase {
     row(4, 4L, "你好,陌生人,我是中国人,你来自哪里?")
   )
 
+  val testData2 = Seq(
+    row(2, "Hi"),
+    row(1, "Hello world"),
+    row(2, "Hello"),
+    row(1, "Hello world, how are you?"),
+    row(3, "I'm fine, thank"),
+    row(3, "I'm fine, thank you"),
+    row(3, "I'm fine, thank you, and you?"),
+    row(4, "你好,陌生人"),
+    row(4, "你好,陌生人,我是"),
+    row(4, "你好,陌生人,我是中国人"),
+    row(4, "你好,陌生人,我是中国人,你来自哪里?")
+  )
+
   def registerTableSink(
       tEnv: TableEnvironment,
       tableName: String,

Reply via email to