[
https://issues.apache.org/jira/browse/FLINK-26419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Morávek updated FLINK-26419:
----------------------------------
Affects Version/s: 1.14.3
1.13.6
1.15.0
> StreamConfig.toString uses a wrong classloader
> ----------------------------------------------
>
> Key: FLINK-26419
> URL: https://issues.apache.org/jira/browse/FLINK-26419
> Project: Flink
> Issue Type: Bug
> Components: API / Core
> Affects Versions: 1.15.0, 1.13.6, 1.14.3
> Reporter: David Morávek
> Priority: Major
>
> StreamConfig#toString method needs a classloader to de-serialize some bits to
> be able to provide a user friendly output. Unfortunately it doesn't has
> access to user classloader, so it uses whatever classloader that has loaded
> the StreamConfig class.
> This method will break if we for example use a custom KeyPartitioner that
> comes from the user code, because we don't have a proper classloader for
> de-serializing it.
> This problem becomes visible when extended serialization debug is enabled
> (which is useful eg. for debugging UDF serialization issues).
> {code}
> env.java.opts: "-Dsun.io.serialization.extendedDebugInfo=true"
> {code}
> Truncated exception snippet:
> {code}
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not
> instantiate non chained outputs.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getNonChainedOutputs(StreamConfig.java:388)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.graph.StreamConfig.toString(StreamConfig.java:691)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at java.lang.String.valueOf(String.java:2951) ~[?:?]
> at java.lang.StringBuilder.append(StringBuilder.java:168) ~[?:?]
> at java.util.AbstractMap.toString(AbstractMap.java:556) ~[?:?]
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1422)
> ~[?:?]
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) ~[?:?]
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
> ~[?:?]
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:546)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.graph.StreamConfig.setTransitiveChainedTaskConfigs(StreamConfig.java:492)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:475)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:418)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:375)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:176)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:114)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:959)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.submitAndGetJobClientFuture(EmbeddedExecutor.java:122)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.execute(EmbeddedExecutor.java:104)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1956)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1833)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139)
> ~[beam-all.jar:?]
> at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:103)
> ~[beam-all.jar:?]
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323) ~[beam-all.jar:?]
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309) ~[beam-all.jar:?]
> at org.apache.dmvk.beam.TestPipeline.main(TestPipeline.java:51)
> ~[beam-all.jar:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:?]
> at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:?]
> at
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:?]
> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
> at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
> [flink-dist_2.11-1.13.6.jar:1.13.6]
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> [flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> [flink-dist_2.11-1.13.6.jar:1.13.6]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.13.6.jar:1.13.6]
> Caused by: java.lang.ClassNotFoundException:
> org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector
> at
> jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
> ~[?:?]
> at
> jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
> ~[?:?]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
> at java.lang.Class.forName0(Native Method) ~[?:?]
> at java.lang.Class.forName(Class.java:398) ~[?:?]
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1995) ~[?:?]
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1862)
> ~[?:?]
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2169)
> ~[?:?]
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
> ~[?:?]
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
> ~[?:?]
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358) ~[?:?]
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
> ~[?:?]
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
> ~[?:?]
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
> ~[?:?]
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358) ~[?:?]
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
> ~[?:?]
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
> ~[?:?]
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:493)
> ~[?:?]
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:451)
> ~[?:?]
> at java.util.ArrayList.readObject(ArrayList.java:929) ~[?:?]
> at jdk.internal.reflect.GeneratedMethodAccessor24.invoke(Unknown
> Source) ~[?:?]
> at
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:?]
> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1175) ~[?:?]
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2325) ~[?:?]
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
> ~[?:?]
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
> ~[?:?]
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:493)
> ~[?:?]
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:451)
> ~[?:?]
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getNonChainedOutputs(StreamConfig.java:385)
> ~[flink-dist_2.11-1.13.6.jar:1.13.6]
> ... 53 more
> {code}
> Full log: https://gist.github.com/dmvk/012027477fbf2436b474007310df9cac
--
This message was sent by Atlassian Jira
(v8.20.1#820001)