Repository: flink Updated Branches: refs/heads/master b93396c5a -> 040356391
[FLINK-6640] Ensure registration of shared state happens before externalizing a checkpoint Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04035639 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04035639 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04035639 Branch: refs/heads/master Commit: 040356391f621858199948b15160b1e382152cc1 Parents: b93396c Author: Stefan Richter <[email protected]> Authored: Fri May 19 16:46:06 2017 +0200 Committer: Stefan Richter <[email protected]> Committed: Sat May 20 15:45:45 2017 +0200 ---------------------------------------------------------------------- .../AbstractCompletedCheckpointStore.java | 43 -------------- .../checkpoint/CheckpointCoordinator.java | 17 +++--- .../runtime/checkpoint/CompletedCheckpoint.java | 6 +- .../checkpoint/CompletedCheckpointStore.java | 5 +- .../runtime/checkpoint/PendingCheckpoint.java | 1 - .../StandaloneCompletedCheckpointStore.java | 11 ++-- .../ZooKeeperCompletedCheckpointStore.java | 26 ++++----- .../runtime/state/SharedStateRegistry.java | 16 +++++- .../CheckpointCoordinatorFailureTest.java | 3 +- .../CompletedCheckpointStoreTest.java | 33 +++++++---- .../checkpoint/CompletedCheckpointTest.java | 14 ++--- .../StandaloneCompletedCheckpointStoreTest.java | 21 +++---- ...ZooKeeperCompletedCheckpointStoreITCase.java | 60 ++++++++++++-------- .../ZooKeeperCompletedCheckpointStoreTest.java | 4 +- .../RecoverableCompletedCheckpointStore.java | 13 ++--- .../JobManagerHACheckpointRecoveryITCase.java | 37 +++++++----- 16 files changed, 156 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java deleted file mode 100644 index bf70501..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint; - -import org.apache.flink.runtime.state.SharedStateRegistry; - -import java.util.concurrent.Executor; - -/** - * This is the base class that provides implementation of some aspects common for all - * {@link CompletedCheckpointStore}s. - */ -public abstract class AbstractCompletedCheckpointStore implements CompletedCheckpointStore { - - /** - * Registry for shared states. - */ - protected final SharedStateRegistry sharedStateRegistry; - - public AbstractCompletedCheckpointStore() { - this.sharedStateRegistry = new SharedStateRegistry(); - } - - public AbstractCompletedCheckpointStore(Executor asyncIOExecutor) { - this.sharedStateRegistry = new SharedStateRegistry(asyncIOExecutor); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 0bbf6b5..e224780 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -40,11 +40,11 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -173,6 +173,9 @@ public class CheckpointCoordinator { @Nullable private CheckpointStatsTracker statsTracker; + /** Registry that tracks state which is shared across (incremental) checkpoints */ + private final SharedStateRegistry sharedStateRegistry; + // -------------------------------------------------------------------------------------------- public CheckpointCoordinator( @@ -226,6 +229,7 @@ public class CheckpointCoordinator { this.completedCheckpointStore = checkNotNull(completedCheckpointStore); this.checkpointDirectory = checkpointDirectory; this.executor = checkNotNull(executor); + this.sharedStateRegistry = new SharedStateRegistry(executor); this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); this.masterHooks = new HashMap<>(); @@ -836,6 +840,10 @@ public class CheckpointCoordinator { final long checkpointId = pendingCheckpoint.getCheckpointId(); final CompletedCheckpoint completedCheckpoint; + // As a first step to complete the checkpoint, we register its state with the registry + Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates(); + sharedStateRegistry.registerAll(operatorStates.values()); + try { try { // externalize the checkpoint if required @@ -1003,7 +1011,7 @@ public class CheckpointCoordinator { } // Recover the checkpoints - completedCheckpointStore.recover(); + completedCheckpointStore.recover(sharedStateRegistry); // restore from the latest checkpoint CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); @@ -1120,11 +1128,6 @@ public class CheckpointCoordinator { return completedCheckpointStore; } -// @VisibleForTesting -// SharedStateRegistry getSharedStateRegistry() { -// return sharedStateRegistry; -// } - public CheckpointIDCounter getCheckpointIdCounter() { return checkpointIdCounter; } http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index b382080..7c3edee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -178,7 +178,7 @@ public class CompletedCheckpoint implements Serializable { doDiscard(); } - public boolean discardOnSubsume(SharedStateRegistry sharedStateRegistry) throws Exception { + public boolean discardOnSubsume() throws Exception { if (props.discardOnSubsumed()) { doDiscard(); @@ -188,7 +188,7 @@ public class CompletedCheckpoint implements Serializable { return false; } - public boolean discardOnShutdown(JobStatus jobStatus, SharedStateRegistry sharedStateRegistry) throws Exception { + public boolean discardOnShutdown(JobStatus jobStatus) throws Exception { if (jobStatus == JobStatus.FINISHED && props.discardOnJobFinished() || jobStatus == JobStatus.CANCELED && props.discardOnJobCancelled() || @@ -290,7 +290,7 @@ public class CompletedCheckpoint implements Serializable { * * @param sharedStateRegistry The registry where shared states are registered */ - public void registerSharedStates(SharedStateRegistry sharedStateRegistry) { + public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateRegistry) { sharedStateRegistry.registerAll(operatorStates.values()); } http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java index 82193b5..45d407e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.SharedStateRegistry; import java.util.List; @@ -32,8 +33,10 @@ public interface CompletedCheckpointStore { * * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest * available checkpoint. + * + * @param sharedStateRegistry the shared state registry to register recovered states. */ - void recover() throws Exception; + void recover(SharedStateRegistry sharedStateRegistry) throws Exception; /** * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints. http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 370032a..0633fec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -35,7 +35,6 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index 233cfc8..fbb0198 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +33,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; /** * {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#NONE}. */ -public class StandaloneCompletedCheckpointStore extends AbstractCompletedCheckpointStore { +public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore { private static final Logger LOG = LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class); @@ -56,21 +57,19 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompletedCheckpo } @Override - public void recover() throws Exception { + public void recover(SharedStateRegistry sharedStateRegistry) throws Exception { // Nothing to do } @Override public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { - - checkpoint.registerSharedStates(sharedStateRegistry); checkpoints.addLast(checkpoint); if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { try { CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); - checkpointToSubsume.discardOnSubsume(sharedStateRegistry); + checkpointToSubsume.discardOnSubsume(); } catch (Exception e) { LOG.warn("Fail to subsume the old checkpoint.", e); } @@ -103,7 +102,7 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompletedCheckpo LOG.info("Shutting down"); for (CompletedCheckpoint checkpoint : checkpoints) { - checkpoint.discardOnShutdown(jobStatus, sharedStateRegistry); + checkpoint.discardOnShutdown(jobStatus); } } finally { checkpoints.clear(); http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 4c3c1ff..469c1b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -64,7 +64,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * checkpoints is consistent. Currently, after recovery we start out with only a single * checkpoint to circumvent those situations. */ -public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpointStore { +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); @@ -102,8 +102,6 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage, Executor executor) throws Exception { - super(executor); - checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); checkNotNull(stateStorage, "State storage"); @@ -139,13 +137,14 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi * that the history of checkpoints is consistent. */ @Override - public void recover() throws Exception { + public void recover(SharedStateRegistry sharedStateRegistry) throws Exception { LOG.info("Recovering checkpoints from ZooKeeper."); // Clear local handles in order to prevent duplicates on // recovery. The local handles should reflect the state // of ZooKeeper. completedCheckpoints.clear(); + sharedStateRegistry.clear(); // Get all there is first List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints; @@ -171,7 +170,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); if (completedCheckpoint != null) { // Re-register all shared states in the checkpoint. - completedCheckpoint.registerSharedStates(sharedStateRegistry); + completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry); completedCheckpoints.add(completedCheckpoint); } } catch (Exception e) { @@ -195,9 +194,6 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi final String path = checkpointIdToPath(checkpoint.getCheckpointID()); - // First, register all shared states in the checkpoint to consolidates placeholder. - checkpoint.registerSharedStates(sharedStateRegistry); - // Now add the new one. If it fails, we don't want to loose existing data. checkpointsInZooKeeper.addAndLock(path, checkpoint); @@ -206,7 +202,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi // Everything worked, let's remove a previous checkpoint if necessary. while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) { try { - removeSubsumed(completedCheckpoints.removeFirst(), sharedStateRegistry); + removeSubsumed(completedCheckpoints.removeFirst()); } catch (Exception e) { LOG.warn("Failed to subsume the old checkpoint", e); } @@ -248,7 +244,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi for (CompletedCheckpoint checkpoint : completedCheckpoints) { try { - removeShutdown(checkpoint, jobStatus, sharedStateRegistry); + removeShutdown(checkpoint, jobStatus); } catch (Exception e) { LOG.error("Failed to discard checkpoint.", e); } @@ -274,8 +270,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi // ------------------------------------------------------------------------ private void removeSubsumed( - final CompletedCheckpoint completedCheckpoint, - final SharedStateRegistry sharedStateRegistry) throws Exception { + final CompletedCheckpoint completedCheckpoint) throws Exception { if(completedCheckpoint == null) { return; @@ -287,7 +282,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException { if (value != null) { try { - completedCheckpoint.discardOnSubsume(sharedStateRegistry); + completedCheckpoint.discardOnSubsume(); } catch (Exception e) { throw new FlinkException("Could not discard the completed checkpoint on subsume.", e); } @@ -302,8 +297,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi private void removeShutdown( final CompletedCheckpoint completedCheckpoint, - final JobStatus jobStatus, - final SharedStateRegistry sharedStateRegistry) throws Exception { + final JobStatus jobStatus) throws Exception { if(completedCheckpoint == null) { return; @@ -313,7 +307,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi @Override public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException { try { - completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry); + completedCheckpoint.discardOnShutdown(jobStatus); } catch (Exception e) { throw new FlinkException("Could not discard the completed checkpoint on subsume.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java index a5e0f84..949839b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java @@ -29,10 +29,13 @@ import java.util.Objects; import java.util.concurrent.Executor; /** + * This registry manages state that is shared across (incremental) checkpoints, and is responsible + * for deleting shared state that is no longer used in any valid checkpoint. + * * A {@code SharedStateRegistry} will be deployed in the - * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to - * maintain the reference count of {@link StreamStateHandle}s which are shared - * among different incremental checkpoints. + * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to + * maintain the reference count of {@link StreamStateHandle}s by a key that (logically) identifies + * them. */ public class SharedStateRegistry { @@ -247,4 +250,11 @@ public class SharedStateRegistry { } } } + + /** + * Clears the registry. + */ + public void clear() { + registeredStates.clear(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 6e20be3..344b340 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -135,7 +136,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore { @Override - public void recover() throws Exception { + public void recover(SharedStateRegistry sharedStateRegistry) throws Exception { throw new UnsupportedOperationException("Not implemented."); } http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java index fb5d7c3..1fe4e65 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java @@ -64,6 +64,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { */ @Test public void testAddAndGetLatestCheckpoint() throws Exception { + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4); // Empty state @@ -71,7 +72,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { assertEquals(0, checkpoints.getAllCheckpoints().size()); TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] { - createCheckpoint(0), createCheckpoint(1) }; + createCheckpoint(0, sharedStateRegistry), createCheckpoint(1, sharedStateRegistry) }; // Add and get latest checkpoints.addCheckpoint(expected[0]); @@ -89,11 +90,12 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { */ @Test public void testAddCheckpointMoreThanMaxRetained() throws Exception { + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1); TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] { - createCheckpoint(0), createCheckpoint(1), - createCheckpoint(2), createCheckpoint(3) + createCheckpoint(0, sharedStateRegistry), createCheckpoint(1, sharedStateRegistry), + createCheckpoint(2, sharedStateRegistry), createCheckpoint(3, sharedStateRegistry) }; // Add checkpoints @@ -134,11 +136,12 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { */ @Test public void testGetAllCheckpoints() throws Exception { + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4); TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] { - createCheckpoint(0), createCheckpoint(1), - createCheckpoint(2), createCheckpoint(3) + createCheckpoint(0, sharedStateRegistry), createCheckpoint(1, sharedStateRegistry), + createCheckpoint(2, sharedStateRegistry), createCheckpoint(3, sharedStateRegistry) }; for (TestCompletedCheckpoint checkpoint : expected) { @@ -159,11 +162,12 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { */ @Test public void testDiscardAllCheckpoints() throws Exception { + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4); TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] { - createCheckpoint(0), createCheckpoint(1), - createCheckpoint(2), createCheckpoint(3) + createCheckpoint(0, sharedStateRegistry), createCheckpoint(1, sharedStateRegistry), + createCheckpoint(2, sharedStateRegistry), createCheckpoint(3, sharedStateRegistry) }; for (TestCompletedCheckpoint checkpoint : expected) { @@ -187,7 +191,10 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { // --------------------------------------------------------------------------------------------- - protected TestCompletedCheckpoint createCheckpoint(int id) throws IOException { + protected TestCompletedCheckpoint createCheckpoint( + int id, + SharedStateRegistry sharedStateRegistry) throws IOException { + int numberOfStates = 4; CheckpointProperties props = CheckpointProperties.forStandardCheckpoint(); @@ -204,6 +211,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { operatorState.putState(i, subtaskState); } + operatorState.registerSharedStates(sharedStateRegistry); + return new TestCompletedCheckpoint(new JobID(), id, 0, operatorGroupState, props); } @@ -251,8 +260,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { } @Override - public boolean discardOnSubsume(SharedStateRegistry sharedStateRegistry) throws Exception { - if (super.discardOnSubsume(sharedStateRegistry)) { + public boolean discardOnSubsume() throws Exception { + if (super.discardOnSubsume()) { discard(); return true; } else { @@ -261,8 +270,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { } @Override - public boolean discardOnShutdown(JobStatus jobStatus, SharedStateRegistry sharedStateRegistry) throws Exception { - if (super.discardOnShutdown(jobStatus, sharedStateRegistry)) { + public boolean discardOnShutdown(JobStatus jobStatus) throws Exception { + if (super.discardOnShutdown(jobStatus)) { discard(); return true; } else { http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index 0bbb961..4846879 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -67,7 +67,7 @@ public class CompletedCheckpointTest { new FileStateHandle(new Path(file.toURI()), file.length()), file.getAbsolutePath()); - checkpoint.discardOnShutdown(JobStatus.FAILED, new SharedStateRegistry()); + checkpoint.discardOnShutdown(JobStatus.FAILED); assertEquals(false, file.exists()); } @@ -93,11 +93,11 @@ public class CompletedCheckpointTest { null); SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - checkpoint.registerSharedStates(sharedStateRegistry); + checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry); verify(state, times(1)).registerSharedStates(sharedStateRegistry); // Subsume - checkpoint.discardOnSubsume(sharedStateRegistry); + checkpoint.discardOnSubsume(); verify(state, times(1)).discardState(); } @@ -132,9 +132,9 @@ public class CompletedCheckpointTest { externalPath); SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - checkpoint.registerSharedStates(sharedStateRegistry); + checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry); - checkpoint.discardOnShutdown(status, sharedStateRegistry); + checkpoint.discardOnShutdown(status); verify(state, times(0)).discardState(); assertEquals(true, file.exists()); @@ -148,7 +148,7 @@ public class CompletedCheckpointTest { null, null); - checkpoint.discardOnShutdown(status, sharedStateRegistry); + checkpoint.discardOnShutdown(status); verify(state, times(1)).discardState(); } } @@ -176,7 +176,7 @@ public class CompletedCheckpointTest { CompletedCheckpointStats.DiscardCallback callback = mock(CompletedCheckpointStats.DiscardCallback.class); completed.setDiscardCallback(callback); - completed.discardOnShutdown(JobStatus.FINISHED, new SharedStateRegistry()); + completed.discardOnShutdown(JobStatus.FINISHED); verify(callback, times(1)).notifyDiscardedCheckpoint(); } http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java index be94762..6f3c60b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java @@ -30,7 +30,6 @@ import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; import static org.powermock.api.mockito.PowerMockito.doReturn; import static org.powermock.api.mockito.PowerMockito.doThrow; import static org.powermock.api.mockito.PowerMockito.mock; @@ -41,7 +40,7 @@ import static org.powermock.api.mockito.PowerMockito.mock; public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointStoreTest { @Override - protected AbstractCompletedCheckpointStore createCompletedCheckpoints( + protected CompletedCheckpointStore createCompletedCheckpoints( int maxNumberOfCheckpointsToRetain) throws Exception { return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain); @@ -52,13 +51,14 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS */ @Test public void testShutdownDiscardsCheckpoints() throws Exception { - AbstractCompletedCheckpointStore store = createCompletedCheckpoints(1); - TestCompletedCheckpoint checkpoint = createCheckpoint(0); + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); + CompletedCheckpointStore store = createCompletedCheckpoints(1); + TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry); Collection<OperatorState> operatorStates = checkpoint.getOperatorStates().values(); store.addCheckpoint(checkpoint); assertEquals(1, store.getNumberOfRetainedCheckpoints()); - verifyCheckpointRegistered(operatorStates, store.sharedStateRegistry); + verifyCheckpointRegistered(operatorStates, sharedStateRegistry); store.shutdown(JobStatus.FINISHED); assertEquals(0, store.getNumberOfRetainedCheckpoints()); @@ -72,13 +72,14 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS */ @Test public void testSuspendDiscardsCheckpoints() throws Exception { - AbstractCompletedCheckpointStore store = createCompletedCheckpoints(1); - TestCompletedCheckpoint checkpoint = createCheckpoint(0); + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); + CompletedCheckpointStore store = createCompletedCheckpoints(1); + TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry); Collection<OperatorState> taskStates = checkpoint.getOperatorStates().values(); store.addCheckpoint(checkpoint); assertEquals(1, store.getNumberOfRetainedCheckpoints()); - verifyCheckpointRegistered(taskStates, store.sharedStateRegistry); + verifyCheckpointRegistered(taskStates, sharedStateRegistry); store.shutdown(JobStatus.SUSPENDED); assertEquals(0, store.getNumberOfRetainedCheckpoints()); @@ -92,7 +93,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS */ @Test public void testAddCheckpointWithFailedRemove() throws Exception { - + final int numCheckpointsToRetain = 1; CompletedCheckpointStore store = createCompletedCheckpoints(numCheckpointsToRetain); @@ -100,7 +101,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class); doReturn(i).when(checkpointToAdd).getCheckpointID(); doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates(); - doThrow(new IOException()).when(checkpointToAdd).discardOnSubsume(any(SharedStateRegistry.class)); + doThrow(new IOException()).when(checkpointToAdd).discardOnSubsume(); try { store.addCheckpoint(checkpointToAdd); http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 44c802b..81ee4f9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; import org.apache.zookeeper.data.Stat; @@ -34,7 +35,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; @@ -82,10 +82,14 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint */ @Test public void testRecover() throws Exception { - AbstractCompletedCheckpointStore checkpoints = createCompletedCheckpoints(3); - TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] { - createCheckpoint(0), createCheckpoint(1), createCheckpoint(2) + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); + CompletedCheckpointStore checkpoints = createCompletedCheckpoints(3); + + TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[]{ + createCheckpoint(0, sharedStateRegistry), + createCheckpoint(1, sharedStateRegistry), + createCheckpoint(2, sharedStateRegistry) }; // Add multiple checkpoints @@ -93,16 +97,17 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint checkpoints.addCheckpoint(expected[1]); checkpoints.addCheckpoint(expected[2]); - verifyCheckpointRegistered(expected[0].getOperatorStates().values(), checkpoints.sharedStateRegistry); - verifyCheckpointRegistered(expected[1].getOperatorStates().values(), checkpoints.sharedStateRegistry); - verifyCheckpointRegistered(expected[2].getOperatorStates().values(), checkpoints.sharedStateRegistry); + verifyCheckpointRegistered(expected[0].getOperatorStates().values(), sharedStateRegistry); + verifyCheckpointRegistered(expected[1].getOperatorStates().values(), sharedStateRegistry); + verifyCheckpointRegistered(expected[2].getOperatorStates().values(), sharedStateRegistry); // All three should be in ZK assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size()); assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); // Recover - checkpoints.recover(); + sharedStateRegistry.clear(); + checkpoints.recover(sharedStateRegistry); assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size()); assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); @@ -111,7 +116,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint List<CompletedCheckpoint> expectedCheckpoints = new ArrayList<>(3); expectedCheckpoints.add(expected[1]); expectedCheckpoints.add(expected[2]); - expectedCheckpoints.add(createCheckpoint(3)); + expectedCheckpoints.add(createCheckpoint(3, sharedStateRegistry)); checkpoints.addCheckpoint(expectedCheckpoints.get(2)); @@ -120,7 +125,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint assertEquals(expectedCheckpoints, actualCheckpoints); for (CompletedCheckpoint actualCheckpoint : actualCheckpoints) { - verifyCheckpointRegistered(actualCheckpoint.getOperatorStates().values(), checkpoints.sharedStateRegistry); + verifyCheckpointRegistered(actualCheckpoint.getOperatorStates().values(), sharedStateRegistry); } } @@ -131,8 +136,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint public void testShutdownDiscardsCheckpoints() throws Exception { CuratorFramework client = ZOOKEEPER.getClient(); + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); CompletedCheckpointStore store = createCompletedCheckpoints(1); - TestCompletedCheckpoint checkpoint = createCheckpoint(0); + TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry); store.addCheckpoint(checkpoint); assertEquals(1, store.getNumberOfRetainedCheckpoints()); @@ -142,7 +148,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint assertEquals(0, store.getNumberOfRetainedCheckpoints()); assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID()))); - store.recover(); + sharedStateRegistry.clear(); + store.recover(sharedStateRegistry); assertEquals(0, store.getNumberOfRetainedCheckpoints()); } @@ -156,8 +163,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint public void testSuspendKeepsCheckpoints() throws Exception { CuratorFramework client = ZOOKEEPER.getClient(); + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); CompletedCheckpointStore store = createCompletedCheckpoints(1); - TestCompletedCheckpoint checkpoint = createCheckpoint(0); + TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry); store.addCheckpoint(checkpoint); assertEquals(1, store.getNumberOfRetainedCheckpoints()); @@ -174,7 +182,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint assertEquals("The checkpoint node should not be locked.", 0, stat.getNumChildren()); // Recover again - store.recover(); + sharedStateRegistry.clear(); + store.recover(sharedStateRegistry); CompletedCheckpoint recovered = store.getLatestCheckpoint(); assertEquals(checkpoint, recovered); @@ -188,18 +197,20 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint @Test public void testLatestCheckpointRecovery() throws Exception { final int numCheckpoints = 3; - AbstractCompletedCheckpointStore checkpointStore = createCompletedCheckpoints(numCheckpoints); + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); + CompletedCheckpointStore checkpointStore = createCompletedCheckpoints(numCheckpoints); List<CompletedCheckpoint> checkpoints = new ArrayList<>(numCheckpoints); - checkpoints.add(createCheckpoint(9)); - checkpoints.add(createCheckpoint(10)); - checkpoints.add(createCheckpoint(11)); + checkpoints.add(createCheckpoint(9, sharedStateRegistry)); + checkpoints.add(createCheckpoint(10, sharedStateRegistry)); + checkpoints.add(createCheckpoint(11, sharedStateRegistry)); for (CompletedCheckpoint checkpoint : checkpoints) { checkpointStore.addCheckpoint(checkpoint); } - checkpointStore.recover(); + sharedStateRegistry.clear(); + checkpointStore.recover(sharedStateRegistry); CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint(); @@ -220,13 +231,16 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint ZooKeeperCompletedCheckpointStore zkCheckpointStore1 = createCompletedCheckpoints(numberOfCheckpoints); ZooKeeperCompletedCheckpointStore zkCheckpointStore2 = createCompletedCheckpoints(numberOfCheckpoints); - TestCompletedCheckpoint completedCheckpoint = createCheckpoint(1); + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); + + TestCompletedCheckpoint completedCheckpoint = createCheckpoint(1, sharedStateRegistry); // complete the first checkpoint zkCheckpointStore1.addCheckpoint(completedCheckpoint); // recover the checkpoint by a different checkpoint store - zkCheckpointStore2.recover(); + sharedStateRegistry.clear(); + zkCheckpointStore2.recover(sharedStateRegistry); CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint(); assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint); @@ -237,7 +251,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint // complete another checkpoint --> this should remove the first checkpoint from the store // because the number of retained checkpoints == 1 - TestCompletedCheckpoint completedCheckpoint2 = createCheckpoint(2); + TestCompletedCheckpoint completedCheckpoint2 = createCheckpoint(2, sharedStateRegistry); zkCheckpointStore1.addCheckpoint(completedCheckpoint2); List<CompletedCheckpoint> allCheckpoints = zkCheckpointStore1.getAllCheckpoints(); @@ -251,7 +265,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint // check that we have not discarded the first completed checkpoint assertFalse(recoveredTestCheckpoint.isDiscarded()); - TestCompletedCheckpoint completedCheckpoint3 = createCheckpoint(3); + TestCompletedCheckpoint completedCheckpoint3 = createCheckpoint(3, sharedStateRegistry); // this should release the last lock on completedCheckoint and thus discard it zkCheckpointStore2.addCheckpoint(completedCheckpoint3); http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index 7d22d8e..23cc8c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -27,6 +27,7 @@ import org.apache.curator.utils.EnsurePath; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.util.TestLogger; @@ -158,7 +159,8 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { stateStorage, Executors.directExecutor()); - zooKeeperCompletedCheckpointStore.recover(); + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); + zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry); CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint(); http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java index 2251e46..a0c4412 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java @@ -18,9 +18,10 @@ package org.apache.flink.runtime.testutils; -import org.apache.flink.runtime.checkpoint.AbstractCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.SharedStateRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +33,7 @@ import java.util.List; * A checkpoint store, which supports shutdown and suspend. You can use this to test HA * as long as the factory always returns the same store instance. */ -public class RecoverableCompletedCheckpointStore extends AbstractCompletedCheckpointStore { +public class RecoverableCompletedCheckpointStore implements CompletedCheckpointStore { private static final Logger LOG = LoggerFactory.getLogger(RecoverableCompletedCheckpointStore.class); @@ -41,26 +42,24 @@ public class RecoverableCompletedCheckpointStore extends AbstractCompletedCheckp private final ArrayDeque<CompletedCheckpoint> suspended = new ArrayDeque<>(2); @Override - public void recover() throws Exception { + public void recover(SharedStateRegistry sharedStateRegistry) throws Exception { checkpoints.addAll(suspended); suspended.clear(); for (CompletedCheckpoint checkpoint : checkpoints) { - checkpoint.registerSharedStates(sharedStateRegistry); + checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry); } } @Override public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { - checkpoint.registerSharedStates(sharedStateRegistry); - checkpoints.addLast(checkpoint); if (checkpoints.size() > 1) { CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); - checkpointToSubsume.discardOnSubsume(sharedStateRegistry); + checkpointToSubsume.discardOnSubsume(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index f9af603..33c3454 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -62,6 +62,7 @@ import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; @@ -147,6 +148,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { private static final int Parallelism = 8; private static CountDownLatch CompletedCheckpointsLatch = new CountDownLatch(4); + private static CountDownLatch CompletedCheckpointsLatch2 = new CountDownLatch(6); private static AtomicLongArray RecoveredStates = new AtomicLongArray(Parallelism); @@ -171,8 +173,8 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { final String fileStateBackendPath = FileStateBackendBasePath.getAbsoluteFile().toString(); Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( - zooKeeperQuorum, - fileStateBackendPath); + zooKeeperQuorum, + fileStateBackendPath); config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2); @@ -188,7 +190,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { try { // Test actor system testActorSystem = AkkaUtils.createActorSystem(new Configuration(), - new Some<>(new Tuple2<String, Object>("localhost", 0))); + new Some<>(new Tuple2<String, Object>("localhost", 0))); // The job managers jobManagerProcess[0] = new JobManagerProcess(0, config); @@ -204,7 +206,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { // The task manager taskManagerSystem = AkkaUtils.createActorSystem( - config, Option.apply(new Tuple2<String, Object>("localhost", 0))); + config, Option.apply(new Tuple2<String, Object>("localhost", 0))); TaskManager.startTaskManagerComponentsAndActor( config, ResourceID.generate(), @@ -223,7 +225,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { // Get the leader ref ActorRef leaderRef = AkkaUtils.getActorRef( - leaderAddress, testActorSystem, testDeadline.timeLeft()); + leaderAddress, testActorSystem, testDeadline.timeLeft()); ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId); // Who's the boss? @@ -248,10 +250,10 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { // Wait for the job to be running JobManagerActorTestUtils.waitForJobStatus( - jobGraph.getJobID(), - JobStatus.RUNNING, - leader, - testDeadline.timeLeft()); + jobGraph.getJobID(), + JobStatus.RUNNING, + leader, + testDeadline.timeLeft()); // Remove all files FileUtils.deleteDirectory(FileStateBackendBasePath); @@ -268,7 +270,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { if (output != null) { if (output.contains("Failed to recover job") && - output.contains("java.io.FileNotFoundException")) { + output.contains("java.io.FileNotFoundException")) { success = true; break; @@ -352,10 +354,12 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { try { Configuration config = new Configuration(); + config.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, retainedCheckpoints); config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots); + config.setString(ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, temporaryFolder.newFolder().toString()); String tmpFolderString = temporaryFolder.newFolder().toString(); @@ -372,6 +376,8 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(Parallelism); env.enableCheckpointing(checkpointingInterval); + env.getCheckpointConfig() + .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //TODO parameterize env.setStateBackend(stateBackend); @@ -427,7 +433,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { * A checkpointed source, which emits elements from 0 to a configured number. */ public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long> - implements ListCheckpointed<Tuple2<Long, Integer>> { + implements ListCheckpointed<Tuple2<Long, Integer>> { private static final Logger LOG = LoggerFactory.getLogger(CheckpointedSequenceSource.class); @@ -465,8 +471,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { --repeat; current = 0; } else { - ctx.collect(LastElement); - return; + isRunning = false; } } @@ -475,6 +480,11 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { Thread.sleep(50); } } + + CompletedCheckpointsLatch2.await(); + synchronized (ctx.getCheckpointLock()) { + ctx.collect(LastElement); + } } @Override @@ -563,6 +573,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { public void notifyCheckpointComplete(long checkpointId) throws Exception { LOG.debug("Checkpoint {} completed.", checkpointId); CompletedCheckpointsLatch.countDown(); + CompletedCheckpointsLatch2.countDown(); } }
