[
https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Morávek updated FLINK-22483:
----------------------------------
Release Note:
This changes a semi-public interface around CompletedCheckpointStore.
CompletedCheckpointRecover#recover() method has been removed as we only need to
"recover" once, after JobManagerRunner gains leadership. We are now expecting
CheckpointRecoveryFactory to always return an already recovered
CompletedCheckpointStore. To make this new behavior more explicit, we've
renamed CheckpointRecoveryFactory#createCompletedCheckpointStore factory method
to CheckpointRecoveryFactory#createRecoveredCompletedCheckpointStore.
> Recover checkpoints when JobMaster gains leadership
> ---------------------------------------------------
>
> Key: FLINK-22483
> URL: https://issues.apache.org/jira/browse/FLINK-22483
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Coordination
> Affects Versions: 1.13.0
> Reporter: Robert Metzger
> Assignee: Eduardo Winpenny Tejedor
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.14.0
>
>
> Recovering checkpoints (from the CompletedCheckpointStore) is a potentially
> long-lasting/blocking operation, for example if the file system
> implementation is retrying to connect to a unavailable storage backend.
> Currently, we are calling the CompletedCheckpointStore.recover() method from
> the main thread of the JobManager, making it unresponsive to any RPC call
> while the recover method is blocked:
> {code}
> 2021-04-02 20:33:31,384 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX
> switched from state RUNNING to RESTARTING.
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to
> minio.minio.svc:9000 [minio.minio.svc/XXXX] failed: Connection refused
> (Connection refused)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?]
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
> ~[?:?]
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008)
> ~[?:?]
> at
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905)
> ~[?:?]
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819)
> ~[?:?]
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818)
> ~[?:?]
> at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
> ~[?:1.8.0_282]
> at XXX.recover(KubernetesHaCheckpointStore.java:69)
> ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
> ~[?:1.8.0_282]
> at
> java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701)
> ~[?:1.8.0_282]
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> ~[?:1.8.0_282]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> Caused by: org.apache.http.conn.HttpHostConnectException: Connect to
> minio.minio.svc:9000 [minio.minio.svc/10.115.246.236] failed: Connection
> refused (Connection refused)
> at
> org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:156)
> ~[?:?]
> at
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
> ~[?:?]
> at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_282]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
> at
> com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
> ~[?:?]
> at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?]
> at
> org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
> ~[?:?]
> at
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
> ~[?:?]
> at
> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
> ~[?:?]
> at
> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
> ~[?:?]
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
> ~[?:?]
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
> ~[?:?]
> at
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
> ~[?:?]
> ... 67 more
> Caused by: java.net.ConnectException: Connection refused (Connection refused)
> at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_282]
> at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> ~[?:1.8.0_282]
> at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> ~[?:1.8.0_282]
> at
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> ~[?:1.8.0_282]
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> ~[?:1.8.0_282]
> at java.net.Socket.connect(Socket.java:607) ~[?:1.8.0_282]
> at
> org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75)
> ~[?:?]
> at
> org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
> ~[?:?]
> at
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
> ~[?:?]
> at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_282]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
> at
> com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
> ~[?:?]
> at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?]
> at
> org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
> ~[?:?]
> at
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
> ~[?:?]
> at
> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
> ~[?:?]
> at
> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
> ~[?:?]
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
> ~[?:?]
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
> ~[?:?]
> at
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
> ~[?:?]
> ... 67 more
> {code}
> By moving the recovery to the start of the JobManager (which happens
> asynchronously after the JobMaster has gained leadership), Flink will remain
> responsive (reporting a job in INITIALIZING state).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)