[
https://issues.apache.org/jira/browse/FLINK-21067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17322035#comment-17322035
]
Till Rohrmann commented on FLINK-21067:
---------------------------------------
Due to this change, the logs became super noisy when we cannot trigger a
checkpoint:
{code}
13:09:39,798 [ Checkpoint Timer] WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Failed to
trigger checkpoint for job 8e0ce5a96f0ea544f0d8d78051e5dc06.)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering
task testVertex (1/2) of job 8e0ce5a96f0ea544f0d8d78051e5dc06 has not being
executed at the moment. Aborting checkpoint. Failure reason: Not all required
tasks are currently running.
at
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_282]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[scala-library-2.11.12.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[scala-library-2.11.12.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[scala-library-2.11.12.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[scala-library-2.11.12.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
13:09:39,799 [flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.taskmanager.Task [] - Attempting to
cancel task testVertex (2/2)#1 (062ee49c1a8f46af91971a8df568f878).
13:09:39,799 [flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.taskmanager.Task [] - testVertex
(2/2)#1 (062ee49c1a8f46af91971a8df568f878) switched from RECOVERING to
CANCELING.
13:09:39,799 [ testVertex (1/2)#2] INFO
org.apache.flink.runtime.taskmanager.Task [] - testVertex
(1/2)#2 (ee9f9783a36d9f5da170a276b606b076) switched from CANCELING to CANCELED.
13:09:39,799 [flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.taskmanager.Task [] - Triggering
cancellation of task code testVertex (2/2)#1 (062ee49c1a8f46af91971a8df568f878).
13:09:39,799 [ testVertex (1/2)#2] INFO
org.apache.flink.runtime.taskmanager.Task [] - Freeing task
resources for testVertex (1/2)#2 (ee9f9783a36d9f5da170a276b606b076).
13:09:39,800 [mini-cluster-io-thread-43] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - JobManager
for job a79bf377367e749a64c8dd2e6d95ac38 with leader id
b4e6e50fbae29c8f981ce43a359549b3 lost leadership.
13:09:39,800 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering job manager
821bd62f89c04ea008988072519e406c@akka://flink/user/rpc/jobmanager_6 for job
8e0ce5a96f0ea544f0d8d78051e5dc06.
13:09:39,801 [ Checkpoint Timer] WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Failed to
trigger checkpoint for job 8e0ce5a96f0ea544f0d8d78051e5dc06.)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering
task testVertex (1/2) of job 8e0ce5a96f0ea544f0d8d78051e5dc06 has not being
executed at the moment. Aborting checkpoint. Failure reason: Not all required
tasks are currently running.
at
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_282]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[scala-library-2.11.12.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[scala-library-2.11.12.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[scala-library-2.11.12.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[scala-library-2.11.12.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
13:09:39,801 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Received resource declaration for job a79bf377367e749a64c8dd2e6d95ac38: []
13:09:39,802 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Disconnect job manager
b4e6e50fbae29c8f981ce43a359549b3@akka://flink/user/rpc/jobmanager_5 for job
a79bf377367e749a64c8dd2e6d95ac38 from the resource manager.
13:09:39,810 [flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot
TaskSlot(index:1, state:ACTIVE, resource profile:
ResourceProfile{taskHeapMemory=256.000gb (274877906944 bytes),
taskOffHeapMemory=256.000gb (274877906944 bytes), managedMemory=20.000mb
(20971520 bytes), networkMemory=16.000mb (16777216 bytes)}, allocationId:
a4db47635b3702514aed35fffffc757c, jobId: a79bf377367e749a64c8dd2e6d95ac38).
13:09:39,810 [flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registered job manager
821bd62f89c04ea008988072519e406c@akka://flink/user/rpc/jobmanager_6 for job
8e0ce5a96f0ea544f0d8d78051e5dc06.
13:09:39,811 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager
successfully registered at ResourceManager, leader id:
83a6df876efea6c54ca7a390eb16409c.
13:09:39,812 [flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.taskmanager.Task [] - Attempting to
fail task externally testVertex (1/2)#2 (ee9f9783a36d9f5da170a276b606b076).
13:09:39,812 [flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.taskmanager.Task [] - Task
testVertex (1/2)#2 is already in state CANCELED
13:09:39,812 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Received resource declaration for job 8e0ce5a96f0ea544f0d8d78051e5dc06:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=2}]
13:09:39,812 [ Checkpoint Timer] WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Failed to
trigger checkpoint for job 8e0ce5a96f0ea544f0d8d78051e5dc06.)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering
task testVertex (1/2) of job 8e0ce5a96f0ea544f0d8d78051e5dc06 has not being
executed at the moment. Aborting checkpoint. Failure reason: Not all required
tasks are currently running.
at
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_282]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[scala-library-2.11.12.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[scala-library-2.11.12.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[scala-library-2.11.12.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[scala-library-2.11.12.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[akka-actor_2.11-2.5.21.jar:2.5.21]
{code}
Do we really want to log this on WARN given that this is a normal case? cc
[~roman_khachatryan], [~pnowojski].
> Modify the logic of computing tasks to trigger/wait/commit to consider
> finished tasks
> -------------------------------------------------------------------------------------
>
> Key: FLINK-21067
> URL: https://issues.apache.org/jira/browse/FLINK-21067
> Project: Flink
> Issue Type: Sub-task
> Components: API / DataStream, Runtime / Checkpointing
> Reporter: Yun Gao
> Assignee: Yun Gao
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.13.0
>
>
> To support checkpoint after tasks finished, for each checkpoint we would like
> to trigger the new "root" tasks, and wait / commit for all the running tasks.
> Thus we would need to modify the logic of identifying the tasks to trigger /
> wait / commit
--
This message was sent by Atlassian Jira
(v8.3.4#803005)