This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new a898dae  [FLINK-15269][table] Fix INSERT OVERWRITE/PARTITION syntax 
shouldn't be limited to Hive dialect (#10587)
a898dae is described below

commit a898daeddb7764b4a79a71e5b1a7025dc66518b0
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Dec 20 14:35:09 2019 +0800

    [FLINK-15269][table] Fix INSERT OVERWRITE/PARTITION syntax shouldn't be 
limited to Hive dialect (#10587)
---
 .../src/main/codegen/includes/parserImpls.ftl       | 19 ++++++++-----------
 .../flink/sql/parser/utils/ParserResource.java      |  7 ++-----
 .../sql/parser/validate/FlinkSqlConformance.java    | 21 +++------------------
 .../ParserResource.properties                       |  3 +--
 .../flink/sql/parser/FlinkSqlParserImplTest.java    | 19 +++++++++++++------
 .../operations/SqlToOperationConverterTest.java     | 12 ++++++------
 .../plan/batch/sql/PartitionableSinkTest.scala      |  4 +---
 .../plan/stream/sql/PartitionableSinkTest.scala     |  4 +---
 .../runtime/batch/sql/PartitionableSinkITCase.scala |  1 -
 .../table/sqlexec/SqlToOperationConverterTest.java  | 12 ++++++------
 10 files changed, 41 insertions(+), 61 deletions(-)

diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 58cd248..9e7bd8e 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -570,7 +570,12 @@ SqlCreate SqlCreateTable(Span s, boolean replace) :
     }]
     [
         <PARTITIONED> <BY>
-        partitionColumns = ParenthesizedSimpleIdentifierList()
+        partitionColumns = ParenthesizedSimpleIdentifierList() {
+            if (!((FlinkSqlConformance) 
this.conformance).allowCreatePartitionedTable()) {
+                throw SqlUtil.newContextException(getPos(),
+                    
ParserResource.RESOURCE.createPartitionedTableIsOnlyAllowedForHive());
+            }
+        }
     ]
     [
         <WITH>
@@ -636,10 +641,7 @@ SqlNode RichSqlInsert() :
         <INTO>
     |
         <OVERWRITE> {
-            if (!((FlinkSqlConformance) 
this.conformance).allowInsertOverwrite()) {
-                throw SqlUtil.newContextException(getPos(),
-                    ParserResource.RESOURCE.overwriteIsOnlyAllowedForHive());
-            } else if (RichSqlInsert.isUpsert(keywords)) {
+            if (RichSqlInsert.isUpsert(keywords)) {
                 throw SqlUtil.newContextException(getPos(),
                     ParserResource.RESOURCE.overwriteIsOnlyUsedWithInsert());
             }
@@ -672,12 +674,7 @@ SqlNode RichSqlInsert() :
         }
     ]
     [
-        <PARTITION> PartitionSpecCommaList(partitionList) {
-            if (!((FlinkSqlConformance) 
this.conformance).allowInsertIntoPartition()) {
-                throw SqlUtil.newContextException(getPos(),
-                    ParserResource.RESOURCE.partitionIsOnlyAllowedForHive());
-            }
-        }
+        <PARTITION> PartitionSpecCommaList(partitionList)
     ]
     source = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) {
         return new RichSqlInsert(s.end(source), keywordList, 
extendedKeywordList, table, source,
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 8d7d59d..5989a37 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -33,12 +33,9 @@ public interface ParserResource {
        @Resources.BaseMessage("Multiple WATERMARK statements is not supported 
yet.")
        Resources.ExInst<ParseException> multipleWatermarksUnsupported();
 
-       @Resources.BaseMessage("OVERWRITE expression is only allowed for HIVE 
dialect.")
-       Resources.ExInst<ParseException> overwriteIsOnlyAllowedForHive();
-
        @Resources.BaseMessage("OVERWRITE expression is only used with INSERT 
statement.")
        Resources.ExInst<ParseException> overwriteIsOnlyUsedWithInsert();
 
-       @Resources.BaseMessage("PARTITION expression is only allowed for HIVE 
dialect.")
-       Resources.ExInst<ParseException> partitionIsOnlyAllowedForHive();
+       @Resources.BaseMessage("Creating partitioned table is only allowed for 
HIVE dialect.")
+       Resources.ExInst<ParseException> 
createPartitionedTableIsOnlyAllowedForHive();
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
index 94cb6be..d53b050 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
@@ -147,24 +147,9 @@ public enum FlinkSqlConformance implements SqlConformance {
        }
 
        /**
-        * Whether to allow "insert into tbl1 partition(col1=val1)" grammar.
+        * Whether to allow "create table T(i int, j int) partitioned by (i)" 
grammar.
         */
-       public boolean allowInsertIntoPartition() {
-               switch (this) {
-                       case HIVE:
-                               return true;
-               }
-               return false;
-       }
-
-       /**
-        * Whether to allow "insert overwrite tbl1 partition(col1=val1)" 
grammar.
-        */
-       public boolean allowInsertOverwrite() {
-               switch (this) {
-                       case HIVE:
-                               return true;
-               }
-               return false;
+       public boolean allowCreatePartitionedTable() {
+               return this == FlinkSqlConformance.HIVE;
        }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
 
b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
index a48c4da..9993e7c 100644
--- 
a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
+++ 
b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
@@ -17,6 +17,5 @@
 # See wrapper class org.apache.calcite.runtime.CalciteResource.
 #
 MultipleWatermarksUnsupported=Multiple WATERMARK statements is not supported 
yet.
-OverwriteIsOnlyAllowedForHive=OVERWRITE expression is only allowed for HIVE 
dialect.
 OverwriteIsOnlyUsedWithInsert=OVERWRITE expression is only used with INSERT 
statement.
-PartitionIsOnlyAllowedForHive=PARTITION expression is only allowed for HIVE 
dialect.
+CreatePartitionedTableIsOnlyAllowedForHive=Creating partitioned table is only 
allowed for HIVE dialect.
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index ea0ff1c..6bb594c 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -203,6 +203,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
        @Test
        public void testCreateTable() {
+               conformance0 = FlinkSqlConformance.HIVE;
                check("CREATE TABLE tbl1 (\n" +
                                "  a bigint,\n" +
                                "  h varchar, \n" +
@@ -235,6 +236,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
        @Test
        public void testCreateTableWithComment() {
+               conformance0 = FlinkSqlConformance.HIVE;
                check("CREATE TABLE tbl1 (\n" +
                                "  a bigint comment 'test column comment 
AAA.',\n" +
                                "  h varchar, \n" +
@@ -269,6 +271,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
        @Test
        public void testCreateTableWithPrimaryKeyAndUniqueKey() {
+               conformance0 = FlinkSqlConformance.HIVE;
                check("CREATE TABLE tbl1 (\n" +
                                "  a bigint comment 'test column comment 
AAA.',\n" +
                                "  h varchar, \n" +
@@ -533,6 +536,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
        @Test
        public void testCreateInvalidPartitionedTable() {
+               conformance0 = FlinkSqlConformance.HIVE;
                String sql = "create table sls_stream1(\n" +
                        "  a bigint,\n" +
                        "  b VARCHAR,\n" +
@@ -543,7 +547,16 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
                        ") with ( 'x' = 'y', 'asd' = 'dada')";
                sql(sql).node(new ValidationMatcher()
                        .fails("Partition column [C] not defined in columns, at 
line 6, column 3"));
+       }
 
+       @Test
+       public void testNotAllowedCreatePartition() {
+               conformance0 = FlinkSqlConformance.DEFAULT;
+               String sql = "create table sls_stream1(\n" +
+                               "  a bigint,\n" +
+                               "  b VARCHAR\n" +
+                               ") PARTITIONED BY (a^)^ with ( 'x' = 'y', 'asd' 
= 'dada')";
+               sql(sql).fails("Creating partitioned table is only allowed for 
HIVE dialect.");
        }
 
        @Test
@@ -598,7 +611,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
        @Test
        public void testInsertPartitionSpecs() {
-               conformance0 = FlinkSqlConformance.HIVE;
                final String sql1 = "insert into emps(x,y) partition (x='ab', 
y='bc') select * from emps";
                final String expected = "INSERT INTO `EMPS` (`X`, `Y`)\n"
                        + "PARTITION (`X` = 'ab', `Y` = 'bc')\n"
@@ -620,7 +632,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
        @Test
        public void testInsertCaseSensitivePartitionSpecs() {
-               conformance0 = FlinkSqlConformance.HIVE;
                final String expected = "INSERT INTO `emps` (`x`, `y`)\n"
                        + "PARTITION (`x` = 'ab', `y` = 'bc')\n"
                        + "(SELECT *\n"
@@ -632,7 +643,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
        @Test
        public void testInsertExtendedColumnAsStaticPartition1() {
-               conformance0 = FlinkSqlConformance.HIVE;
                String expected = "INSERT INTO `EMPS` EXTEND (`Z` BOOLEAN) 
(`X`, `Y`)\n"
                        + "PARTITION (`Z` = 'ab')\n"
                        + "(SELECT *\n"
@@ -643,7 +653,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
        @Test(expected = SqlParseException.class)
        public void testInsertExtendedColumnAsStaticPartition2() {
-               conformance0 = FlinkSqlConformance.HIVE;
                sql("insert into emps(x, y, z boolean) partition (z='ab') 
select * from emps")
                        .node(new ValidationMatcher()
                                .fails("Extended columns not allowed under the 
current SQL conformance level"));
@@ -651,7 +660,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
        @Test
        public void testInsertOverwrite() {
-               conformance0 = FlinkSqlConformance.HIVE;
                // non-partitioned
                check("INSERT OVERWRITE myDB.myTbl SELECT * FROM src",
                        "INSERT OVERWRITE `MYDB`.`MYTBL`\n"
@@ -668,7 +676,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
        @Test
        public void testInvalidUpsertOverwrite() {
-               conformance0 = FlinkSqlConformance.HIVE;
                sql("UPSERT ^OVERWRITE^ myDB.myTbl SELECT * FROM src")
                        .fails("OVERWRITE expression is only used with INSERT 
statement.");
        }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index bf3567b..e9aa754 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -244,8 +244,8 @@ public class SqlToOperationConverterTest {
                        "    'connector' = 'kafka', \n" +
                        "    'kafka.topic' = 'log.test'\n" +
                        ")\n";
-               FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
-               final CalciteParser parser = 
getParserBySqlDialect(SqlDialect.DEFAULT);
+               FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.HIVE);
+               final CalciteParser parser = 
getParserBySqlDialect(SqlDialect.HIVE);
                Operation operation = parse(sql, planner, parser);
                assert operation instanceof CreateTableOperation;
                CreateTableOperation op = (CreateTableOperation) operation;
@@ -263,8 +263,8 @@ public class SqlToOperationConverterTest {
 
        @Test(expected = SqlConversionException.class)
        public void testCreateTableWithPkUniqueKeys() {
-               FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
-               final CalciteParser parser = 
getParserBySqlDialect(SqlDialect.DEFAULT);
+               FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.HIVE);
+               final CalciteParser parser = 
getParserBySqlDialect(SqlDialect.HIVE);
                final String sql = "CREATE TABLE tbl1 (\n" +
                        "  a bigint,\n" +
                        "  b varchar, \n" +
@@ -351,8 +351,8 @@ public class SqlToOperationConverterTest {
        @Test
        public void testSqlInsertWithStaticPartition() {
                final String sql = "insert into t1 partition(a=1) select b, c, 
d from t2";
-               FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.HIVE);
-               final CalciteParser parser = 
getParserBySqlDialect(SqlDialect.HIVE);
+               FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
+               final CalciteParser parser = 
getParserBySqlDialect(SqlDialect.DEFAULT);
                Operation operation = parse(sql, planner, parser);
                assert operation instanceof CatalogSinkModifyOperation;
                CatalogSinkModifyOperation sinkModifyOperation = 
(CatalogSinkModifyOperation) operation;
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
index 0fd5626..84d5659 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSinkTest.scala
@@ -30,9 +30,7 @@ import org.junit.Test
 
 class PartitionableSinkTest extends TableTestBase {
 
-  val conf = new TableConfig
-  conf.setSqlDialect(SqlDialect.HIVE)
-  private val util = batchTestUtil(conf)
+  private val util = batchTestUtil()
   util.addTableSource[(Long, Long, Long)]("MyTable", 'a, 'b, 'c)
   PartitionableSinkITCase.registerTableSink(
     util.tableEnv,
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/PartitionableSinkTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/PartitionableSinkTest.scala
index b1e0841..f77ebce 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/PartitionableSinkTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/PartitionableSinkTest.scala
@@ -30,9 +30,7 @@ import org.junit.Test
 
 class PartitionableSinkTest extends TableTestBase {
 
-  val conf = new TableConfig
-  conf.setSqlDialect(SqlDialect.HIVE)
-  private val util = streamTestUtil(conf)
+  private val util = streamTestUtil()
   util.addTableSource[(Long, Long, Long)]("MyTable", 'a, 'b, 'c)
   PartitionableSinkITCase.registerTableSink(
     util.tableEnv,
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index 2fc0b23..12579dc 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -70,7 +70,6 @@ class PartitionableSinkITCase extends BatchTestBase {
     tEnv.getConfig
       .getConfiguration
       
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 3)
-    tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
     registerCollection("nonSortTable", testData, type3, "a, b, c", 
dataNullables)
     registerCollection("sortTable", testData1, type3, "a, b, c", dataNullables)
     PartitionableSinkITCase.init()
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 7609994..6438c25 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -244,8 +244,8 @@ public class SqlToOperationConverterTest {
                        "    'connector' = 'kafka', \n" +
                        "    'kafka.topic' = 'log.test'\n" +
                        ")\n";
-               final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
-               SqlNode node = 
getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql);
+               final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.HIVE);
+               SqlNode node = 
getParserBySqlDialect(SqlDialect.HIVE).parse(sql);
                assert node instanceof SqlCreateTable;
                Operation operation = SqlToOperationConverter.convert(planner, 
catalogManager, node).get();
                assert operation instanceof CreateTableOperation;
@@ -305,8 +305,8 @@ public class SqlToOperationConverterTest {
                        "    'connector' = 'kafka', \n" +
                        "    'kafka.topic' = 'log.test'\n" +
                        ")\n";
-               final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
-               SqlNode node = 
getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql);
+               final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.HIVE);
+               SqlNode node = 
getParserBySqlDialect(SqlDialect.HIVE).parse(sql);
                assert node instanceof SqlCreateTable;
                SqlToOperationConverter.convert(planner, catalogManager, node);
        }
@@ -314,8 +314,8 @@ public class SqlToOperationConverterTest {
        @Test
        public void testSqlInsertWithStaticPartition() {
                final String sql = "insert into t1 partition(a=1) select b, c, 
d from t2";
-               FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.HIVE);
-               SqlNode node = 
getParserBySqlDialect(SqlDialect.HIVE).parse(sql);
+               FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
+               SqlNode node = 
getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql);
                assert node instanceof RichSqlInsert;
                Operation operation = SqlToOperationConverter.convert(planner, 
catalogManager, node).get();
                assert operation instanceof CatalogSinkModifyOperation;

Reply via email to