This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new b1e618f [FLINK-18315][table-planner-blink] Fix INSERT INTO
partitioned table with VALUES doesn't work correctly
b1e618f is described below
commit b1e618fedad889f580b5a50e21429443b2585d7c
Author: Danny Chan <[email protected]>
AuthorDate: Mon Jun 22 16:12:23 2020 +0800
[FLINK-18315][table-planner-blink] Fix INSERT INTO partitioned table with
VALUES doesn't work correctly
The VALUES would be patched up with partition fields.
This closes #12734
---
.../hive/TableEnvHiveConnectorITCase.java | 11 ++++
.../planner/calcite/PreValidateReWriter.scala | 72 +++++++++++++++++++---
.../plan/batch/sql/PartitionableSinkTest.xml | 19 ++++++
.../plan/batch/sql/PartitionableSinkTest.scala | 4 --
.../batch/sql/PartitionableSinkITCase.scala | 10 +++
5 files changed, 103 insertions(+), 13 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
index f2a8108..8cdf001 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
@@ -688,6 +688,17 @@ public class TableEnvHiveConnectorITCase {
tableEnv.executeSql("drop table if exists dest");
}
+ @Test
+ public void testInsertPartitionWithValuesSource() {
+ TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+ tableEnv.executeSql("create table dest (x int) partitioned by
(p1 int,p2 string)");
+ TableEnvUtil.execInsertSqlAndWaitResult(tableEnv,
+ "insert into dest partition (p1=1) values(1,
'a')");
+ List<Row> results =
Lists.newArrayList(tableEnv.sqlQuery("select * from dest").execute().collect());
+ assertEquals("[1,1,a]", results.toString());
+ tableEnv.executeSql("drop table if exists dest");
+ }
+
private TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv =
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 4cb2aae..91e584c 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -32,10 +32,11 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.util.SqlBasicVisitor
import org.apache.calcite.sql.validate.{SqlValidatorException,
SqlValidatorTable, SqlValidatorUtil}
-import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlLiteral, SqlNode,
SqlNodeList, SqlSelect, SqlUtil}
+import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlLiteral,
SqlNode, SqlNodeList, SqlSelect, SqlUtil}
import org.apache.calcite.util.Static.RESOURCE
import java.util
+import java.util.Collections
import scala.collection.JavaConversions._
@@ -49,10 +50,14 @@ class PreValidateReWriter(
case r: RichSqlInsert if r.getStaticPartitions.nonEmpty => r.getSource
match {
case select: SqlSelect =>
appendPartitionProjects(r, validator, typeFactory, select,
r.getStaticPartitions)
+ case values: SqlCall if values.getKind == SqlKind.VALUES =>
+ val newSource = appendPartitionProjects(r, validator, typeFactory,
values,
+ r.getStaticPartitions)
+ r.setOperand(2, newSource)
case source =>
throw new ValidationException(
- s"INSERT INTO <table> PARTITION statement only support SELECT
clause for now," +
- s" '$source' is not supported yet.")
+ s"INSERT INTO <table> PARTITION statement only support "
+ + s"SELECT and VALUES clause for now, '$source' is not supported
yet.")
}
case _ =>
}
@@ -81,21 +86,22 @@ object PreValidateReWriter {
* @param sqlInsert RichSqlInsert instance
* @param validator Validator
* @param typeFactory type factory
- * @param select Source sql select
+ * @param source Source to rewrite
* @param partitions Static partition statements
*/
def appendPartitionProjects(sqlInsert: RichSqlInsert,
validator: FlinkCalciteSqlValidator,
typeFactory: RelDataTypeFactory,
- select: SqlSelect,
- partitions: SqlNodeList): Unit = {
+ source: SqlCall,
+ partitions: SqlNodeList): SqlCall = {
+ assert(source.getKind == SqlKind.SELECT || source.getKind ==
SqlKind.VALUES)
val calciteCatalogReader =
validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader])
val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
val table = calciteCatalogReader.getTable(names)
if (table == null) {
// There is no table exists in current catalog,
// just skip to let other validation error throw.
- return
+ return source
}
val targetRowType = createTargetRowType(typeFactory,
calciteCatalogReader, table, sqlInsert.getTargetColumnList)
@@ -115,12 +121,26 @@ object PreValidateReWriter {
assignedFields.put(targetField.getIndex,
maybeCast(value, value.createSqlType(typeFactory),
targetField.getType, typeFactory))
}
+ source match {
+ case select: SqlSelect =>
+ rewriteSelect(validator, select, targetRowType, assignedFields)
+ case values: SqlCall if values.getKind == SqlKind.VALUES =>
+ rewriteValues(values, targetRowType, assignedFields)
+ }
+ }
+
+ private def rewriteSelect(
+ validator: FlinkCalciteSqlValidator,
+ select: SqlSelect,
+ targetRowType: RelDataType,
+ assignedFields: util.LinkedHashMap[Integer, SqlNode]): SqlCall = {
// Expands the select list first in case there is a star(*).
// Validates the select first to register the where scope.
validator.validate(select)
- val selectList = validator.expandStar(select.getSelectList, select, false)
- val currentNodes = new util.ArrayList[SqlNode](selectList.getList)
+ val sourceList = validator.expandStar(select.getSelectList, select,
false).getList
+
val fixedNodes = new util.ArrayList[SqlNode]
+ val currentNodes = new util.ArrayList[SqlNode](sourceList)
0 until targetRowType.getFieldList.length foreach {
idx =>
if (assignedFields.containsKey(idx)) {
@@ -135,6 +155,40 @@ object PreValidateReWriter {
fixedNodes.addAll(currentNodes)
}
select.setSelectList(new SqlNodeList(fixedNodes,
select.getSelectList.getParserPosition))
+ select
+ }
+
+ private def rewriteValues(
+ values: SqlCall,
+ targetRowType: RelDataType,
+ assignedFields: util.LinkedHashMap[Integer, SqlNode]): SqlCall = {
+ val fixedNodes = new util.ArrayList[SqlNode]
+ 0 until values.getOperandList.size() foreach {
+ valueIdx =>
+ val value = values.getOperandList.get(valueIdx)
+ val valueAsList = if (value.getKind == SqlKind.ROW) {
+ value.asInstanceOf[SqlCall].getOperandList
+ } else {
+ Collections.singletonList(value)
+ }
+ val currentNodes = new util.ArrayList[SqlNode](valueAsList)
+ val fieldNodes = new util.ArrayList[SqlNode]
+ 0 until targetRowType.getFieldList.length foreach {
+ fieldIdx =>
+ if (assignedFields.containsKey(fieldIdx)) {
+ fieldNodes.add(assignedFields.get(fieldIdx))
+ } else if (currentNodes.size() > 0) {
+ fieldNodes.add(currentNodes.remove(0))
+ }
+ }
+ // Although it is error case, we still append the old remaining
+ // value items to new item list.
+ if (currentNodes.size > 0) {
+ fieldNodes.addAll(currentNodes)
+ }
+
fixedNodes.add(SqlStdOperatorTable.ROW.createCall(value.getParserPosition,
fieldNodes))
+ }
+ SqlStdOperatorTable.VALUES.createCall(values.getParserPosition, fixedNodes)
}
/**
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.xml
index 36968ce..b79e97f 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.xml
@@ -94,6 +94,25 @@
LegacySink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b, c])
]]>
</Resource>
</TestCase>
+ <TestCase name="testStaticWithValues">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO sink PARTITION (b=1, c=1) VALUES (5)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`sink`],
fields=[a, b, c])
++- LogicalProject(a=[5:BIGINT], b=[1:BIGINT], c=[1:BIGINT])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LegacySink(name=[`default_catalog`.`default_database`.`sink`], fields=[a, b,
c])
++- Calc(select=[5:BIGINT AS a, 1:BIGINT AS b, 1:BIGINT AS c])
+ +- Values(tuples=[[{ 0 }]], values=[ZERO])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testWrongFields">
<Resource name="sql">
<![CDATA[INSERT INTO sink PARTITION (b=1) SELECT a, b, c FROM MyTable]]>
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
index cfcf38b..3247460 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
@@ -79,10 +79,6 @@ class PartitionableSinkTest extends TableTestBase {
@Test
def testStaticWithValues(): Unit = {
- thrown.expect(classOf[ValidationException])
- thrown.expectMessage(
- "INSERT INTO <table> PARTITION statement only support SELECT clause for
now," +
- " 'VALUES ROW(5)' is not supported yet")
util.verifyPlanInsert("INSERT INTO sink PARTITION (b=1, c=1) VALUES (5)")
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index 1820047..acfce80 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -182,6 +182,16 @@ class PartitionableSinkITCase extends BatchTestBase {
}
@Test
+ def testInsertWithStaticPartitionAndValuesSource(): Unit = {
+ registerTableSink(partitionColumns = Array("b", "c"))
+ execInsertSqlAndWaitResult("insert into sinkTable partition(b=1)\n"
+ + "(values (1, 'Hello world, how are you?'), (4, '你好,陌生人,我是'), (2,
'Hello'))")
+ assertEquals(List("1,1,Hello world, how are you?"), RESULT1.toList)
+ assertEquals(List("4,1,你好,陌生人,我是"), RESULT2.toList)
+ assertEquals(List("2,1,Hello"), RESULT3.toList)
+ }
+
+ @Test
def testStaticPartitionNotInPartitionFields(): Unit = {
expectedEx.expect(classOf[ValidationException])
registerTableSink(tableName = "sinkTable2", rowType = type4,