[ 
https://issues.apache.org/jira/browse/FLINK-26419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Morávek updated FLINK-26419:
----------------------------------
    Affects Version/s: 1.14.3
                       1.13.6
                       1.15.0

> StreamConfig.toString uses a wrong classloader
> ----------------------------------------------
>
>                 Key: FLINK-26419
>                 URL: https://issues.apache.org/jira/browse/FLINK-26419
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.15.0, 1.13.6, 1.14.3
>            Reporter: David Morávek
>            Priority: Major
>
> StreamConfig#toString method needs a classloader to de-serialize some bits to 
> be able to provide a user friendly output. Unfortunately it doesn't has 
> access to user classloader, so it uses whatever classloader that has loaded 
> the StreamConfig class.
> This method will break if we for example use a custom KeyPartitioner that 
> comes from the user code, because we don't have a proper classloader for 
> de-serializing it.
> This problem becomes visible when extended serialization debug is enabled 
> (which is useful eg. for debugging UDF serialization issues).
> {code}
> env.java.opts: "-Dsun.io.serialization.extendedDebugInfo=true"
> {code}
> Truncated exception snippet:
> {code}
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not 
> instantiate non chained outputs.
>       at 
> org.apache.flink.streaming.api.graph.StreamConfig.getNonChainedOutputs(StreamConfig.java:388)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.graph.StreamConfig.toString(StreamConfig.java:691)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at java.lang.String.valueOf(String.java:2951) ~[?:?]
>       at java.lang.StringBuilder.append(StringBuilder.java:168) ~[?:?]
>       at java.util.AbstractMap.toString(AbstractMap.java:556) ~[?:?]
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1422) 
> ~[?:?]
>       at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) ~[?:?]
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349) 
> ~[?:?]
>       at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:546)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.graph.StreamConfig.setTransitiveChainedTaskConfigs(StreamConfig.java:492)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:475)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:418)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:375)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:176)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:114)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:959)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.submitAndGetJobClientFuture(EmbeddedExecutor.java:122)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.execute(EmbeddedExecutor.java:104)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1956)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1833)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139)
>  ~[beam-all.jar:?]
>       at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:103) 
> ~[beam-all.jar:?]
>       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323) ~[beam-all.jar:?]
>       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309) ~[beam-all.jar:?]
>       at org.apache.dmvk.beam.TestPipeline.main(TestPipeline.java:51) 
> ~[beam-all.jar:?]
>       at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:?]
>       at 
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  ~[?:?]
>       at 
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:?]
>       at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>       at 
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
>  [flink-dist_2.11-1.13.6.jar:1.13.6]
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
> [flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>  [flink-dist_2.11-1.13.6.jar:1.13.6]
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dist_2.11-1.13.6.jar:1.13.6]
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector
>       at 
> jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) 
> ~[?:?]
>       at 
> jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>  ~[?:?]
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
>       at java.lang.Class.forName0(Native Method) ~[?:?]
>       at java.lang.Class.forName(Class.java:398) ~[?:?]
>       at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1995) ~[?:?]
>       at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1862) 
> ~[?:?]
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2169) 
> ~[?:?]
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679) 
> ~[?:?]
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464) 
> ~[?:?]
>       at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358) ~[?:?]
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) 
> ~[?:?]
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679) 
> ~[?:?]
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464) 
> ~[?:?]
>       at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358) ~[?:?]
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) 
> ~[?:?]
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679) 
> ~[?:?]
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:493) 
> ~[?:?]
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:451) 
> ~[?:?]
>       at java.util.ArrayList.readObject(ArrayList.java:929) ~[?:?]
>       at jdk.internal.reflect.GeneratedMethodAccessor24.invoke(Unknown 
> Source) ~[?:?]
>       at 
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:?]
>       at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1175) ~[?:?]
>       at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2325) ~[?:?]
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) 
> ~[?:?]
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679) 
> ~[?:?]
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:493) 
> ~[?:?]
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:451) 
> ~[?:?]
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       at 
> org.apache.flink.streaming.api.graph.StreamConfig.getNonChainedOutputs(StreamConfig.java:385)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>       ... 53 more
> {code}
> Full log: https://gist.github.com/dmvk/012027477fbf2436b474007310df9cac



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to