fsk119 commented on code in PR #20001:
URL: https://github.com/apache/flink/pull/20001#discussion_r911729065


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -69,6 +69,10 @@ public URLClassLoader getUserClassLoader() {
         return userClassLoader;
     }
 
+    public void updateUserClasLoader(MutableURLClassLoader userClassLoader) {
+        this.userClassLoader = userClassLoader;
+    }
+

Review Comment:
   The behavriour in the `registerResource` with same key is a little confused 
for users. User may update the jar with the same path. But here just log the 
problem and continue. When user tries to use the function, it may get exception 
like class not found.
   
   The earlier exception will make everything clearer. It's diffcult for others 
to debug the classloading problem.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -69,6 +69,10 @@ public URLClassLoader getUserClassLoader() {
         return userClassLoader;
     }
 
+    public void updateUserClasLoader(MutableURLClassLoader userClassLoader) {

Review Comment:
   updateUserClasLoader -> updateUserClassLoader
   
   If the update only works for the SQL Client embedded mode, I think it's not 
desirable behavior. I think it's better to introduce a client-level 
classloader, which extends MutableClassloader.  The client classloader holds 
the real classloader to load the class. When removing the jar, the client 
classloader builds a new classloader and holds. When the session closes, it 
will close all the created classloaders in the client classloader.
   
   Comparing to the current implementation:
   
   1. We don't need `Supplier<Classloader>` anymore and the codes are cleaner.
   2. If we want to support the `REMOVE JAR` on the gateway side, it can 
prevent the resource leak. 



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java:
##########
@@ -0,0 +1,167 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.Table;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.table.resource.ResourceManagerTest;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for catalog and system functions in a table environment. */
+public class FunctionITCase extends BatchTestBase {
+
+    private static String jarPath;
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        File jarFile =
+                UserClassLoaderJarTestUtils.createJarFile(
+                        TEMPORARY_FOLDER.newFolder("test-jar"),
+                        "test-classloader-udf.jar",
+                        ResourceManagerTest.LOWER_UDF_CLASS,
+                        ResourceManagerTest.LOWER_UDF_CODE);
+        jarPath = jarFile.toURI().toString();
+    }
+
+    @Test
+    public void testCreateTemporarySystemFunctionByUsingJar() {
+        String ddl =
+                String.format(
+                        "CREATE TEMPORARY SYSTEM FUNCTION f10 AS '%s' USING 
JAR '%s'",
+                        ResourceManagerTest.LOWER_UDF_CLASS, jarPath);
+        tEnv().executeSql(ddl);

Review Comment:
   It seems you use the same ResourceManager in all tests. If so, the created 
jar will only be added for the first test?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java:
##########
@@ -0,0 +1,167 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.Table;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.table.resource.ResourceManagerTest;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for catalog and system functions in a table environment. */
+public class FunctionITCase extends BatchTestBase {

Review Comment:
   What about adding python jar? 
   
   Could you add some negative cases:
   
   - Using jar that doesn't exist.
   - Using jar that doesn't contains the required class.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -54,7 +54,7 @@ public class ResourceManager implements Closeable {
 
     private final Path localResourceDir;
     private final Map<ResourceUri, URL> resourceInfos;
-    private final MutableURLClassLoader userClassLoader;
+    private MutableURLClassLoader userClassLoader;
 
     public ResourceManager(Configuration config, MutableURLClassLoader 
userClassLoader) {
         this.localResourceDir =

Review Comment:
   In the gateway side, I am prone to control the directory by the gateway 
itself. For example, I want to create the directory with 
`sql-gateway-1.14-<SessionHandle>`. Could you share some thoughts how I can do 
this?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java:
##########
@@ -34,4 +35,29 @@ public interface FunctionDefinitionFactory {
      * @return a {@link FunctionDefinition}
      */
     FunctionDefinition createFunctionDefinition(String name, CatalogFunction 
catalogFunction);
+
+    /**
+     * Creates a {@link FunctionDefinition} from given {@link 
CatalogFunction}. If the {@link
+     * CatalogFunction} is created by user defined resource, the user of {@link
+     * FunctionDefinitionFactory} needs to override this method explicitly. 
The implementation logic
+     * needs to use the user classloader to load custom classes instead of the 
thread context
+     * classloader.
+     *
+     * @param name name of the {@link CatalogFunction}
+     * @param catalogFunction the catalog function
+     * @param userClassLoader the class loader is used to load user defined 
function's class
+     * @return

Review Comment:
   @return Return the defination of the function.



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ResourceManagerMocks.java:
##########
@@ -0,0 +1,49 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.util.ClassLoaderUtil;
+import org.apache.flink.util.MutableURLClassLoader;
+
+import java.net.URL;
+
+/** Mock implementations of {@link ResourceManager} for testing purposes. */
+public class ResourceManagerMocks {
+
+    public static ResourceManager createEmptyResourceManager() {

Review Comment:
   I think we can reuse the same name and code. The non-parameterr 
`createEmptyResourceManager` just invokes the method has the parameters?



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ResourceManagerMocks.java:
##########
@@ -0,0 +1,49 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.util.ClassLoaderUtil;
+import org.apache.flink.util.MutableURLClassLoader;
+
+import java.net.URL;
+
+/** Mock implementations of {@link ResourceManager} for testing purposes. */
+public class ResourceManagerMocks {

Review Comment:
   Actually I think it's not a mock of the `ResourceManager` but offer some 
utils for others to use. I am not sure whether this method is common for others 
to use. What about moving to the `ResourceManager` and let others to use?



##########
flink-core/src/main/java/org/apache/flink/util/ClassLoaderUtil.java:
##########
@@ -151,6 +155,26 @@ public static boolean 
validateClassLoadable(ClassNotFoundException cnfe, ClassLo
         }
     }
 
+    /** Build MutableURLClassLoader from urls and configuration. */

Review Comment:
   ```
       /** Build  {@link MutableURLClassLoader} from urls and configuration. */
   ```
   
   However it also needs parent classloader.
   
   
   It's better we can move the create method to the MutableURLClassLoader. 
Please refer to `URLClassloader#newInstance`.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -139,21 +137,25 @@ public static SessionContext create(
         // Init classloader
         // 
--------------------------------------------------------------------------------------------------------------
 
-        URLClassLoader classLoader =
-                buildClassLoader(Collections.emptySet(), 
Collections.emptySet(), configuration);
+        // override to use SafetyNetWrapperClassLoader
+        configuration.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, true);
+        final MutableURLClassLoader userClassLoader =
+                ClassLoaderUtil.buildMutableURLClassLoader(
+                        new URL[0], SessionContext.class.getClassLoader(), 
configuration);
 
         // 
--------------------------------------------------------------------------------------------------------------
         // Init session state
         // 
--------------------------------------------------------------------------------------------------------------
 
-        ModuleManager moduleManager = new ModuleManager();
+        final ResourceManager resourceManager = new 
ResourceManager(configuration, userClassLoader);
+        final ModuleManager moduleManager = new ModuleManager();
 
         final EnvironmentSettings settings = 
EnvironmentSettings.fromConfiguration(configuration);
 
         CatalogManager catalogManager =
                 CatalogManager.newBuilder()
                         // Currently, the classloader is only used by 
DataTypeFactory.
-                        .classLoader(classLoader)
+                        .classLoaderSupplier(() -> 
resourceManager.getUserClassLoader())

Review Comment:
   Why not use userClassloader? I think it will not change in the lifecyle of 
the `Session`.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -139,21 +137,25 @@ public static SessionContext create(
         // Init classloader
         // 
--------------------------------------------------------------------------------------------------------------
 
-        URLClassLoader classLoader =
-                buildClassLoader(Collections.emptySet(), 
Collections.emptySet(), configuration);
+        // override to use SafetyNetWrapperClassLoader

Review Comment:
   nit: It's better we can log the behaviour.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##########
@@ -649,9 +649,15 @@ private void validateAndPrepareFunction(CatalogFunction 
function)
             }
             // Skip validation if it's not a UserDefinedFunction.
         } else if (function.getFunctionLanguage() == FunctionLanguage.JAVA) {
+            // If the resource of UDF used is not empty, register it to 
classloader before
+            // validate.
+            registerFunctionResource(name, function.getFunctionResources());

Review Comment:
   It seems you only download the resource when the function is in Java. But 
the syntax doesn't has the limitation.
   ```
   CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] 
[catalog_name.db_name.]function_name AS class_name [LANGUAGE JAVA|SCALA|PYTHON] 
[USING JAR‘resource_path’ [, JAR ‘resource_path’]*]; 
   
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -260,23 +266,26 @@ public static TableEnvironmentImpl create(Configuration 
configuration) {
     }
 
     public static TableEnvironmentImpl create(EnvironmentSettings settings) {
-        final ClassLoader classLoader = settings.getUserClassLoader();
+        final MutableURLClassLoader userClassLoader =
+                ClassLoaderUtil.buildMutableURLClassLoader(
+                        new URL[0], settings.getUserClassLoader(), 
settings.getConfiguration());

Review Comment:
   I think we'd better to read the value of option `pipeline.jars` in the 
configuration to build the user classloader.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##########
@@ -366,6 +422,45 @@ public void testUserDefinedTemporaryCatalogFunction() 
throws Exception {
         tEnv().executeSql(dropFunctionDDL);
     }
 
+    @Test
+    public void testUserDefinedTemporarySystemFunctionByUsingJar() throws 
Exception {
+        String functionDDL =
+                String.format(
+                        "create temporary system function lowerUdf as '%s' 
using jar '%s'",
+                        ResourceManagerTest.LOWER_UDF_CLASS, jarPath);
+
+        String dropFunctionDDL = "drop temporary system function lowerUdf";
+        testUserDefinedFunctionByUsingJar(functionDDL);
+        // delete the function
+        tEnv().executeSql(dropFunctionDDL);

Review Comment:
   It's better to move to the 
   ```
   @After
   // drop funciton if exists 
   ``



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##########
@@ -662,14 +668,35 @@ private FunctionDefinition getFunctionDefinition(String 
name, CatalogFunction fu
             // directly.
             return ((InlineCatalogFunction) function).getDefinition();
         }
+        // If the resource of UDF used is not empty, register it to 
classloader before
+        // validate.
+        registerFunctionResource(name, function.getFunctionResources());
+
         return UserDefinedFunctionHelper.instantiateFunction(
-                classLoader,
+                resourceManager.getUserClassLoader(),
                 // future
                 config,
                 name,
                 function);
     }
 
+    private void registerFunctionResource(String functionName, 
List<ResourceUri> resourceUris) {
+        if (!resourceUris.isEmpty()) {

Review Comment:
   remove this condition. Stream.forEach only works when Stream contains 
elements.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##########
@@ -662,14 +668,35 @@ private FunctionDefinition getFunctionDefinition(String 
name, CatalogFunction fu
             // directly.
             return ((InlineCatalogFunction) function).getDefinition();
         }
+        // If the resource of UDF used is not empty, register it to 
classloader before
+        // validate.
+        registerFunctionResource(name, function.getFunctionResources());
+
         return UserDefinedFunctionHelper.instantiateFunction(
-                classLoader,
+                resourceManager.getUserClassLoader(),
                 // future
                 config,
                 name,
                 function);
     }
 
+    private void registerFunctionResource(String functionName, 
List<ResourceUri> resourceUris) {
+        if (!resourceUris.isEmpty()) {
+            resourceUris.forEach(
+                    resourceUri -> {
+                        try {
+                            resourceManager.registerResource(resourceUri);
+                        } catch (IOException e) {
+                            throw new ValidationException(

Review Comment:
   I think we should do something like transaction to remove all downloaded 
resources if fails.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java:
##########
@@ -34,4 +35,29 @@ public interface FunctionDefinitionFactory {
      * @return a {@link FunctionDefinition}
      */
     FunctionDefinition createFunctionDefinition(String name, CatalogFunction 
catalogFunction);
+
+    /**
+     * Creates a {@link FunctionDefinition} from given {@link 
CatalogFunction}. If the {@link
+     * CatalogFunction} is created by user defined resource, the user of {@link
+     * FunctionDefinitionFactory} needs to override this method explicitly. 
The implementation logic
+     * needs to use the user classloader to load custom classes instead of the 
thread context
+     * classloader.
+     *
+     * @param name name of the {@link CatalogFunction}
+     * @param catalogFunction the catalog function
+     * @param userClassLoader the class loader is used to load user defined 
function's class
+     * @return
+     */
+    default FunctionDefinition createFunctionDefinition(

Review Comment:
   I am not very understand why we need expose. It seems user can only add the 
temporary function with the using jar syntax, which is stored in the 
CatalogManager. If so, why catalog needs this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to