This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new fbae8fc [FLINK-17666][table-planner-blink] Insert into partitioned
table can fail with select *
fbae8fc is described below
commit fbae8fc97caa6ffd852963cec2979a4ebf08eb07
Author: Danny Chan <[email protected]>
AuthorDate: Tue Jun 16 16:18:47 2020 +0800
[FLINK-17666][table-planner-blink] Insert into partitioned table can fail
with select *
This closes #12656
---
.../hive/TableEnvHiveConnectorITCase.java | 19 ++++++++
.../flink/sql/parser/dql/SqlShowCatalogs.java | 2 +-
.../table/planner/calcite/FlinkPlannerImpl.scala | 2 +-
.../planner/calcite/PreValidateReWriter.scala | 15 ++++--
.../batch/sql/PartitionableSinkITCase.scala | 54 +++++++++++++++++++---
5 files changed, 78 insertions(+), 14 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
index dd12899..f2a8108 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
@@ -669,6 +669,25 @@ public class TableEnvHiveConnectorITCase {
}
}
+ @Test
+ public void testInsertPartitionWithStarSource() throws Exception {
+ TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+ tableEnv.executeSql("create table src (x int,y string)");
+ HiveTestUtils.createTextTableInserter(
+ hiveShell,
+ "default",
+ "src")
+ .addRow(new Object[]{1, "a"})
+ .commit();
+ tableEnv.executeSql("create table dest (x int) partitioned by
(p1 int,p2 string)");
+ TableEnvUtil.execInsertSqlAndWaitResult(tableEnv,
+ "insert into dest partition (p1=1) select *
from src");
+ List<Row> results =
Lists.newArrayList(tableEnv.sqlQuery("select * from dest").execute().collect());
+ assertEquals("[1,1,a]", results.toString());
+ tableEnv.executeSql("drop table if exists src");
+ tableEnv.executeSql("drop table if exists dest");
+ }
+
private TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv =
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java
index 0821c18..1e2861f 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java
@@ -47,7 +47,7 @@ public class SqlShowCatalogs extends SqlCall {
@Override
public List<SqlNode> getOperandList() {
- return Collections.EMPTY_LIST;
+ return Collections.emptyList();
}
@Override
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index b38fa92..395dce3 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -111,7 +111,7 @@ class FlinkPlannerImpl(
private def validate(sqlNode: SqlNode, validator: FlinkCalciteSqlValidator):
SqlNode = {
try {
sqlNode.accept(new PreValidateReWriter(
- validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader]),
typeFactory))
+ validator, typeFactory))
// do extended validation.
sqlNode match {
case node: ExtendedSqlNode =>
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index e0a7fb9..4cb2aae 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -42,13 +42,13 @@ import scala.collection.JavaConversions._
/** Implements [[org.apache.calcite.sql.util.SqlVisitor]]
* interface to do some rewrite work before sql node validation. */
class PreValidateReWriter(
- val catalogReader: CalciteCatalogReader,
+ val validator: FlinkCalciteSqlValidator,
val typeFactory: RelDataTypeFactory) extends SqlBasicVisitor[Unit] {
override def visit(call: SqlCall): Unit = {
call match {
case r: RichSqlInsert if r.getStaticPartitions.nonEmpty => r.getSource
match {
case select: SqlSelect =>
- appendPartitionProjects(r, catalogReader, typeFactory, select,
r.getStaticPartitions)
+ appendPartitionProjects(r, validator, typeFactory, select,
r.getStaticPartitions)
case source =>
throw new ValidationException(
s"INSERT INTO <table> PARTITION statement only support SELECT
clause for now," +
@@ -79,16 +79,17 @@ object PreValidateReWriter {
* Where the "tpe1" and "tpe2" are data types of column a and c of target
table A.
*
* @param sqlInsert RichSqlInsert instance
- * @param calciteCatalogReader catalog reader
+ * @param validator Validator
* @param typeFactory type factory
* @param select Source sql select
* @param partitions Static partition statements
*/
def appendPartitionProjects(sqlInsert: RichSqlInsert,
- calciteCatalogReader: CalciteCatalogReader,
+ validator: FlinkCalciteSqlValidator,
typeFactory: RelDataTypeFactory,
select: SqlSelect,
partitions: SqlNodeList): Unit = {
+ val calciteCatalogReader =
validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader])
val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
val table = calciteCatalogReader.getTable(names)
if (table == null) {
@@ -114,7 +115,11 @@ object PreValidateReWriter {
assignedFields.put(targetField.getIndex,
maybeCast(value, value.createSqlType(typeFactory),
targetField.getType, typeFactory))
}
- val currentNodes = new
util.ArrayList[SqlNode](select.getSelectList.getList)
+ // Expands the select list first in case there is a star(*).
+ // Validates the select first to register the where scope.
+ validator.validate(select)
+ val selectList = validator.expandStar(select.getSelectList, select, false)
+ val currentNodes = new util.ArrayList[SqlNode](selectList.getList)
val fixedNodes = new util.ArrayList[SqlNode]
0 until targetRowType.getFieldList.length foreach {
idx =>
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index 1350724..1820047 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.planner.runtime.batch.sql
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO,
STRING_TYPE_INFO}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.configuration.Configuration
@@ -32,7 +32,7 @@ import org.apache.flink.table.descriptors.DescriptorProperties
import org.apache.flink.table.descriptors.Schema.SCHEMA
import org.apache.flink.table.factories.TableSinkFactory
import org.apache.flink.table.filesystem.FileSystemOptions
-import
org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase._
+import
org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase.{type4,
type_int_string, _}
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils.TestData._
@@ -52,12 +52,11 @@ import scala.collection.JavaConverters._
import scala.collection.Seq
/**
- * Test cases for [[org.apache.flink.table.sinks.PartitionableTableSink]].
- */
+ * Test cases for [[org.apache.flink.table.sinks.PartitionableTableSink]].
+ */
class PartitionableSinkITCase extends BatchTestBase {
private val _expectedException = ExpectedException.none
- private val type4 = new RowTypeInfo(INT_TYPE_INFO, INT_TYPE_INFO,
INT_TYPE_INFO)
@Rule
def expectedEx: ExpectedException = _expectedException
@@ -71,6 +70,7 @@ class PartitionableSinkITCase extends BatchTestBase {
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 3)
registerCollection("nonSortTable", testData, type3, "a, b, c",
dataNullables)
registerCollection("sortTable", testData1, type3, "a, b, c", dataNullables)
+ registerCollection("starTable", testData2, type_int_string, "b, c",
Array(true, true))
PartitionableSinkITCase.init()
}
@@ -159,6 +159,29 @@ class PartitionableSinkITCase extends BatchTestBase {
}
@Test
+ def testInsertWithStaticPartitionAndStarSource(): Unit = {
+ registerTableSink(partitionColumns = Array("b", "c"))
+ execInsertSqlAndWaitResult("insert into sinkTable partition(b=1) select *
from starTable")
+ assertEquals(List(
+ "1,1,Hello world, how are you?",
+ "3,1,I'm fine, thank you",
+ "4,1,你好,陌生人",
+ "4,1,你好,陌生人,我是中国人"),
+ RESULT1.toList)
+ assertEquals(List(
+ "4,1,你好,陌生人,我是",
+ "4,1,你好,陌生人,我是中国人,你来自哪里?"),
+ RESULT2.toList)
+ assertEquals(List(
+ "2,1,Hello",
+ "1,1,Hello world",
+ "2,1,Hi",
+ "3,1,I'm fine, thank",
+ "3,1,I'm fine, thank you, and you?"),
+ RESULT3.toList)
+ }
+
+ @Test
def testStaticPartitionNotInPartitionFields(): Unit = {
expectedEx.expect(classOf[ValidationException])
registerTableSink(tableName = "sinkTable2", rowType = type4,
@@ -183,6 +206,9 @@ class PartitionableSinkITCase extends BatchTestBase {
}
object PartitionableSinkITCase {
+ val type4 = new RowTypeInfo(INT_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
+ val type_int_string = new RowTypeInfo(INT_TYPE_INFO, STRING_TYPE_INFO)
+
val RESULT1 = new JLinkedList[String]()
val RESULT2 = new JLinkedList[String]()
val RESULT3 = new JLinkedList[String]()
@@ -199,8 +225,8 @@ object PartitionableSinkITCase {
}
/**
- * Sink function of unsafe memory.
- */
+ * Sink function of unsafe memory.
+ */
class UnsafeMemorySinkFunction(outputType: TypeInformation[Row])
extends RichSinkFunction[Row] {
private var resultSet: JLinkedList[String] = _
@@ -251,6 +277,20 @@ object PartitionableSinkITCase {
row(4, 4L, "你好,陌生人,我是中国人,你来自哪里?")
)
+ val testData2 = Seq(
+ row(2, "Hi"),
+ row(1, "Hello world"),
+ row(2, "Hello"),
+ row(1, "Hello world, how are you?"),
+ row(3, "I'm fine, thank"),
+ row(3, "I'm fine, thank you"),
+ row(3, "I'm fine, thank you, and you?"),
+ row(4, "你好,陌生人"),
+ row(4, "你好,陌生人,我是"),
+ row(4, "你好,陌生人,我是中国人"),
+ row(4, "你好,陌生人,我是中国人,你来自哪里?")
+ )
+
def registerTableSink(
tEnv: TableEnvironment,
tableName: String,