[ 
https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-22483:
-------------------------------------
    Release Note: 
Flink no longer re-loads checkpoint metadata from the external storage before 
restoring the task
state after the failover (except when the JobManager fails over / changes 
leadership). This results
in less external I/O and faster failover.

Please note that this changes a public interfaces around 
`CompletedCheckpointStore`, that we allow
overriding by providing custom implementation of HA Services.

  was:
This changes a semi-public interface around CompletedCheckpointStore.

CompletedCheckpointRecover#recover() method has been removed as we only need to 
"recover" once, after JobManagerRunner gains leadership. We are now expecting 
CheckpointRecoveryFactory to always return an already recovered 
CompletedCheckpointStore. To make this new behavior more explicit, we've 
renamed CheckpointRecoveryFactory#createCompletedCheckpointStore factory method 
to CheckpointRecoveryFactory#createRecoveredCompletedCheckpointStore.


> Recover checkpoints when JobMaster gains leadership
> ---------------------------------------------------
>
>                 Key: FLINK-22483
>                 URL: https://issues.apache.org/jira/browse/FLINK-22483
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>    Affects Versions: 1.13.0
>            Reporter: Robert Metzger
>            Assignee: David Morávek
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>
> Recovering checkpoints (from the CompletedCheckpointStore) is a potentially 
> long-lasting/blocking operation, for example if the file system 
> implementation is retrying to connect to a unavailable storage backend.
> Currently, we are calling the CompletedCheckpointStore.recover() method from 
> the main thread of the JobManager, making it unresponsive to any RPC call 
> while the recover method is blocked:
> {code}
> 2021-04-02 20:33:31,384 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job XXX 
> switched from state RUNNING to RESTARTING.
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to 
> minio.minio.svc:9000 [minio.minio.svc/XXXX] failed: Connection refused 
> (Connection refused)
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?]
>       at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) 
> ~[?:?]
>       at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) 
> ~[?:?]
>       at 
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) 
> ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905)
>  ~[?:?]
>       at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902)
>  ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887)
>  ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880)
>  ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819)
>  ~[?:?]
>       at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>       at 
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818)
>  ~[?:?]
>       at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) 
> ~[?:1.8.0_282]
>       at XXX.recover(KubernetesHaCheckpointStore.java:69) 
> ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) 
> ~[?:1.8.0_282]
>       at 
> java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701)
>  ~[?:1.8.0_282]
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>  ~[?:1.8.0_282]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.Actor.aroundReceive(Actor.scala:517) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>       at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> Caused by: org.apache.http.conn.HttpHostConnectException: Connect to 
> minio.minio.svc:9000 [minio.minio.svc/10.115.246.236] failed: Connection 
> refused (Connection refused)
>       at 
> org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:156)
>  ~[?:?]
>       at 
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>  ~[?:?]
>       at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?]
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_282]
>       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
>       at 
> com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>  ~[?:?]
>       at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?]
>       at 
> org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>  ~[?:?]
>       at 
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>  ~[?:?]
>       at 
> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) 
> ~[?:?]
>       at 
> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>  ~[?:?]
>       at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>  ~[?:?]
>       at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>  ~[?:?]
>       at 
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
>  ~[?:?]
>       ... 67 more
> Caused by: java.net.ConnectException: Connection refused (Connection refused)
>       at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_282]
>       at 
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) 
> ~[?:1.8.0_282]
>       at 
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>  ~[?:1.8.0_282]
>       at 
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
> ~[?:1.8.0_282]
>       at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
> ~[?:1.8.0_282]
>       at java.net.Socket.connect(Socket.java:607) ~[?:1.8.0_282]
>       at 
> org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75)
>  ~[?:?]
>       at 
> org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>  ~[?:?]
>       at 
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>  ~[?:?]
>       at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?]
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_282]
>       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
>       at 
> com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>  ~[?:?]
>       at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?]
>       at 
> org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>  ~[?:?]
>       at 
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>  ~[?:?]
>       at 
> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) 
> ~[?:?]
>       at 
> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>  ~[?:?]
>       at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>  ~[?:?]
>       at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>  ~[?:?]
>       at 
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330)
>  ~[?:?]
>       at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
>  ~[?:?]
>       ... 67 more
> {code}
> By moving the recovery to the start of the JobManager (which happens 
> asynchronously after the JobMaster has gained leadership), Flink will remain 
> responsive (reporting a job in INITIALIZING state).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to