This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ce8ad93  [FLINK-18315][table-planner-blink] Fix INSERT INTO 
partitioned table with VALUES doesn't work correctly
ce8ad93 is described below

commit ce8ad9399f6a36e1197ca090c38cee631097bcc5
Author: yuzhao.cyz <[email protected]>
AuthorDate: Tue Jun 16 16:50:02 2020 +0800

    [FLINK-18315][table-planner-blink] Fix INSERT INTO partitioned table with 
VALUES doesn't work correctly
    
    The VALUES would be patched up with partition fields.
    
    This closes #12677
---
 .../hive/TableEnvHiveConnectorITCase.java          | 11 ++++
 .../planner/calcite/PreValidateReWriter.scala      | 72 +++++++++++++++++++---
 .../plan/batch/sql/PartitionableSinkTest.xml       | 19 ++++++
 .../plan/batch/sql/PartitionableSinkTest.scala     |  4 --
 .../batch/sql/PartitionableSinkITCase.scala        | 10 +++
 5 files changed, 103 insertions(+), 13 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
index f2a8108..8cdf001 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
@@ -688,6 +688,17 @@ public class TableEnvHiveConnectorITCase {
                tableEnv.executeSql("drop table if exists dest");
        }
 
+       @Test
+       public void testInsertPartitionWithValuesSource() {
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create table dest (x int) partitioned by 
(p1 int,p2 string)");
+               TableEnvUtil.execInsertSqlAndWaitResult(tableEnv,
+                               "insert into dest partition (p1=1) values(1, 
'a')");
+               List<Row> results = 
Lists.newArrayList(tableEnv.sqlQuery("select * from dest").execute().collect());
+               assertEquals("[1,1,a]", results.toString());
+               tableEnv.executeSql("drop table if exists dest");
+       }
+
        private TableEnvironment getTableEnvWithHiveCatalog() {
                TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
                tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 4cb2aae..91e584c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -32,10 +32,11 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.sql.util.SqlBasicVisitor
 import org.apache.calcite.sql.validate.{SqlValidatorException, 
SqlValidatorTable, SqlValidatorUtil}
-import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlLiteral, SqlNode, 
SqlNodeList, SqlSelect, SqlUtil}
+import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlLiteral, 
SqlNode, SqlNodeList, SqlSelect, SqlUtil}
 import org.apache.calcite.util.Static.RESOURCE
 
 import java.util
+import java.util.Collections
 
 import scala.collection.JavaConversions._
 
@@ -49,10 +50,14 @@ class PreValidateReWriter(
       case r: RichSqlInsert if r.getStaticPartitions.nonEmpty => r.getSource 
match {
         case select: SqlSelect =>
           appendPartitionProjects(r, validator, typeFactory, select, 
r.getStaticPartitions)
+        case values: SqlCall if values.getKind == SqlKind.VALUES =>
+          val newSource = appendPartitionProjects(r, validator, typeFactory, 
values,
+            r.getStaticPartitions)
+          r.setOperand(2, newSource)
         case source =>
           throw new ValidationException(
-            s"INSERT INTO <table> PARTITION statement only support SELECT 
clause for now," +
-                s" '$source' is not supported yet.")
+            s"INSERT INTO <table> PARTITION statement only support "
+              + s"SELECT and VALUES clause for now, '$source' is not supported 
yet.")
       }
       case _ =>
     }
@@ -81,21 +86,22 @@ object PreValidateReWriter {
     * @param sqlInsert            RichSqlInsert instance
     * @param validator            Validator
     * @param typeFactory          type factory
-    * @param select               Source sql select
+    * @param source               Source to rewrite
     * @param partitions           Static partition statements
     */
   def appendPartitionProjects(sqlInsert: RichSqlInsert,
       validator: FlinkCalciteSqlValidator,
       typeFactory: RelDataTypeFactory,
-      select: SqlSelect,
-      partitions: SqlNodeList): Unit = {
+      source: SqlCall,
+      partitions: SqlNodeList): SqlCall = {
+    assert(source.getKind == SqlKind.SELECT || source.getKind == 
SqlKind.VALUES)
     val calciteCatalogReader = 
validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader])
     val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
     val table = calciteCatalogReader.getTable(names)
     if (table == null) {
       // There is no table exists in current catalog,
       // just skip to let other validation error throw.
-      return
+      return source
     }
     val targetRowType = createTargetRowType(typeFactory,
       calciteCatalogReader, table, sqlInsert.getTargetColumnList)
@@ -115,12 +121,26 @@ object PreValidateReWriter {
       assignedFields.put(targetField.getIndex,
         maybeCast(value, value.createSqlType(typeFactory), 
targetField.getType, typeFactory))
     }
