[ 
https://issues.apache.org/jira/browse/FLINK-21067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17322035#comment-17322035
 ] 

Till Rohrmann commented on FLINK-21067:
---------------------------------------

Due to this change, the logs became super noisy when we cannot trigger a 
checkpoint:

{code}
13:09:39,798 [    Checkpoint Timer] WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Failed to 
trigger checkpoint for job 8e0ce5a96f0ea544f0d8d78051e5dc06.)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering 
task testVertex (1/2) of job 8e0ce5a96f0ea544f0d8d78051e5dc06 has not being 
executed at the moment. Aborting checkpoint. Failure reason: Not all required 
tasks are currently running.
        at 
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_282]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
~[scala-library-2.11.12.jar:?]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
~[scala-library-2.11.12.jar:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[scala-library-2.11.12.jar:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[scala-library-2.11.12.jar:?]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
13:09:39,799 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to 
cancel task testVertex (2/2)#1 (062ee49c1a8f46af91971a8df568f878).
13:09:39,799 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - testVertex 
(2/2)#1 (062ee49c1a8f46af91971a8df568f878) switched from RECOVERING to 
CANCELING.
13:09:39,799 [  testVertex (1/2)#2] INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - testVertex 
(1/2)#2 (ee9f9783a36d9f5da170a276b606b076) switched from CANCELING to CANCELED.
13:09:39,799 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Triggering 
cancellation of task code testVertex (2/2)#1 (062ee49c1a8f46af91971a8df568f878).
13:09:39,799 [  testVertex (1/2)#2] INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task 
resources for testVertex (1/2)#2 (ee9f9783a36d9f5da170a276b606b076).
13:09:39,800 [mini-cluster-io-thread-43] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - JobManager 
for job a79bf377367e749a64c8dd2e6d95ac38 with leader id 
b4e6e50fbae29c8f981ce43a359549b3 lost leadership.
13:09:39,800 [flink-akka.actor.default-dispatcher-5] INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registering job manager 
821bd62f89c04ea008988072519e406c@akka://flink/user/rpc/jobmanager_6 for job 
8e0ce5a96f0ea544f0d8d78051e5dc06.
13:09:39,801 [    Checkpoint Timer] WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Failed to 
trigger checkpoint for job 8e0ce5a96f0ea544f0d8d78051e5dc06.)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering 
task testVertex (1/2) of job 8e0ce5a96f0ea544f0d8d78051e5dc06 has not being 
executed at the moment. Aborting checkpoint. Failure reason: Not all required 
tasks are currently running.
        at 
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_282]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
~[scala-library-2.11.12.jar:?]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
~[scala-library-2.11.12.jar:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[scala-library-2.11.12.jar:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[scala-library-2.11.12.jar:?]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
13:09:39,801 [flink-akka.actor.default-dispatcher-5] INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Received resource declaration for job a79bf377367e749a64c8dd2e6d95ac38: []
13:09:39,802 [flink-akka.actor.default-dispatcher-5] INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Disconnect job manager 
b4e6e50fbae29c8f981ce43a359549b3@akka://flink/user/rpc/jobmanager_5 for job 
a79bf377367e749a64c8dd2e6d95ac38 from the resource manager.
13:09:39,810 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
TaskSlot(index:1, state:ACTIVE, resource profile: 
ResourceProfile{taskHeapMemory=256.000gb (274877906944 bytes), 
taskOffHeapMemory=256.000gb (274877906944 bytes), managedMemory=20.000mb 
(20971520 bytes), networkMemory=16.000mb (16777216 bytes)}, allocationId: 
a4db47635b3702514aed35fffffc757c, jobId: a79bf377367e749a64c8dd2e6d95ac38).
13:09:39,810 [flink-akka.actor.default-dispatcher-2] INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registered job manager 
821bd62f89c04ea008988072519e406c@akka://flink/user/rpc/jobmanager_6 for job 
8e0ce5a96f0ea544f0d8d78051e5dc06.
13:09:39,811 [flink-akka.actor.default-dispatcher-5] INFO  
org.apache.flink.runtime.jobmaster.JobMaster                 [] - JobManager 
successfully registered at ResourceManager, leader id: 
83a6df876efea6c54ca7a390eb16409c.
13:09:39,812 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to 
fail task externally testVertex (1/2)#2 (ee9f9783a36d9f5da170a276b606b076).
13:09:39,812 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Task 
testVertex (1/2)#2 is already in state CANCELED
13:09:39,812 [flink-akka.actor.default-dispatcher-5] INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Received resource declaration for job 8e0ce5a96f0ea544f0d8d78051e5dc06: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=2}]
13:09:39,812 [    Checkpoint Timer] WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Failed to 
trigger checkpoint for job 8e0ce5a96f0ea544f0d8d78051e5dc06.)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering 
task testVertex (1/2) of job 8e0ce5a96f0ea544f0d8d78051e5dc06 has not being 
executed at the moment. Aborting checkpoint. Failure reason: Not all required 
tasks are currently running.
        at 
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_282]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
~[scala-library-2.11.12.jar:?]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
~[scala-library-2.11.12.jar:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[scala-library-2.11.12.jar:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[scala-library-2.11.12.jar:?]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
{code}

Do we really want to log this on WARN given that this is a normal case? cc 
[~roman_khachatryan], [~pnowojski].

> Modify the logic of computing tasks to trigger/wait/commit to consider 
> finished tasks
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-21067
>                 URL: https://issues.apache.org/jira/browse/FLINK-21067
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / DataStream, Runtime / Checkpointing
>            Reporter: Yun Gao
>            Assignee: Yun Gao
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.13.0
>
>
> To support checkpoint after tasks finished, for each checkpoint we would like 
> to trigger the new "root" tasks, and wait / commit for all the running tasks. 
> Thus we would need to modify the logic of identifying the tasks to trigger / 
> wait / commit



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to