This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2e37a063685 [FLINK-27658][runtime] SafetyNetWrapperClassLoader expose
addURL method to allow registering jar dynamically
2e37a063685 is described below
commit 2e37a06368596ca9ed04366a879f2facfb7ef509
Author: fengli <[email protected]>
AuthorDate: Wed Jun 1 16:04:44 2022 +0800
[FLINK-27658][runtime] SafetyNetWrapperClassLoader expose addURL method to
allow registering jar dynamically
---
.../java/org/apache/flink/client/ClientUtils.java | 2 +-
.../cli/CliFrontendDynamicPropertiesTest.java | 2 +-
.../DefaultPackagedProgramRetrieverTest.java | 2 +-
.../org/apache/flink}/util/ClassLoaderUtil.java | 5 +-
.../flink/util/FlinkUserCodeClassLoader.java | 5 +
.../flink/util}/FlinkUserCodeClassLoaders.java | 15 +-
.../apache/flink}/util/ClassLoaderUtilsTest.java | 4 +-
.../flink/util}/FlinkUserCodeClassLoadersTest.java | 164 ++++++++++++++-------
.../flink/util/UserClassLoaderJarTestUtils.java | 11 +-
.../formats/avro/AvroKryoClassloadingTest.java | 2 +-
.../librarycache/BlobLibraryCacheManager.java | 1 +
.../jobmaster/JobManagerSharedServices.java | 2 +-
.../TaskManagerServicesConfiguration.java | 2 +-
.../librarycache/BlobLibraryCacheManagerTest.java | 1 +
.../BlobLibraryCacheRecoveryITCase.java | 1 +
.../ClassLoaderDeserializationTest.java | 94 ++++++++++++
.../state/RocksDbMultiClassLoaderTest.java | 2 +-
.../flink/streaming/api/graph/StreamConfig.java | 2 +-
.../flink/table/client/cli/CliClientITCase.java | 4 +-
.../client/gateway/context/SessionContextTest.java | 4 +-
.../client/gateway/local/LocalExecutorITCase.java | 4 +-
.../batch/sql/PartitionableSourceITCase.scala | 4 +-
22 files changed, 247 insertions(+), 86 deletions(-)
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 38a52385980..95a223a2135 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -30,9 +30,9 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.client.JobInitializationException;
-import
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.function.SupplierWithException;
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
index a45f728c8b3..374eb8f8b60 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.client.cli;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
-import
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ParentFirstClassLoader;
import org.apache.flink.util.ChildFirstClassLoader;
+import org.apache.flink.util.FlinkUserCodeClassLoaders.ParentFirstClassLoader;
import org.apache.commons.cli.Options;
import org.junit.AfterClass;
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
index d35c0593c48..bb3e34f6749 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
@@ -29,12 +29,12 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.core.testutils.FlinkMatchers;
-import
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.ChildFirstClassLoader;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java
b/flink-core/src/main/java/org/apache/flink/util/ClassLoaderUtil.java
similarity index 98%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java
rename to flink-core/src/main/java/org/apache/flink/util/ClassLoaderUtil.java
index 120e4039d6b..742bb0581a6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ClassLoaderUtil.java
@@ -16,9 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.util;
+package org.apache.flink.util;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.annotation.Internal;
import java.io.File;
import java.io.IOException;
@@ -30,6 +30,7 @@ import java.util.jar.JarFile;
* Utilities for information with respect to class loaders, specifically class
loaders for the
* dynamic loading of user defined classes.
*/
+@Internal
public final class ClassLoaderUtil {
/**
diff --git
a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
index b605d7bba54..54b1fa61db0 100644
---
a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
+++
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
@@ -67,4 +67,9 @@ public abstract class FlinkUserCodeClassLoader extends
URLClassLoader {
throws ClassNotFoundException {
return super.loadClass(name, resolve);
}
+
+ @Override
+ protected void addURL(URL url) {
+ super.addURL(url);
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
similarity index 95%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
rename to
flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
index 8d9286be178..44dff20a3e4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
+++
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
@@ -16,11 +16,10 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.execution.librarycache;
+package org.apache.flink.util;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.util.ChildFirstClassLoader;
-import org.apache.flink.util.FlinkUserCodeClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +32,7 @@ import java.util.Enumeration;
import java.util.function.Consumer;
/** Gives the URLClassLoader a nicer name for debugging purposes. */
+@Internal
public class FlinkUserCodeClassLoaders {
private FlinkUserCodeClassLoaders() {}
@@ -110,6 +110,7 @@ public class FlinkUserCodeClassLoaders {
/**
* Regular URLClassLoader that first loads from the parent and only after
that from the URLs.
*/
+ @Internal
public static class ParentFirstClassLoader extends
FlinkUserCodeClassLoader {
ParentFirstClassLoader(
@@ -130,7 +131,8 @@ public class FlinkUserCodeClassLoaders {
* delegate is nulled and can be garbage collected. Additional class
resolution will be resolved
* solely through the bootstrap classloader and most likely result in
ClassNotFound exceptions.
*/
- private static class SafetyNetWrapperClassLoader extends URLClassLoader
implements Closeable {
+ @Internal
+ public static class SafetyNetWrapperClassLoader extends URLClassLoader
implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class);
@@ -178,6 +180,11 @@ public class FlinkUserCodeClassLoaders {
return ensureInner().loadClass(name, resolve);
}
+ @Override
+ public void addURL(URL url) {
+ ensureInner().addURL(url);
+ }
+
@Override
public URL getResource(String name) {
return ensureInner().getResource(name);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java
b/flink-core/src/test/java/org/apache/flink/util/ClassLoaderUtilsTest.java
similarity index 98%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java
rename to
flink-core/src/test/java/org/apache/flink/util/ClassLoaderUtilsTest.java
index 2c238de1f32..3574bced1ab 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/ClassLoaderUtilsTest.java
@@ -16,9 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.util.IOUtils;
+package org.apache.flink.util;
import org.junit.Test;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoadersTest.java
b/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoadersTest.java
similarity index 66%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoadersTest.java
rename to
flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoadersTest.java
index c03e2aedc39..108ef0f8f88 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoadersTest.java
+++
b/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoadersTest.java
@@ -16,29 +16,22 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.execution.librarycache;
-
-import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
-import org.apache.flink.runtime.util.ClassLoaderUtil;
-import org.apache.flink.testutils.ClassLoaderUtils;
-import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
+package org.apache.flink.util;
+import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
+import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import static
org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
-import static org.hamcrest.CoreMatchers.allOf;
-import static org.hamcrest.CoreMatchers.containsString;
+import static org.assertj.core.api.Assertions.fail;
import static org.hamcrest.CoreMatchers.isA;
-import static org.hamcrest.Matchers.hasItemInArray;
-import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -53,52 +46,23 @@ public class FlinkUserCodeClassLoadersTest extends
TestLogger {
@Rule public ExpectedException expectedException =
ExpectedException.none();
- @Test
- public void testMessageDecodingWithUnavailableClass() throws Exception {
- final ClassLoader systemClassLoader = getClass().getClassLoader();
-
- final String className = "UserClass";
- final URLClassLoader userClassLoader =
- ClassLoaderUtils.compileAndLoadJava(
- temporaryFolder.newFolder(),
- className + ".java",
- "import java.io.Serializable;\n"
- + "public class "
- + className
- + " implements Serializable {}");
-
- RemoteRpcInvocation method =
- new RemoteRpcInvocation(
- className,
- "test",
- new Class<?>[] {
- int.class, Class.forName(className, false,
userClassLoader)
- },
- new Object[] {
- 1, Class.forName(className, false,
userClassLoader).newInstance()
- });
-
- SerializedValue<RemoteRpcInvocation> serializedMethod = new
SerializedValue<>(method);
-
- expectedException.expect(ClassNotFoundException.class);
- expectedException.expect(
- allOf(
- isA(ClassNotFoundException.class),
- hasProperty(
- "suppressed",
- hasItemInArray(
- allOf(
-
isA(ClassNotFoundException.class),
- hasProperty(
- "message",
- containsString(
- "Could not
deserialize 1th parameter type of method test(int, ...).")))))));
-
- RemoteRpcInvocation deserializedMethod =
- serializedMethod.deserializeValue(systemClassLoader);
- deserializedMethod.getMethodName();
-
- userClassLoader.close();
+ public static final String USER_CLASS = "UserClass";
+ public static final String USER_CLASS_CODE =
+ "import java.io.Serializable;\n"
+ + "public class "
+ + USER_CLASS
+ + " implements Serializable {}";
+
+ private static File userJar;
+
+ @BeforeClass
+ public static void prepare() throws Exception {
+ userJar =
+ UserClassLoaderJarTestUtils.createJarFile(
+ temporaryFolder.newFolder("test-jar"),
+ "test-classloader.jar",
+ USER_CLASS,
+ USER_CLASS_CODE);
}
@Test
@@ -292,6 +256,92 @@ public class FlinkUserCodeClassLoadersTest extends
TestLogger {
assertTrue(TestParentFirstClassLoader.isParallelCapable);
}
+ @Test
+ public void testParentFirstClassLoadingByAddURL() throws Exception {
+ // collect the libraries / class folders with RocksDB related code:
the state backend and
+ // RocksDB itself
+ final URL childCodePath =
getClass().getProtectionDomain().getCodeSource().getLocation();
+
+ final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader
parentClassLoader =
+ (FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
+ createChildFirstClassLoader(childCodePath,
getClass().getClassLoader());
+ final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader
childClassLoader1 =
+ (FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
+ createParentFirstClassLoader(childCodePath,
parentClassLoader);
+ final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader
childClassLoader2 =
+ (FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
+ createParentFirstClassLoader(childCodePath,
parentClassLoader);
+
+ // test class loader before add user jar ulr to ClassLoader
+ assertClassNotFoundException(USER_CLASS, false, parentClassLoader);
+ assertClassNotFoundException(USER_CLASS, false, childClassLoader1);
+ assertClassNotFoundException(USER_CLASS, false, childClassLoader2);
+
+ // only add jar url to parent ClassLoader
+ parentClassLoader.addURL(userJar.toURI().toURL());
+
+ // test class loader after add jar url
+ final Class<?> clazz1 = Class.forName(USER_CLASS, false,
parentClassLoader);
+ final Class<?> clazz2 = Class.forName(USER_CLASS, false,
childClassLoader1);
+ final Class<?> clazz3 = Class.forName(USER_CLASS, false,
childClassLoader2);
+
+ assertEquals(clazz1, clazz2);
+ assertEquals(clazz1, clazz3);
+
+ parentClassLoader.close();
+ childClassLoader1.close();
+ childClassLoader2.close();
+ }
+
+ @Test
+ public void testChildFirstClassLoadingByAddURL() throws Exception {
+
+ // collect the libraries / class folders with RocksDB related code:
the state backend and
+ // RocksDB itself
+ final URL childCodePath =
getClass().getProtectionDomain().getCodeSource().getLocation();
+
+ final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader
parentClassLoader =
+ (FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
+ createChildFirstClassLoader(childCodePath,
getClass().getClassLoader());
+ final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader
childClassLoader1 =
+ (FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
+ createChildFirstClassLoader(childCodePath,
parentClassLoader);
+ final FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader
childClassLoader2 =
+ (FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader)
+ createChildFirstClassLoader(childCodePath,
parentClassLoader);
+
+ // test class loader before add user jar ulr to ClassLoader
+ assertClassNotFoundException(USER_CLASS, false, parentClassLoader);
+ assertClassNotFoundException(USER_CLASS, false, childClassLoader1);
+ assertClassNotFoundException(USER_CLASS, false, childClassLoader2);
+
+ // only add jar url to child ClassLoader
+ URL userJarURL = userJar.toURI().toURL();
+ childClassLoader1.addURL(userJarURL);
+ childClassLoader2.addURL(userJarURL);
+
+ // test class loader after add jar url
+ assertClassNotFoundException(USER_CLASS, false, parentClassLoader);
+
+ final Class<?> clazz1 = Class.forName(USER_CLASS, false,
childClassLoader1);
+ final Class<?> clazz2 = Class.forName(USER_CLASS, false,
childClassLoader2);
+
+ assertNotEquals(clazz1, clazz2);
+
+ parentClassLoader.close();
+ childClassLoader1.close();
+ childClassLoader2.close();
+ }
+
+ private void assertClassNotFoundException(
+ String className, boolean initialize, ClassLoader classLoader) {
+ try {
+ Class.forName(className, initialize, classLoader);
+ fail("Should fail.");
+ } catch (ClassNotFoundException e) {
+ }
+ }
+
private static class TestParentFirstClassLoader
extends FlinkUserCodeClassLoaders.ParentFirstClassLoader {
public static boolean isParallelCapable;
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
b/flink-core/src/test/java/org/apache/flink/util/UserClassLoaderJarTestUtils.java
similarity index 93%
rename from
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
rename to
flink-core/src/test/java/org/apache/flink/util/UserClassLoaderJarTestUtils.java
index 4765f0dd94d..0785b8d801f 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
+++
b/flink-core/src/test/java/org/apache/flink/util/UserClassLoaderJarTestUtils.java
@@ -16,9 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.utils;
-
-import org.apache.flink.util.FileUtils;
+package org.apache.flink.util;
import javax.tools.DiagnosticCollector;
import javax.tools.JavaCompiler;
@@ -35,7 +33,12 @@ import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
/** Mainly used for testing classloading. */
-public class TestUserClassLoaderJar {
+public class UserClassLoaderJarTestUtils {
+
+ /** Private constructor to prevent instantiation. */
+ private UserClassLoaderJarTestUtils() {
+ throw new RuntimeException();
+ }
/** Pack the generated UDF class into a JAR and return the path of the
JAR. */
public static File createJarFile(File tmpDir, String jarName, String
className, String javaCode)
diff --git
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
index feb5142ca76..f7db7d29ada 100644
---
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
+++
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.formats.avro;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.testutils.FilteredClassLoader;
import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
-import
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
import com.esotericsoftware.kryo.Kryo;
import org.junit.jupiter.api.Test;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 460f83e449b..bd187e745ee 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkUserCodeClassLoader;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
index 4320ae9171a..d403893c06e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
-import
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import javax.annotation.Nonnull;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 18f50ea8b83..5f0116b6204 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -30,9 +30,9 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
-import
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Reference;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index d11c6d8d22f..163a2d63b3c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.UserCodeClassLoader;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index efb43c35437..8aa960aa102 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.TestLogger;
import org.hamcrest.collection.IsEmptyCollection;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/ClassLoaderDeserializationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/ClassLoaderDeserializationTest.java
new file mode 100644
index 00000000000..73dfab1e980
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/ClassLoaderDeserializationTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution.librarycache;
+
+import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
+import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.URLClassLoader;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.isA;
+import static org.hamcrest.Matchers.hasItemInArray;
+import static org.hamcrest.Matchers.hasProperty;
+
+/** Tests for classloader deserialize utilities. */
+public class ClassLoaderDeserializationTest extends TestLogger {
+
+ @ClassRule public static TemporaryFolder temporaryFolder = new
TemporaryFolder();
+
+ @Rule public ExpectedException expectedException =
ExpectedException.none();
+
+ @Test
+ public void testMessageDecodingWithUnavailableClass() throws Exception {
+ final ClassLoader systemClassLoader = getClass().getClassLoader();
+
+ final String className = "UserClass";
+ final URLClassLoader userClassLoader =
+ ClassLoaderUtils.compileAndLoadJava(
+ temporaryFolder.newFolder(),
+ className + ".java",
+ "import java.io.Serializable;\n"
+ + "public class "
+ + className
+ + " implements Serializable {}");
+
+ RemoteRpcInvocation method =
+ new RemoteRpcInvocation(
+ className,
+ "test",
+ new Class<?>[] {
+ int.class, Class.forName(className, false,
userClassLoader)
+ },
+ new Object[] {
+ 1, Class.forName(className, false,
userClassLoader).newInstance()
+ });
+
+ SerializedValue<RemoteRpcInvocation> serializedMethod = new
SerializedValue<>(method);
+
+ expectedException.expect(ClassNotFoundException.class);
+ expectedException.expect(
+ allOf(
+ isA(ClassNotFoundException.class),
+ hasProperty(
+ "suppressed",
+ hasItemInArray(
+ allOf(
+
isA(ClassNotFoundException.class),
+ hasProperty(
+ "message",
+ containsString(
+ "Could not
deserialize 1th parameter type of method test(int, ...).")))))));
+
+ RemoteRpcInvocation deserializedMethod =
+ serializedMethod.deserializeValue(systemClassLoader);
+ deserializedMethod.getMethodName();
+
+ userClassLoader.close();
+ }
+}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
index 556ea3d5d64..f03410c7bac 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.contrib.streaming.state;
-import
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.junit.Rule;
import org.junit.Test;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 6f9e674cd52..b1bc3d633a0 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.util.CorruptConfigurationException;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.util.ClassLoaderUtil;
import org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -40,6 +39,7 @@ import
org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
+import org.apache.flink.util.ClassLoaderUtil;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
index 57245a47d90..45ee390095f 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
@@ -28,8 +28,8 @@ import
org.apache.flink.table.client.gateway.context.DefaultContext;
import org.apache.flink.table.client.gateway.local.LocalExecutor;
import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
import org.apache.flink.table.planner.utils.TableTestUtil;
-import org.apache.flink.table.utils.TestUserClassLoaderJar;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
import
org.apache.flink.shaded.guava30.com.google.common.io.PatternFilenameFilter;
@@ -100,7 +100,7 @@ public class CliClientITCase extends AbstractTestBase {
@BeforeClass
public static void setup() throws IOException {
File udfJar =
- TestUserClassLoaderJar.createJarFile(
+ UserClassLoaderJarTestUtils.createJarFile(
tempFolder.newFolder("test-jar"),
"test-classloader-udf.jar",
UserDefinedFunctions.GENERATED_UDF_CLASS,
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
index c8bebe5971f..7f9f1574a85 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
-import org.apache.flink.table.utils.TestUserClassLoaderJar;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -59,7 +59,7 @@ public class SessionContextTest {
@BeforeClass
public static void prepare() throws Exception {
udfJar =
- TestUserClassLoaderJar.createJarFile(
+ UserClassLoaderJarTestUtils.createJarFile(
tempFolder.newFolder("test-jar"),
"test-classloader-udf.jar",
UserDefinedFunctions.GENERATED_UDF_CLASS,
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 79da81f4f29..df1ba55cfe5 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -41,12 +41,12 @@ import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.utils.TestUserClassLoaderJar;
import org.apache.flink.table.utils.print.RowDataToStringConverter;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -101,7 +101,7 @@ public class LocalExecutorITCase extends TestLogger {
public static void setup() throws IOException {
clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
File udfJar =
- TestUserClassLoaderJar.createJarFile(
+ UserClassLoaderJarTestUtils.createJarFile(
tempFolder.newFolder("test-jar"),
"test-classloader-udf.jar",
UserDefinedFunctions.GENERATED_UDF_CLASS,
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
index 1a8877d48b2..7947b689f8d 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
@@ -28,7 +28,7 @@ import
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPOR
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.utils.TestingTableEnvironment
-import org.apache.flink.table.utils.TestUserClassLoaderJar
+import org.apache.flink.util.UserClassLoaderJarTestUtils
import org.junit.{Before, Test}
import org.junit.runner.RunWith
@@ -56,7 +56,7 @@ class PartitionableSourceITCase(val sourceFetchPartitions:
Boolean, val useCatal
private def overrideTableEnv(): Unit = {
val tmpDir: File = TEMPORARY_FOLDER.newFolder()
val udfJarFile: File =
- TestUserClassLoaderJar.createJarFile(tmpDir, "flink-test-udf.jar",
"TrimUDF", UDF_CLASS)
+ UserClassLoaderJarTestUtils.createJarFile(tmpDir, "flink-test-udf.jar",
"TrimUDF", UDF_CLASS)
val jars: util.List[URL] =
util.Collections.singletonList(udfJarFile.toURI.toURL)
val cl = ClientUtils.buildUserCodeClassLoader(
jars,