This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-hive.git

commit ddbb538e15f18ef83db15fd40bed00912256d5b7
Author: Wencong Liu <104502720+wencong...@users.noreply.github.com>
AuthorDate: Fri May 24 09:34:24 2024 +0800

    [FLINK-35221][hive] Support SQL 2011 reserved keywords as identifiers in 
HiveParser (#18)
---
 .../flink/connectors/hive/HiveTableSink.java       |   3 +-
 .../delegation/hive/HiveParserConstants.java       |   2 +
 .../delegation/hive/parse/FromClauseASTParser.g    |   3 +
 .../planner/delegation/hive/parse/HiveASTParser.g  |   7 +
 .../delegation/hive/parse/IdentifiersASTParser.g   |  24 +++
 .../delegation/hive/parse/SelectClauseASTParser.g  |   3 +
 ...portSQL11ReservedKeywordAsIdentifierITTest.java | 223 +++++++++++++++++++++
 7 files changed, 263 insertions(+), 2 deletions(-)

diff --git 
a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 
b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index f55fed51..e89cdbaf 100644
--- 
a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ 
b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -607,8 +607,7 @@ public class HiveTableSink implements DynamicTableSink, 
SupportsPartitioning, Su
         builder.setOverwrite(overwrite);
         builder.setIsToLocal(isToLocal);
         builder.setStaticPartitions(staticPartitionSpec);
-        builder.setTempPath(
-                new 
org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf)));
+        builder.setPath(new 
org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf)));
         builder.setOutputFileConfig(fileNaming);
         builder.setIdentifier(identifier);
         builder.setPartitionCommitPolicyFactory(
diff --git 
a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java
 
b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java
index 3bd3e0ff..ab7744f1 100644
--- 
a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java
+++ 
b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserConstants.java
@@ -30,4 +30,6 @@ public class HiveParserConstants {
     /* Constants for insert overwrite directory */
     public static final String IS_INSERT_DIRECTORY = "is-insert-directory";
     public static final String IS_TO_LOCAL_DIRECTORY = "is-to-local-directory";
+    public static final String HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS =
+            "hive.support.sql11.reserved.keywords";
 }
diff --git 
a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g
 
b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g
index 77ed8364..6e773957 100644
--- 
a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g
+++ 
b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/FromClauseASTParser.g
@@ -37,6 +37,9 @@ k=3;
       RecognitionException e) {
     gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames));
   }
+  protected boolean useSQL11ReservedKeywordsForIdentifier() {
+    return gParent.useSQL11ReservedKeywordsForIdentifier();
+  }
 }
 
 @rulecatch {
diff --git 
a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g
 
b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g
index 367d4b21..ec44d95d 100644
--- 
a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g
+++ 
b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveASTParser.g
@@ -417,6 +417,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode;
 import org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseError;
+import static 
org.apache.flink.table.planner.delegation.hive.HiveParserConstants.HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS;
 }
 
 
@@ -721,6 +722,12 @@ import 
org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseError;
   public void setHiveConf(Configuration hiveConf) {
     this.hiveConf = hiveConf;
   }
+  protected boolean useSQL11ReservedKeywordsForIdentifier() {
+    if(hiveConf==null){
+      return false;
+    }
+    return !hiveConf.getBoolean(HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS, true);
+  }
 }
 
 @rulecatch {
diff --git 
a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g
 
b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g
index 3ab8631d..4d21bd9b 100644
--- 
a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g
+++ 
b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/IdentifiersASTParser.g
@@ -37,6 +37,9 @@ k=3;
       RecognitionException e) {
     gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames));
   }
