Repository: flink
Updated Branches:
  refs/heads/master d5f2647af -> 7e9557afe


[FLINK-6027] [checkpoints] Suppress (and log)  exceptions thrown by the 
subsuming of completed checkppoints

This closes #3521


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ccf4c4f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ccf4c4f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ccf4c4f

Branch: refs/heads/master
Commit: 7ccf4c4f97452b5340b5b610a371f74c219a8286
Parents: d5f2647
Author: xiaogang.sxg <[email protected]>
Authored: Mon Mar 13 17:03:42 2017 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Mon Mar 20 13:21:31 2017 +0100

----------------------------------------------------------------------
 .../StandaloneCompletedCheckpointStore.java     |  6 +-
 .../ZooKeeperCompletedCheckpointStore.java      |  6 +-
 .../StandaloneCompletedCheckpointStoreTest.java | 36 ++++++++++++
 .../ZooKeeperCompletedCheckpointStoreTest.java  | 61 ++++++++++++++++++++
 4 files changed, 107 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ccf4c4f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 6c752f2..6eb5242 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -64,7 +64,11 @@ public class StandaloneCompletedCheckpointStore implements 
CompletedCheckpointSt
        public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
                checkpoints.add(checkpoint);
                if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
-                       checkpoints.remove().subsume();
+                       try {
+                               checkpoints.remove().subsume();
+                       } catch (Exception e) {
+                               LOG.warn("Fail to subsume the old checkpoint.", 
e);
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7ccf4c4f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 1319c27..af7bcc4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -188,7 +188,11 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
 
                // Everything worked, let's remove a previous checkpoint if 
necessary.
                while (checkpointStateHandles.size() > 
maxNumberOfCheckpointsToRetain) {
-                       removeSubsumed(checkpointStateHandles.removeFirst());
+                       try {
+                               
removeSubsumed(checkpointStateHandles.removeFirst());
+                       } catch (Exception e) {
+                               LOG.warn("Failed to subsume the old 
checkpoint", e);
+                       }
                }
 
                LOG.debug("Added {} to {}.", checkpoint, path);

http://git-wip-us.apache.org/repos/asf/flink/blob/7ccf4c4f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index 4e9366e..cc7b2d0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -21,8 +21,15 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.List;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.mock;
 
 /**
  * Tests for basic {@link CompletedCheckpointStore} contract.
@@ -70,4 +77,33 @@ public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointS
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
                assertTrue(checkpoint.isDiscarded());
        }
+       
+       /**
+        * Tests that the checkpoint does not exist in the store when we fail 
to add
+        * it into the store (i.e., there exists an exception thrown by the 
method).
+        */
+       @Test
+       public void testAddCheckpointWithFailedRemove() throws Exception {
+               
+               final int numCheckpointsToRetain = 1;
+               CompletedCheckpointStore store = 
createCompletedCheckpoints(numCheckpointsToRetain);
+               
+               for (long i = 0; i <= numCheckpointsToRetain; ++i) {
+                       CompletedCheckpoint checkpointToAdd = 
mock(CompletedCheckpoint.class);
+                       doReturn(i).when(checkpointToAdd).getCheckpointID();
+                       doThrow(new 
IOException()).when(checkpointToAdd).subsume();
+                       
+                       try {
+                               store.addCheckpoint(checkpointToAdd);
+                               
+                               // The checkpoint should be in the store if we 
successfully add it into the store.
+                               List<CompletedCheckpoint> addedCheckpoints = 
store.getAllCheckpoints();
+                               
assertTrue(addedCheckpoints.contains(checkpointToAdd));
+                       } catch (Exception e) {
+                               // The checkpoint should not be in the store if 
any exception is thrown.
+                               List<CompletedCheckpoint> addedCheckpoints = 
store.getAllCheckpoints();
+                               
assertFalse(addedCheckpoints.contains(checkpointToAdd));
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ccf4c4f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 66956e6..aa2ec85 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -45,6 +45,8 @@ import java.util.List;
 import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -54,6 +56,8 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 @RunWith(PowerMockRunner.class)
@@ -180,4 +184,61 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
                // check that we have discarded the state handles which could 
not be retrieved
                verify(failingRetrievableStateHandle, times(2)).discardState();
        }
+       
+       /**
+        * Tests that the checkpoint does not exist in the store when we fail 
to add
+        * it into the store (i.e., there exists an exception thrown by the 
method).
+        */
+       @Test
+       public void testAddCheckpointWithFailedRemove() throws Exception {
+               final CuratorFramework client = mock(CuratorFramework.class, 
Mockito.RETURNS_DEEP_STUBS);
+               final RetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelperMock = mock(RetrievableStateStorageHelper.class);
+               
+               ZooKeeperStateHandleStore<CompletedCheckpoint> 
zookeeperStateHandleStoreMock = 
+                       spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock, Executors.directExecutor()));
+               
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
+               
+               doAnswer(new 
Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
+                       @Override
+                       public RetrievableStateHandle<CompletedCheckpoint> 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                               CompletedCheckpoint checkpoint = 
(CompletedCheckpoint)invocationOnMock.getArguments()[1];
+                               
+                               RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle = mock(RetrievableStateHandle.class);
+                               
when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
+                               
+                               return retrievableStateHandle;
+                       }
+               }).when(zookeeperStateHandleStoreMock).add(anyString(), 
any(CompletedCheckpoint.class));
+               
+               doThrow(new 
Exception()).when(zookeeperStateHandleStoreMock).remove(anyString(), 
any(BackgroundCallback.class));
+               
+               final int numCheckpointsToRetain = 1;
+               final String checkpointsPath = "foobar";
+               final RetrievableStateStorageHelper<CompletedCheckpoint> 
stateSotrage = mock(RetrievableStateStorageHelper.class);
+               
+               ZooKeeperCompletedCheckpointStore 
zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
+                       numCheckpointsToRetain,
+                       client,
+                       checkpointsPath,
+                       stateSotrage,
+                       Executors.directExecutor());
+               
+               
+               for (long i = 0; i <= numCheckpointsToRetain; ++i) {
+                       CompletedCheckpoint checkpointToAdd = 
mock(CompletedCheckpoint.class);
+                       doReturn(i).when(checkpointToAdd).getCheckpointID();
+                       
+                       try {
+                               
zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd);
+                               
+                               // The checkpoint should be in the store if we 
successfully add it into the store.
+                               List<CompletedCheckpoint> addedCheckpoints = 
zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+                               
assertTrue(addedCheckpoints.contains(checkpointToAdd));
+                       } catch (Exception e) {
+                               // The checkpoint should not be in the store if 
any exception is thrown.
+                               List<CompletedCheckpoint> addedCheckpoints = 
zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+                               
assertFalse(addedCheckpoints.contains(checkpointToAdd));
+                       }
+               }
+       }
 }

Reply via email to