Giuliano Caliari created FLINK-5633:
---------------------------------------

             Summary: ClassCastException: X cannot be cast to X when 
re-submitting a job.
                 Key: FLINK-5633
                 URL: https://issues.apache.org/jira/browse/FLINK-5633
             Project: Flink
          Issue Type: Bug
          Components: Job-Submission, YARN
    Affects Versions: 1.1.4
            Reporter: Giuliano Caliari
            Priority: Minor


I’m running a job on my local cluster and the first time I submit the job 
everything works but whenever I cancel and re-submit the same job it fails with:

{quote}
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.

        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)

        at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)

        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)

        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)

        at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)

        at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)

        at 
au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)

        at 
au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)

        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)

        at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)

        at scala.App$$anonfun$main$1.apply(App.scala:76)

        at scala.App$$anonfun$main$1.apply(App.scala:76)

        at scala.collection.immutable.List.foreach(List.scala:381)

        at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)

        at scala.App$class.main(App.scala:76)

        at 
au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21)

        at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)

        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)

        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)

        at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)

        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)

        at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)

        at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)

        at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)

        at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29)

        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.

        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)

        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)

        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)

        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)

        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)

        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)

        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)

        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.lang.RuntimeException: Could not forward element to next 
operator

        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:415)

        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:397)

        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:749)

        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)

        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.collectWithTimestamp(StreamSourceContexts.java:272)

        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261)

        at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:88)

        at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157)

        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:255)

        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)

        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)

        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ClassCastException: 
au.com.my.package.schema.p.WowTransaction cannot be cast to 
au.com.my.package.schema.p.WowTransaction

        at 
au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4.apply(Traitor.scala:132)

        at 
org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:763)

        at 
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:72)

        at 
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:65)

        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:412)

        ... 14 more
{quote}

This happens on versions 1.1.4 and 1.2
Here's a great description of the problem, provided by Yury Ruchin:

{quote}
In YARN setup there are several sources where classes are loaded from: Flink 
lib directory, YARN lib directories, user code. The first two sources are 
handled by system classloader, the last one is loaded by 
FlinkUserCodeClassLoader.

My streaming job parses Avro-encoded data using SpecificRecord facility. In 
essence, the job looks like this: Source -> Avro parser (Map) -> Sink. 
Parallelism is 1. Job operates inside a long-lived YARN session. I have a 
subclass of SpecificRecord, say it's name is MySpecificRecord. From class 
loading perspective, Avro library classes, including the SpecificRecord, are 
loaded by system class loader from YARN lib dir - such classes are shared 
across different Flink tasks within task manager. On the other side, 
MySpecificRecord is in the job fat jar, so it gets loaded by 
FlinkUserCodeClassLoader. Upon every job restart, task gets a new 
FlinkUserCodeClassLoader instance, so classes from user code are confined to a 
task instance.

Simply put, the parsing itself looks like this:

val bean = new 
SpecificDatumReader[MySpecificRecord](MySpecificRecord.getClassSchema).read(...)

Now, the scenario:

1. I start my job. Parsing is initiated, so the SpecificDatumReader and 
SpecificData get loaded by system classloader. A new FlinkUserCodeClassloader 
is instantiated, let's denote its instance as "A". MySpecificRecord then gets 
loaded by A.

2. SpecificData gets a singleton SpecificData.INSTANCE that holds a cache that 
maps some string key derived from Avro schema to the implementing class. So 
during parsing I get MySpecificRecord (A) cached there.

3. I stop the job and re-submit it. The JVM process is the same, so all 
standard Avro classes, including SpecificData, remain loaded. A new task 
instance is created and gets a new FlinkUserCodeClassLoader instance, let's 
name it "B". A new MySpecificRecord class incarnation is loaded by B. From JVM 
standpoint MySpecificRecord (B) is different from MySpecificRecord (A), even 
though their bytecode is identical.

4. The job starts parsing again. SpecificDatumReader consults 
SpecificData.INSTANCE's cache for any stashed classes and finds 
MySpecificRecord (A) there.

5. SpecificDatumReader uses the cached MySpecificRecord (A) to instantiate a 
bean for filling the parsed data in.

6. SpecificDatumReader hands the filled instance of MySpecificRecord (A) back 
to job.

7. Job tries to cast MySpecificRecord (A) to MySpecificRecord (B).

8. ClassCastException :^(

I fixed the issue by not using the SpecificData.INSTANCE singleton (even though 
this is considered a common and expected practice). I feed every parser a new 
instance of SpecificData. This way the class cache is confined to a parser 
instance and gets recycled along with it.

{quote}

A discussion the the error can be found at:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-get-help-on-ClassCastException-when-re-submitting-a-job-td10972.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to