Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4863#discussion_r145905621
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
---
@@ -183,11 +179,120 @@ public Void answer(InvocationOnMock invocation)
throws Throwable {
// check that we did not discard any of the state handles which
were retrieved
verify(retrievableStateHandle1, never()).discardState();
verify(retrievableStateHandle2, never()).discardState();
+ }
+
+ /**
+ * Tests that the completed checkpoint store can retrieve all
checkpoints stored in ZooKeeper
+ * and ignores those which cannot be retrieved via their state handles.
+ */
+ @Test
+ public void testCheckpointRecoveryWithBrokenCheckpoints() throws
Exception {
+ final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>,
String>> checkpointsInZooKeeper = new ArrayList<>(4);
+
+ final CompletedCheckpoint completedCheckpoint1 =
mock(CompletedCheckpoint.class);
+ when(completedCheckpoint1.getCheckpointID()).thenReturn(1L);
+ final CompletedCheckpoint completedCheckpoint2 =
mock(CompletedCheckpoint.class);
+ when(completedCheckpoint2.getCheckpointID()).thenReturn(2L);
+
+ final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
+ expectedCheckpointIds.add(1L);
+ expectedCheckpointIds.add(2L);
+
+ final RetrievableStateHandle<CompletedCheckpoint>
failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
+
when(failingRetrievableStateHandle.retrieveState()).thenThrow(new
IOException("Test exception"));
+
+ final RetrievableStateHandle<CompletedCheckpoint>
retrievableStateHandle1 = mock(RetrievableStateHandle.class);
+
when(retrievableStateHandle1.retrieveState()).thenReturn(completedCheckpoint1);
+
+ final RetrievableStateHandle<CompletedCheckpoint>
retrievableStateHandle2 = mock(RetrievableStateHandle.class);
+
when(retrievableStateHandle2.retrieveState()).thenReturn(completedCheckpoint2);
+
+
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle,
"/a_failing1"));
+ checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1,
"/b_foobar1"));
+
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle,
"/c_failing2"));
+ checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2,
"/d_foobar2"));
+
+ final CuratorFramework client = mock(CuratorFramework.class,
Mockito.RETURNS_DEEP_STUBS);
+ final RetrievableStateStorageHelper<CompletedCheckpoint>
storageHelperMock = mock(RetrievableStateStorageHelper.class);
+
+ ZooKeeperStateHandleStore<CompletedCheckpoint>
zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client,
storageHelperMock, Executors.directExecutor()));
+
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
+
doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
+
+ final int numCheckpointsToRetain = 1;
+
+ // Mocking for the delete operation on the CuratorFramework
client
+ // It assures that the callback is executed synchronously
+
+ final EnsurePath ensurePathMock = mock(EnsurePath.class);
--- End diff --
I agree with Stefan, that this test looks really hard to maintain. I think
the underlying problem is that `ZooKeeperCompletedCheckpointStore` uses
internally the `ZooKeeperStateHandleStore` which is hardwired. I guess it would
be better if we had a proper interface which can be passed to the
`ZooKeeperCompletedCheckpointStore`. Moreover I think, the current design is a
bad separation of concerns wrt how `ZooKeeperCompletedCheckpointStore` and the
`ZooKeeperStateHandleStore` work together. My gut feeling is that
`ZooKeeperCompletedCheckpointStore` does not have to know anything about
`ZooKeeper` at all. Fixing this, should make the components easier to test as
well.
---