[ 
https://issues.apache.org/jira/browse/FLINK-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718491#comment-16718491
 ] 

ASF GitHub Bot commented on FLINK-4810:
---------------------------------------

ramkrish86 closed pull request #3334: FLINK-4810 Checkpoint Coordinator should 
fail ExecutionGraph after "n" unsuccessful checkpoints
URL: https://github.com/apache/flink/pull/3334
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0592e3d9aea..9f453d0f2c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -132,6 +132,8 @@
 
        /** The maximum number of checkpoints that may be in progress at the 
same time */
        private final int maxConcurrentCheckpointAttempts;
+       /** The maximum number of unsuccessful checkpoints */
+       private final int maxFailedCheckpoints;
 
        /** The timer that handles the checkpoint timeouts and triggers 
periodic checkpoints */
        private final Timer timer;
@@ -142,6 +144,9 @@
        /** The number of consecutive failed trigger attempts */
        private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new 
AtomicInteger(0);
 
+       /** The number of consecutive failed checkpoints */
+       private final AtomicInteger numFailedCheckpoints = new AtomicInteger(0);
+
        private ScheduledTrigger currentPeriodicTrigger;
 
        /** The timestamp (via {@link System#nanoTime()}) when the last 
checkpoint completed */
@@ -163,6 +168,23 @@
        private CheckpointStatsTracker statsTracker;
 
        // 
