Stefan Richter created FLINK-4182:
-------------------------------------
Summary: HA recovery not working properly under ApplicationMaster
failures.
Key: FLINK-4182
URL: https://issues.apache.org/jira/browse/FLINK-4182
Project: Flink
Issue Type: Bug
Components: Distributed Coordination, State Backends, Checkpointing
Affects Versions: 1.0.3
Reporter: Stefan Richter
When randomly killing TaskManager and ApplicationMaster, a job sometimes does
not properly recover in HA mode.
There can be different symptoms for this. For example, in one case the job is
dying with the following exception:
{code}
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Cannot set up the user code libraries: Cannot get library
with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
at
org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
at
da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set
up the user code libraries: Cannot get library with hash
7fafffe9595cd06aff213b81b5da7b1682e1d6b0
at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot get library with hash
7fafffe9595cd06aff213b81b5da7b1682e1d6b0
at
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:257)
at
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:116)
at
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:88)
at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1084)
... 26 more
Caused by: java.io.IOException: Failed to copy from blob store.
at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:358)
at
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:248)
... 29 more
Caused by: java.io.IOException:
gs:///flink/recovery/blob/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0
does not exist.
at
org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:121)
at
org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:93)
at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:355)
... 30 more
{code}
In other cases, i noticed that stream events are dropped by testing with a
streaming state machine job and a Kafka source. My guess is that value state is
no restored properly, because all invalid transactions in the log start from
the initial state, which is the default value for the value state.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)