[FLINK-5158] [ckPtCoord] Handle exceptions from CompletedCheckpointStore in 
CheckpointCoordinator

Handle exceptions from the CompletedCheckpointStore properly in the 
CheckpointCoordinator. This
means that in case of an exception, the completed checkpoint will be properly 
cleaned up and also
the triggering of subsequent checkpoints will be started.

Fix failing SavepointCoordinatorTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b734d7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b734d7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b734d7b

Branch: refs/heads/release-1.1
Commit: 4b734d7b8726200e5293c32f2cb9e8c77db4d378
Parents: d314bc5
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Nov 24 18:16:28 2016 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Dec 1 18:00:53 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 143 +++++++++++-------
 .../runtime/checkpoint/CheckpointException.java |  35 +++++
 .../runtime/checkpoint/PendingCheckpoint.java   |  16 +--
 .../CheckpointCoordinatorFailureTest.java       | 144 +++++++++++++++++++
 .../checkpoint/CheckpointCoordinatorTest.java   |   4 +-
 5 files changed, 275 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0d09922..74e6d08 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -671,24 +671,17 @@ public class CheckpointCoordinator {
         *
         * @throws Exception If the checkpoint cannot be added to the completed 
checkpoint store.
         */
-       public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) 
throws Exception {
+       public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) 
throws CheckpointException {
                if (shutdown || message == null) {
                        return false;
                }
                if (!job.equals(message.getJob())) {
-                       LOG.error("Received AcknowledgeCheckpoint message for 
wrong job: {}", message);
+                       LOG.error("Received wrong AcknowledgeCheckpoint message 
for job {}: {}", job, message);
                        return false;
                }
 
                final long checkpointId = message.getCheckpointId();
 
-               CompletedCheckpoint completed = null;
-               PendingCheckpoint checkpoint;
-
-               // Flag indicating whether the ack message was for a known 
pending
-               // checkpoint.
-               boolean isPendingCheckpoint;
-
                synchronized (lock) {
                        // we need to check inside the lock for being shutdown 
as well, otherwise we
                        // get races and invalid error log messages
@@ -696,45 +689,16 @@ public class CheckpointCoordinator {
                                return false;
                        }
 
-                       checkpoint = pendingCheckpoints.get(checkpointId);
+                       final PendingCheckpoint checkpoint = 
pendingCheckpoints.get(checkpointId);
 
                        if (checkpoint != null && !checkpoint.isDiscarded()) {
-                               isPendingCheckpoint = true;
 
                                switch 
(checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), 
message.getStateSize(), null)) {
                                        case SUCCESS:
 
                                                if 
(checkpoint.isFullyAcknowledged()) {
-
-                                                       
lastCheckpointCompletionNanos = System.nanoTime();
-                                                       completed = 
checkpoint.finalizeCheckpoint();
-
-                                                       
completedCheckpointStore.addCheckpoint(completed);
-
-                                                       LOG.info("Completed 
checkpoint " + checkpointId + " (in " +
-                                                               
completed.getDuration() + " ms)");
-
-                                                       if 
(LOG.isDebugEnabled()) {
-                                                               StringBuilder 
builder = new StringBuilder();
-                                                               
builder.append("Checkpoint state: ");
-                                                               for (TaskState 
state : completed.getTaskStates().values()) {
-                                                                       
builder.append(state);
-                                                                       
builder.append(", ");
-                                                               }
-                                                               // Remove last 
two chars ", "
-                                                               
builder.delete(builder.length() - 2, builder.length());
-
-                                                               
LOG.debug(builder.toString());
-                                                       }
-
-                                                       
pendingCheckpoints.remove(checkpointId);
-                                                       
rememberRecentCheckpointId(checkpointId);
-
-                                                       
dropSubsumedCheckpoints(completed.getTimestamp());
-
-                                                       
onFullyAcknowledgedCheckpoint(completed);
-
-                                                       triggerQueuedRequests();
+                                                       
completePendingCheckpoint(checkpoint);
+                                                       
                                                }
                                                break;
                                        case DUPLICATE:
@@ -757,6 +721,8 @@ public class CheckpointCoordinator {
 
                                                
discardState(message.getState());
                                }
+
+                               return true;
                        }
                        else if (checkpoint != null) {
                                // this should not happen
@@ -764,39 +730,108 @@ public class CheckpointCoordinator {
                                                "Received message for discarded 
but non-removed checkpoint " + checkpointId);
                        }
                        else {
+                               boolean wasPendingCheckpoint;
+
                                // message is for an unknown checkpoint, or 
comes too late (checkpoint disposed)
                                if 
(recentPendingCheckpoints.contains(checkpointId)) {
-                                       isPendingCheckpoint = true;
+                                       wasPendingCheckpoint = true;
                                        LOG.warn("Received late message for now 
expired checkpoint attempt {}.", checkpointId);
                                }
                                else {
                                        LOG.debug("Received message for an 
unknown checkpoint {}.", checkpointId);
-                                       isPendingCheckpoint = false;
+                                       wasPendingCheckpoint = false;
                                }
 
                                // try to discard the state so that we don't 
have lingering state lying around
                                discardState(message.getState());
+
+                               return wasPendingCheckpoint;
+                       }
+               }
+       }
+
+       /**
+        * Try to complete the given pending checkpoint.
+        *
+        * Important: This method should only be called in the checkpoint lock 
scope.
+        *
+        * @param pendingCheckpoint to complete
+        * @throws CheckpointException if the completion failed
+        */
+       private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint) throws CheckpointException {
+               // we have to be called in the checkpoint lock scope
+               assert(Thread.holdsLock(lock));
+
+               final long checkpointId = pendingCheckpoint.getCheckpointId();
+               CompletedCheckpoint completedCheckpoint = null;
+
+               try {
+                       completedCheckpoint = 
pendingCheckpoint.finalizeCheckpoint();                   
+
+                       
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+
+                       rememberRecentCheckpointId(checkpointId);
+                       
dropSubsumedCheckpoints(completedCheckpoint.getTimestamp());
+
+                       onFullyAcknowledgedCheckpoint(completedCheckpoint);
+               } catch (Exception exception) {
+                       // abort the current pending checkpoint if it has not 
been discarded yet
+                       if (!pendingCheckpoint.isDiscarded()) {
+                               pendingCheckpoint.discard(userClassLoader);
+                       }
+
+                       if (completedCheckpoint != null) {
+                               // we failed to store the completed checkpoint. 
Let's clean up
+                               final CompletedCheckpoint cc = 
completedCheckpoint;
+
+                               executor.execute(new Runnable() {
+                                       @Override
+                                       public void run() {
+                                               try {
+                                                       
cc.discard(userClassLoader);
+                                               } catch (Exception 
nestedException) {
+                                                       LOG.warn("Could not 
properly discard completed checkpoint {}.", cc.getCheckpointID(), 
nestedException);
+                                               }
+                                       }
+                               });
                        }
+
+                       throw new CheckpointException("Could not complete the 
pending checkpoint " + checkpointId + '.', exception);
+               } finally {
+                       pendingCheckpoints.remove(checkpointId);
+
+                       triggerQueuedRequests();
                }
+               
+               lastCheckpointCompletionNanos = System.nanoTime();
+
+               LOG.info("Completed checkpoint {} (in {} ms).", checkpointId, 
completedCheckpoint.getDuration());
 
-               // send the confirmation messages to the necessary targets. we 
do this here
-               // to be outside the lock scope
-               if (completed != null) {
-                       final long timestamp = completed.getTimestamp();
+               if (LOG.isDebugEnabled()) {
+                       StringBuilder builder = new StringBuilder();
+                       builder.append("Checkpoint state: ");
+                       for (TaskState state : 
completedCheckpoint.getTaskStates().values()) {
+                               builder.append(state);
+                               builder.append(", ");
+                       }
+                       // Remove last two chars ", "
+                       builder.delete(builder.length() - 2, builder.length());
+
+                       LOG.debug(builder.toString());
+               }
 
-                       for (ExecutionVertex ev : tasksToCommitTo) {
-                               Execution ee = ev.getCurrentExecutionAttempt();
-                               if (ee != null) {
+               final long timestamp = completedCheckpoint.getTimestamp();
+
+               for (ExecutionVertex ev : tasksToCommitTo) {
+                       Execution ee = ev.getCurrentExecutionAttempt();
+                       if (ee != null) {
                                        ExecutionAttemptID attemptId = 
ee.getAttemptId();
                                        NotifyCheckpointComplete notifyMessage 
= new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
                                        
ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
-                               }
                        }
-
-                       statsTracker.onCompletedCheckpoint(completed);
                }
 
-               return isPendingCheckpoint;
+               statsTracker.onCompletedCheckpoint(completedCheckpoint);
        }
 
        private void rememberRecentCheckpointId(long id) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
new file mode 100644
index 0000000..707878c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * Base class for checkpoint related exceptions.
+ */
+public class CheckpointException extends Exception {
+
+       private static final long serialVersionUID = -4341865597039002540L;
+
+       public CheckpointException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public CheckpointException(String message) {
+               super(message);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 22ba9f2..6f185bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -129,12 +129,10 @@ public class PendingCheckpoint {
                return discarded;
        }
        
-       public CompletedCheckpoint finalizeCheckpoint() throws Exception {
+       public CompletedCheckpoint finalizeCheckpoint() {
                synchronized (lock) {
-                       if (discarded) {
-                               throw new IllegalStateException("pending 
checkpoint is discarded");
-                       }
-                       if (notYetAcknowledgedTasks.isEmpty()) {
+                       Preconditions.checkState(isFullyAcknowledged(), 
"Pending checkpoint has not been fully acknowledged yet.");
+
                                CompletedCheckpoint completed =  new 
CompletedCheckpoint(
                                        jobId,
                                        checkpointId,
@@ -144,10 +142,6 @@ public class PendingCheckpoint {
                                dispose(null, false);
                                
                                return completed;
-                       }
-                       else {
-                               throw new IllegalStateException("Cannot 
complete checkpoint while not all tasks are acknowledged");
-                       }
                }
        }
        
@@ -237,10 +231,9 @@ public class PendingCheckpoint {
 
        private void dispose(final ClassLoader userClassLoader, boolean 
releaseState) {
                synchronized (lock) {
-                       discarded = true;
                        numAcknowledgedTasks = -1;
                        try {
-                               if (releaseState) {
+                               if (!discarded && releaseState) {
                                        executor.execute(new Runnable() {
                                                @Override
                                                public void run() {
@@ -257,6 +250,7 @@ public class PendingCheckpoint {
 
                                }
                        } finally {
+                               discarded = true;
                                taskStates.clear();
                                notYetAcknowledgedTasks.clear();
                                acknowledgedTasks.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
new file mode 100644
index 0000000..e74bbd8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.util.TestExecutors;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(PendingCheckpoint.class)
+public class CheckpointCoordinatorFailureTest extends TestLogger {
+
+       /**
+        * Tests that a failure while storing a completed checkpoint in the 
completed checkpoint store
+        * will properly fail the originating pending checkpoint and clean upt 
the completed checkpoint.
+        */
+       @Test
+       public void testFailingCompletedCheckpointStoreAdd() throws Exception {
+               JobID jid = new JobID();
+
+               final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
+               final ExecutionVertex vertex = 
CheckpointCoordinatorTest.mockExecutionVertex(executionAttemptId);
+
+               final long triggerTimestamp = 1L;
+
+               // set up the coordinator and validate the initial state
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jid,
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       42,
+                       new ExecutionVertex[]{vertex},
+                       new ExecutionVertex[]{vertex},
+                       new ExecutionVertex[]{vertex},
+                       getClass().getClassLoader(),
+                       new StandaloneCheckpointIDCounter(),
+                       new FailingCompletedCheckpointStore(),
+                       null,
+                       new DisabledCheckpointStatsTracker(),
+                       TestExecutors.directExecutor());
+
+               coord.triggerCheckpoint(triggerTimestamp);
+
+               assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+               PendingCheckpoint pendingCheckpoint = 
coord.getPendingCheckpoints().values().iterator().next();
+
+               assertFalse(pendingCheckpoint.isDiscarded());
+
+               final long checkpointId 
=coord.getPendingCheckpoints().keySet().iterator().next();
+
+               AcknowledgeCheckpoint acknowledgeMessage = new 
AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId);
+
+               CompletedCheckpoint completedCheckpoint = 
mock(CompletedCheckpoint.class);
+               
PowerMockito.whenNew(CompletedCheckpoint.class).withAnyArguments().thenReturn(completedCheckpoint);
+
+               try {
+                       coord.receiveAcknowledgeMessage(acknowledgeMessage);
+                       fail("Expected a checkpoint exception because the 
completed checkpoint store could not " +
+                               "store the completed checkpoint.");
+               } catch (CheckpointException e) {
+                       // ignore because we expected this exception
+               }
+
+               // make sure that the pending checkpoint has been discarded 
after we could not complete it
+               assertTrue(pendingCheckpoint.isDiscarded());
+
+               
verify(completedCheckpoint).discard(getClass().getClassLoader());
+       }
+
+       private static final class FailingCompletedCheckpointStore implements 
CompletedCheckpointStore {
+
+               @Override
+               public void recover() throws Exception {
+                       throw new UnsupportedOperationException("Not 
implemented.");
+               }
+
+               @Override
+               public void addCheckpoint(CompletedCheckpoint checkpoint) 
throws Exception {
+                       throw new Exception("The failing completed checkpoint 
store failed again... :-(");
+               }
+
+               @Override
+               public CompletedCheckpoint getLatestCheckpoint() throws 
Exception {
+                       throw new UnsupportedOperationException("Not 
implemented.");
+               }
+
+               @Override
+               public void shutdown() throws Exception {
+                       throw new UnsupportedOperationException("Not 
implemented.");
+               }
+
+               @Override
+               public void suspend() throws Exception {
+                       throw new UnsupportedOperationException("Not 
implemented.");
+               }
+
+               @Override
+               public List<CompletedCheckpoint> getAllCheckpoints() throws 
Exception {
+                       throw new UnsupportedOperationException("Not 
implemented.");
+               }
+
+               @Override
+               public int getNumberOfRetainedCheckpoints() {
+                       return -1;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b734d7b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 9159711..d02e48f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1500,7 +1500,7 @@ public class CheckpointCoordinatorTest {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID 
attemptID) {
+       static ExecutionVertex mockExecutionVertex(ExecutionAttemptID 
attemptID) {
                return mockExecutionVertex(attemptID, 1);
        }
 
@@ -1508,7 +1508,7 @@ public class CheckpointCoordinatorTest {
                return mockExecutionVertex(attemptId, ExecutionState.RUNNING, 
parallelism);
        }
 
-       private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID 
attemptID, 
+       private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID 
attemptID,
                                                                                
                                ExecutionState state, ExecutionState ... 
successiveStates) {
                return mockExecutionVertex(attemptID, state, 1, 
successiveStates);
        }

Reply via email to