[ 
https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17362667#comment-17362667
 ] 

Eduardo Winpenny Tejedor commented on FLINK-22483:
--------------------------------------------------

Hi [~trohrmann] , sorry for the delay but it's taken me a while to get some 
spare time and to crack this one out. I'll give you an update. I've got a 
couple of ideas as to how this could be done. Either move the handling of the 
{{DefaultCompletedCheckpointStore}} to the {{DefaultJobMasterServiceFactory}} 
as you suggested or move the recovering of checkpoints to a function close to 
where it already is but only call it from {{DefaultJobMasterServiceFactory}}. 
I'd like to know a couple more details before committing to either solution.

The stack trace presented in the ticket is indeed a call to 
{{CompletedCheckpointStore::recover}} but it seems to me to already happen 
[asynchronously|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L259].
  I think the call that needs lifting and shifting is [this 
one|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java#L134]
 which does get invoked from the {{JobMaster}} constructor. Am I on the right 
track here?

The call to {{CompletedCheckpointStore::recover}} lies deep in many layers of 
calls, how many wrapping layers need to be shifted to the 
{{DefaultJobMasterServiceFactory}}? Moving only the call to 
{{CompletedCheckpointStore::recover}} wouldn't make sense without moving [this 
whole block 
too|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java#L129].

 
{code:java}
final CheckpointCoordinator checkpointCoordinator =
        newExecutionGraph.getCheckpointCoordinator();

if (checkpointCoordinator != null) {
    // check whether we find a valid checkpoint
    if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
            new HashSet<>(newExecutionGraph.getAllVertices().values()))) {

        // check whether we can restore from a savepoint
        tryRestoreExecutionGraphFromSavepoint(
                newExecutionGraph, jobGraph.getSavepointRestoreSettings());
    }
}
{code}
Again, let me know if I'm not on the right track.

 

Finally, where would the code need to be shifted exactly for it to not bother 
the intended execution of the program? Before the instantiation of the 
{{JobMaster}}? After {{JobMaster::start}}? Anywhere in between would still be 
blocking wouldn't it? Possibly use a separate {{Executor}} for that?

Hope to get cracking on this one soon!

 

> Recover checkpoints when JobMaster gains leadership
> ---------------------------------------------------
>
>                 Key: FLINK-22483
>                 URL: https://issues.apache.org/jira/browse/FLINK-22483
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>    Affects Versions: 1.13.0
>            Reporter: Robert Metzger
>            Priority: Critical
>              Labels: stale-critical
>             Fix For: 1.14.0
>
>
> Recovering checkpoints (from the CompletedCheckpointStore) is a potentially 
> long-lasting/blocking operation, for example if the file system 
> implementation is retrying to connect to a unavailable storage backend.
> Currently, we are calling the CompletedCheckpointStore.recover() method from 
> the main thread of the JobManager, making it unresponsive to any RPC call 
> while the recover method is blocked:
> {code}
> 2021-04-02 20:33:31,384 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job XXX 
> switched from state RUNNING to RESTARTING.
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to 
> minio.minio.svc:9000 [minio.minio.svc/XXXX] failed: Connection refused 
> (Connection refused)
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?]
>       at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) 
> ~[?:?]
>       at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) 
> ~[?:?]
>       at 
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) 
> ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905)
>  ~[?:?]
>       at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902)
>  ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887)
>  ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880)
>  ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819)
>  ~[?:?]
>       at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818)
>  ~[?:?]
>       at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) 
> ~[?:1.8.0_282]
>       at XXX.recover(KubernetesHaCheckpointStore.java:69) 
> ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) 
> ~[?:1.8.0_282]
>       at 
> java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701)
>  ~[?:1.8.0_282]
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>  ~[?:1.8.0_282]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.Actor.aroundReceive(Actor.scala:517) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> Caused by: org.apache.http.conn.HttpHostConnectException: Connect to 
> minio.minio.svc:9000 [minio.minio.svc/10.115.246.236] failed: Connection 
> refused (Connection refused)
>       at 
> org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:156)
>  ~[?:?]
>       at 
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>  ~[?:?]
>       at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?]
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_282]
>       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
>       at 
> com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>  ~[?:?]
>       at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?]
>       at 
> org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>  ~[?:?]
>       at 
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>  ~[?:?]
>       at 
> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) 
> ~[?:?]
>       at 
> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>  ~[?:?]
>       at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>  ~[?:?]
>       at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>  ~[?:?]
>       at 
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
>  ~[?:?]
>       ... 67 more
> Caused by: java.net.ConnectException: Connection refused (Connection refused)
>       at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_282]
>       at 
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) 
> ~[?:1.8.0_282]
>       at 
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>  ~[?:1.8.0_282]
>       at 
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
> ~[?:1.8.0_282]
>       at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
> ~[?:1.8.0_282]
>       at java.net.Socket.connect(Socket.java:607) ~[?:1.8.0_282]
>       at 
> org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75)
>  ~[?:?]
>       at 
> org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>  ~[?:?]
>       at 
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>  ~[?:?]
>       at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?]
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_282]
>       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
>       at 
> com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>  ~[?:?]
>       at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?]
>       at 
> org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>  ~[?:?]
>       at 
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>  ~[?:?]
>       at 
> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) 
> ~[?:?]
>       at 
> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>  ~[?:?]
>       at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>  ~[?:?]
>       at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>  ~[?:?]
>       at 
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
>  ~[?:?]
>       ... 67 more
> {code}
> By moving the recovery to the start of the JobManager (which happens 
> asynchronously after the JobMaster has gained leadership), Flink will remain 
> responsive (reporting a job in INITIALIZING state).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to