+  protected boolean useSQL11ReservedKeywordsForIdentifier() {
+    return gParent.useSQL11ReservedKeywordsForIdentifier();
+  }
 }
 
 @rulecatch {
@@ -730,6 +733,8 @@ identifier
     :
     Identifier
     | nonReserved -> Identifier[$nonReserved.start]
+    // The reserved keywords in SQL 2011 can be used as identifiers if 
useSQL11ReservedKeywordsForIdentifier() == true.
+    | {useSQL11ReservedKeywordsForIdentifier()}? 
sql11ReservedKeywordsUsedAsIdentifier -> 
Identifier[$sql11ReservedKeywordsUsedAsIdentifier.start]
     ;
 
 functionIdentifier
@@ -806,3 +811,22 @@ sql11ReservedKeywordsUsedAsFunctionName
     :
     KW_IF | KW_ARRAY | KW_MAP | KW_BIGINT | KW_BINARY | KW_BOOLEAN | 
KW_CURRENT_DATE | KW_CURRENT_TIMESTAMP | KW_DATE | KW_DOUBLE | KW_FLOAT | 
KW_GROUPING | KW_INT | KW_SMALLINT | KW_TIMESTAMP
     ;
+
+//The following SQL2011 reserved keywords can be used as identifiers if 
useSQL11ReservedKeywordsForIdentifier() == true.
+sql11ReservedKeywordsUsedAsIdentifier
+    :
+    KW_ALL | KW_ALTER | KW_ARRAY | KW_AS | KW_AUTHORIZATION | KW_BETWEEN | 
KW_BIGINT | KW_BINARY | KW_BOOLEAN
+    | KW_BOTH | KW_BY | KW_CREATE | KW_CUBE | KW_CURRENT_DATE | 
KW_CURRENT_TIMESTAMP | KW_CURSOR | KW_DATE | KW_DECIMAL | KW_DELETE | 
KW_DESCRIBE
+    | KW_DOUBLE | KW_DROP | KW_EXISTS | KW_EXTERNAL | KW_FALSE | KW_FETCH | 
KW_FLOAT | KW_FOR | KW_FULL | KW_GRANT
+    | KW_GROUP | KW_GROUPING | KW_IMPORT | KW_IN | KW_INNER | KW_INSERT | 
KW_INT | KW_INTERSECT | KW_INTO | KW_IS | KW_LATERAL
+    | KW_LEFT | KW_LIKE | KW_LOCAL | KW_NONE | KW_NULL | KW_OF | KW_ORDER | 
KW_OUT | KW_OUTER | KW_PARTITION
+    | KW_PERCENT | KW_PROCEDURE | KW_RANGE | KW_READS | KW_REVOKE | KW_RIGHT
+    | KW_ROLLUP | KW_ROW | KW_ROWS | KW_SET | KW_SMALLINT | KW_TABLE | 
KW_TIMESTAMP | KW_TO | KW_TRIGGER | KW_TRUE
+    | KW_TRUNCATE | KW_UNION | KW_UPDATE | KW_USER | KW_USING | KW_VALUES | 
KW_WITH
+    | KW_REGEXP | KW_RLIKE
+    | KW_PRIMARY
+    | KW_FOREIGN
+    | KW_CONSTRAINT
+    | KW_REFERENCES
+    | KW_PRECISION
+    ;
diff --git 
a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g
 
b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g
index c141d310..ac64e031 100644
--- 
a/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g
+++ 
b/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/SelectClauseASTParser.g
@@ -37,6 +37,9 @@ k=3;
       RecognitionException e) {
     gParent.errors.add(new HiveASTParseError(gParent, e, tokenNames));
   }
