yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285325598
########## File path: flink-end-to-end-tests/flink-state-evolution-test/src/main/java/org/apache/flink/test/StatefulStreamingJob.java ########## @@ -150,6 +150,7 @@ public static void main(String[] args) throws Exception { env.setRestartStrategy(RestartStrategies.noRestart()); env.enableCheckpointing(1000L); env.getConfig().disableGenericTypes(); + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); Review comment: After removing all the setting to tolerable checkpoint failure number, this case is the first one which causes Travis's failure. Error detail : ```java 2019-05-17 15:43:08,068 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 66 @ 1558107788056 for job dd4f07c1f866b7afff42759d7a35de3f. 2019-05-17 15:43:08,207 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 66 by task e65ff0e6f0ff8e14a1b53410aa8bb019 of job dd4f07c1f866b7afff42759d7a35de3f. 2019-05-17 15:43:08,208 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 66 of job dd4f07c1f866b7afff42759d7a35de3f. org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: Custom Source (1/1) was not running at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1150) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-05-17 15:43:08,213 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (dd4f07c1f866b7afff42759d7a35de3f) switched from state RUNNING to FAILING. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$enableCheckpointing$0(ExecutionGraph.java:540) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpoint(CheckpointCoordinator.java:1401) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1333) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:758) at org.apache.flink.runtime.jobmaster.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:556) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-05-17 15:43:08,224 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (e65ff0e6f0ff8e14a1b53410aa8bb019) switched from RUNNING to CANCELING. 2019-05-17 15:43:08,238 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (1/1) (b6eca113b5bb384dd099e20a14250dcb) switched from RUNNING to CANCELING. 2019-05-17 15:43:08,324 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (e65ff0e6f0ff8e14a1b53410aa8bb019) switched from CANCELING to CANCELED. 2019-05-17 15:43:08,345 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (1/1) (b6eca113b5bb384dd099e20a14250dcb) switched from CANCELING to CANCELED. 2019-05-17 15:43:08,353 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Try to restart or fail the job Flink Streaming Job (dd4f07c1f866b7afff42759d7a35de3f) if no longer possible. 2019-05-17 15:43:08,356 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (dd4f07c1f866b7afff42759d7a35de3f) switched from state FAILING to FAILED. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$enableCheckpointing$0(ExecutionGraph.java:540) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpoint(CheckpointCoordinator.java:1401) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1333) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:758) at org.apache.flink.runtime.jobmaster.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:556) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-05-17 15:43:08,358 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Could not restart the job Flink Streaming Job (dd4f07c1f866b7afff42759d7a35de3f) because the restart strategy prevented it. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$enableCheckpointing$0(ExecutionGraph.java:540) at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpoint(CheckpointCoordinator.java:1401) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1333) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:758) at org.apache.flink.runtime.jobmaster.LegacyScheduler.lambda$declineCheckpoint$2(LegacyScheduler.java:556) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` Log details : https://api.travis-ci.org/v3/job/533773072/log.txt Reason analysis: 1. The job does not allow restart: ``` env.setRestartStrategy(RestartStrategies.noRestart()); ``` 2. Default tolerance failure number is 0; 3. the source task is not ready when receiving the trigger message so it acknowledged a decline message. 4. After coordinator received a decline message, it will call `discardCheckpoint` method then call the new `failPendingCheckpoint` method, which is introduced in this PR, it will count the failure number and compare with the tolerance number (here is 0), then fail the job. Step 4 is the key step which is different from the old implement. Before this pr, `discardCheckpoint` only call `PendingCheckpoint#abortXX` method which can not trigger job failure. For replay the other cases' exception, I am going to provisionally recover the setting of the tolerance number. Because of this case happens in compile phase in Travis, it stopped all the tests. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services