This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e37a063685 [FLINK-27658][runtime] SafetyNetWrapperClassLoader expose 
addURL method to allow registering jar dynamically
2e37a063685 is described below

commit 2e37a06368596ca9ed04366a879f2facfb7ef509
Author: fengli <[email protected]>
AuthorDate: Wed Jun 1 16:04:44 2022 +0800

    [FLINK-27658][runtime] SafetyNetWrapperClassLoader expose addURL method to 
allow registering jar dynamically
---
 .../java/org/apache/flink/client/ClientUtils.java  |   2 +-
 .../cli/CliFrontendDynamicPropertiesTest.java      |   2 +-
 .../DefaultPackagedProgramRetrieverTest.java       |   2 +-
 .../org/apache/flink}/util/ClassLoaderUtil.java    |   5 +-
 .../flink/util/FlinkUserCodeClassLoader.java       |   5 +
 .../flink/util}/FlinkUserCodeClassLoaders.java     |  15 +-
 .../apache/flink}/util/ClassLoaderUtilsTest.java   |   4 +-
 .../flink/util}/FlinkUserCodeClassLoadersTest.java | 164 ++++++++++++++-------
 .../flink/util/UserClassLoaderJarTestUtils.java    |  11 +-
 .../formats/avro/AvroKryoClassloadingTest.java     |   2 +-
 .../librarycache/BlobLibraryCacheManager.java      |   1 +
 .../jobmaster/JobManagerSharedServices.java        |   2 +-
 .../TaskManagerServicesConfiguration.java          |   2 +-
 .../librarycache/BlobLibraryCacheManagerTest.java  |   1 +
 .../BlobLibraryCacheRecoveryITCase.java            |   1 +
 .../ClassLoaderDeserializationTest.java            |  94 ++++++++++++
 .../state/RocksDbMultiClassLoaderTest.java         |   2 +-
 .../flink/streaming/api/graph/StreamConfig.java    |   2 +-
 .../flink/table/client/cli/CliClientITCase.java    |   4 +-
 .../client/gateway/context/SessionContextTest.java |   4 +-
 .../client/gateway/local/LocalExecutorITCase.java  |   4 +-
 .../batch/sql/PartitionableSourceITCase.scala      |   4 +-
 22 files changed, 247 insertions(+), 86 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 38a52385980..95a223a2135 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -30,9 +30,9 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.client.JobInitializationException;
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.function.SupplierWithException;
 
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
index a45f728c8b3..374eb8f8b60 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.client.cli;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ParentFirstClassLoader;
 import org.apache.flink.util.ChildFirstClassLoader;
+import org.apache.flink.util.FlinkUserCodeClassLoaders.ParentFirstClassLoader;
 
 import org.apache.commons.cli.Options;
 import org.junit.AfterClass;
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
index d35c0593c48..bb3e34f6749 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
@@ -29,12 +29,12 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.core.testutils.FlinkMatchers;
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.util.ChildFirstClassLoader;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java
 b/flink-core/src/main/java/org/apache/flink/util/ClassLoaderUtil.java
similarity index 98%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java
rename to flink-core/src/main/java/org/apache/flink/util/ClassLoaderUtil.java
index 120e4039d6b..742bb0581a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ClassLoaderUtil.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.util;
+package org.apache.flink.util;
 
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.annotation.Internal;
 
 import java.io.File;
 import java.io.IOException;
@@ -30,6 +30,7 @@ import java.util.jar.JarFile;
  * Utilities for information with respect to class loaders, specifically class 
loaders for the
  * dynamic loading of user defined classes.
  */
