This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-hive.git
commit ddbb538e15f18ef83db15fd40bed00912256d5b7 Author: Wencong Liu <104502720+wencong...@users.noreply.github.com> AuthorDate: Fri May 24 09:34:24 2024 +0800 [FLINK-35221][hive] Support SQL 2011 reserved keywords as identifiers in HiveParser (#18) --- .../flink/connectors/hive/HiveTableSink.java | 3 +- .../delegation/hive/HiveParserConstants.java | 2 + .../delegation/hive/parse/FromClauseASTParser.g | 3 + .../planner/delegation/hive/parse/HiveASTParser.g | 7 + .../delegation/hive/parse/IdentifiersASTParser.g | 24 +++ .../delegation/hive/parse/SelectClauseASTParser.g | 3 + ...portSQL11ReservedKeywordAsIdentifierITTest.java | 223 +++++++++++++++++++++ 7 files changed, 263 insertions(+), 2 deletions(-) diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java index f55fed51..e89cdbaf 100644 --- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java +++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java @@ -607,8 +607,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su builder.setOverwrite(overwrite); builder.setIsToLocal(isToLocal); builder.setStaticPartitions(staticPartitionSpec); - builder.setTempPath( - new org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf))); + builder.setPath(new org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf))); builder.setOutputFileConfig(fileNaming); builder.setIdentifier(identifier); builder.setPartitionCommitPolicyFactory( diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java index 3bd3e0ff..ab7744f1 100644 --- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java +++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java @@ -30,4 +30,6 @@ public class HiveParserConstants { /* Constants for insert overwrite directory */ public static final String IS_INSERT_DIRECTORY = "is-insert-directory"; public static final String IS_TO_LOCAL_DIRECTORY = "is-to-local-directory"; + public static final String HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS = + "hive.support.sql11.reserved.keywords"; } diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g index 77ed8364..6e773957 100644 --- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g +++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g @@ -37,6 +37,9 @@ k=3; RecognitionException e) { gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames)); } + protected boolean useSQL11ReservedKeywordsForIdentifier() { + return gParent.useSQL11ReservedKeywordsForIdentifier(); + } } @rulecatch { diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g index 367d4b21..ec44d95d 100644 --- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g +++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g @@ -417,6 +417,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode; import org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseError; +import static org.apache.flink.table.planner.delegation.hive.HiveParserConstants.HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS; } @@ -721,6 +722,12 @@ import org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseError; public void setHiveConf(Configuration hiveConf) { this.hiveConf = hiveConf; } + protected boolean useSQL11ReservedKeywordsForIdentifier() { + if(hiveConf==null){ + return false; + } + return !hiveConf.getBoolean(HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS, true); + } } @rulecatch { diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g index 3ab8631d..4d21bd9b 100644 --- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g +++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g @@ -37,6 +37,9 @@ k=3; RecognitionException e) { gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames)); } + protected boolean useSQL11ReservedKeywordsForIdentifier() { + return gParent.useSQL11ReservedKeywordsForIdentifier(); + } } @rulecatch { @@ -730,6 +733,8 @@ identifier : Identifier | nonReserved -> Identifier[$nonReserved.start] + // The reserved keywords in SQL 2011 can be used as identifiers if useSQL11ReservedKeywordsForIdentifier() == true. + | {useSQL11ReservedKeywordsForIdentifier()}? sql11ReservedKeywordsUsedAsIdentifier -> Identifier[$sql11ReservedKeywordsUsedAsIdentifier.start] ; functionIdentifier @@ -806,3 +811,22 @@ sql11ReservedKeywordsUsedAsFunctionName : KW_IF | KW_ARRAY | KW_MAP | KW_BIGINT | KW_BINARY | KW_BOOLEAN | KW_CURRENT_DATE | KW_CURRENT_TIMESTAMP | KW_DATE | KW_DOUBLE | KW_FLOAT | KW_GROUPING | KW_INT | KW_SMALLINT | KW_TIMESTAMP ; + +//The following SQL2011 reserved keywords can be used as identifiers if useSQL11ReservedKeywordsForIdentifier() == true. +sql11ReservedKeywordsUsedAsIdentifier + : + KW_ALL | KW_ALTER | KW_ARRAY | KW_AS | KW_AUTHORIZATION | KW_BETWEEN | KW_BIGINT | KW_BINARY | KW_BOOLEAN + | KW_BOTH | KW_BY | KW_CREATE | KW_CUBE | KW_CURRENT_DATE | KW_CURRENT_TIMESTAMP | KW_CURSOR | KW_DATE | KW_DECIMAL | KW_DELETE | KW_DESCRIBE + | KW_DOUBLE | KW_DROP | KW_EXISTS | KW_EXTERNAL | KW_FALSE | KW_FETCH | KW_FLOAT | KW_FOR | KW_FULL | KW_GRANT + | KW_GROUP | KW_GROUPING | KW_IMPORT | KW_IN | KW_INNER | KW_INSERT | KW_INT | KW_INTERSECT | KW_INTO | KW_IS | KW_LATERAL + | KW_LEFT | KW_LIKE | KW_LOCAL | KW_NONE | KW_NULL | KW_OF | KW_ORDER | KW_OUT | KW_OUTER | KW_PARTITION + | KW_PERCENT | KW_PROCEDURE | KW_RANGE | KW_READS | KW_REVOKE | KW_RIGHT + | KW_ROLLUP | KW_ROW | KW_ROWS | KW_SET | KW_SMALLINT | KW_TABLE | KW_TIMESTAMP | KW_TO | KW_TRIGGER | KW_TRUE + | KW_TRUNCATE | KW_UNION | KW_UPDATE | KW_USER | KW_USING | KW_VALUES | KW_WITH + | KW_REGEXP | KW_RLIKE + | KW_PRIMARY + | KW_FOREIGN + | KW_CONSTRAINT + | KW_REFERENCES + | KW_PRECISION + ; diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g index c141d310..ac64e031 100644 --- a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g +++ b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g @@ -37,6 +37,9 @@ k=3; RecognitionException e) { gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames)); } + protected boolean useSQL11ReservedKeywordsForIdentifier() { + return gParent.useSQL11ReservedKeywordsForIdentifier(); + } } @rulecatch { diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest.java new file mode 100644 index 00000000..a652ae72 --- /dev/null +++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.HiveTestUtils; +import org.apache.flink.table.module.CoreModule; +import org.apache.flink.table.module.hive.HiveModule; +import org.apache.flink.table.planner.delegation.hive.HiveParserConstants; +import org.apache.flink.util.CollectionUtil; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThatNoException; + +/** Test with SQL11 reserved keywords in hive queries. */ +class HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest { + private static HiveCatalog hiveCatalog; + private static TableEnvironment tableEnv; + private static List<String> sql11ReservedKeywords; + + @BeforeAll + static void setup() throws Exception { + hiveCatalog = HiveTestUtils.createHiveCatalog(); + hiveCatalog + .getHiveConf() + .setBoolean(HiveParserConstants.HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS, false); + hiveCatalog.open(); + tableEnv = getTableEnvWithHiveCatalog(); + sql11ReservedKeywords = + Arrays.asList( + "ALL", + "ALTER", + "ARRAY", + "AS", + "AUTHORIZATION", + "BETWEEN", + "BIGINT", + "BINARY", + "BOOLEAN", + "BOTH", + "BY", + "CREATE", + "CUBE", + "CURRENT_DATE", + "CURRENT_TIMESTAMP", + "CURSOR", + "DATE", + "DECIMAL", + "DELETE", + "DESCRIBE", + "DOUBLE", + "DROP", + "EXISTS", + "EXTERNAL", + "FALSE", + "FETCH", + "FLOAT", + "FOR", + "FULL", + "GRANT", + "GROUP", + "GROUPING", + "IMPORT", + "IN", + "INNER", + "INSERT", + "INT", + "INTERSECT", + "INTO", + "IS", + "LATERAL", + "LEFT", + "LIKE", + "LOCAL", + "NONE", + "NULL", + "OF", + "ORDER", + "OUT", + "OUTER", + "PARTITION", + "PERCENT", + "PROCEDURE", + "RANGE", + "READS", + "REVOKE", + "RIGHT", + "ROLLUP", + "ROW", + "ROWS", + "SET", + "SMALLINT", + "TABLE", + "TIMESTAMP", + "TO", + "TRIGGER", + "TRUE", + "TRUNCATE", + "UNION", + "UPDATE", + "USER", + "USING", + "VALUES", + "WITH", + "REGEXP", + "RLIKE", + "PRIMARY", + "FOREIGN", + "CONSTRAINT", + "REFERENCES", + "PRECISION"); + } + + @Test + void testReservedKeywordAsIdentifierInDDL() { + List<String> toRun = + new ArrayList<>( + Arrays.asList( + "create table table1 (x int, %s int)", + "create table table2 (x int) partitioned by (%s string, q string)", + "create table table3 (\n" + + " a int,\n" + + " %s struct<f1: boolean, f2: string, f3: struct<f4: int, f5: double>, f6: int>\n" + + ")")); + Random random = new Random(); + for (String queryTemplate : toRun) { + // Select a random keyword. + String chosenKeyword = + sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size())); + String finalQuery = String.format(queryTemplate, chosenKeyword); + runQuery(finalQuery); + } + } + + @Test + void testReservedKeywordAsIdentifierInDQL() { + List<String> toRun = + new ArrayList<>( + Arrays.asList( + "create table table4(id int,name string,dep string,%s int,age int)", + "select avg(%s) over (partition by dep) as avgsal from table4", + "select dep,name,%s from (select dep,name,%s,rank() over " + + "(partition by dep order by %s desc) as rnk from table4) a where rnk=1", + "select %s,sum(cnt) over (order by %s)/sum(cnt) over " + + "(order by %s ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) from" + + " (select %s,count(*) as cnt from table4 group by %s) a")); + Random random = new Random(); + // Select a random keyword. + String chosenKeyword = + sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size())); + for (String queryTemplate : toRun) { + String finalQuery = queryTemplate.replace("%s", chosenKeyword); + runQuery(finalQuery); + } + } + + @Test + void testReservedKeywordAsIdentifierInDML() { + List<String> toRun = + new ArrayList<>( + Arrays.asList( + "create table table5 (%s string, value string)", + "create table table6(key int, ten int, one int, value string)", + "from table5 insert overwrite table table6 map table5.%s," + + " CAST(table5.%s / 10 AS INT), CAST(table5.%s % 10 AS INT)," + + " table5.value using 'cat' as (tkey, ten, one, tvalue)" + + " distribute by tvalue, tkey")); + Random random = new Random(); + // Select a random keyword. + String chosenKeyword = + sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size())); + for (String queryTemplate : toRun) { + String finalQuery = queryTemplate.replace("%s", chosenKeyword); + runQuery(finalQuery); + } + } + + private void runQuery(String query) { + assertThatNoException() + .isThrownBy( + () -> CollectionUtil.iteratorToList(tableEnv.executeSql(query).collect())); + } + + private static TableEnvironment getTableEnvWithHiveCatalog() { + TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE); + tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tableEnv.useCatalog(hiveCatalog.getName()); + // automatically load hive module in hive-compatible mode + HiveModule hiveModule = new HiveModule(hiveCatalog.getHiveVersion()); + CoreModule coreModule = CoreModule.INSTANCE; + for (String loaded : tableEnv.listModules()) { + tableEnv.unloadModule(loaded); + } + tableEnv.loadModule("hive", hiveModule); + tableEnv.loadModule("core", coreModule); + return tableEnv; + } +}