[
https://issues.apache.org/jira/browse/FLINK-31243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17695792#comment-17695792
]
Martijn Visser commented on FLINK-31243:
----------------------------------------
[~amitgurd] The intention was to unblock users from being stuck with Scala
2.12.7 forever (which is the latest version that Flink can support with Scala).
You can achieve that by not using Flink's Scala APIs/runtime, but build your
application in your desired Scala version while using Flink's Java APIs. If you
want to use Flink's Scala APIs, you must use Scala 2.12.7.
> KryoSerializer when loaded from user code classloader cannot load Scala
> extensions from app classloader
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-31243
> URL: https://issues.apache.org/jira/browse/FLINK-31243
> Project: Flink
> Issue Type: Bug
> Components: API / Core
> Affects Versions: 1.15.3, 1.16.1
> Environment: OS: Amazon Linux 2
> JVM: Amazon Corretto 11
>
> Reporter: Amit Gurdasani
> Priority: Major
>
> The
> [KryoSerializer|https://github.com/apache/flink/blob/9bf0d9f2c2bcb2bc0c8ab6228bb0a9e76e10ad70/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java]
> uses Class.forName() to dynamically load Scala extensions by name. This
> seems to imply that it references only its own classloader to find these
> extensions. By default, as the application classloader is favored for
> KryoSerializer, this implies that unless the flink-scala artifact is
> available to the application classloader, the Scala extensions cannot be
> loaded. Scala applications that include flink-scala are therefore unable to
> benefit from the Scala extensions to the Kryo Serializer.
> Exception looks like this:
> {noformat}
> java.lang.ClassNotFoundException:
> org.apache.flink.runtime.types.FlinkScalaKryoInstantiator
> at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
> at
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> at java.base/java.lang.Class.forName0(Native Method)
> at java.base/java.lang.Class.forName(Class.java:315)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:486)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:521)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryo(KryoSerializer.java:720)
> at software.amazon.kinesisanalytics.kryotest.Main.main(Main.java:16)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:239)
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829){noformat}
> Example code resulting in this issue:
> Main class for Flink application:
> {noformat}
> package software.amazon.kinesisanalytics.kryotest;
> import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import java.io.Serializable;
> public class Main {
> private static class Something implements Serializable {
> public static long serialVersionUID = 289034745902347830L;
> }
> public static void main(String... args) {
> StreamExecutionEnvironment executionEnvironment = new
> StreamExecutionEnvironment();
> KryoSerializer<Something> serializer = new
> KryoSerializer<>(Something.class, executionEnvironment.getConfig());
> serializer.getKryo();
> }
> }
> {noformat}
> build.gradle for Flink application:
> {code:java}
> plugins {
> id 'application'
> id 'java'
> id 'com.github.johnrengelman.shadow' version '7.1.2'
> }
> group 'software.amazon.kinesisanalytics'
> version '0.1'
> repositories {
> mavenCentral()
> }
> dependencies {
> compileOnly 'org.apache.flink:flink-core:1.15.2'
> compileOnly 'org.apache.flink:flink-streaming-java:1.15.2'
> implementation 'org.apache.flink:flink-scala_2.12:1.15.2'
> }
> shadowJar {
> dependencies {
> exclude(dependency('com.esotericsoftware.kryo:.*:.*'))
> exclude(dependency('com.esotericsoftware.minlog:.*:.*'))
> exclude(dependency('com.twitter:.*:.*'))
> exclude(dependency('org.apache.flink:flink-core:.*'))
> exclude(dependency('org.apache.flink:flink-streaming-java:.*'))
> exclude(dependency('org.scala-lang:.*:.*'))
> }
> }
> mainClassName = 'software.amazon.kinesisanalytics.kryotest.Main'
> {code}
> Note that the application jar does not include Kryo itself, nor flink-core,
> but does include flink-scala.
> Placing flink-scala in the application classpath eliminates the error, but as
> I understand it, the [point of eliminating
> Scala|https://flink.apache.org/2022/02/22/scala-free-in-one-fifteen/] from
> the Flink application classloader was to allow the only Scala dependencies to
> be loaded by the user code classloader. This issue prevents that from being
> achieved for the Scala extensions to the Kryo Serializer.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)