Tartarus0zm commented on code in PR #22862:
URL: https://github.com/apache/flink/pull/22862#discussion_r1270206798


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/utils/HiveCatalogUtils.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+/**
+ * Utils related to Hive Catalog for Hive dialect. In Hive dialect, the 
classloader of the Hive's
+ * catalog got in Hive dialect is FlinkUserClassloader. But the classloader of 
the class HiveCatalog
+ * is Flink's ComponentClassLoader. The classloader is not same, so we can't 
cast the Hive's catalog
+ * got to HiveCatalog directly which cause us can only call these methods in 
HiveCatalog via
+ * reflection.
+ *
+ * <p>It's a hack logic, we can remove the hack logic after we split the Hive 
connector into two,
+ * one of which is for source/sink, and the other one is only for hive 
dialect. Then, the class
+ * loader will be same.
+ */
+public class HiveCatalogUtils {
+
+    public static boolean isHiveCatalog(Catalog catalog) {
+        // we can't use catalog instanceof HiveCatalog directly
+        // as the classloaders for catalog and HiveCatalog are different
+        return catalog instanceof HiveCatalog
+                || 
catalog.getClass().getName().equals(HiveCatalog.class.getName());
+    }
+
+    public static HiveConf getHiveConf(Catalog catalog) {

Review Comment:
   This tool class is designed for `HiveCatalog`, so why is the entry parameter 
type `Catalog`? Is the use of reflection to avoid cross-classloader issues?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java:
##########
@@ -212,26 +212,25 @@ public List<Operation> parse(String statement) {
             }
         }
 
-        Optional<Operation> nonSqlOperation =
-                tryProcessHiveNonSqlStatement(
-                        ((HiveCatalog) currentCatalog).getHiveConf(), 
statement);
+        HiveConf hiveConf = HiveCatalogUtils.getHiveConf(currentCatalog);

Review Comment:
   why not use `((HiveCatalog) currentCatalog).getHiveConf()` to get `HiveConf`?



##########
flink-table/flink-table-planner-loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java:
##########
@@ -150,4 +160,30 @@ public PlannerFactory loadPlannerFactory() {
         return FactoryUtil.discoverFactory(
                 this.submoduleClassLoader, PlannerFactory.class, 
PlannerFactory.DEFAULT_IDENTIFIER);
     }
+
+    /**
+     * A class loader extending {@link ComponentClassLoader} which overwrites 
method{@link #addURL}
+     * to enable it can add url to component classloader.
+     */
+    private static class PlannerComponentClassLoader extends 
ComponentClassLoader {

Review Comment:
   Why not just use `ComponentClassLoader`? It doesn't seem to extend anything.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java:
##########
@@ -212,26 +212,25 @@ public List<Operation> parse(String statement) {
             }
         }
 
-        Optional<Operation> nonSqlOperation =
-                tryProcessHiveNonSqlStatement(
-                        ((HiveCatalog) currentCatalog).getHiveConf(), 
statement);
+        HiveConf hiveConf = HiveCatalogUtils.getHiveConf(currentCatalog);
+        Optional<Operation> nonSqlOperation = 
tryProcessHiveNonSqlStatement(hiveConf, statement);
         if (nonSqlOperation.isPresent()) {
             return Collections.singletonList(nonSqlOperation.get());
         }
-        HiveConf hiveConf = new HiveConf(((HiveCatalog) 
currentCatalog).getHiveConf());
-        hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, 
"nonstrict");
-        hiveConf.set("hive.allow.udf.load.on.demand", "false");
-        hiveConf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
+        HiveConf hiveConfCopy = new HiveConf(hiveConf);
+        hiveConfCopy.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, 
"nonstrict");
+        hiveConfCopy.set("hive.allow.udf.load.on.demand", "false");
+        hiveConfCopy.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
         HiveShim hiveShim =
-                HiveShimLoader.loadHiveShim(((HiveCatalog) 
currentCatalog).getHiveVersion());
+                
HiveShimLoader.loadHiveShim(HiveCatalogUtils.getHiveVersion(currentCatalog));
         try {

Review Comment:
   dito



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to