lirui-apache commented on a change in pull request #15151:
URL: https://github.com/apache/flink/pull/15151#discussion_r594115614



##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
##########
@@ -67,6 +172,180 @@
 
     @Override
     public List<Operation> parse(String statement) {
-        return super.parse(statement);
+        CatalogManager catalogManager = getCatalogManager();
+        Catalog currentCatalog =
+                
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).orElse(null);
+        if (!(currentCatalog instanceof HiveCatalog)) {
+            LOG.warn("Current catalog is not HiveCatalog. Falling back to 
Flink's planner.");
+            return super.parse(statement);
+        }
+        HiveConf hiveConf = new HiveConf(((HiveCatalog) 
currentCatalog).getHiveConf());
+        hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, 
"nonstrict");
+        hiveConf.set("hive.allow.udf.load.on.demand", "false");

Review comment:
       `HiveConf.ConfVars.HIVE_ALLOW_UDF_LOAD_ON_DEMAND` was not available in 
older hive versions

##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
##########
@@ -308,7 +308,7 @@ public void testAlterTable() throws Exception {
         // change location
         String newLocation = warehouse + "/tbl1_new_location";
         tableEnv.executeSql(
-                String.format("alter table `default`.tbl1 set location '%s'", 
newLocation));
+                String.format("alter table default.tbl1 set location '%s'", 
newLocation));

Review comment:
       We do. This is to verify users no longer have to escape `default` 
keyword in hive dialect.

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
##########
@@ -41,6 +72,80 @@
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HiveParser.class);
 
+    private static final Method setCurrentTSMethod =
+            HiveReflectionUtils.tryGetMethod(
+                    SessionState.class, "setupQueryCurrentTimestamp", new 
Class[0]);
+
+    // need to maintain the ASTNode types for DDLs
+    private static final Set<Integer> DDL_NODES;
+
+    static {
+        DDL_NODES =
+                new HashSet<>(
+                        Arrays.asList(
+                                HiveASTParser.TOK_ALTERTABLE,
+                                HiveASTParser.TOK_ALTERVIEW,
+                                HiveASTParser.TOK_CREATEDATABASE,
+                                HiveASTParser.TOK_DROPDATABASE,
+                                HiveASTParser.TOK_SWITCHDATABASE,
+                                HiveASTParser.TOK_DROPTABLE,
+                                HiveASTParser.TOK_DROPVIEW,
+                                HiveASTParser.TOK_DROP_MATERIALIZED_VIEW,
+                                HiveASTParser.TOK_DESCDATABASE,
+                                HiveASTParser.TOK_DESCTABLE,
+                                HiveASTParser.TOK_DESCFUNCTION,
+                                HiveASTParser.TOK_MSCK,
+                                HiveASTParser.TOK_ALTERINDEX_REBUILD,
+                                HiveASTParser.TOK_ALTERINDEX_PROPERTIES,
+                                HiveASTParser.TOK_SHOWDATABASES,
+                                HiveASTParser.TOK_SHOWTABLES,
+                                HiveASTParser.TOK_SHOWCOLUMNS,
+                                HiveASTParser.TOK_SHOW_TABLESTATUS,
+                                HiveASTParser.TOK_SHOW_TBLPROPERTIES,
+                                HiveASTParser.TOK_SHOW_CREATEDATABASE,
+                                HiveASTParser.TOK_SHOW_CREATETABLE,
+                                HiveASTParser.TOK_SHOWFUNCTIONS,
+                                HiveASTParser.TOK_SHOWPARTITIONS,
+                                HiveASTParser.TOK_SHOWINDEXES,
+                                HiveASTParser.TOK_SHOWLOCKS,
+                                HiveASTParser.TOK_SHOWDBLOCKS,
+                                HiveASTParser.TOK_SHOW_COMPACTIONS,
+                                HiveASTParser.TOK_SHOW_TRANSACTIONS,
+                                HiveASTParser.TOK_ABORT_TRANSACTIONS,
+                                HiveASTParser.TOK_SHOWCONF,
+                                HiveASTParser.TOK_SHOWVIEWS,
+                                HiveASTParser.TOK_CREATEINDEX,
+                                HiveASTParser.TOK_DROPINDEX,
+                                HiveASTParser.TOK_ALTERTABLE_CLUSTER_SORT,
+                                HiveASTParser.TOK_LOCKTABLE,
+                                HiveASTParser.TOK_UNLOCKTABLE,
+                                HiveASTParser.TOK_LOCKDB,
+                                HiveASTParser.TOK_UNLOCKDB,
+                                HiveASTParser.TOK_CREATEROLE,
+                                HiveASTParser.TOK_DROPROLE,
+                                HiveASTParser.TOK_GRANT,
+                                HiveASTParser.TOK_REVOKE,
+                                HiveASTParser.TOK_SHOW_GRANT,
+                                HiveASTParser.TOK_GRANT_ROLE,
+                                HiveASTParser.TOK_REVOKE_ROLE,
+                                HiveASTParser.TOK_SHOW_ROLE_GRANT,
+                                HiveASTParser.TOK_SHOW_ROLE_PRINCIPALS,
+                                HiveASTParser.TOK_SHOW_ROLE_PRINCIPALS,
+                                HiveASTParser.TOK_ALTERDATABASE_PROPERTIES,
+                                HiveASTParser.TOK_ALTERDATABASE_OWNER,
+                                HiveASTParser.TOK_TRUNCATETABLE,
+                                HiveASTParser.TOK_SHOW_SET_ROLE,
+                                HiveASTParser.TOK_CACHE_METADATA,
+                                HiveASTParser.TOK_CREATEMACRO,
+                                HiveASTParser.TOK_DROPMACRO,
+                                HiveASTParser.TOK_CREATETABLE,
+                                HiveASTParser.TOK_CREATEFUNCTION,
+                                HiveASTParser.TOK_DROPFUNCTION,
+                                HiveASTParser.TOK_RELOADFUNCTION,
+                                HiveASTParser.TOK_CREATEVIEW,
+                                HiveASTParser.TOK_ALTERDATABASE_LOCATION));
+    }
+
     private final PlannerContext plannerContext;
     private final FlinkCalciteCatalogReader catalogReader;
     private final FrameworkConfig frameworkConfig;

