[ 
https://issues.apache.org/jira/browse/FLINK-16470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17056583#comment-17056583
 ] 

Jason Kania commented on FLINK-16470:
-------------------------------------

[~gjy], the restart strategy was just the default and had no delay so that 
explains the rapid restart. I did not understand that the restart strategy was 
related to this interaction with the Zookeeper and the RegionStrategy. Thanks 
for connecting the two. I will try with a restart strategy and see what happens 
another time if there is a similar failure.

> Network failure causes Checkpoint Coordinator to flood disk with exceptions
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-16470
>                 URL: https://issues.apache.org/jira/browse/FLINK-16470
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / Coordination
>    Affects Versions: 1.9.2
>         Environment: Latest patch current Ubuntu release with latest java 8 
> JRE.
>            Reporter: Jason Kania
>            Priority: Major
>
> When a networking error occurred that prevented access to the shared folder 
> mounted over NFS, the CheckpointCoordinator flooded the logs with the 
> following:
>  
> {{org.apache.flink.util.FlinkException: Could not retrieve checkpoint 158365 
> from state handle under /0000000000000158365. This indicates that the 
> retrieved state handle is broken. Try cleaning the state handle store.}}
> {{ at 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:345)}}
> {{ at 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)}}
> {{ at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1014)}}
> {{ at 
> org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.resetTasks(AdaptedRestartPipelinedRegionStrategyNG.java:205)}}
> {{ at 
> org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.lambda$createResetAndRescheduleTasksCallback$1(AdaptedRestartPipelinedRegionStrategyNG.java:149)}}
> {{ at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$3(FutureUtils.java:202)}}
> {{ at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$4(FutureUtils.java:226)}}
> {{ at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}}
> {{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)}}
> {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}}
> {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}}
> {{ at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)}}
> {{ at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)}}
> {{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}}
> {{ at akka.actor.Actor.aroundReceive(Actor.scala:517)}}
> {{ at akka.actor.Actor.aroundReceive$(Actor.scala:515)}}
> {{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}}
> {{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}}
> {{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}}
> {{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}}
> {{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}}
> {{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}}
> {{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
> {{ at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
> {{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
> {{ at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}
> {{Caused by: java.io.FileNotFoundException: 
> /mnt/shared/completedCheckpoint53ed9d9197f7 (No such file or directory)}}
> {{ at java.io.FileInputStream.open0(Native Method)}}
> {{ at java.io.FileInputStream.open(FileInputStream.java:195)}}
> {{ at java.io.FileInputStream.<init>(FileInputStream.java:138)}}
> {{ at 
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)}}
> {{ at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)}}
> {{ at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)}}
> {{ at 
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:64)}}
> {{ at 
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:57)}}
> {{ at 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:339)}}
> {{ ... 32 more}}
>  
> The result was very high CPU utilization, high disk IO and a cascade of other 
> application failures. In this situation, there should be a backoff on the 
> retries to not bring down the whole node. Recovery may have been possible 
> because the network outage lasted less than 30 seconds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to