wuchong commented on code in PR #20211:
URL: https://github.com/apache/flink/pull/20211#discussion_r926684832


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java:
##########
@@ -34,4 +34,29 @@ public interface FunctionDefinitionFactory {
      * @return a {@link FunctionDefinition}
      */
     FunctionDefinition createFunctionDefinition(String name, CatalogFunction 
catalogFunction);

Review Comment:
   Add default implementation of the old method (users don't need to override 
it anymore).
   And add a deprecation message on the old method to instruct users to 
implement the new method. 
   
   
https://github.com/apache/flink/pull/4616/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R51
 is a good example for evolving methods. 



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java:
##########
@@ -55,27 +57,56 @@ public HiveFunctionDefinitionFactory(HiveShim hiveShim) {
     @Override
     public FunctionDefinition createFunctionDefinition(
             String name, CatalogFunction catalogFunction) {
-        if (catalogFunction.isGeneric()) {
-            return createFunctionDefinitionFromFlinkFunction(name, 
catalogFunction);
+        return createFunctionDefinition(
+                name, catalogFunction, () -> 
Thread.currentThread().getContextClassLoader());
+    }
+
+    @Override
+    public FunctionDefinition createFunctionDefinition(
+            String name, CatalogFunction catalogFunction, Context context) {
+        if (isGenericFunction(catalogFunction, context.getClassLoader())) {
+            return createFunctionDefinitionFromFlinkFunction(name, 
catalogFunction, context);
         }
-        return createFunctionDefinitionFromHiveFunction(name, 
catalogFunction.getClassName());
+        return createFunctionDefinitionFromHiveFunction(
+                name, catalogFunction.getClassName(), context);
     }
 
     public FunctionDefinition createFunctionDefinitionFromFlinkFunction(
-            String name, CatalogFunction catalogFunction) {
+            String name, CatalogFunction catalogFunction, Context context) {
         return UserDefinedFunctionHelper.instantiateFunction(
-                Thread.currentThread().getContextClassLoader(), null, name, 
catalogFunction);
+                context.getClassLoader(), null, name, catalogFunction);
+    }
+
+    /**
+     * Distinguish if the function is a generic function.
+     *
+     * @return whether the function is a generic function
+     */
+    private boolean isGenericFunction(CatalogFunction catalogFunction, 
ClassLoader classLoader) {

Review Comment:
   We can get rid of the `generic` term now, and call this method 
`isFlinkFunction`. 



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java:
##########
@@ -55,27 +57,56 @@ public HiveFunctionDefinitionFactory(HiveShim hiveShim) {
     @Override
     public FunctionDefinition createFunctionDefinition(
             String name, CatalogFunction catalogFunction) {
-        if (catalogFunction.isGeneric()) {
-            return createFunctionDefinitionFromFlinkFunction(name, 
catalogFunction);
+        return createFunctionDefinition(
+                name, catalogFunction, () -> 
Thread.currentThread().getContextClassLoader());

Review Comment:
   We don't need to implement it anymore. 



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java:
##########
@@ -127,13 +122,8 @@ public Class<UDFType> getUDFClass() throws 
ClassNotFoundException {
      * @return the UDF deserialized
      */
     private UDFType deserializeUDF() {
-        try {
-            return (UDFType)
-                    SerializationUtilities.deserializeObject(
-                            udfSerializedString, (Class<Serializable>) 
getUDFClass());
-        } catch (ClassNotFoundException e) {
-            throw new FlinkHiveUDFException(
-                    String.format("Failed to deserialize function %s.", 
className), e);
-        }
+        return (UDFType)
+                SerializationUtilities.deserializeObject(
+                        udfSerializedString, (Class<Serializable>) 
getUDFClass());

Review Comment:
   Will the deserialization use the classloader of the `functionClz`? Is there 
any test to cover this code path?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java:
##########
@@ -34,4 +34,29 @@ public interface FunctionDefinitionFactory {
      * @return a {@link FunctionDefinition}
      */
     FunctionDefinition createFunctionDefinition(String name, CatalogFunction 
catalogFunction);
+
+    /**
+     * Creates a {@link FunctionDefinition} from given {@link CatalogFunction} 
with the given {@link
+     * Context} containing the class loader of the current session, which is 
useful when it's needed
+     * to load class from class name.
+     *
+     * <p>The default implementation will call {@link 
#createFunctionDefinition(String,
+     * CatalogFunction)} directly.
+     *
+     * @param name name of the {@link CatalogFunction}
+     * @param catalogFunction the catalog function
+     * @param context the {@link Context} for creating function definition
+     * @return a {@link FunctionDefinition}
+     */
+    default FunctionDefinition createFunctionDefinition(
+            String name, CatalogFunction catalogFunction, Context context) {
+        return createFunctionDefinition(name, catalogFunction);

Review Comment:
   When calling the old method, we should replace the thread classloader with 
the user classloader (see 
`org.apache.flink.table.client.gateway.context.ExecutionContext#wrapClassLoader`).
 Otherwise, classes can't be found if it still use the old method. Please add a 
test in the planner to cover this problem. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to