+    source match {
+      case select: SqlSelect =>
+        rewriteSelect(validator, select, targetRowType, assignedFields)
+      case values: SqlCall if values.getKind == SqlKind.VALUES =>
+        rewriteValues(values, targetRowType, assignedFields)
+    }
+  }
+
+  private def rewriteSelect(
+      validator: FlinkCalciteSqlValidator,
+      select: SqlSelect,
+      targetRowType: RelDataType,
+      assignedFields: util.LinkedHashMap[Integer, SqlNode]): SqlCall = {
     // Expands the select list first in case there is a star(*).
     // Validates the select first to register the where scope.
     validator.validate(select)
-    val selectList = validator.expandStar(select.getSelectList, select, false)
-    val currentNodes = new util.ArrayList[SqlNode](selectList.getList)
+    val sourceList = validator.expandStar(select.getSelectList, select, 
false).getList
+
     val fixedNodes = new util.ArrayList[SqlNode]
+    val currentNodes = new util.ArrayList[SqlNode](sourceList)
     0 until targetRowType.getFieldList.length foreach {
       idx =>
         if (assignedFields.containsKey(idx)) {
@@ -135,6 +155,40 @@ object PreValidateReWriter {
       fixedNodes.addAll(currentNodes)
     }
     select.setSelectList(new SqlNodeList(fixedNodes, 
select.getSelectList.getParserPosition))
+    select
+  }
+
+  private def rewriteValues(
+      values: SqlCall,
+      targetRowType: RelDataType,
+      assignedFields: util.LinkedHashMap[Integer, SqlNode]): SqlCall = {
+    val fixedNodes = new util.ArrayList[SqlNode]
+    0 until values.getOperandList.size() foreach {
+      valueIdx =>
+        val value = values.getOperandList.get(valueIdx)
+        val valueAsList = if (value.getKind == SqlKind.ROW) {
+          value.asInstanceOf[SqlCall].getOperandList
+        } else {
+          Collections.singletonList(value)
+        }
+        val currentNodes = new util.ArrayList[SqlNode](valueAsList)
+        val fieldNodes = new util.ArrayList[SqlNode]
+        0 until targetRowType.getFieldList.length foreach {
+          fieldIdx =>
+            if (assignedFields.containsKey(fieldIdx)) {
+              fieldNodes.add(assignedFields.get(fieldIdx))
+            } else if (currentNodes.size() > 0) {
+              fieldNodes.add(currentNodes.remove(0))
+            }
+        }
+        // Although it is error case, we still append the old remaining
+        // value items to new item list.
+        if (currentNodes.size > 0) {
+          fieldNodes.addAll(currentNodes)
+        }
+        
fixedNodes.add(SqlStdOperatorTable.ROW.createCall(value.getParserPosition, 
fieldNodes))
+    }
+    SqlStdOperatorTable.VALUES.createCall(values.getParserPosition, fixedNodes)
   }
 
   /**
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.xml
index 36968ce..b79e97f 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.xml
@@ -94,6 +94,25 @@ 
LegacySink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testStaticWithValues">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO sink PARTITION (b=1, c=1) VALUES (5)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`sink`], 
fields=[a, b, c])
++- LogicalProject(a=[5:BIGINT], b=[1:BIGINT], c=[1:BIGINT])
+   +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LegacySink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, 
c])
++- Calc(select=[5:BIGINT AS a, 1:BIGINT AS b, 1:BIGINT AS c])
+   +- Values(tuples=[[{ 0 }]], values=[ZERO])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testWrongFields">
     <Resource name="sql">
       <![CDATA[INSERT INTO sink PARTITION (b=1) SELECT a, b, c FROM MyTable]]>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
index cfcf38b..3247460 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
@@ -79,10 +79,6 @@ class PartitionableSinkTest extends TableTestBase {
 
   @Test
   def testStaticWithValues(): Unit = {
-    thrown.expect(classOf[ValidationException])
-    thrown.expectMessage(
-      "INSERT INTO <table> PARTITION statement only support SELECT clause for 
now," +
-          " 'VALUES ROW(5)' is not supported yet")
     util.verifyPlanInsert("INSERT INTO sink PARTITION (b=1, c=1) VALUES (5)")
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index 1820047..acfce80 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -182,6 +182,16 @@ class PartitionableSinkITCase extends BatchTestBase {
   }
 
   @Test
+  def testInsertWithStaticPartitionAndValuesSource(): Unit = {
+    registerTableSink(partitionColumns = Array("b", "c"))
+    execInsertSqlAndWaitResult("insert into sinkTable partition(b=1)\n"
+      + "(values (1, 'Hello world, how are you?'), (4, '你好,陌生人,我是'), (2, 
'Hello'))")
+    assertEquals(List("1,1,Hello world, how are you?"), RESULT1.toList)
+    assertEquals(List("4,1,你好,陌生人,我是"), RESULT2.toList)
+    assertEquals(List("2,1,Hello"), RESULT3.toList)
+  }
+
+  @Test
   def testStaticPartitionNotInPartitionFields(): Unit = {
     expectedEx.expect(classOf[ValidationException])
     registerTableSink(tableName = "sinkTable2", rowType = type4,

Reply via email to