[
https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Robert Metzger updated FLINK-22483:
-----------------------------------
Description:
Recovering checkpoints (from the CompletedCheckpointStore) is a potentially
long-lasting/blocking operation, for example if the file system implementation
is retrying to connect to a unavailable storage backend.
Currently, we are calling the CompletedCheckpointStore.recover() method from
the main thread of the JobManager, making it unresponsive to any RPC call while
the recover method is blocked:
{code}
2021-04-02 20:33:31,384 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX
switched from state RUNNING to RESTARTING.
com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to
minio.minio.svc:9000 [minio.minio.svc/XXXX] failed: Connection refused
(Connection refused)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?]
at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?]
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) ~[?:?]
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) ~[?:?]
at
com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490)
~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905)
~[?:?]
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902)
~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887)
~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880)
~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819)
~[?:?]
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818)
~[?:?]
at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
~[?:1.8.0_282]
at XXX.recover(KubernetesHaCheckpointStore.java:69)
~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at
org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
~[?:1.8.0_282]
at
java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701)
~[?:1.8.0_282]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
~[?:1.8.0_282]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at akka.actor.Actor.aroundReceive(Actor.scala:517)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
Caused by: org.apache.http.conn.HttpHostConnectException: Connect to
minio.minio.svc:9000 [minio.minio.svc/10.115.246.236] failed: Connection
refused (Connection refused)
at
org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:156)
~[?:?]
at
org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
~[?:?]
at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
at
com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
~[?:?]
at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?]
at
org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
~[?:?]
at
org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
~[?:?]
at
org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
~[?:?]
at
org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
~[?:?]
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
~[?:?]
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
~[?:?]
at
com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
~[?:?]
... 67 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_282]
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
~[?:1.8.0_282]
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
~[?:1.8.0_282]
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
~[?:1.8.0_282]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
~[?:1.8.0_282]
at java.net.Socket.connect(Socket.java:607) ~[?:1.8.0_282]
at
org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75)
~[?:?]
at
org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
~[?:?]
at
org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
~[?:?]
at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_282]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
at
com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
~[?:?]
at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?]
at
org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
~[?:?]
at
org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
~[?:?]
at
org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
~[?:?]
at
org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
~[?:?]
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
~[?:?]
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
~[?:?]
at
com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
~[?:?]
... 67 more
{code}
By moving the recovery to the start of the JobManager (which happens
asynchronously after the JobMaster has gained leadership), Flink will remain
responsive (reporting a job in INITIALIZING state).
was:
Recovering checkpoints (from the CompletedCheckpointStore) is a potentially
blocking operation, for example if the file system implementation is retrying
to connect to a unavailable storage backend.
Currently, we are calling the CompletedCheckpointStore.recover() method from
the main thread of the JobManager, making it unresponsive to any RPC call while
the recover method is blocked.
By moving the recovery to the start of the JobManager (which happens
asynchronously after the JobMaster has gained leadership), Flink will remain
responsive (reporting a job in INITIALIZING state).
> Recover checkpoints when JobMaster gains leadership
> ---------------------------------------------------
>
> Key: FLINK-22483
> URL: https://issues.apache.org/jira/browse/FLINK-22483
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.13.0
> Reporter: Robert Metzger
> Priority: Major
> Fix For: 1.14.0
>
>
> Recovering checkpoints (from the CompletedCheckpointStore) is a potentially
> long-lasting/blocking operation, for example if the file system
> implementation is retrying to connect to a unavailable storage backend.
> Currently, we are calling the CompletedCheckpointStore.recover() method from
> the main thread of the JobManager, making it unresponsive to any RPC call
> while the recover method is blocked:
> {code}
> 2021-04-02 20:33:31,384 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX
> switched from state RUNNING to RESTARTING.
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to
> minio.minio.svc:9000 [minio.minio.svc/XXXX] failed: Connection refused
> (Connection refused)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?]
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
> ~[?:?]
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008)
> ~[?:?]
> at
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905)
> ~[?:?]
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819)
> ~[?:?]
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818)
> ~[?:?]
> at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
> ~[?:1.8.0_282]
> at XXX.recover(KubernetesHaCheckpointStore.java:69)
> ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
> ~[?:1.8.0_282]
> at
> java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701)
> ~[?:1.8.0_282]
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> ~[?:1.8.0_282]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> Caused by: org.apache.http.conn.HttpHostConnectException: Connect to
> minio.minio.svc:9000 [minio.minio.svc/10.115.246.236] failed: Connection
> refused (Connection refused)
> at
> org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:156)
> ~[?:?]
> at
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
> ~[?:?]
> at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_282]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
> at
> com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
> ~[?:?]
> at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?]
> at
> org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
> ~[?:?]
> at
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
> ~[?:?]
> at
> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
> ~[?:?]
> at
> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
> ~[?:?]
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
> ~[?:?]
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
> ~[?:?]
> at
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
> ~[?:?]
> ... 67 more
> Caused by: java.net.ConnectException: Connection refused (Connection refused)
> at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_282]
> at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> ~[?:1.8.0_282]
> at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> ~[?:1.8.0_282]
> at
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> ~[?:1.8.0_282]
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> ~[?:1.8.0_282]
> at java.net.Socket.connect(Socket.java:607) ~[?:1.8.0_282]
> at
> org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75)
> ~[?:?]
> at
> org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
> ~[?:?]
> at
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
> ~[?:?]
> at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_282]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
> at
> com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
> ~[?:?]
> at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?]
> at
> org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
> ~[?:?]
> at
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
> ~[?:?]
> at
> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
> ~[?:?]
> at
> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
> ~[?:?]
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
> ~[?:?]
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
> ~[?:?]
> at
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
> ~[?:?]
> ... 67 more
> {code}
> By moving the recovery to the start of the JobManager (which happens
> asynchronously after the JobMaster has gained leadership), Flink will remain
> responsive (reporting a job in INITIALIZING state).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)