Review comment:
       These will soon get used to process DQL/DML, so I'd rather not to remove 
them and then add back.

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
##########
@@ -67,6 +172,180 @@
 
     @Override
     public List<Operation> parse(String statement) {
-        return super.parse(statement);
+        CatalogManager catalogManager = getCatalogManager();
+        Catalog currentCatalog =
+                
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).orElse(null);
+        if (!(currentCatalog instanceof HiveCatalog)) {
+            LOG.warn("Current catalog is not HiveCatalog. Falling back to 
Flink's planner.");
+            return super.parse(statement);
+        }
+        HiveConf hiveConf = new HiveConf(((HiveCatalog) 
currentCatalog).getHiveConf());
+        hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, 
"nonstrict");
+        hiveConf.set("hive.allow.udf.load.on.demand", "false");
+        hiveConf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
+        HiveShim hiveShim =
+                HiveShimLoader.loadHiveShim(((HiveCatalog) 
currentCatalog).getHiveVersion());
+        try {
+            // creates SessionState
+            startSessionState(hiveConf, catalogManager);
+            return processCmd(statement, hiveConf, hiveShim, (HiveCatalog) 
currentCatalog);
+        } finally {
+            clearSessionState(hiveConf);
+        }
+    }
+
+    private List<Operation> processCmd(
+            String cmd, HiveConf hiveConf, HiveShim hiveShim, HiveCatalog 
hiveCatalog) {
+        try {
+            final HiveParserContext context = new HiveParserContext(hiveConf);
+            // parse statement to get AST
+            final ASTNode node = HiveASTParseUtils.parse(cmd, context);
+            // generate Calcite plan
+            Operation operation;
+            if (DDL_NODES.contains(node.getType())) {
+                HiveParserQueryState queryState = new 
HiveParserQueryState(hiveConf);
+                HiveParserDDLSemanticAnalyzer ddlAnalyzer =
+                        new HiveParserDDLSemanticAnalyzer(
+                                queryState,
+                                context,
+                                hiveCatalog,
+                                getCatalogManager().getCurrentDatabase());
+                Serializable work = ddlAnalyzer.analyzeInternal(node);
+                DDLOperationConverter ddlConverter =
+                        new DDLOperationConverter(getCatalogManager(), 
hiveShim);
+                if (work instanceof HiveParserCreateViewDesc) {
+                    return super.parse(cmd);

Review comment:
       Because `HiveParser` cannot parse queries at the moment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to