[
https://issues.apache.org/jira/browse/FLINK-7783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212351#comment-16212351
]
ASF GitHub Bot commented on FLINK-7783:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4863#discussion_r145905621
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
---
@@ -183,11 +179,120 @@ public Void answer(InvocationOnMock invocation)
throws Throwable {
// check that we did not discard any of the state handles which
were retrieved
verify(retrievableStateHandle1, never()).discardState();
verify(retrievableStateHandle2, never()).discardState();
+ }
+
+ /**
+ * Tests that the completed checkpoint store can retrieve all
checkpoints stored in ZooKeeper
+ * and ignores those which cannot be retrieved via their state handles.
+ */
+ @Test
+ public void testCheckpointRecoveryWithBrokenCheckpoints() throws
Exception {
+ final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>,
String>> checkpointsInZooKeeper = new ArrayList<>(4);
+
+ final CompletedCheckpoint completedCheckpoint1 =
mock(CompletedCheckpoint.class);
+ when(completedCheckpoint1.getCheckpointID()).thenReturn(1L);
+ final CompletedCheckpoint completedCheckpoint2 =
mock(CompletedCheckpoint.class);
+ when(completedCheckpoint2.getCheckpointID()).thenReturn(2L);
+
+ final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
+ expectedCheckpointIds.add(1L);
+ expectedCheckpointIds.add(2L);
+
+ final RetrievableStateHandle<CompletedCheckpoint>
failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
+
when(failingRetrievableStateHandle.retrieveState()).thenThrow(new
IOException("Test exception"));
+
+ final RetrievableStateHandle<CompletedCheckpoint>
retrievableStateHandle1 = mock(RetrievableStateHandle.class);
+
when(retrievableStateHandle1.retrieveState()).thenReturn(completedCheckpoint1);
+
+ final RetrievableStateHandle<CompletedCheckpoint>
retrievableStateHandle2 = mock(RetrievableStateHandle.class);
+
when(retrievableStateHandle2.retrieveState()).thenReturn(completedCheckpoint2);
+
+
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle,
"/a_failing1"));
+ checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1,
"/b_foobar1"));
+
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle,
"/c_failing2"));
+ checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2,
"/d_foobar2"));
+
+ final CuratorFramework client = mock(CuratorFramework.class,
Mockito.RETURNS_DEEP_STUBS);
+ final RetrievableStateStorageHelper<CompletedCheckpoint>
storageHelperMock = mock(RetrievableStateStorageHelper.class);
+
+ ZooKeeperStateHandleStore<CompletedCheckpoint>
zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client,
storageHelperMock, Executors.directExecutor()));
+
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
+
doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
+
+ final int numCheckpointsToRetain = 1;
+
+ // Mocking for the delete operation on the CuratorFramework
client
+ // It assures that the callback is executed synchronously
+
+ final EnsurePath ensurePathMock = mock(EnsurePath.class);
--- End diff --
I agree with Stefan, that this test looks really hard to maintain. I think
the underlying problem is that `ZooKeeperCompletedCheckpointStore` uses
internally the `ZooKeeperStateHandleStore` which is hardwired. I guess it would
be better if we had a proper interface which can be passed to the
`ZooKeeperCompletedCheckpointStore`. Moreover I think, the current design is a
bad separation of concerns wrt how `ZooKeeperCompletedCheckpointStore` and the
`ZooKeeperStateHandleStore` work together. My gut feeling is that
`ZooKeeperCompletedCheckpointStore` does not have to know anything about
`ZooKeeper` at all. Fixing this, should make the components easier to test as
well.
> Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
> ------------------------------------------------------------------------------
>
> Key: FLINK-7783
> URL: https://issues.apache.org/jira/browse/FLINK-7783
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Affects Versions: 1.4.0, 1.3.2
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, we always delete checkpoint handles if they (or the data from the
> DFS) cannot be read:
> https://github.com/apache/flink/blob/91a4b276171afb760bfff9ccf30593e648e91dfb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L180
> This can lead to problems in case the DFS is temporarily not available, i.e.
> we could inadvertently
> delete all checkpoints even though they are still valid.
> A user reported this problem on the mailing list:
> https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)