[ 
https://issues.apache.org/jira/browse/FLINK-24543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17428726#comment-17428726
 ] 

Victor Xu commented on FLINK-24543:
-----------------------------------

After the KeeperException$NodeExistsException was thrown, the 
*ZooKeeperStateHandleStore.indicatesPossiblyInconsistentState(e)* returned 
*false* so the state cleaning logic cleaned both the metadata file and the 
corresponding checkpoints data files which then led to the 
FileNotFoundException:
{noformat}
Caused by: java.io.FileNotFoundException: 
/mnt/flink/.flink/ha/flink-.../completedCheckpoint42683d1121c7 (No such file or 
directory)
{noformat}
The reason was that *NodeExistsException* was included in the 
PRE_COMMIT_EXCEPTIONS. But shouldn't we consider this NodeExistsException as an 
inconsistent state and remove it from the PRE_COMMIT_EXCEPTIONS? It's different 
from other ZK commit exceptions (e.g. AuthFailedException, BadVersionException, 
etc.) as others mean that the ZK write is failed but this one means the ZK node 
is already there, though we don't know if it's the correct one or not. So I 
think we should mark it as an inconsistent state and don't remove the 
corresponding metadata and checkpoint data files.

 

 

> Zookeeper connection issue causes inconsistent state in Flink
> -------------------------------------------------------------
>
>                 Key: FLINK-24543
>                 URL: https://issues.apache.org/jira/browse/FLINK-24543
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.13.2
>            Reporter: Jun Qin
>            Priority: Major
>
> Env: Flink 1.13.2 with Zookeeper HA
> Here is what happened:
> {code:bash}
> # checkpoint 1116 was triggered
> 2021-10-09 00:16:49,555 [Checkpoint Timer] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
> checkpoint 1116 (type=CHECKPOINT) @ 1633738609538 for job 
> a8a4fb85b681a897ba118db64333c9e5.
> # a few seconds later, zookeeper connection suspended, it turned out to be a 
> disk issue at zookeeper side caused slow fsync and commit)
> 2021-10-09 00:16:58,563 [Curator-ConnectionStateManager-0] WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.
> 2021-10-09 00:16:58,563 [Curator-ConnectionStateManager-0] WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
> Connection to ZooKeeper suspended. The contender LeaderContender: 
> DefaultDispatcherRunner no longer participates in the leader election.
> # job was switching to suspended
> 2021-10-09 00:16:58,564 [flink-akka.actor.default-dispatcher-61] INFO  
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
> Disconnect job manager 
> [email protected]://[email protected]:50010/user/rpc/jobmanager_3
>  for job a8a4fb85b681a897ba118db64333c9e5 from the resource manager.
> 2021-10-09 00:16:58,565 [flink-akka.actor.default-dispatcher-92] INFO  
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
> Registering job manager 
> [email protected]://[email protected]:50010/user/rpc/jobmanager_3
>  for job a8a4fb85b681a897ba118db64333c9e5.
> 2021-10-09 00:16:58,565 [flink-akka.actor.default-dispatcher-90] INFO  
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Stopping 
> the JobMaster for job Flink ...(a8a4fb85b681a897ba118db64333c9e5).
> 2021-10-09 00:16:58,567 [flink-akka.actor.default-dispatcher-90] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Flink 
> ... (a8a4fb85b681a897ba118db64333c9e5) switched from state RUNNING to 
> SUSPENDED.
> 2021-10-09 00:16:58,570 [flink-akka.actor.default-dispatcher-86] INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
> Closing 
> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/a8a4fb85b681a897ba118db64333c9e5/job_manager_lock'}.
> 2021-10-09 00:16:58,667 [flink-akka.actor.default-dispatcher-92] INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 
> a8a4fb85b681a897ba118db64333c9e5 reached terminal state SUSPENDED.
> # zookeeper connector restored
> 2021-10-09 00:17:08,225 [Curator-ConnectionStateManager-0] INFO  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
> Connection to ZooKeeper was reconnected. Leader election can be restarted.
> # received checkpoint acknowledgement, trying to finalize it, then failed to 
> add to zookeeper due to KeeperException$NodeExistsException
> 2021-10-09 00:17:14,382 [flink-akka.actor.default-dispatcher-90] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: ... 
> (1/5) (09d25852e3e206d6b7fe0d6bc965870f) switched from RUNNING to CANCELING.
> 2021-10-09 00:17:14,382 [jobmanager-future-thread-1] WARN  
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Error while 
> processing AcknowledgeCheckpoint message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
> the pending checkpoint 1116. Failure reason: Failure to finalize checkpoint.
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1227)
>  
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1072)
>  
>       at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>  
>       at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>  
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  [?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
>       at java.lang.Thread.run(Thread.java:834) [?:?]
> Caused by: 
> org.apache.flink.runtime.persistence.StateHandleStore$AlreadyExistException: 
> ZooKeeper node /0000000000000001116 already exists.
>       at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.lambda$addAndLock$0(ZooKeeperStateHandleStore.java:179)
>  
>       at java.util.Optional.map(Optional.java:265) ~[?:?]
>       at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:177)
>  
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.addCheckpoint(DefaultCompletedCheckpointStore.java:182)
>  
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1209)
>  
>       ... 9 more
> Caused by: 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$NodeExistsException:
>  KeeperErrorCode = NodeExists
>       at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:122)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:1015)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:919)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl.doOperation(CuratorTransactionImpl.java:197)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl.access$000(CuratorTransactionImpl.java:37)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:130)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:126)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.curator4.org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.curator4.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl.commit(CuratorTransactionImpl.java:123)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.writeStoreHandleTransactionally(ZooKeeperStateHandleStore.java:204)
>  
>       at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:165)
>  
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.addCheckpoint(DefaultCompletedCheckpointStore.java:182)
>  
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1209)
>  
> ... 9 more
> # checkpoint coordinator was stopping
> 2021-10-09 00:17:14,385 [flink-akka.actor.default-dispatcher-90] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping 
> checkpoint coordinator for job a8a4fb85b681a897ba118db64333c9e5.
> 2021-10-09 00:17:14,401 [flink-akka.actor.default-dispatcher-90] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job 
> a8a4fb85b681a897ba118db64333c9e5 has been suspended.
> # clean up
> 2021-10-09 00:17:14,403 
> [AkkaRpcService-Supervisor-Termination-Future-Executor-thread-1] INFO  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
> Closing 
> ZooKeeperLeaderElectionDriver{leaderPath='/leader/a8a4fb85b681a897ba118db64333c9e5/job_manager_lock'}
> 2021-10-09 00:17:14,404 [cluster-io-thread-2] INFO  
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Released 
> job graph a8a4fb85b681a897ba118db64333c9e5 from 
> ZooKeeperStateHandleStore{namespace='flink/flink-.../jobgraphs'}.
> # however, during recovery, checkpoint 1116 was found in zookeeper, but the 
> metadata file /mnt/flink/.flink/ha/flink-.../completedCheckpoint42683d1121c7 
> was cleaned up due to the KeeperException$NodeExistsException happened before
> 2021-10-09 00:18:18,678 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Recovering checkpoints from 
> ZooKeeperStateHandleStore{namespace='flink/flink-.../checkpoints/a8a4fb85b681a897ba118db64333c9e5'}.
> 2021-10-09 00:18:18,686 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Found 4 checkpoints in 
> ZooKeeperStateHandleStore{namespace='flink/flink-.../checkpoints/a8a4fb85b681a897ba118db64333c9e5'}.
> 2021-10-09 00:18:18,686 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to fetch 4 checkpoints from storage.
> 2021-10-09 00:18:18,686 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to retrieve checkpoint 1113.
> 2021-10-09 00:18:18,689 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to retrieve checkpoint 1114.
> 2021-10-09 00:18:18,691 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to retrieve checkpoint 1115.
> 2021-10-09 00:18:18,693 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to retrieve checkpoint 1116.
> 2021-10-09 00:18:18,700 [flink-akka.actor.default-dispatcher-18] ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
> occurred in the cluster entrypoint.
> org.apache.flink.util.FlinkException: JobMaster for job 
> a8a4fb85b681a897ba118db64333c9e5 failed.
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873)
>  
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459)
>  
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436)
>  
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415)
>  
>       at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) 
> ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>  ~[?:?]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>  
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>  
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>  
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>  
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
>       at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
>       at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
>       at akka.actor.Actor.aroundReceive(Actor.scala:517) 
>       at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
>       at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
>       at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
>       at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
>       at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
>       at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
> not start the JobMaster.
>       at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>  
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
>  ~[?:?]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>       at java.lang.Thread.run(Thread.java:834) ~[?:?]
> Caused by: java.util.concurrent.CompletionException: 
> java.lang.RuntimeException: org.apache.flink.util.FlinkException: Could not 
> retrieve checkpoint 1116 from state handle under /0000000000000001116. This 
> indicates that the retrieved state handle is broken. Try cleaning the state 
> handle store.
>       at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
>  ~[?:?]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>       at java.lang.Thread.run(Thread.java:834) ~[?:?]
> Caused by: java.lang.RuntimeException: org.apache.flink.util.FlinkException: 
> Could not retrieve checkpoint 1116 from state handle under 
> /0000000000000001116. This indicates that the retrieved state handle is 
> broken. Try cleaning the state handle store.
>       at 
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) 
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
>  
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>  ~[?:?]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>       at java.lang.Thread.run(Thread.java:834) ~[?:?]
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve 
> checkpoint 1116 from state handle under /0000000000000001116. This indicates 
> that the retrieved state handle is broken. Try cleaning the state handle 
> store.
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:309)
>  
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151)
>  
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513)
>  
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
>  
>       at 
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
>  
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
>  
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
>  
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
>  
>       at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
>  
>       at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
>  
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
>  
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) 
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
>  
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
>  
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>  
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>  ~[?:?]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>       at java.lang.Thread.run(Thread.java:834) ~[?:?]
> Caused by: java.io.FileNotFoundException: 
> /mnt/flink/.flink/ha/flink-.../completedCheckpoint42683d1121c7 (No such file 
> or directory)
>       at java.io.FileInputStream.open0(Native Method) ~[?:?]
>       at java.io.FileInputStream.open(FileInputStream.java:219) ~[?:?]
>       at java.io.FileInputStream.<init>(FileInputStream.java:157) ~[?:?]
>       at 
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
>  
>       at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) 
>       at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>  
>       at 
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:66)
>  
>       at 
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
>  
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:298)
>  
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151)
>  
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513)
>  
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
>  
>       at 
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
>  
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
>  
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
>  
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
>  
>       at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
>  
>       at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
>  
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
>  
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) 
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
>  
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
>  
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>  
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>  ~[?:?]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>       at java.lang.Thread.run(Thread.java:834) ~[?:?]
>  {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to