This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new a898dae [FLINK-15269][table] Fix INSERT OVERWRITE/PARTITION syntax
shouldn't be limited to Hive dialect (#10587)
a898dae is described below
commit a898daeddb7764b4a79a71e5b1a7025dc66518b0
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Dec 20 14:35:09 2019 +0800
[FLINK-15269][table] Fix INSERT OVERWRITE/PARTITION syntax shouldn't be
limited to Hive dialect (#10587)
---
.../src/main/codegen/includes/parserImpls.ftl | 19 ++++++++-----------
.../flink/sql/parser/utils/ParserResource.java | 7 ++-----
.../sql/parser/validate/FlinkSqlConformance.java | 21 +++------------------
.../ParserResource.properties | 3 +--
.../flink/sql/parser/FlinkSqlParserImplTest.java | 19 +++++++++++++------
.../operations/SqlToOperationConverterTest.java | 12 ++++++------
.../plan/batch/sql/PartitionableSinkTest.scala | 4 +---
.../plan/stream/sql/PartitionableSinkTest.scala | 4 +---
.../runtime/batch/sql/PartitionableSinkITCase.scala | 1 -
.../table/sqlexec/SqlToOperationConverterTest.java | 12 ++++++------
10 files changed, 41 insertions(+), 61 deletions(-)
diff --git
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 58cd248..9e7bd8e 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -570,7 +570,12 @@ SqlCreate SqlCreateTable(Span s, boolean replace) :
}]
[
<PARTITIONED> <BY>
- partitionColumns = ParenthesizedSimpleIdentifierList()
+ partitionColumns = ParenthesizedSimpleIdentifierList() {
+ if (!((FlinkSqlConformance)
this.conformance).allowCreatePartitionedTable()) {
+ throw SqlUtil.newContextException(getPos(),
+
ParserResource.RESOURCE.createPartitionedTableIsOnlyAllowedForHive());
+ }
+ }
]
[
<WITH>
@@ -636,10 +641,7 @@ SqlNode RichSqlInsert() :
<INTO>
|
<OVERWRITE> {
- if (!((FlinkSqlConformance)
this.conformance).allowInsertOverwrite()) {
- throw SqlUtil.newContextException(getPos(),
- ParserResource.RESOURCE.overwriteIsOnlyAllowedForHive());
- } else if (RichSqlInsert.isUpsert(keywords)) {
+ if (RichSqlInsert.isUpsert(keywords)) {
throw SqlUtil.newContextException(getPos(),
ParserResource.RESOURCE.overwriteIsOnlyUsedWithInsert());
}
@@ -672,12 +674,7 @@ SqlNode RichSqlInsert() :
}
]
[
- <PARTITION> PartitionSpecCommaList(partitionList) {
- if (!((FlinkSqlConformance)
this.conformance).allowInsertIntoPartition()) {
- throw SqlUtil.newContextException(getPos(),
- ParserResource.RESOURCE.partitionIsOnlyAllowedForHive());
- }
- }
+ <PARTITION> PartitionSpecCommaList(partitionList)
]
source = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) {
return new RichSqlInsert(s.end(source), keywordList,
extendedKeywordList, table, source,
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 8d7d59d..5989a37 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -33,12 +33,9 @@ public interface ParserResource {
@Resources.BaseMessage("Multiple WATERMARK statements is not supported
yet.")
Resources.ExInst<ParseException> multipleWatermarksUnsupported();
- @Resources.BaseMessage("OVERWRITE expression is only allowed for HIVE
dialect.")
- Resources.ExInst<ParseException> overwriteIsOnlyAllowedForHive();
-
@Resources.BaseMessage("OVERWRITE expression is only used with INSERT
statement.")
Resources.ExInst<ParseException> overwriteIsOnlyUsedWithInsert();
- @Resources.BaseMessage("PARTITION expression is only allowed for HIVE
dialect.")
- Resources.ExInst<ParseException> partitionIsOnlyAllowedForHive();
+ @Resources.BaseMessage("Creating partitioned table is only allowed for
HIVE dialect.")
+ Resources.ExInst<ParseException>
createPartitionedTableIsOnlyAllowedForHive();
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
index 94cb6be..d53b050 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
@@ -147,24 +147,9 @@ public enum FlinkSqlConformance implements SqlConformance {
}
/**
- * Whether to allow "insert into tbl1 partition(col1=val1)" grammar.
+ * Whether to allow "create table T(i int, j int) partitioned by (i)"
grammar.
*/
- public boolean allowInsertIntoPartition() {
- switch (this) {
- case HIVE:
- return true;
- }
- return false;
- }
-
- /**
- * Whether to allow "insert overwrite tbl1 partition(col1=val1)"
grammar.
- */
- public boolean allowInsertOverwrite() {
- switch (this) {
- case HIVE:
- return true;
- }
- return false;
+ public boolean allowCreatePartitionedTable() {
+ return this == FlinkSqlConformance.HIVE;
}
}
diff --git
a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
index a48c4da..9993e7c 100644
---
a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
+++
b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
@@ -17,6 +17,5 @@
# See wrapper class org.apache.calcite.runtime.CalciteResource.
#
MultipleWatermarksUnsupported=Multiple WATERMARK statements is not supported
yet.
-OverwriteIsOnlyAllowedForHive=OVERWRITE expression is only allowed for HIVE
dialect.
OverwriteIsOnlyUsedWithInsert=OVERWRITE expression is only used with INSERT
statement.
-PartitionIsOnlyAllowedForHive=PARTITION expression is only allowed for HIVE
dialect.
+CreatePartitionedTableIsOnlyAllowedForHive=Creating partitioned table is only
allowed for HIVE dialect.
diff --git
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index ea0ff1c..6bb594c 100644
---
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -203,6 +203,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testCreateTable() {
+ conformance0 = FlinkSqlConformance.HIVE;
check("CREATE TABLE tbl1 (\n" +
" a bigint,\n" +
" h varchar, \n" +
@@ -235,6 +236,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testCreateTableWithComment() {
+ conformance0 = FlinkSqlConformance.HIVE;
check("CREATE TABLE tbl1 (\n" +
" a bigint comment 'test column comment
AAA.',\n" +
" h varchar, \n" +
@@ -269,6 +271,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testCreateTableWithPrimaryKeyAndUniqueKey() {
+ conformance0 = FlinkSqlConformance.HIVE;
check("CREATE TABLE tbl1 (\n" +
" a bigint comment 'test column comment
AAA.',\n" +
" h varchar, \n" +
@@ -533,6 +536,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testCreateInvalidPartitionedTable() {
+ conformance0 = FlinkSqlConformance.HIVE;
String sql = "create table sls_stream1(\n" +
" a bigint,\n" +
" b VARCHAR,\n" +
@@ -543,7 +547,16 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
") with ( 'x' = 'y', 'asd' = 'dada')";
sql(sql).node(new ValidationMatcher()
.fails("Partition column [C] not defined in columns, at
line 6, column 3"));
+ }
+ @Test
+ public void testNotAllowedCreatePartition() {
+ conformance0 = FlinkSqlConformance.DEFAULT;
+ String sql = "create table sls_stream1(\n" +
+ " a bigint,\n" +
+ " b VARCHAR\n" +
+ ") PARTITIONED BY (a^)^ with ( 'x' = 'y', 'asd'
= 'dada')";
+ sql(sql).fails("Creating partitioned table is only allowed for
HIVE dialect.");
}
@Test
@@ -598,7 +611,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testInsertPartitionSpecs() {
- conformance0 = FlinkSqlConformance.HIVE;
final String sql1 = "insert into emps(x,y) partition (x='ab',
y='bc') select * from emps";
final String expected = "INSERT INTO `EMPS` (`X`, `Y`)\n"
+ "PARTITION (`X` = 'ab', `Y` = 'bc')\n"
@@ -620,7 +632,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testInsertCaseSensitivePartitionSpecs() {
- conformance0 = FlinkSqlConformance.HIVE;
final String expected = "INSERT INTO `emps` (`x`, `y`)\n"
+ "PARTITION (`x` = 'ab', `y` = 'bc')\n"
+ "(SELECT *\n"
@@ -632,7 +643,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testInsertExtendedColumnAsStaticPartition1() {
- conformance0 = FlinkSqlConformance.HIVE;
String expected = "INSERT INTO `EMPS` EXTEND (`Z` BOOLEAN)
(`X`, `Y`)\n"
+ "PARTITION (`Z` = 'ab')\n"
+ "(SELECT *\n"
@@ -643,7 +653,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test(expected = SqlParseException.class)
public void testInsertExtendedColumnAsStaticPartition2() {
- conformance0 = FlinkSqlConformance.HIVE;
sql("insert into emps(x, y, z boolean) partition (z='ab')
select * from emps")
.node(new ValidationMatcher()
.fails("Extended columns not allowed under the
current SQL conformance level"));
@@ -651,7 +660,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testInsertOverwrite() {
- conformance0 = FlinkSqlConformance.HIVE;
// non-partitioned
check("INSERT OVERWRITE myDB.myTbl SELECT * FROM src",
"INSERT OVERWRITE `MYDB`.`MYTBL`\n"
@@ -668,7 +676,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testInvalidUpsertOverwrite() {
- conformance0 = FlinkSqlConformance.HIVE;
sql("UPSERT ^OVERWRITE^ myDB.myTbl SELECT * FROM src")
.fails("OVERWRITE expression is only used with INSERT
statement.");
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index bf3567b..e9aa754 100644
---
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -244,8 +244,8 @@ public class SqlToOperationConverterTest {
" 'connector' = 'kafka', \n" +
" 'kafka.topic' = 'log.test'\n" +
")\n";
- FlinkPlannerImpl planner =
getPlannerBySqlDialect(SqlDialect.DEFAULT);
- final CalciteParser parser =
getParserBySqlDialect(SqlDialect.DEFAULT);
+ FlinkPlannerImpl planner =
getPlannerBySqlDialect(SqlDialect.HIVE);
+ final CalciteParser parser =
getParserBySqlDialect(SqlDialect.HIVE);
Operation operation = parse(sql, planner, parser);
assert operation instanceof CreateTableOperation;
CreateTableOperation op = (CreateTableOperation) operation;
@@ -263,8 +263,8 @@ public class SqlToOperationConverterTest {
@Test(expected = SqlConversionException.class)
public void testCreateTableWithPkUniqueKeys() {
- FlinkPlannerImpl planner =
getPlannerBySqlDialect(SqlDialect.DEFAULT);
- final CalciteParser parser =
getParserBySqlDialect(SqlDialect.DEFAULT);
+ FlinkPlannerImpl planner =
getPlannerBySqlDialect(SqlDialect.HIVE);
+ final CalciteParser parser =
getParserBySqlDialect(SqlDialect.HIVE);
final String sql = "CREATE TABLE tbl1 (\n" +
" a bigint,\n" +
" b varchar, \n" +
@@ -351,8 +351,8 @@ public class SqlToOperationConverterTest {
@Test
public void testSqlInsertWithStaticPartition() {
final String sql = "insert into t1 partition(a=1) select b, c,
d from t2";
- FlinkPlannerImpl planner =
getPlannerBySqlDialect(SqlDialect.HIVE);
- final CalciteParser parser =
getParserBySqlDialect(SqlDialect.HIVE);
+ FlinkPlannerImpl planner =
getPlannerBySqlDialect(SqlDialect.DEFAULT);
+ final CalciteParser parser =
getParserBySqlDialect(SqlDialect.DEFAULT);
Operation operation = parse(sql, planner, parser);
assert operation instanceof CatalogSinkModifyOperation;
CatalogSinkModifyOperation sinkModifyOperation =
(CatalogSinkModifyOperation) operation;
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
index 0fd5626..84d5659 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
@@ -30,9 +30,7 @@ import org.junit.Test
class PartitionableSinkTest extends TableTestBase {
- val conf = new TableConfig
- conf.setSqlDialect(SqlDialect.HIVE)
- private val util = batchTestUtil(conf)
+ private val util = batchTestUtil()
util.addTableSource[(Long, Long, Long)]("MyTable", 'a, 'b, 'c)
PartitionableSinkITCase.registerTableSink(
util.tableEnv,
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/PartitionableSinkTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/PartitionableSinkTest.scala
index b1e0841..f77ebce 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/PartitionableSinkTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/PartitionableSinkTest.scala
@@ -30,9 +30,7 @@ import org.junit.Test
class PartitionableSinkTest extends TableTestBase {
- val conf = new TableConfig
- conf.setSqlDialect(SqlDialect.HIVE)
- private val util = streamTestUtil(conf)
+ private val util = streamTestUtil()
util.addTableSource[(Long, Long, Long)]("MyTable", 'a, 'b, 'c)
PartitionableSinkITCase.registerTableSink(
util.tableEnv,
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index 2fc0b23..12579dc 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -70,7 +70,6 @@ class PartitionableSinkITCase extends BatchTestBase {
tEnv.getConfig
.getConfiguration
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 3)
- tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
registerCollection("nonSortTable", testData, type3, "a, b, c",
dataNullables)
registerCollection("sortTable", testData1, type3, "a, b, c", dataNullables)
PartitionableSinkITCase.init()
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 7609994..6438c25 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -244,8 +244,8 @@ public class SqlToOperationConverterTest {
" 'connector' = 'kafka', \n" +
" 'kafka.topic' = 'log.test'\n" +
")\n";
- final FlinkPlannerImpl planner =
getPlannerBySqlDialect(SqlDialect.DEFAULT);
- SqlNode node =
getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql);
+ final FlinkPlannerImpl planner =
getPlannerBySqlDialect(SqlDialect.HIVE);
+ SqlNode node =
getParserBySqlDialect(SqlDialect.HIVE).parse(sql);
assert node instanceof SqlCreateTable;
Operation operation = SqlToOperationConverter.convert(planner,
catalogManager, node).get();
assert operation instanceof CreateTableOperation;
@@ -305,8 +305,8 @@ public class SqlToOperationConverterTest {
" 'connector' = 'kafka', \n" +
" 'kafka.topic' = 'log.test'\n" +
")\n";
- final FlinkPlannerImpl planner =
getPlannerBySqlDialect(SqlDialect.DEFAULT);
- SqlNode node =
getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql);
+ final FlinkPlannerImpl planner =
getPlannerBySqlDialect(SqlDialect.HIVE);
+ SqlNode node =
getParserBySqlDialect(SqlDialect.HIVE).parse(sql);
assert node instanceof SqlCreateTable;
SqlToOperationConverter.convert(planner, catalogManager, node);
}
@@ -314,8 +314,8 @@ public class SqlToOperationConverterTest {
@Test
public void testSqlInsertWithStaticPartition() {
final String sql = "insert into t1 partition(a=1) select b, c,
d from t2";
- FlinkPlannerImpl planner =
getPlannerBySqlDialect(SqlDialect.HIVE);
- SqlNode node =
getParserBySqlDialect(SqlDialect.HIVE).parse(sql);
+ FlinkPlannerImpl planner =
getPlannerBySqlDialect(SqlDialect.DEFAULT);
+ SqlNode node =
getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql);
assert node instanceof RichSqlInsert;
Operation operation = SqlToOperationConverter.convert(planner,
catalogManager, node).get();
assert operation instanceof CatalogSinkModifyOperation;