+  protected boolean useSQL11ReservedKeywordsForIdentifier() {
+    return gParent.useSQL11ReservedKeywordsForIdentifier();
+  }
 }
 
 @rulecatch {
diff --git 
a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest.java
 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest.java
new file mode 100644
index 00000000..a652ae72
--- /dev/null
+++ 
b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.module.CoreModule;
+import org.apache.flink.table.module.hive.HiveModule;
+import org.apache.flink.table.planner.delegation.hive.HiveParserConstants;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThatNoException;
+
+/** Test with SQL11 reserved keywords in hive queries. */
+class HiveDialectSupportSQL11ReservedKeywordAsIdentifierITTest {
+    private static HiveCatalog hiveCatalog;
+    private static TableEnvironment tableEnv;
+    private static List<String> sql11ReservedKeywords;
+
+    @BeforeAll
+    static void setup() throws Exception {
+        hiveCatalog = HiveTestUtils.createHiveCatalog();
+        hiveCatalog
+                .getHiveConf()
+                
.setBoolean(HiveParserConstants.HIVE_SUPPORT_SQL_11_RESERVED_KEYWORDS, false);
+        hiveCatalog.open();
+        tableEnv = getTableEnvWithHiveCatalog();
+        sql11ReservedKeywords =
+                Arrays.asList(
+                        "ALL",
+                        "ALTER",
+                        "ARRAY",
+                        "AS",
+                        "AUTHORIZATION",
+                        "BETWEEN",
+                        "BIGINT",
+                        "BINARY",
+                        "BOOLEAN",
+                        "BOTH",
+                        "BY",
+                        "CREATE",
+                        "CUBE",
+                        "CURRENT_DATE",
+                        "CURRENT_TIMESTAMP",
+                        "CURSOR",
+                        "DATE",
+                        "DECIMAL",
+                        "DELETE",
+                        "DESCRIBE",
+                        "DOUBLE",
+                        "DROP",
+                        "EXISTS",
+                        "EXTERNAL",
+                        "FALSE",
+                        "FETCH",
+                        "FLOAT",
+                        "FOR",
+                        "FULL",
+                        "GRANT",
+                        "GROUP",
+                        "GROUPING",
+                        "IMPORT",
+                        "IN",
+                        "INNER",
+                        "INSERT",
+                        "INT",
+                        "INTERSECT",
+                        "INTO",
+                        "IS",
+                        "LATERAL",
+                        "LEFT",
+                        "LIKE",
+                        "LOCAL",
+                        "NONE",
+                        "NULL",
+                        "OF",
+                        "ORDER",
+                        "OUT",
+                        "OUTER",
+                        "PARTITION",
+                        "PERCENT",
+                        "PROCEDURE",
+                        "RANGE",
+                        "READS",
+                        "REVOKE",
+                        "RIGHT",
+                        "ROLLUP",
+                        "ROW",
+                        "ROWS",
+                        "SET",
+                        "SMALLINT",
+                        "TABLE",
+                        "TIMESTAMP",
+                        "TO",
+                        "TRIGGER",
+                        "TRUE",
+                        "TRUNCATE",
+                        "UNION",
+                        "UPDATE",
+                        "USER",
+                        "USING",
+                        "VALUES",
+                        "WITH",
+                        "REGEXP",
+                        "RLIKE",
+                        "PRIMARY",
+                        "FOREIGN",
+                        "CONSTRAINT",
+                        "REFERENCES",
+                        "PRECISION");
+    }
+
+    @Test
+    void testReservedKeywordAsIdentifierInDDL() {
+        List<String> toRun =
+                new ArrayList<>(
+                        Arrays.asList(
+                                "create table table1 (x int, %s int)",
+                                "create table table2 (x int) partitioned by 
(%s string, q string)",
+                                "create table table3 (\n"
+                                        + "  a int,\n"
+                                        + "  %s struct<f1: boolean, f2: 
string, f3: struct<f4: int, f5: double>, f6: int>\n"
+                                        + ")"));
+        Random random = new Random();
+        for (String queryTemplate : toRun) {
+            // Select a random keyword.
+            String chosenKeyword =
+                    
sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size()));
+            String finalQuery = String.format(queryTemplate, chosenKeyword);
+            runQuery(finalQuery);
+        }
+    }
+
+    @Test
+    void testReservedKeywordAsIdentifierInDQL() {
+        List<String> toRun =
+                new ArrayList<>(
+                        Arrays.asList(
+                                "create table table4(id int,name string,dep 
string,%s int,age int)",
+                                "select avg(%s) over (partition by dep) as 
avgsal from table4",
+                                "select dep,name,%s from (select 
dep,name,%s,rank() over "
+                                        + "(partition by dep order by %s desc) 
as rnk from table4) a where rnk=1",
+                                "select %s,sum(cnt) over (order by 
%s)/sum(cnt) over "
+                                        + "(order by %s ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING) from"
+                                        + " (select %s,count(*) as cnt from 
table4 group by %s) a"));
+        Random random = new Random();
+        // Select a random keyword.
+        String chosenKeyword =
+                
sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size()));
+        for (String queryTemplate : toRun) {
+            String finalQuery = queryTemplate.replace("%s", chosenKeyword);
+            runQuery(finalQuery);
+        }
+    }
+
+    @Test
+    void testReservedKeywordAsIdentifierInDML() {
+        List<String> toRun =
+                new ArrayList<>(
+                        Arrays.asList(
+                                "create table table5 (%s string, value 
string)",
+                                "create table table6(key int, ten int, one 
int, value string)",
+                                "from table5 insert overwrite table table6 map 
table5.%s,"
+                                        + " CAST(table5.%s / 10 AS INT), 
CAST(table5.%s % 10 AS INT),"
+                                        + " table5.value using 'cat' as (tkey, 
ten, one, tvalue)"
+                                        + " distribute by tvalue, tkey"));
+        Random random = new Random();
+        // Select a random keyword.
+        String chosenKeyword =
+                
sql11ReservedKeywords.get(random.nextInt(sql11ReservedKeywords.size()));
+        for (String queryTemplate : toRun) {
+            String finalQuery = queryTemplate.replace("%s", chosenKeyword);
+            runQuery(finalQuery);
+        }
+    }
+
+    private void runQuery(String query) {
+        assertThatNoException()
+                .isThrownBy(
+                        () -> 
CollectionUtil.iteratorToList(tableEnv.executeSql(query).collect()));
+    }
+
+    private static TableEnvironment getTableEnvWithHiveCatalog() {
+        TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+        tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tableEnv.useCatalog(hiveCatalog.getName());
+        // automatically load hive module in hive-compatible mode
+        HiveModule hiveModule = new HiveModule(hiveCatalog.getHiveVersion());
+        CoreModule coreModule = CoreModule.INSTANCE;
+        for (String loaded : tableEnv.listModules()) {
+            tableEnv.unloadModule(loaded);
+        }
+        tableEnv.loadModule("hive", hiveModule);
+        tableEnv.loadModule("core", coreModule);
+        return tableEnv;
+    }
+}

Reply via email to