[ 
https://issues.apache.org/jira/browse/FLINK-31243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694087#comment-17694087
 ] 

Amit Gurdasani edited comment on FLINK-31243 at 2/28/23 11:00 AM:
------------------------------------------------------------------

[~martijnvisser] I can confirm that I still see the error using the 
flink-streaming-scala_2.12 dependency. Revised build.gradle:
{noformat}
plugins {
    id 'application'
    id 'java'
    id 'com.github.johnrengelman.shadow' version '7.1.2'
}

group 'software.amazon.kinesisanalytics'
version '0.1'

repositories {
    mavenCentral()
}

dependencies {
    compileOnly 'org.apache.flink:flink-core:1.15.3'
    implementation 'org.apache.flink:flink-streaming-scala_2.12:1.15.3'
}

shadowJar {
    dependencies {
        exclude(dependency('com.esotericsoftware.kryo:.*:.*'))
        exclude(dependency('com.esotericsoftware.minlog:.*:.*'))
        exclude(dependency('com.twitter:.*:.*'))
        exclude(dependency('org.apache.flink:flink-core:.*'))
        exclude(dependency('org.scala-lang:.*:.*'))
    }
}

mainClassName = 'software.amazon.kinesisanalytics.kryotest.Main'
 {noformat}
I still see "Kryo serializer scala extensions are not available." with the 
exception stack trace in the Description of this Jira issue.


was (Author: JIRAUSER297846):
[~martijnvisser] I can confirm that I still see the error using the 
flink-streaming-java_2.12 dependency. Revised build.gradle:
{noformat}
plugins {
    id 'application'
    id 'java'
    id 'com.github.johnrengelman.shadow' version '7.1.2'
}

group 'software.amazon.kinesisanalytics'
version '0.1'

repositories {
    mavenCentral()
}

dependencies {
    compileOnly 'org.apache.flink:flink-core:1.15.3'
    implementation 'org.apache.flink:flink-streaming-scala_2.12:1.15.3'
}

shadowJar {
    dependencies {
        exclude(dependency('com.esotericsoftware.kryo:.*:.*'))
        exclude(dependency('com.esotericsoftware.minlog:.*:.*'))
        exclude(dependency('com.twitter:.*:.*'))
        exclude(dependency('org.apache.flink:flink-core:.*'))
        exclude(dependency('org.scala-lang:.*:.*'))
    }
}

mainClassName = 'software.amazon.kinesisanalytics.kryotest.Main'
 {noformat}
I still see "Kryo serializer scala extensions are not available." with the 
exception stack trace in the Description of this Jira issue.

> KryoSerializer when loaded from user code classloader cannot load Scala 
> extensions from app classloader
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-31243
>                 URL: https://issues.apache.org/jira/browse/FLINK-31243
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.15.3, 1.16.1
>         Environment: OS: Amazon Linux 2
> JVM: Amazon Corretto 11
>  
>            Reporter: Amit Gurdasani
>            Priority: Major
>
> The 
> [KryoSerializer|https://github.com/apache/flink/blob/9bf0d9f2c2bcb2bc0c8ab6228bb0a9e76e10ad70/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java]
>  uses Class.forName() to dynamically load Scala extensions by name. This 
> seems to imply that it references only its own classloader to find these 
> extensions. By default, as the application classloader is favored for 
> KryoSerializer, this implies that unless the flink-scala artifact is 
> available to the application classloader, the Scala extensions cannot be 
> loaded. Scala applications that include flink-scala are therefore unable to 
> benefit from the Scala extensions to the Kryo Serializer.
> Exception looks like this:
> {noformat}
> java.lang.ClassNotFoundException: 
> org.apache.flink.runtime.types.FlinkScalaKryoInstantiator
>     at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>     at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>     at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>     at java.base/java.lang.Class.forName0(Native Method)
>     at java.base/java.lang.Class.forName(Class.java:315)
>     at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:486)
>     at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:521)
>     at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryo(KryoSerializer.java:720)
>     at software.amazon.kinesisanalytics.kryotest.Main.main(Main.java:16)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>     at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>     at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>     at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
>     at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
>     at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:239)
>     at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>     at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829){noformat}
> Example code resulting in this issue:
> Main class for Flink application:
> {noformat}
> package software.amazon.kinesisanalytics.kryotest;
> import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import java.io.Serializable;
> public class Main {
>     private static class Something implements Serializable {
>         public static long serialVersionUID = 289034745902347830L;
>     }
>     public static void main(String... args) {
>         StreamExecutionEnvironment executionEnvironment = new 
> StreamExecutionEnvironment();
>         KryoSerializer<Something> serializer = new 
> KryoSerializer<>(Something.class, executionEnvironment.getConfig());
>         serializer.getKryo();
>     }
> }
> {noformat}
> build.gradle for Flink application:
> {code:java}
> plugins {
>     id 'application'
>     id 'java'
>     id 'com.github.johnrengelman.shadow' version '7.1.2'
> }
> group 'software.amazon.kinesisanalytics'
> version '0.1'
> repositories {
>     mavenCentral()
> }
> dependencies {
>     compileOnly 'org.apache.flink:flink-core:1.15.2'
>     compileOnly 'org.apache.flink:flink-streaming-java:1.15.2'
>     implementation 'org.apache.flink:flink-scala_2.12:1.15.2'
> }
> shadowJar {
>     dependencies {
>         exclude(dependency('com.esotericsoftware.kryo:.*:.*'))
>         exclude(dependency('com.esotericsoftware.minlog:.*:.*'))
>         exclude(dependency('com.twitter:.*:.*'))
>         exclude(dependency('org.apache.flink:flink-core:.*'))
>         exclude(dependency('org.apache.flink:flink-streaming-java:.*'))
>         exclude(dependency('org.scala-lang:.*:.*'))
>     }
> }
> mainClassName = 'software.amazon.kinesisanalytics.kryotest.Main'
>  {code}
> Note that the application jar does not include Kryo itself, nor flink-core, 
> but does include flink-scala.
> Placing flink-scala in the application classpath eliminates the error, but as 
> I understand it, the [point of eliminating 
> Scala|https://flink.apache.org/2022/02/22/scala-free-in-one-fifteen/] from 
> the Flink application classloader was to allow the only Scala dependencies to 
> be loaded by the user code classloader. This issue prevents that from being 
> achieved for the Scala extensions to the Kryo Serializer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to