Repository: flink Updated Branches: refs/heads/master d5f2647af -> 7e9557afe
[FLINK-6027] [checkpoints] Suppress (and log) exceptions thrown by the subsuming of completed checkppoints This closes #3521 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ccf4c4f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ccf4c4f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ccf4c4f Branch: refs/heads/master Commit: 7ccf4c4f97452b5340b5b610a371f74c219a8286 Parents: d5f2647 Author: xiaogang.sxg <[email protected]> Authored: Mon Mar 13 17:03:42 2017 +0800 Committer: Stephan Ewen <[email protected]> Committed: Mon Mar 20 13:21:31 2017 +0100 ---------------------------------------------------------------------- .../StandaloneCompletedCheckpointStore.java | 6 +- .../ZooKeeperCompletedCheckpointStore.java | 6 +- .../StandaloneCompletedCheckpointStoreTest.java | 36 ++++++++++++ .../ZooKeeperCompletedCheckpointStoreTest.java | 61 ++++++++++++++++++++ 4 files changed, 107 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7ccf4c4f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index 6c752f2..6eb5242 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -64,7 +64,11 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { checkpoints.add(checkpoint); if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { - checkpoints.remove().subsume(); + try { + checkpoints.remove().subsume(); + } catch (Exception e) { + LOG.warn("Fail to subsume the old checkpoint.", e); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ccf4c4f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 1319c27..af7bcc4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -188,7 +188,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto // Everything worked, let's remove a previous checkpoint if necessary. while (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) { - removeSubsumed(checkpointStateHandles.removeFirst()); + try { + removeSubsumed(checkpointStateHandles.removeFirst()); + } catch (Exception e) { + LOG.warn("Failed to subsume the old checkpoint", e); + } } LOG.debug("Added {} to {}.", checkpoint, path); http://git-wip-us.apache.org/repos/asf/flink/blob/7ccf4c4f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java index 4e9366e..cc7b2d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java @@ -21,8 +21,15 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobStatus; import org.junit.Test; +import java.io.IOException; +import java.util.List; + import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.powermock.api.mockito.PowerMockito.doReturn; +import static org.powermock.api.mockito.PowerMockito.doThrow; +import static org.powermock.api.mockito.PowerMockito.mock; /** * Tests for basic {@link CompletedCheckpointStore} contract. @@ -70,4 +77,33 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS assertEquals(0, store.getNumberOfRetainedCheckpoints()); assertTrue(checkpoint.isDiscarded()); } + + /** + * Tests that the checkpoint does not exist in the store when we fail to add + * it into the store (i.e., there exists an exception thrown by the method). + */ + @Test + public void testAddCheckpointWithFailedRemove() throws Exception { + + final int numCheckpointsToRetain = 1; + CompletedCheckpointStore store = createCompletedCheckpoints(numCheckpointsToRetain); + + for (long i = 0; i <= numCheckpointsToRetain; ++i) { + CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class); + doReturn(i).when(checkpointToAdd).getCheckpointID(); + doThrow(new IOException()).when(checkpointToAdd).subsume(); + + try { + store.addCheckpoint(checkpointToAdd); + + // The checkpoint should be in the store if we successfully add it into the store. + List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints(); + assertTrue(addedCheckpoints.contains(checkpointToAdd)); + } catch (Exception e) { + // The checkpoint should not be in the store if any exception is thrown. + List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints(); + assertFalse(addedCheckpoints.contains(checkpointToAdd)); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ccf4c4f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index 66956e6..aa2ec85 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -45,6 +45,8 @@ import java.util.List; import java.util.concurrent.Executor; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; @@ -54,6 +56,8 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.doThrow; import static org.powermock.api.mockito.PowerMockito.whenNew; @RunWith(PowerMockRunner.class) @@ -180,4 +184,61 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { // check that we have discarded the state handles which could not be retrieved verify(failingRetrievableStateHandle, times(2)).discardState(); } + + /** + * Tests that the checkpoint does not exist in the store when we fail to add + * it into the store (i.e., there exists an exception thrown by the method). + */ + @Test + public void testAddCheckpointWithFailedRemove() throws Exception { + final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS); + final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = mock(RetrievableStateStorageHelper.class); + + ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock = + spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor())); + whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock); + + doAnswer(new Answer<RetrievableStateHandle<CompletedCheckpoint>>() { + @Override + public RetrievableStateHandle<CompletedCheckpoint> answer(InvocationOnMock invocationOnMock) throws Throwable { + CompletedCheckpoint checkpoint = (CompletedCheckpoint)invocationOnMock.getArguments()[1]; + + RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle = mock(RetrievableStateHandle.class); + when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint); + + return retrievableStateHandle; + } + }).when(zookeeperStateHandleStoreMock).add(anyString(), any(CompletedCheckpoint.class)); + + doThrow(new Exception()).when(zookeeperStateHandleStoreMock).remove(anyString(), any(BackgroundCallback.class)); + + final int numCheckpointsToRetain = 1; + final String checkpointsPath = "foobar"; + final RetrievableStateStorageHelper<CompletedCheckpoint> stateSotrage = mock(RetrievableStateStorageHelper.class); + + ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore( + numCheckpointsToRetain, + client, + checkpointsPath, + stateSotrage, + Executors.directExecutor()); + + + for (long i = 0; i <= numCheckpointsToRetain; ++i) { + CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class); + doReturn(i).when(checkpointToAdd).getCheckpointID(); + + try { + zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd); + + // The checkpoint should be in the store if we successfully add it into the store. + List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints(); + assertTrue(addedCheckpoints.contains(checkpointToAdd)); + } catch (Exception e) { + // The checkpoint should not be in the store if any exception is thrown. + List<CompletedCheckpoint> addedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints(); + assertFalse(addedCheckpoints.contains(checkpointToAdd)); + } + } + } }
