[
https://issues.apache.org/jira/browse/FLINK-20672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17783892#comment-17783892
]
Zakelly Lan commented on FLINK-20672:
-------------------------------------
[~yunta] I'm afraid this is still valid since the io executors are using the
FatalExitExceptionHandler (see [initialization
code|https://github.com/apache/flink/pull/16946/files#diff-2d970a09502b5a55d74c67dabafa932a18848d4a600b9b33f4afb584328dc0f0R178]).
I did an experiment by throwing an RuntimeException within the abort message
sending lambda:
{code:java}
// send notification of aborted checkpoints asynchronously.
executor.execute(
() -> {
// send the "abort checkpoint" messages to necessary
vertices.
// ...
LOG.info("Simulate something wrong.");
throw new RuntimeException("something wrong");
});
{code}
And when a checkpoint expired it turns out the JM exits with this log:
{code:java}
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired
before completing.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2323)
[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_372]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[?:1.8.0_372]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_372]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[?:1.8.0_372]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_372]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_372]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372]
2023-11-08 13:58:13,693 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Simulate
something wrong.
2023-11-08 13:58:13,694 ERROR org.apache.flink.util.FatalExitExceptionHandler
[] - FATAL: Thread 'jobmanager-io-thread-1' produced an uncaught
exception. Stopping the process...
java.lang.RuntimeException: something wrong
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$sendAbortedMessages$12(CheckpointCoordinator.java:1605)
~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_372]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_372]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372]
2023-11-08 13:58:13,722 ERROR org.apache.flink.util.FatalExitExceptionHandler
(Thread dump omitted)
2023-11-08 13:58:13,724 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics
Cluster entrypoint has been closed externally..
2023-11-08 13:58:13,725 INFO org.apache.flink.runtime.blob.BlobServer
[] - Stopped BLOB server at 0.0.0.0:38077
2023-11-08 13:58:13,727 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
down rest endpoint.
2023-11-08 13:58:13,736 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing
cache directory /tmp/flink-web-318212a0-3faa-4fe8-92df-ed445943d4e0/flink-web-ui
2023-11-08 13:58:13,736 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
http://10.0.3.79:38897 lost leadership
2023-11-08 13:58:13,736 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down
complete.
2023-11-08 13:58:13,736 INFO
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
[] - Closing components.
2023-11-08 13:58:13,737 INFO
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] -
Stopping JobDispatcherLeaderProcess.
2023-11-08 13:58:13,737 INFO
org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] -
Stopping resource manager service.
2023-11-08 13:58:13,739 INFO
org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Stopping
dispatcher pekko.tcp://flink@xxxxxxxxxx:44267/user/rpc/dispatcher_0.
2023-11-08 13:58:13,739 INFO
org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Stopping all
currently running jobs of dispatcher
pekko.tcp://xxxxxxxxxx:44267/user/rpc/dispatcher_0.
2023-11-08 13:58:13,743 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Stopping the JobMaster for job 'testing6'
(bf8a2c9a2628fedd369e1c50451f66ea).
2023-11-08 13:58:13,744 INFO
org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Job
bf8a2c9a2628fedd369e1c50451f66ea reached terminal state SUSPENDED.
2023-11-08 13:58:13,745 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job testing6
(bf8a2c9a2628fedd369e1c50451f66ea) switched from state RUNNING to SUSPENDED.
org.apache.flink.util.FlinkException: Scheduler is being stopped.
at
org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:656)
~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at
org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:1082)
~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at
org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:1045)
~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at
org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:451)
~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239)
~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StartedState.lambda$terminate$0(PekkoRpcActor.java:574)
~[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StartedState.terminate(PekkoRpcActor.java:573)
~[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:196)
~[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
[?:1.8.0_372]
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
[?:1.8.0_372]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
[?:1.8.0_372]
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
[?:1.8.0_372]
{code}
> notifyCheckpointAborted RPC failure can fail JM
> -----------------------------------------------
>
> Key: FLINK-20672
> URL: https://issues.apache.org/jira/browse/FLINK-20672
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.11.3, 1.12.0
> Reporter: Roman Khachatryan
> Assignee: Zakelly Lan
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor,
> pull-request-available
>
> Introduced in FLINK-8871, aborted RPC notifications are done asynchonously:
>
> {code}
> private void sendAbortedMessages(long checkpointId, long timeStamp) {
> // send notification of aborted checkpoints asynchronously.
> executor.execute(() -> {
> // send the "abort checkpoint" messages to necessary
> vertices.
> // ..
> });
> }
> {code}
> However, the executor that eventually executes this request is created as
> follows
> {code}
> final ScheduledExecutorService futureExecutor =
> Executors.newScheduledThreadPool(
> Hardware.getNumberCPUCores(),
> new ExecutorThreadFactory("jobmanager-future"));
> {code}
> ExecutorThreadFactory uses UncaughtExceptionHandler that exits JVM on error.
> cc: [~yunta]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)