[ 
https://issues.apache.org/jira/browse/FLINK-7783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212351#comment-16212351
 ] 

ASF GitHub Bot commented on FLINK-7783:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4863#discussion_r145905621
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 ---
    @@ -183,11 +179,120 @@ public Void answer(InvocationOnMock invocation) 
throws Throwable {
                // check that we did not discard any of the state handles which 
were retrieved
                verify(retrievableStateHandle1, never()).discardState();
                verify(retrievableStateHandle2, never()).discardState();
    +   }
    +
    +   /**
    +    * Tests that the completed checkpoint store can retrieve all 
checkpoints stored in ZooKeeper
    +    * and ignores those which cannot be retrieved via their state handles.
    +    */
    +   @Test
    +   public void testCheckpointRecoveryWithBrokenCheckpoints() throws 
Exception {
    +           final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> checkpointsInZooKeeper = new ArrayList<>(4);
    +
    +           final CompletedCheckpoint completedCheckpoint1 = 
mock(CompletedCheckpoint.class);
    +           when(completedCheckpoint1.getCheckpointID()).thenReturn(1L);
    +           final CompletedCheckpoint completedCheckpoint2 = 
mock(CompletedCheckpoint.class);
    +           when(completedCheckpoint2.getCheckpointID()).thenReturn(2L);
    +
    +           final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
    +           expectedCheckpointIds.add(1L);
    +           expectedCheckpointIds.add(2L);
    +
    +           final RetrievableStateHandle<CompletedCheckpoint> 
failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
    +           
when(failingRetrievableStateHandle.retrieveState()).thenThrow(new 
IOException("Test exception"));
    +
    +           final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle1 = mock(RetrievableStateHandle.class);
    +           
when(retrievableStateHandle1.retrieveState()).thenReturn(completedCheckpoint1);
    +
    +           final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle2 = mock(RetrievableStateHandle.class);
    +           
when(retrievableStateHandle2.retrieveState()).thenReturn(completedCheckpoint2);
    +
    +           
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, 
"/a_failing1"));
    +           checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, 
"/b_foobar1"));
    +           
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, 
"/c_failing2"));
    +           checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, 
"/d_foobar2"));
    +
    +           final CuratorFramework client = mock(CuratorFramework.class, 
Mockito.RETURNS_DEEP_STUBS);
    +           final RetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelperMock = mock(RetrievableStateStorageHelper.class);
    +
    +           ZooKeeperStateHandleStore<CompletedCheckpoint> 
zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock, Executors.directExecutor()));
    +           
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
    +           
doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
    +
    +           final int numCheckpointsToRetain = 1;
    +
    +           // Mocking for the delete operation on the CuratorFramework 
client
    +           // It assures that the callback is executed synchronously
    +
    +           final EnsurePath ensurePathMock = mock(EnsurePath.class);
    --- End diff --
    
    I agree with Stefan, that this test looks really hard to maintain. I think 
the underlying problem is that `ZooKeeperCompletedCheckpointStore` uses 
internally the `ZooKeeperStateHandleStore` which is hardwired. I guess it would 
be better if we had a proper interface which can be passed to the 
`ZooKeeperCompletedCheckpointStore`. Moreover I think, the current design is a 
bad separation of concerns wrt how  `ZooKeeperCompletedCheckpointStore` and the 
`ZooKeeperStateHandleStore` work together. My gut feeling is that 
`ZooKeeperCompletedCheckpointStore` does not have to know anything about 
`ZooKeeper` at all. Fixing this, should make the components easier to test as 
well.


> Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-7783
>                 URL: https://issues.apache.org/jira/browse/FLINK-7783
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>
> Currently, we always delete checkpoint handles if they (or the data from the 
> DFS) cannot be read: 
> https://github.com/apache/flink/blob/91a4b276171afb760bfff9ccf30593e648e91dfb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L180
> This can lead to problems in case the DFS is temporarily not available, i.e. 
> we could inadvertently
> delete all checkpoints even though they are still valid.
> A user reported this problem on the mailing list: 
> https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to