+@Internal
 public final class ClassLoaderUtil {
 
     /**
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java 
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
index b605d7bba54..54b1fa61db0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
@@ -67,4 +67,9 @@ public abstract class FlinkUserCodeClassLoader extends 
URLClassLoader {
             throws ClassNotFoundException {
         return super.loadClass(name, resolve);
     }
+
+    @Override
+    protected void addURL(URL url) {
+        super.addURL(url);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
 b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
similarity index 95%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
rename to 
flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
index 8d9286be178..44dff20a3e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
@@ -16,11 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.execution.librarycache;
+package org.apache.flink.util;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.util.ChildFirstClassLoader;
-import org.apache.flink.util.FlinkUserCodeClassLoader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +32,7 @@ import java.util.Enumeration;
 import java.util.function.Consumer;
 
 /** Gives the URLClassLoader a nicer name for debugging purposes. */
+@Internal
 public class FlinkUserCodeClassLoaders {
 
     private FlinkUserCodeClassLoaders() {}
@@ -110,6 +110,7 @@ public class FlinkUserCodeClassLoaders {
     /**
      * Regular URLClassLoader that first loads from the parent and only after 
that from the URLs.
      */
+    @Internal
     public static class ParentFirstClassLoader extends 
FlinkUserCodeClassLoader {
 
         ParentFirstClassLoader(
@@ -130,7 +131,8 @@ public class FlinkUserCodeClassLoaders {
      * delegate is nulled and can be garbage collected. Additional class 
resolution will be resolved
      * solely through the bootstrap classloader and most likely result in 
ClassNotFound exceptions.
      */
-    private static class SafetyNetWrapperClassLoader extends URLClassLoader 
implements Closeable {
+    @Internal
+    public static class SafetyNetWrapperClassLoader extends URLClassLoader 
implements Closeable {
         private static final Logger LOG =
                 LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class);
 
@@ -178,6 +180,11 @@ public class FlinkUserCodeClassLoaders {
             return ensureInner().loadClass(name, resolve);
         }
 
+        @Override
+        public void addURL(URL url) {
+            ensureInner().addURL(url);
+        }
+
         @Override
         public URL getResource(String name) {
             return ensureInner().getResource(name);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java
 b/flink-core/src/test/java/org/apache/flink/util/ClassLoaderUtilsTest.java
similarity index 98%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java
rename to 
flink-core/src/test/java/org/apache/flink/util/ClassLoaderUtilsTest.java
index 2c238de1f32..3574bced1ab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/ClassLoaderUtilsTest.java
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.util.IOUtils;
+package org.apache.flink.util;
 
 import org.junit.Test;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoadersTest.java
 
b/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoadersTest.java
similarity index 66%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoadersTest.java
rename to 
flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoadersTest.java
index c03e2aedc39..108ef0f8f88 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoadersTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoadersTest.java
@@ -16,29 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.execution.librarycache;
-
-import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
-import org.apache.flink.runtime.util.ClassLoaderUtil;
-import org.apache.flink.testutils.ClassLoaderUtils;
-import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
+package org.apache.flink.util;
 
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.net.URL;
 import java.net.URLClassLoader;
 
 import static 
org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
-import static org.hamcrest.CoreMatchers.allOf;
-import static org.hamcrest.CoreMatchers.containsString;
+import static org.assertj.core.api.Assertions.fail;
 import static org.hamcrest.CoreMatchers.isA;
-import static org.hamcrest.Matchers.hasItemInArray;
-import static org.hamcrest.Matchers.hasProperty;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -53,52 +46,23 @@ public class FlinkUserCodeClassLoadersTest extends 
TestLogger {
 
     @Rule public ExpectedException expectedException = 
ExpectedException.none();
 
-    @Test
-    public void testMessageDecodingWithUnavailableClass() throws Exception {
-        final ClassLoader systemClassLoader = getClass().getClassLoader();
-
-        final String className = "UserClass";
-        final URLClassLoader userClassLoader =
-                ClassLoaderUtils.compileAndLoadJava(
-                        temporaryFolder.newFolder(),
-                        className + ".java",
-                        "import java.io.Serializable;\n"
-                                + "public class "
-                                + className
-                                + " implements Serializable {}");
-
-        RemoteRpcInvocation method =
-                new RemoteRpcInvocation(
-                        className,
-                        "test",
-                        new Class<?>[] {
-                            int.class, Class.forName(className, false, 
userClassLoader)
-                        },
-                        new Object[] {
-                            1, Class.forName(className, false, 
userClassLoader).newInstance()
-                        });
-
-        SerializedValue<RemoteRpcInvocation> serializedMethod = new 
SerializedValue<>(method);
-
-        expectedException.expect(ClassNotFoundException.class);
-        expectedException.expect(
-                allOf(
-                        isA(ClassNotFoundException.class),
-                        hasProperty(
-                                "suppressed",
-                                hasItemInArray(
-                                        allOf(
-                                                
isA(ClassNotFoundException.class),
-                                                hasProperty(
-                                                        "message",
-                                                        containsString(
-                                                                "Could not 
deserialize 1th parameter type of method test(int, ...).")))))));
-
-        RemoteRpcInvocation deserializedMethod =
-                serializedMethod.deserializeValue(systemClassLoader);
-        deserializedMethod.getMethodName();
-
-        userClassLoader.close();
+    public static final String USER_CLASS = "UserClass";
+    public static final String USER_CLASS_CODE =
+            "import java.io.Serializable;\n"
+                    + "public class "
+                    + USER_CLASS
+                    + " implements Serializable {}";
+
+    private static File userJar;
+
+    @BeforeClass
+    public static void prepare() throws Exception {
+        userJar =
+                UserClassLoaderJarTestUtils.createJarFile(
+                        temporaryFolder.newFolder("test-jar"),
+                        "test-classloader.jar",
+                        USER_CLASS,
+                        USER_CLASS_CODE);
     }
 
     @Test
@@ -292,6 +256,92 @@ public class FlinkUserCodeClassLoadersTest extends 
TestLogger {
         assertTrue(TestParentFirstClassLoader.isParallelCapable);
     }
 
+    @Test
+    public void testParentFirstClassLoadingByAddURL() throws Exception {
+        // collect the libraries / class folders with RocksDB related code: 
the state backend and
+        // RocksDB itself
+        final URL childCodePath = 
getClass().getProtectionDomain().getCodeSource().getLocation();
+
+        final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader 
parentClassLoader =
+                (FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
+                        createChildFirstClassLoader(childCodePath, 
getClass().getClassLoader());
+        final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader 
childClassLoader1 =
+                (FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
+                        createParentFirstClassLoader(childCodePath, 
parentClassLoader);
+        final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader 
childClassLoader2 =
+                (FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
+                        createParentFirstClassLoader(childCodePath, 
parentClassLoader);
+
+        // test class loader before add user jar ulr to ClassLoader
+        assertClassNotFoundException(USER_CLASS, false, parentClassLoader);
+        assertClassNotFoundException(USER_CLASS, false, childClassLoader1);
+        assertClassNotFoundException(USER_CLASS, false, childClassLoader2);
+
+        // only add jar url to parent ClassLoader
+        parentClassLoader.addURL(userJar.toURI().toURL());
+
+        // test class loader after add jar url
+        final Class<?> clazz1 = Class.forName(USER_CLASS, false, 
parentClassLoader);
+        final Class<?> clazz2 = Class.forName(USER_CLASS, false, 
childClassLoader1);
+        final Class<?> clazz3 = Class.forName(USER_CLASS, false, 
childClassLoader2);
+
+        assertEquals(clazz1, clazz2);
+        assertEquals(clazz1, clazz3);
+
+        parentClassLoader.close();
+        childClassLoader1.close();
+        childClassLoader2.close();
+    }
+
+    @Test
+    public void testChildFirstClassLoadingByAddURL() throws Exception {
+
+        // collect the libraries / class folders with RocksDB related code: 
the state backend and
+        // RocksDB itself
+        final URL childCodePath = 
getClass().getProtectionDomain().getCodeSource().getLocation();
+
+        final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader 
parentClassLoader =
+                (FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
+                        createChildFirstClassLoader(childCodePath, 
getClass().getClassLoader());
+        final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader 
childClassLoader1 =
+                (FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
+                        createChildFirstClassLoader(childCodePath, 
parentClassLoader);
+        final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader 
childClassLoader2 =
+                (FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
+                        createChildFirstClassLoader(childCodePath, 
parentClassLoader);
+
+        // test class loader before add user jar ulr to ClassLoader
+        assertClassNotFoundException(USER_CLASS, false, parentClassLoader);
+        assertClassNotFoundException(USER_CLASS, false, childClassLoader1);
+        assertClassNotFoundException(USER_CLASS, false, childClassLoader2);
+
+        // only add jar url to child ClassLoader
+        URL userJarURL = userJar.toURI().toURL();
+        childClassLoader1.addURL(userJarURL);
+        childClassLoader2.addURL(userJarURL);
+
+        // test class loader after add jar url
+        assertClassNotFoundException(USER_CLASS, false, parentClassLoader);
+
+        final Class<?> clazz1 = Class.forName(USER_CLASS, false, 
childClassLoader1);
+        final Class<?> clazz2 = Class.forName(USER_CLASS, false, 
childClassLoader2);
+
+        assertNotEquals(clazz1, clazz2);
+
+        parentClassLoader.close();
+        childClassLoader1.close();
+        childClassLoader2.close();
+    }
+
+    private void assertClassNotFoundException(
+            String className, boolean initialize, ClassLoader classLoader) {
+        try {
+            Class.forName(className, initialize, classLoader);
+            fail("Should fail.");
+        } catch (ClassNotFoundException e) {
+        }
+    }
+
     private static class TestParentFirstClassLoader
             extends FlinkUserCodeClassLoaders.ParentFirstClassLoader {
         public static boolean isParallelCapable;
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
 
b/flink-core/src/test/java/org/apache/flink/util/UserClassLoaderJarTestUtils.java
similarity index 93%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
rename to 
flink-core/src/test/java/org/apache/flink/util/UserClassLoaderJarTestUtils.java
index 4765f0dd94d..0785b8d801f 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
+++ 
b/flink-core/src/test/java/org/apache/flink/util/UserClassLoaderJarTestUtils.java
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.utils;
-
-import org.apache.flink.util.FileUtils;
+package org.apache.flink.util;
 
 import javax.tools.DiagnosticCollector;
 import javax.tools.JavaCompiler;
@@ -35,7 +33,12 @@ import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
 
 /** Mainly used for testing classloading. */
-public class TestUserClassLoaderJar {
+public class UserClassLoaderJarTestUtils {
+
+    /** Private constructor to prevent instantiation. */
+    private UserClassLoaderJarTestUtils() {
+        throw new RuntimeException();
+    }
 
     /** Pack the generated UDF class into a JAR and return the path of the 
JAR. */
     public static File createJarFile(File tmpDir, String jarName, String 
className, String javaCode)
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
index feb5142ca76..f7db7d29ada 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.formats.avro;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.testutils.FilteredClassLoader;
 import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
 
 import com.esotericsoftware.kryo.Kryo;
 import org.junit.jupiter.api.Test;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 460f83e449b..bd187e745ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkUserCodeClassLoader;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.UserCodeClassLoader;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
index 4320ae9171a..d403893c06e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import javax.annotation.Nonnull;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 18f50ea8b83..5f0116b6204 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -30,9 +30,9 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
 import org.apache.flink.runtime.entrypoint.WorkingDirectory;
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.Reference;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index d11c6d8d22f..163a2d63b3c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.UserCodeClassLoader;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index efb43c35437..8aa960aa102 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.collection.IsEmptyCollection;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/ClassLoaderDeserializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/ClassLoaderDeserializationTest.java
new file mode 100644
index 00000000000..73dfab1e980
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/ClassLoaderDeserializationTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution.librarycache;
+
+import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
+import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.URLClassLoader;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.isA;
+import static org.hamcrest.Matchers.hasItemInArray;
+import static org.hamcrest.Matchers.hasProperty;
+
+/** Tests for classloader deserialize utilities. */
+public class ClassLoaderDeserializationTest extends TestLogger {
+
+    @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+
+    @Rule public ExpectedException expectedException = 
ExpectedException.none();
+
+    @Test
+    public void testMessageDecodingWithUnavailableClass() throws Exception {
+        final ClassLoader systemClassLoader = getClass().getClassLoader();
+
+        final String className = "UserClass";
+        final URLClassLoader userClassLoader =
+                ClassLoaderUtils.compileAndLoadJava(
+                        temporaryFolder.newFolder(),
+                        className + ".java",
+                        "import java.io.Serializable;\n"
+                                + "public class "
+                                + className
+                                + " implements Serializable {}");
+
+        RemoteRpcInvocation method =
+                new RemoteRpcInvocation(
+                        className,
+                        "test",
+                        new Class<?>[] {
+                            int.class, Class.forName(className, false, 
userClassLoader)
+                        },
+                        new Object[] {
+                            1, Class.forName(className, false, 
userClassLoader).newInstance()
+                        });
+
+        SerializedValue<RemoteRpcInvocation> serializedMethod = new 
SerializedValue<>(method);
+
+        expectedException.expect(ClassNotFoundException.class);
+        expectedException.expect(
+                allOf(
+                        isA(ClassNotFoundException.class),
+                        hasProperty(
+                                "suppressed",
+                                hasItemInArray(
+                                        allOf(
+                                                
isA(ClassNotFoundException.class),
+                                                hasProperty(
+                                                        "message",
+                                                        containsString(
+                                                                "Could not 
deserialize 1th parameter type of method test(int, ...).")))))));
+
+        RemoteRpcInvocation deserializedMethod =
+                serializedMethod.deserializeValue(systemClassLoader);
+        deserializedMethod.getMethodName();
+
+        userClassLoader.close();
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
index 556ea3d5d64..f03410c7bac 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
 
 import org.junit.Rule;
 import org.junit.Test;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 6f9e674cd52..b1bc3d633a0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.util.CorruptConfigurationException;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.util.ClassLoaderUtil;
 import org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -40,6 +39,7 @@ import 
org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
+import org.apache.flink.util.ClassLoaderUtil;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
index 57245a47d90..45ee390095f 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
@@ -28,8 +28,8 @@ import 
org.apache.flink.table.client.gateway.context.DefaultContext;
 import org.apache.flink.table.client.gateway.local.LocalExecutor;
 import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
 import org.apache.flink.table.planner.utils.TableTestUtil;
-import org.apache.flink.table.utils.TestUserClassLoaderJar;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
 
 import 
org.apache.flink.shaded.guava30.com.google.common.io.PatternFilenameFilter;
 
@@ -100,7 +100,7 @@ public class CliClientITCase extends AbstractTestBase {
     @BeforeClass
     public static void setup() throws IOException {
         File udfJar =
-                TestUserClassLoaderJar.createJarFile(
+                UserClassLoaderJarTestUtils.createJarFile(
                         tempFolder.newFolder("test-jar"),
                         "test-classloader-udf.jar",
                         UserDefinedFunctions.GENERATED_UDF_CLASS,
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
index c8bebe5971f..7f9f1574a85 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
-import org.apache.flink.table.utils.TestUserClassLoaderJar;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
 
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -59,7 +59,7 @@ public class SessionContextTest {
     @BeforeClass
     public static void prepare() throws Exception {
         udfJar =
-                TestUserClassLoaderJar.createJarFile(
+                UserClassLoaderJarTestUtils.createJarFile(
                         tempFolder.newFolder("test-jar"),
                         "test-classloader-udf.jar",
                         UserDefinedFunctions.GENERATED_UDF_CLASS,
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 79da81f4f29..df1ba55cfe5 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -41,12 +41,12 @@ import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.utils.TestUserClassLoaderJar;
 import org.apache.flink.table.utils.print.RowDataToStringConverter;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
 
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -101,7 +101,7 @@ public class LocalExecutorITCase extends TestLogger {
     public static void setup() throws IOException {
         clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
         File udfJar =
-                TestUserClassLoaderJar.createJarFile(
+                UserClassLoaderJarTestUtils.createJarFile(
                         tempFolder.newFolder("test-jar"),
                         "test-classloader-udf.jar",
                         UserDefinedFunctions.GENERATED_UDF_CLASS,
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
index 1a8877d48b2..7947b689f8d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
@@ -28,7 +28,7 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPOR
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.utils.TestingTableEnvironment
-import org.apache.flink.table.utils.TestUserClassLoaderJar
+import org.apache.flink.util.UserClassLoaderJarTestUtils
 
 import org.junit.{Before, Test}
 import org.junit.runner.RunWith
@@ -56,7 +56,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions: 
Boolean, val useCatal
   private def overrideTableEnv(): Unit = {
     val tmpDir: File = TEMPORARY_FOLDER.newFolder()
     val udfJarFile: File =
-      TestUserClassLoaderJar.createJarFile(tmpDir, "flink-test-udf.jar", 
"TrimUDF", UDF_CLASS)
+      UserClassLoaderJarTestUtils.createJarFile(tmpDir, "flink-test-udf.jar", 
"TrimUDF", UDF_CLASS)
     val jars: util.List[URL] = 
util.Collections.singletonList(udfJarFile.toURI.toURL)
     val cl = ClientUtils.buildUserCodeClassLoader(
       jars,

Reply via email to