Repository: flink
Updated Branches:
  refs/heads/release-1.3.3-rc1 [created] 3c40656a7


[FLINK-7783] Don't always remove checkpoints in 
ZooKeeperCompletedCheckpointStore#recover()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f69bdf20
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f69bdf20
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f69bdf20

Branch: refs/heads/release-1.3.3-rc1
Commit: f69bdf207b92ca47a5ce3e29f6ec7193ed17ec72
Parents: 100558b
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Sun Oct 22 11:40:43 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Mon Mar 12 18:40:17 2018 +0800

----------------------------------------------------------------------
 .../runtime/checkpoint/CompletedCheckpoint.java |  24 +++++
 .../ZooKeeperCompletedCheckpointStore.java      | 105 ++++++++++++-------
 .../zookeeper/ZooKeeperStateHandleStore.java    |   3 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  35 +++++--
 4 files changed, 118 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f69bdf20/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 76d1580..01718e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -303,4 +303,28 @@ public class CompletedCheckpoint implements Serializable {
        public String toString() {
                return String.format("Checkpoint %d @ %d for %s", checkpointID, 
timestamp, job);
        }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               CompletedCheckpoint that = (CompletedCheckpoint) o;
+
+               if (checkpointID != that.checkpointID) {
+                       return false;
+               }
+               return job.equals(that.job);
+       }
+
+       @Override
+       public int hashCode() {
+               int result = job.hashCode();
+               result = 31 * result + (int) (checkpointID ^ (checkpointID >>> 
32));
+               return result;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f69bdf20/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 88dd0d4..73598e6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -70,16 +70,20 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
 
-       /** Curator ZooKeeper client */
+       /** Curator ZooKeeper client. */
        private final CuratorFramework client;
 
-       /** Completed checkpoints in ZooKeeper */
+       /** Completed checkpoints in ZooKeeper. */
        private final ZooKeeperStateHandleStore<CompletedCheckpoint> 
checkpointsInZooKeeper;
 
        /** The maximum number of checkpoints to retain (at least 1). */
        private final int maxNumberOfCheckpointsToRetain;
 
-       /** Local completed checkpoints. */
+       /**
+        * Local copy of the completed checkpoints in ZooKeeper. This is 
restored from ZooKeeper
+        * when recovering and is maintained in parallel to the state in 
ZooKeeper during normal
+        * operations.
+        */
        private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
        /**
@@ -122,7 +126,7 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                this.checkpointsInZooKeeper = new 
ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
 
                this.completedCheckpoints = new 
ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
-               
+
                LOG.info("Initialized in '{}'.", checkpointsPath);
        }
 
@@ -142,11 +146,6 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
        public void recover() throws Exception {
                LOG.info("Recovering checkpoints from ZooKeeper.");
 
-               // Clear local handles in order to prevent duplicates on
-               // recovery. The local handles should reflect the state
-               // of ZooKeeper.
-               completedCheckpoints.clear();
-
                // Get all there is first
                List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> initialCheckpoints;
                while (true) {
@@ -163,22 +162,59 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
 
                LOG.info("Found {} checkpoints in ZooKeeper.", 
numberOfInitialCheckpoints);
 
-               for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String> checkpointStateHandle : initialCheckpoints) {
+               // Try and read the state handles from storage. We try until we 
either successfully read
+               // all of them or when we reach a stable state, i.e. when we 
successfully read the same set
+               // of checkpoints in two tries. We do it like this to protect 
against transient outages
+               // of the checkpoint store (for example a DFS): if the DFS 
comes online midway through
+               // reading a set of checkpoints we would run the risk of 
reading only a partial set
+               // of checkpoints while we could in fact read the other 
checkpoints as well if we retried.
+               // Waiting until a stable state protects against this while 
also being resilient against
+               // checkpoints being actually unreadable.
+               //
+               // These considerations are also important in the scope of 
incremental checkpoints, where
+               // we use ref-counting for shared state handles and might 
accidentally delete shared state
+               // of checkpoints that we don't read due to transient storage 
outages.
+               List<CompletedCheckpoint> lastTryRetrievedCheckpoints = new 
ArrayList<>(numberOfInitialCheckpoints);
+               List<CompletedCheckpoint> retrievedCheckpoints = new 
ArrayList<>(numberOfInitialCheckpoints);
+               do {
+                       LOG.info("Trying to fetch {} checkpoints from 
storage.", numberOfInitialCheckpoints);
 
-                       CompletedCheckpoint completedCheckpoint = null;
+                       lastTryRetrievedCheckpoints.clear();
+                       
lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints);
 
-                       try {
-                               completedCheckpoint = 
retrieveCompletedCheckpoint(checkpointStateHandle);
-                               if (completedCheckpoint != null) {
-                                       
completedCheckpoints.add(completedCheckpoint);
-                               }
-                       } catch (Exception e) {
-                               LOG.warn("Could not retrieve checkpoint. 
Removing it from the completed " +
-                                       "checkpoint store.", e);
+                       retrievedCheckpoints.clear();
+
+                       for 
(Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> 
checkpointStateHandle : initialCheckpoints) {
 
-                               // remove the checkpoint with broken state 
handle
-                               
removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0);
+                               CompletedCheckpoint completedCheckpoint = null;
+
+                               try {
+                                       completedCheckpoint = 
retrieveCompletedCheckpoint(checkpointStateHandle);
+                                       if (completedCheckpoint != null) {
+                                               
retrievedCheckpoints.add(completedCheckpoint);
+                                       }
+                               } catch (Exception e) {
+                                       LOG.warn("Could not retrieve 
checkpoint, not adding to list of recovered checkpoints.", e);
+                               }
                        }
+
+               } while (retrievedCheckpoints.size() != 
numberOfInitialCheckpoints &&
+                       
!lastTryRetrievedCheckpoints.equals(retrievedCheckpoints));
+
+               // Clear local handles in order to prevent duplicates on
+               // recovery. The local handles should reflect the state
+               // of ZooKeeper.
+               completedCheckpoints.clear();
+               completedCheckpoints.addAll(retrievedCheckpoints);
+
+               if (completedCheckpoints.isEmpty() && 
numberOfInitialCheckpoints > 0) {
+                       throw new FlinkException(
+                               "Could not read any of the " + 
numberOfInitialCheckpoints + " from storage.");
+               } else if (completedCheckpoints.size() != 
numberOfInitialCheckpoints) {
+                       LOG.warn(
+                               "Could only fetch {} of {} checkpoints from 
storage.",
+                               completedCheckpoints.size(),
+                               numberOfInitialCheckpoints);
                }
        }
 
@@ -190,7 +226,7 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
        @Override
        public void addCheckpoint(final CompletedCheckpoint checkpoint) throws 
Exception {
                checkNotNull(checkpoint, "Checkpoint");
-               
+
                final String path = 
checkpointIdToPath(checkpoint.getCheckpointID());
 
                // Now add the new one. If it fails, we don't want to loose 
existing data.
@@ -268,10 +304,13 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
 
        // 
------------------------------------------------------------------------
 
+       /**
+        * Removes a subsumed checkpoint from ZooKeeper and drops the state.
+        */
        private void removeSubsumed(
                final CompletedCheckpoint completedCheckpoint) throws Exception 
{
 
-               if(completedCheckpoint == null) {
+               if (completedCheckpoint == null) {
                        return;
                }
 
@@ -294,11 +333,14 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                        action);
        }
 
+       /**
+        * Removes a checkpoint from ZooKeeper because of Job shutdown and 
drops the state.
+        */
        private void removeShutdown(
                        final CompletedCheckpoint completedCheckpoint,
                        final JobStatus jobStatus) throws Exception {
 
-               if(completedCheckpoint == null) {
+               if (completedCheckpoint == null) {
                        return;
                }
 
@@ -318,21 +360,6 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                        removeAction);
        }
 
-       private void removeBrokenStateHandle(
-               final String pathInZooKeeper,
-               final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle) throws Exception {
-               checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, new 
ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-                       @Override
-                       public void apply(@Nullable 
RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-                               try {
-                                       retrievableStateHandle.discardState();
-                               } catch (Exception e) {
-                                       throw new FlinkException("Could not 
discard state handle.", e);
-                               }
-                       }
-               });
-       }
-
        /**
         * Convert a checkpoint id into a ZooKeeper path.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/f69bdf20/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index a548f1d..dc3f7d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -326,7 +326,8 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
 
        /**
         * Gets all available state handles from ZooKeeper sorted by name 
(ascending) and locks the
-        * respective state nodes.
+        * respective state nodes. The result tuples contain the retrieved 
state and the path to the
+        * node in ZooKeeper.
         *
         * <p>If there is a concurrent modification, the operation is retried 
until it succeeds.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/f69bdf20/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 3171f1f..bee245c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.TestLogger;
@@ -44,6 +45,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.Executor;
@@ -57,7 +59,6 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.doAnswer;
@@ -85,10 +86,25 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
        public void testCheckpointRecovery() throws Exception {
                final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> checkpointsInZooKeeper = new ArrayList<>(4);
 
-               final CompletedCheckpoint completedCheckpoint1 = 
mock(CompletedCheckpoint.class);
-               when(completedCheckpoint1.getCheckpointID()).thenReturn(1L);
-               final CompletedCheckpoint completedCheckpoint2 = 
mock(CompletedCheckpoint.class);
-               when(completedCheckpoint2.getCheckpointID()).thenReturn(2L);
+               final CompletedCheckpoint completedCheckpoint1 = new 
CompletedCheckpoint(
+                       new JobID(),
+                       1L,
+                       1L,
+                       1L,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null, null);
+
+               final CompletedCheckpoint completedCheckpoint2 = new 
CompletedCheckpoint(
+                       new JobID(),
+                       2L,
+                       2L,
+                       2L,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null, null);
 
                final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
                expectedCheckpointIds.add(1L);
@@ -180,12 +196,13 @@ public class ZooKeeperCompletedCheckpointStoreTest 
extends TestLogger {
 
                assertEquals(expectedCheckpointIds, actualCheckpointIds);
 
-               // check that we did not discard any of the state handles which 
were retrieved
+               // check that we did not discard any of the state handles
                verify(retrievableStateHandle1, never()).discardState();
                verify(retrievableStateHandle2, never()).discardState();
 
-               // check that we have discarded the state handles which could 
not be retrieved
-               verify(failingRetrievableStateHandle, times(2)).discardState();
+               // Make sure that we also didn't discard any of the broken 
handles. Only when checkpoints
+               // are subsumed should they be discarded.
+               verify(failingRetrievableStateHandle, never()).discardState();
        }
        
        /**

Reply via email to