[ 
https://issues.apache.org/jira/browse/FLINK-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4182:
----------------------------------
    Description: 
When randomly killing TaskManager and ApplicationMaster, a job sometimes does 
not properly recover in HA mode.

There can be different symptoms for this. For example, in one case the job is 
dying with the following exception:

{code}
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Cannot set up the user code libraries: Cannot get library 
with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
        at 
org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
        at 
da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
        at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
        at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set 
up the user code libraries: Cannot get library with hash 
7fafffe9595cd06aff213b81b5da7b1682e1d6b0
        at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at 
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
        at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot get library with hash 
7fafffe9595cd06aff213b81b5da7b1682e1d6b0
        at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:257)
        at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:116)
        at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:88)
        at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1084)
        ... 26 more
Caused by: java.io.IOException: Failed to copy from blob store.
        at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:358)
        at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:248)
        ... 29 more
Caused by: java.io.IOException: 
gs:///flink/recovery/blob/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0 
does not exist.
        at 
org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:121)
        at 
org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:93)
        at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:355)
        ... 30 more
{code}

In other cases, i noticed inconsistencies in the results by testing with a 
streaming state machine job and a Kafka source. My guess is that value state is 
no restored properly, because all invalid transactions in the log start from 
the initial state, which is the default value for the value state.

  was:
When randomly killing TaskManager and ApplicationMaster, a job sometimes does 
not properly recover in HA mode.

There can be different symptoms for this. For example, in one case the job is 
dying with the following exception:

{code}
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Cannot set up the user code libraries: Cannot get library 
with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
        at 
org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
        at 
da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
        at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
        at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set 
up the user code libraries: Cannot get library with hash 
7fafffe9595cd06aff213b81b5da7b1682e1d6b0
        at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at 
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
        at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot get library with hash 
7fafffe9595cd06aff213b81b5da7b1682e1d6b0
        at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:257)
        at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:116)
        at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:88)
        at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1084)
        ... 26 more
Caused by: java.io.IOException: Failed to copy from blob store.
        at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:358)
        at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:248)
        ... 29 more
Caused by: java.io.IOException: 
gs:///flink/recovery/blob/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0 
does not exist.
        at 
org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:121)
        at 
org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:93)
        at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:355)
        ... 30 more
{code}

In other cases, i noticed that stream events are dropped by testing with a 
streaming state machine job and a Kafka source. My guess is that value state is 
no restored properly, because all invalid transactions in the log start from 
the initial state, which is the default value for the value state.


> HA recovery not working properly under ApplicationMaster failures.
> ------------------------------------------------------------------
>
>                 Key: FLINK-4182
>                 URL: https://issues.apache.org/jira/browse/FLINK-4182
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination, State Backends, Checkpointing
>    Affects Versions: 1.0.3
>            Reporter: Stefan Richter
>
> When randomly killing TaskManager and ApplicationMaster, a job sometimes does 
> not properly recover in HA mode.
> There can be different symptoms for this. For example, in one case the job is 
> dying with the following exception:
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Cannot set up the user code libraries: Cannot get library 
> with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
>       at 
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>       at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
>       at 
> da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>       at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
>       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
>       at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
>       at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set 
> up the user code libraries: Cannot get library with hash 
> 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>       at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>       at 
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
>       at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>       at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>       at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Cannot get library with hash 
> 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>       at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:257)
>       at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:116)
>       at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:88)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1084)
>       ... 26 more
> Caused by: java.io.IOException: Failed to copy from blob store.
>       at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:358)
>       at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:248)
>       ... 29 more
> Caused by: java.io.IOException: 
> gs:///flink/recovery/blob/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0 
> does not exist.
>       at 
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:121)
>       at 
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:93)
>       at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:355)
>       ... 30 more
> {code}
> In other cases, i noticed inconsistencies in the results by testing with a 
> streaming state machine job and a Kafka source. My guess is that value state 
> is no restored properly, because all invalid transactions in the log start 
> from the initial state, which is the default value for the value state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to