--------------------------------------------------------------------------------------------
+       public CheckpointCoordinator(
+               JobID job,
+               long baseInterval,
+               long checkpointTimeout,
+               long minPauseBetweenCheckpoints,
+               int maxConcurrentCheckpointAttempts,
+               ExternalizedCheckpointSettings externalizeSettings,
+               ExecutionVertex[] tasksToTrigger,
+               ExecutionVertex[] tasksToWaitFor,
+               ExecutionVertex[] tasksToCommitTo,
+               CheckpointIDCounter checkpointIDCounter,
+               CompletedCheckpointStore completedCheckpointStore,
+               String checkpointDirectory,
+               Executor executor) {
+               this(job, baseInterval, checkpointTimeout, 
minPauseBetweenCheckpoints, maxConcurrentCheckpointAttempts, 0, 
externalizeSettings, tasksToTrigger, tasksToWaitFor, tasksToCommitTo,
+                       checkpointIDCounter, completedCheckpointStore, 
checkpointDirectory, executor);
+       }
 
        public CheckpointCoordinator(
                        JobID job,
@@ -170,6 +192,7 @@ public CheckpointCoordinator(
                        long checkpointTimeout,
                        long minPauseBetweenCheckpoints,
                        int maxConcurrentCheckpointAttempts,
+                       int maxFailedCheckpoints,
                        ExternalizedCheckpointSettings externalizeSettings,
                        ExecutionVertex[] tasksToTrigger,
                        ExecutionVertex[] tasksToWaitFor,
@@ -184,6 +207,7 @@ public CheckpointCoordinator(
                checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must 
be larger than zero");
                checkArgument(minPauseBetweenCheckpoints >= 0, 
"minPauseBetweenCheckpoints must be >= 0");
                checkArgument(maxConcurrentCheckpointAttempts >= 1, 
"maxConcurrentCheckpointAttempts must be >= 1");
+               checkArgument(maxFailedCheckpoints >= 0, "maxFailedCheckpoints 
must be >= 0");
 
                if (externalizeSettings.externalizeCheckpoints() && 
checkpointDirectory == null) {
                        throw new IllegalStateException("CheckpointConfig says 
to persist periodic " +
@@ -207,6 +231,7 @@ public CheckpointCoordinator(
                this.checkpointTimeout = checkpointTimeout;
                this.minPauseBetweenCheckpointsNanos = 
minPauseBetweenCheckpoints * 1_000_000;
                this.maxConcurrentCheckpointAttempts = 
maxConcurrentCheckpointAttempts;
+               this.maxFailedCheckpoints = maxFailedCheckpoints;
                this.tasksToTrigger = checkNotNull(tasksToTrigger);
                this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
                this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
@@ -461,6 +486,9 @@ CheckpointTriggerResult triggerCheckpoint(
                        catch (Throwable t) {
                                int numUnsuccessful = 
numUnsuccessfulCheckpointsTriggers.incrementAndGet();
                                LOG.warn("Failed to trigger checkpoint (" + 
numUnsuccessful + " consecutive failed attempts so far)", t);
+                               if(numUnsuccessful > maxFailedCheckpoints) {
+                                       return failExecution(tasksToCommitTo);
+                               }
                                return new 
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
                        }
 
@@ -577,18 +605,35 @@ else if (!props.forceCheckpoint()) {
                                if (!checkpoint.isDiscarded()) {
                                        checkpoint.abortError(new 
Exception("Failed to trigger checkpoint"));
                                }
+                               if(numUnsuccessful > maxFailedCheckpoints) {
+                                       return failExecution(tasksToCommitTo);
+                               }
                                return new 
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
                        }
 
                } // end trigger lock
        }
 
+       private CheckpointTriggerResult failExecution(ExecutionVertex[] 
executions) {
+               if (currentPeriodicTrigger != null) {
+                       currentPeriodicTrigger.cancel();
+                       currentPeriodicTrigger = null;
+               }
+               for (ExecutionVertex executionVertex : executions) {
+                       // fail the graph
+                       if(executionVertex != null) {
+                               executionVertex.getExecutionGraph().fail(new 
Throwable("The number of max unsuccessful checkpoints attempts exhausted"));
+                       }
+               }
+               return new 
CheckpointTriggerResult(CheckpointDeclineReason.MAX_FAILED_ATTEMPTS_EXHAUSTED);
+       }
+
        /**
         * Receives a {@link DeclineCheckpoint} message for a pending 
checkpoint.
         *
         * @param message Checkpoint decline from the task manager
         */
-       public void receiveDeclineMessage(DeclineCheckpoint message) {
+       public void receiveDeclineMessage(DeclineCheckpoint message) throws 
CheckpointException {
                if (shutdown || message == null) {
                        return;
                }
@@ -618,7 +663,6 @@ public void receiveDeclineMessage(DeclineCheckpoint 
message) {
                                pendingCheckpoints.remove(checkpointId);
                                checkpoint.abortDeclined();
                                rememberRecentCheckpointId(checkpointId);
-
                                // we don't have to schedule another 
"dissolving" checkpoint any more because the
                                // cancellation barriers take care of breaking 
downstream alignments
                                // we only need to make sure that suspended 
queued requests are resumed
@@ -632,11 +676,24 @@ public void receiveDeclineMessage(DeclineCheckpoint 
message) {
                                }
 
                                if (!haveMoreRecentPending) {
-                                       triggerQueuedRequests();
+                                       int numFailed = 
this.numFailedCheckpoints.incrementAndGet();
+                                       if(numFailed > maxFailedCheckpoints) {
+                                               failExecution(tasksToCommitTo);
+                                               throw new CheckpointException(
+                                                       "Max failed checkpoints 
attempts exhausted");
+                                       } else {
+                                               triggerQueuedRequests();
+                                       }
                                }
                        }
                        else if (checkpoint != null) {
                                // this should not happen
+                               int numFailed = 
this.numFailedCheckpoints.incrementAndGet();
+                               if(numFailed > maxFailedCheckpoints) {
+                                       failExecution(tasksToCommitTo);
+                                       throw new CheckpointException(
+                                               "Max failed checkpoints 
attempts exhausted");
+                               }
                                throw new IllegalStateException(
                                                "Received message for discarded 
but non-removed checkpoint " + checkpointId);
                        }
@@ -795,7 +852,11 @@ public void run() {
                                        }
                                });
                        }
-
+                       int numFailed = 
this.numFailedCheckpoints.incrementAndGet();
+                       if(numFailed > maxFailedCheckpoints) {
+                               failExecution(tasksToCommitTo);
+                               throw new CheckpointException("Max failed 
checkpoints attempts exhausted "+ '.', exception);
+                       }
                        throw new CheckpointException("Could not complete the 
pending checkpoint " + checkpointId + '.', exception);
                } finally {
                        pendingCheckpoints.remove(checkpointId);
@@ -832,6 +893,8 @@ public void run() {
                                ee.notifyCheckpointComplete(checkpointId, 
timestamp);
                        }
                }
+               // reset it - we are sure that we got a successful checkpoint
+               this.numFailedCheckpoints.set(0);
        }
 
        private void rememberRecentCheckpointId(long id) {
@@ -1027,6 +1090,7 @@ public void stopCheckpointScheduler() {
 
                        pendingCheckpoints.clear();
                        numUnsuccessfulCheckpointsTriggers.set(0);
+                       numFailedCheckpoints.set(0);
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
index 60fe657c851..34f12d1e72e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
@@ -36,7 +36,9 @@
 
        NOT_ALL_REQUIRED_TASKS_RUNNING("Not all required tasks are currently 
running."),
 
-       EXCEPTION("An Exception occurred while triggering the checkpoint.");
+       EXCEPTION("An Exception occurred while triggering the checkpoint."),
+
+       MAX_FAILED_ATTEMPTS_EXHAUSTED("The number of max failed checkpoints 
attempts exhausted.");
 
        // 
------------------------------------------------------------------------
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 6c9dbaf79c7..b0eb2aa9549 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -61,7 +61,7 @@
  */
 public class PendingCheckpoint {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointCoordinator.class);
+       private static final Logger LOG = 
LoggerFactory.getLogger(PendingCheckpoint.class);
 
        private final Object lock = new Object();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 5fa40fced37..fa29398a48a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -358,6 +358,7 @@ public void enableCheckpointing(
                        long checkpointTimeout,
                        long minPauseBetweenCheckpoints,
                        int maxConcurrentCheckpoints,
+                       int maxFailedCheckpoints,
                        ExternalizedCheckpointSettings externalizeSettings,
                        List<ExecutionJobVertex> verticesToTrigger,
                        List<ExecutionJobVertex> verticesToWaitFor,
@@ -396,6 +397,7 @@ public void enableCheckpointing(
                        checkpointTimeout,
                        minPauseBetweenCheckpoints,
                        maxConcurrentCheckpoints,
+                       maxFailedCheckpoints,
                        externalizeSettings,
                        tasksToTrigger,
                        tasksToWaitFor,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index ec7103c3c2c..a8954b45f86 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -224,6 +224,7 @@ public static ExecutionGraph buildGraph(
                                        snapshotSettings.getCheckpointTimeout(),
                                        
snapshotSettings.getMinPauseBetweenCheckpoints(),
                                        
snapshotSettings.getMaxConcurrentCheckpoints(),
+                                       
snapshotSettings.getMaxFailedCheckpoints(),
                                        
snapshotSettings.getExternalizedCheckpointSettings(),
                                        triggerVertices,
                                        ackVertices,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
index 233aa8887de..214acd78e30 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -65,6 +65,23 @@
         */
        private final boolean isExactlyOnce;
 
+       private final int maxFailedCheckpoints;
+
+       public JobSnapshottingSettings(
+               List<JobVertexID> verticesToTrigger,
+               List<JobVertexID> verticesToAcknowledge,
+               List<JobVertexID> verticesToConfirm,
+               long checkpointInterval,
+               long checkpointTimeout,
+               long minPauseBetweenCheckpoints,
+               int maxConcurrentCheckpoints,
+               ExternalizedCheckpointSettings externalizedCheckpointSettings,
+               @Nullable StateBackend defaultStateBackend,
+               boolean isExactlyOnce) {
+               this(verticesToTrigger, verticesToAcknowledge, 
verticesToConfirm, checkpointInterval, checkpointTimeout,
+                       minPauseBetweenCheckpoints, maxConcurrentCheckpoints, 
0, externalizedCheckpointSettings, defaultStateBackend, isExactlyOnce);
+       }
+
        public JobSnapshottingSettings(
                        List<JobVertexID> verticesToTrigger,
                        List<JobVertexID> verticesToAcknowledge,
@@ -73,6 +90,7 @@ public JobSnapshottingSettings(
                        long checkpointTimeout,
                        long minPauseBetweenCheckpoints,
                        int maxConcurrentCheckpoints,
+                       int maxFailedCheckpoints,
                        ExternalizedCheckpointSettings 
externalizedCheckpointSettings,
                        @Nullable StateBackend defaultStateBackend,
                        boolean isExactlyOnce) {
@@ -93,6 +111,7 @@ public JobSnapshottingSettings(
                this.externalizedCheckpointSettings = 
requireNonNull(externalizedCheckpointSettings);
                this.defaultStateBackend = defaultStateBackend;
                this.isExactlyOnce = isExactlyOnce;
+               this.maxFailedCheckpoints = maxFailedCheckpoints;
        }
        
        // 
--------------------------------------------------------------------------------------------
@@ -138,14 +157,16 @@ public boolean isExactlyOnce() {
                return isExactlyOnce;
        }
 
+       public int getMaxFailedCheckpoints() { return maxFailedCheckpoints; }
+
        // 
--------------------------------------------------------------------------------------------
        
        @Override
        public String toString() {
                return String.format("SnapshotSettings: interval=%d, 
timeout=%d, pause-between=%d, " +
-                                               "maxConcurrent=%d, trigger=%s, 
ack=%s, commit=%s",
+                                               "maxConcurrent=%d, trigger=%s, 
ack=%s, commit=%s, maxFailedCheckpoints=%s",
                                                checkpointInterval, 
checkpointTimeout,
                                                minPauseBetweenCheckpoints, 
maxConcurrentCheckpoints,
-                                               verticesToTrigger, 
verticesToAcknowledge, verticesToConfirm);
+                                               verticesToTrigger, 
verticesToAcknowledge, verticesToConfirm, maxFailedCheckpoints);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 16913705efe..0f2268fb632 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -27,10 +27,7 @@
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.*;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
@@ -274,12 +271,14 @@ public void testTriggerAndDeclineCheckpointSimple() {
                        ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2);
 
                        // set up the coordinator and validate the initial state
+                       // set max fail attempts as 3
                        CheckpointCoordinator coord = new CheckpointCoordinator(
                                jid,
                                600000,
                                600000,
                                0,
                                Integer.MAX_VALUE,
+                               2,
                                ExternalizedCheckpointSettings.none(),
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
@@ -354,6 +353,132 @@ public void testTriggerAndDeclineCheckpointSimple() {
                }
        }
 
+       /**
+        * This test triggers a checkpoint and then sends a decline checkpoint 
message from
+        * one of the tasks. We receive multiple decline messages and as a 
result
+        * we fail the executiongraph by throwing a CheckpointException
+        */
+       @Test
+       public void testTriggerAndMulitpleDeclineCheckpointFailure() {
+               try {
+                       final JobID jid = new JobID();
+                       final long timestamp = System.currentTimeMillis();
+
+                       // create some mock Execution vertices that receive the 
checkpoint trigger messages
+                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID attemptID2 = new 
ExecutionAttemptID();
+                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1);
+                       ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2);
+
+                       // set up the coordinator and validate the initial state
+                       // set max fail attempts as 3
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               2,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new ExecutionVertex[] { vertex1, vertex2 },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               Executors.directExecutor());
+
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       // trigger the first checkpoint. this should succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+                       // validate that we have a pending checkpoint
+                       assertEquals(1, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       long checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+                       PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().get(checkpointId);
+
+                       assertNotNull(checkpoint);
+                       assertEquals(checkpointId, 
checkpoint.getCheckpointId());
+                       assertEquals(timestamp, 
checkpoint.getCheckpointTimestamp());
+                       assertEquals(jid, checkpoint.getJobId());
+                       assertEquals(2, 
checkpoint.getNumberOfNonAcknowledgedTasks());
+                       assertEquals(0, 
checkpoint.getNumberOfAcknowledgedTasks());
+                       assertEquals(0, checkpoint.getTaskStates().size());
+                       assertFalse(checkpoint.isDiscarded());
+                       assertFalse(checkpoint.isFullyAcknowledged());
+
+                       // check that the vertices received the trigger 
checkpoint message
+                       
verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(eq(checkpointId),
 eq(timestamp), any(CheckpointOptions.class));
+                       
verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(eq(checkpointId),
 eq(timestamp), any(CheckpointOptions.class));
+
+                       CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, 0L);
+
+                       // acknowledge from one of the tasks
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+                       assertEquals(1, 
checkpoint.getNumberOfAcknowledgedTasks());
+                       assertEquals(1, 
checkpoint.getNumberOfNonAcknowledgedTasks());
+                       assertFalse(checkpoint.isDiscarded());
+                       assertFalse(checkpoint.isFullyAcknowledged());
+
+                       // acknowledge the same task again (should not matter)
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
+                       assertFalse(checkpoint.isDiscarded());
+                       assertFalse(checkpoint.isFullyAcknowledged());
+
+
+                       // decline checkpoint from the other task, this should 
cancel the checkpoint
+                       // and trigger a new one
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId));
+                       assertTrue(checkpoint.isDiscarded());
+
+                       // validate that we have no new pending checkpoint
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       // trigger the first checkpoint. this should succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+                       // validate that we have a pending checkpoint
+                       assertEquals(1, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+                       checkpoint = 
coord.getPendingCheckpoints().get(checkpointId);
+
+                       // decline checkpoint from the other task. Still the 
task should not fail
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId));
+                       assertTrue(checkpoint.isDiscarded());
+
+                       // trigger the first checkpoint. this should succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+                       // validate that we have a pending checkpoint
+                       assertEquals(1, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+                       checkpoint = 
coord.getPendingCheckpoints().get(checkpointId);
+
+                       // decline checkpoint from the other task. This should 
fail as already we have failed
+                       // 2 times
+                       try {
+                               coord.receiveDeclineMessage(new 
DeclineCheckpoint(jid, attemptID1, checkpointId));
+                               fail("Should have thrown checkpointException");
+                       } catch(CheckpointException e) {
+                               // exception should be generated
+                       }
+                       coord.shutdown(JobStatus.FINISHED);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
        /**
         * This test triggers two checkpoints and then sends a decline message 
from one of the tasks
         * for the first checkpoint. This should discard the first checkpoint 
while not triggering
@@ -476,6 +601,164 @@ public void testTriggerAndDeclineCheckpointComplex() {
                }
        }
 
+       /**
+        * Ensures that though we get multiple decline messages the first one is
+        * not considered as a failure because already there are checkpoints in
+        * queue. From the second one onwards we count each decline message as a
+        * failure and finally fail the execution graph by throwing Checkpoint 
exception
+        */
+       @Test
+       public void 
testTriggerAndDeclineCheckpointComplexWithMultipleDeclineMessage() {
+               try {
+                       final JobID jid = new JobID();
+                       final long timestamp = System.currentTimeMillis();
+
+                       // create some mock Execution vertices that receive the 
checkpoint trigger messages
+                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID attemptID2 = new 
ExecutionAttemptID();
+                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1);
+                       ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2);
+
+                       // set up the coordinator and validate the initial state
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               2,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[]{vertex1, vertex2},
+                               new ExecutionVertex[]{vertex1, vertex2},
+                               new ExecutionVertex[]{vertex1, vertex2},
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               Executors.directExecutor());
+
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       // trigger the first checkpoint. this should succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+                       // trigger second checkpoint, should also succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp + 2, 
false));
+
+                       // validate that we have a pending checkpoint
+                       assertEquals(2, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       Iterator<Map.Entry<Long, PendingCheckpoint>> it = 
coord.getPendingCheckpoints().entrySet().iterator();
+                       long checkpoint1Id = it.next().getKey();
+                       long checkpoint2Id = it.next().getKey();
+                       PendingCheckpoint checkpoint1 = 
coord.getPendingCheckpoints().get(checkpoint1Id);
+                       PendingCheckpoint checkpoint2 = 
coord.getPendingCheckpoints().get(checkpoint2Id);
+
+                       assertNotNull(checkpoint1);
+                       assertEquals(checkpoint1Id, 
checkpoint1.getCheckpointId());
+                       assertEquals(timestamp, 
checkpoint1.getCheckpointTimestamp());
+                       assertEquals(jid, checkpoint1.getJobId());
+                       assertEquals(2, 
checkpoint1.getNumberOfNonAcknowledgedTasks());
+                       assertEquals(0, 
checkpoint1.getNumberOfAcknowledgedTasks());
+                       assertEquals(0, checkpoint1.getTaskStates().size());
+                       assertFalse(checkpoint1.isDiscarded());
+                       assertFalse(checkpoint1.isFullyAcknowledged());
+
+                       assertNotNull(checkpoint2);
+                       assertEquals(checkpoint2Id, 
checkpoint2.getCheckpointId());
+                       assertEquals(timestamp + 2, 
checkpoint2.getCheckpointTimestamp());
+                       assertEquals(jid, checkpoint2.getJobId());
+                       assertEquals(2, 
checkpoint2.getNumberOfNonAcknowledgedTasks());
+                       assertEquals(0, 
checkpoint2.getNumberOfAcknowledgedTasks());
+                       assertEquals(0, checkpoint2.getTaskStates().size());
+                       assertFalse(checkpoint2.isDiscarded());
+                       assertFalse(checkpoint2.isFullyAcknowledged());
+
+                       // check that the vertices received the trigger 
checkpoint message
+                       {
+                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), 
any(CheckpointOptions.class));
+                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), 
any(CheckpointOptions.class));
+                       }
+
+                       // check that the vertices received the trigger 
checkpoint message for the second checkpoint
+                       {
+                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), 
any(CheckpointOptions.class));
+                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), 
any(CheckpointOptions.class));
+                       }
+
+                       // decline checkpoint from one of the tasks, this 
should cancel the checkpoint
+                       // and trigger a new one
+                       // decline once. But this should not count because we 
already have another checkpoint
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id));
+                       assertTrue(checkpoint1.isDiscarded());
+
+                       // validate that we have only one pending checkpoint 
left
+                       assertEquals(1, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       // validate that it is the same second checkpoint from 
earlier
+                       long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+                       PendingCheckpoint checkpointNew = 
coord.getPendingCheckpoints().get(checkpointIdNew);
+                       assertEquals(checkpoint2Id, checkpointIdNew);
+                       // trigger the first checkpoint. this should succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp + 4, 
false));
+
+                       // validate that we have a pending checkpoint
+                       assertEquals(2, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       checkpoint2Id = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+                       PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().get(checkpoint2Id);
+
+                       // decline checkpoint from the other task. Still the 
task should not fail
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint2Id));
+                       assertTrue(checkpoint.isDiscarded());
+
+                       // trigger the first checkpoint. this should succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp + 6, 
false));
+
+                       // validate that we have a pending checkpoint
+                       assertEquals(2, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       checkpoint2Id = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+                       checkpoint = 
coord.getPendingCheckpoints().get(checkpoint2Id);
+
+                       // decline checkpoint from the other task. This should 
not fail still because
+                       // the first one was not accounted for.
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint2Id));
+                       assertTrue(checkpoint.isDiscarded());
+
+                       // trigger the first checkpoint. this should succeed
+                       assertTrue(coord.triggerCheckpoint(timestamp + 8, 
false));
+
+                       // validate that we have a pending checkpoint
+                       assertEquals(2, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+                       checkpoint2Id = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+                       checkpoint = 
coord.getPendingCheckpoints().get(checkpoint2Id);
+
+                       // decline checkpoint from the other task. This should 
not fail still because
+                       // the first one was not accounted for.
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint2Id));
+                       assertTrue(checkpoint.isDiscarded());
+                       checkpoint2Id = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+                       try {
+                               coord.receiveDeclineMessage(new 
DeclineCheckpoint(jid, attemptID1, checkpoint2Id));
+                               fail("Should have thrown checkpointException");
+                       } catch (CheckpointException e) {
+                               // exception should be generated
+                       }
+
+                       coord.shutdown(JobStatus.FINISHED);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
        @Test
        public void testTriggerAndConfirmSimpleCheckpoint() {
                try {
@@ -1026,6 +1309,7 @@ public void testStateCleanupForLateOrUnknownMessages() 
throws Exception {
                        20000L,
                        0L,
                        1,
+                       2,
                        ExternalizedCheckpointSettings.none(),
                        new ExecutionVertex[] { triggerVertex },
                        new ExecutionVertex[] {triggerVertex, ackVertex1, 
ackVertex2},
@@ -2632,7 +2916,7 @@ private static ExecutionVertex mockExecutionVertex(
                ExecutionState ... successiveStates) {
 
                ExecutionVertex vertex = mock(ExecutionVertex.class);
-
+               ExecutionGraph graph = mock(ExecutionGraph.class);
                final Execution exec = spy(new Execution(
                        mock(Executor.class),
                        vertex,
@@ -2642,11 +2926,11 @@ private static ExecutionVertex mockExecutionVertex(
                ));
                when(exec.getAttemptId()).thenReturn(attemptID);
                when(exec.getState()).thenReturn(state, successiveStates);
-
                when(vertex.getJobvertexId()).thenReturn(jobVertexID);
                when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
                
when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(parallelism);
                when(vertex.getMaxParallelism()).thenReturn(maxParallelism);
+               when(vertex.getExecutionGraph()).thenReturn(graph);
 
                return vertex;
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 98b4c4deeeb..f3b2de6138d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -113,6 +113,7 @@ private ExecutionGraph 
createExecutionGraphAndEnableCheckpointing(
                                100,
                                100,
                                1,
+                               0,
                                ExternalizedCheckpointSettings.none(),
                                Collections.<ExecutionJobVertex>emptyList(),
                                Collections.<ExecutionJobVertex>emptyList(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 077ab53ac16..5087b3d45e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -124,6 +124,7 @@ public static void setupExecutionGraph() throws Exception {
                        100,
                        100,
                        1,
+                       0,
                        ExternalizedCheckpointSettings.none(),
                        Collections.<ExecutionJobVertex>emptyList(),
                        Collections.<ExecutionJobVertex>emptyList(),
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index eb7833a636e..9b54138b4f0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -47,6 +47,8 @@
        /** The default limit of concurrently happening checkpoints: one */
        public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
 
+       public static final int DEFAULT_MAX_UNSUCCESSFUL_CHECKPOINTS = 0;
+
        // 
------------------------------------------------------------------------
 
        /** Checkpointing mode (exactly-once vs. at-least-once). */
@@ -70,6 +72,8 @@
        /** Cleanup behaviour for persistent checkpoints. */
        private ExternalizedCheckpointCleanup externalizedCheckpointCleanup;
 
+       /** the maximum number of unsuccessful checkpoints **/
+       private int maxFailedCheckpoints = DEFAULT_MAX_UNSUCCESSFUL_CHECKPOINTS;
        // 
------------------------------------------------------------------------
 
        /**
@@ -204,6 +208,17 @@ public void setMaxConcurrentCheckpoints(int 
maxConcurrentCheckpoints) {
                this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
        }
 
+       public void setMaxFailedCheckpoints(int maxFailedCheckpoints) {
+               if(maxFailedCheckpoints < 0 ) {
+                       throw new IllegalArgumentException("The maximum number 
of unsuccessful checkpoint attempts must be at least 0.");
+               }
+               this.maxFailedCheckpoints = maxFailedCheckpoints;
+       }
+
+       public int getMaxFailedCheckpoints() {
+               return this.maxFailedCheckpoints;
+       }
+
        /**
         * Checks whether checkpointing is forced, despite currently 
non-checkpointable iteration feedback.
         * 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 003eff90914..f84cebe6bb3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -537,7 +537,7 @@ private void configureCheckpointing() {
                JobSnapshottingSettings settings = new JobSnapshottingSettings(
                                triggerVertices, ackVertices, commitVertices, 
interval,
                                cfg.getCheckpointTimeout(), 
cfg.getMinPauseBetweenCheckpoints(),
-                               cfg.getMaxConcurrentCheckpoints(),
+                               cfg.getMaxConcurrentCheckpoints(), 
cfg.getMaxFailedCheckpoints(),
                                externalizedCheckpointSettings,
                                streamGraph.getStateBackend(),
                                isExactlyOnce);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful 
> checkpoints
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-4810
>                 URL: https://issues.apache.org/jira/browse/FLINK-4810
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Stephan Ewen
>            Priority: Major
>              Labels: pull-request-available
>
> The Checkpoint coordinator should track the number of consecutive 
> unsuccessful checkpoints.
> If more than {{n}} (configured value) checkpoints fail in a row, it should 
> call {{fail()}} on the execution graph to trigger a recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to