[FLINK-5897] [checkpoints] Make checkpoint externalization not depend strictly on FileSystems
That is the first step towards checkpoints that can be externalized to other stores as well, like k/v stores and databases, if supported by the state backend. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b7f21d8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b7f21d8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b7f21d8 Branch: refs/heads/master Commit: 5b7f21d891b410ca0046efdaf12caf5e73deadf4 Parents: 9912de2 Author: Stephan Ewen <[email protected]> Authored: Wed Feb 22 22:18:50 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Tue Feb 28 18:59:10 2017 +0100 ---------------------------------------------------------------------- .../org/apache/flink/util/ExceptionUtils.java | 4 +- .../checkpoint/CheckpointCoordinator.java | 20 ++- .../runtime/checkpoint/CompletedCheckpoint.java | 135 +++++++++++++++---- .../checkpoint/CompletedCheckpointStore.java | 9 ++ .../runtime/checkpoint/PendingCheckpoint.java | 112 ++++++++++----- .../StandaloneCompletedCheckpointStore.java | 4 + .../ZooKeeperCompletedCheckpointStore.java | 5 + .../checkpoint/savepoint/SavepointLoader.java | 19 ++- .../checkpoint/savepoint/SavepointStore.java | 93 ++++++++++--- .../apache/flink/runtime/state/StateUtil.java | 17 +-- .../flink/runtime/jobmanager/JobManager.scala | 14 +- .../CheckpointCoordinatorFailureTest.java | 5 + .../CompletedCheckpointStoreTest.java | 2 +- .../checkpoint/CompletedCheckpointTest.java | 17 ++- .../checkpoint/PendingCheckpointTest.java | 25 ++-- .../jobmanager/JobManagerHARecoveryTest.java | 4 + .../runtime/jobmanager/JobManagerITCase.scala | 2 +- .../JobManagerHACheckpointRecoveryITCase.java | 2 +- 18 files changed, 365 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index fea25ff..7167a0b 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -257,7 +257,7 @@ public final class ExceptionUtils { throw (Error) t; } else { - throw new IOException(t); + throw new IOException(t.getMessage(), t); } } @@ -268,7 +268,7 @@ public final class ExceptionUtils { * @param searchType the type of exception to search for in the chain. * @return True, if the searched type is nested in the throwable, false otherwise. */ - public static boolean containsThrowable(Throwable throwable, Class searchType) { + public static boolean containsThrowable(Throwable throwable, Class<?> searchType) { if (throwable == null || searchType == null) { return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index c1c65b5..6da6f7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.TaskStateHandles; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -758,13 +759,19 @@ public class CheckpointCoordinator { CompletedCheckpoint completedCheckpoint = null; try { - completedCheckpoint = pendingCheckpoint.finalizeCheckpoint(); + // externalize the checkpoint if required + if (pendingCheckpoint.getProps().externalizeCheckpoint()) { + completedCheckpoint = pendingCheckpoint.finalizeCheckpointExternalized(); + } else { + completedCheckpoint = pendingCheckpoint.finalizeCheckpointNonExternalized(); + } completedCheckpointStore.addCheckpoint(completedCheckpoint); rememberRecentCheckpointId(checkpointId); dropSubsumedCheckpoints(checkpointId); - } catch (Exception exception) { + } + catch (Exception exception) { // abort the current pending checkpoint if it has not been discarded yet if (!pendingCheckpoint.isDiscarded()) { pendingCheckpoint.abortError(exception); @@ -779,8 +786,8 @@ public class CheckpointCoordinator { public void run() { try { cc.discard(); - } catch (Exception nestedException) { - LOG.warn("Could not properly discard completed checkpoint {}.", cc.getCheckpointID(), nestedException); + } catch (Throwable t) { + LOG.warn("Could not properly discard completed checkpoint {}.", cc.getCheckpointID(), t); } } }); @@ -808,11 +815,12 @@ public class CheckpointCoordinator { builder.append(", "); } // Remove last two chars ", " - builder.delete(builder.length() - 2, builder.length()); + builder.setLength(builder.length() - 2); LOG.debug(builder.toString()); } + // send the "notify complete" call to all vertices final long timestamp = completedCheckpoint.getTimestamp(); for (ExecutionVertex ev : tasksToCommitTo) { @@ -934,7 +942,7 @@ public class CheckpointCoordinator { latest.getCheckpointID(), latest.getProperties(), restoreTimestamp, - latest.getExternalPath()); + latest.getExternalPointer()); statsTracker.reportRestoredCheckpoint(restored); } http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index db86484..17ce4d5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -18,11 +18,14 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats.DiscardCallback; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,8 +38,36 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state) - * and that is considered completed. + * A CompletedCheckpoint describes a checkpoint after all required tasks acknowledged it (with their state) + * and that is considered successful. The CompletedCheckpoint class contains all the metadata of the + * checkpoint, i.e., checkpoint ID, timestamps, and the handles to all states that are part of the + * checkpoint. + * + * <h2>Size the CompletedCheckpoint Instances</h2> + * + * In most cases, the CompletedCheckpoint objects are very small, because the handles to the checkpoint + * states are only pointers (such as file paths). However, the some state backend implementations may + * choose to store some payload data directly with the metadata (for example to avoid many small files). + * If those thresholds are increased to large values, the memory consumption of the CompletedCheckpoint + * objects can be significant. + * + * <h2>Externalized Metadata</h2> + * + * The metadata of the CompletedCheckpoint is optionally also persisted in an external storage + * system. In that case, the checkpoint is called <i>externalized</i>. + * + * <p>Externalized checkpoints have an external pointer, which points to the metadata. For example + * when externalizing to a file system, that pointer is the file path to the checkpoint's folder + * or the metadata file. For a state backend that stores metadata in database tables, the pointer + * could be the table name and row key. The pointer is encoded as a String. + * + * <h2>Externalized Metadata and High-availability</h2> + * + * For high availability setups, the checkpoint metadata must be stored persistent and available + * as well. The high-availability services that stores the checkpoint ground-truth (meaning what are + * the latest completed checkpoints in what order) often rely on checkpoints being externalized. That + * way, those services only store pointers to the externalized metadata, rather than the complete + * metadata itself (for example ZooKeeper's ZNode payload should ideally be less than megabytes). */ public class CompletedCheckpoint implements Serializable { @@ -44,8 +75,12 @@ public class CompletedCheckpoint implements Serializable { private static final long serialVersionUID = -8360248179615702014L; + // ------------------------------------------------------------------------ + + /** The ID of the job that the checkpoint belongs to */ private final JobID job; + /** The ID (logical timestamp) of the checkpoint */ private final long checkpointID; /** The timestamp when the checkpoint was triggered. */ @@ -60,23 +95,41 @@ public class CompletedCheckpoint implements Serializable { /** Properties for this checkpoint. */ private final CheckpointProperties props; - /** External path if persisted checkpoint; <code>null</code> otherwise. */ - private final String externalPath; + /** The state handle to the externalized meta data, if the metadata has been externalized */ + @Nullable + private final StreamStateHandle externalizedMetadata; + + /** External pointer to the completed checkpoint (for example file path) if externalized; null otherwise. */ + @Nullable + private final String externalPointer; /** Optional stats tracker callback for discard. */ @Nullable - private transient CompletedCheckpointStats.DiscardCallback discardCallback; + private transient volatile DiscardCallback discardCallback; // ------------------------------------------------------------------------ - public CompletedCheckpoint( + @VisibleForTesting + CompletedCheckpoint( JobID job, long checkpointID, long timestamp, long completionTimestamp, Map<JobVertexID, TaskState> taskStates) { - this(job, checkpointID, timestamp, completionTimestamp, taskStates, CheckpointProperties.forStandardCheckpoint(), null); + this(job, checkpointID, timestamp, completionTimestamp, taskStates, + CheckpointProperties.forStandardCheckpoint()); + } + + public CompletedCheckpoint( + JobID job, + long checkpointID, + long timestamp, + long completionTimestamp, + Map<JobVertexID, TaskState> taskStates, + CheckpointProperties props) { + + this(job, checkpointID, timestamp, completionTimestamp, taskStates, props, null, null); } public CompletedCheckpoint( @@ -86,24 +139,27 @@ public class CompletedCheckpoint implements Serializable { long completionTimestamp, Map<JobVertexID, TaskState> taskStates, CheckpointProperties props, - String externalPath) { + @Nullable StreamStateHandle externalizedMetadata, + @Nullable String externalPointer) { checkArgument(checkpointID >= 0); checkArgument(timestamp >= 0); checkArgument(completionTimestamp >= 0); + checkArgument((externalPointer == null) == (externalizedMetadata == null), + "external pointer without externalized metadata must be both null or both non-null"); + + checkArgument(!props.externalizeCheckpoint() || externalPointer != null, + "Checkpoint properties require externalized checkpoint, but checkpoint is not externalized"); + this.job = checkNotNull(job); this.checkpointID = checkpointID; this.timestamp = timestamp; this.duration = completionTimestamp - timestamp; this.taskStates = checkNotNull(taskStates); this.props = checkNotNull(props); - this.externalPath = externalPath; - - if (props.externalizeCheckpoint() && externalPath == null) { - throw new NullPointerException("Checkpoint properties say that the checkpoint " + - "should have been persisted, but missing external path."); - } + this.externalizedMetadata = externalizedMetadata; + this.externalPointer = externalPointer; } // ------------------------------------------------------------------------ @@ -146,10 +202,9 @@ public class CompletedCheckpoint implements Serializable { discard(); return true; } else { - if (externalPath != null) { + if (externalPointer != null) { LOG.info("Persistent checkpoint with ID {} at '{}' not discarded.", - checkpointID, - externalPath); + checkpointID, externalPointer); } return false; @@ -158,14 +213,36 @@ public class CompletedCheckpoint implements Serializable { void discard() throws Exception { try { - if (externalPath != null) { - SavepointStore.removeSavepointFile(externalPath); + // collect exceptions and continue cleanup + Exception exception = null; + + // drop the metadata, if we have some + if (externalizedMetadata != null) { + try { + externalizedMetadata.discardState(); + } + catch (Exception e) { + exception = e; + } } - StateUtil.bestEffortDiscardAllStateObjects(taskStates.values()); - } finally { + // drop the actual state + try { + StateUtil.bestEffortDiscardAllStateObjects(taskStates.values()); + } + catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + throw exception; + } + } + finally { taskStates.clear(); + // to be null-pointer safe, copy reference to stack + DiscardCallback discardCallback = this.discardCallback; if (discardCallback != null) { discardCallback.notifyDiscardedCheckpoint(); } @@ -190,8 +267,18 @@ public class CompletedCheckpoint implements Serializable { return taskStates.get(jobVertexID); } - public String getExternalPath() { - return externalPath; + public boolean isExternalized() { + return externalizedMetadata != null; + } + + @Nullable + public StreamStateHandle getExternalizedMetadata() { + return externalizedMetadata; + } + + @Nullable + public String getExternalPointer() { + return externalPointer; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java index d2c0f6c..e91e038 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -72,4 +72,13 @@ public interface CompletedCheckpointStore { */ int getNumberOfRetainedCheckpoints(); + /** + * This method returns whether the completed checkpoint store requires checkpoints to be + * externalized. Externalized checkpoints have their meta data persisted, which the checkpoint + * store can exploit (for example by simply pointing the persisted metadata). + * + * @return True, if the store requires that checkpoints are externalized before being added, false + * if the store stores the metadata itself. + */ + boolean requiresExternalizedCheckpoints(); } http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 908ff7f..2c392b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; import java.io.IOException; import java.util.HashMap; @@ -28,6 +29,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; @@ -41,7 +44,10 @@ import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +80,7 @@ public class PendingCheckpoint { /** * The checkpoint properties. If the checkpoint should be persisted - * externally, it happens in {@link #finalizeCheckpoint()}. + * externally, it happens in {@link #finalizeCheckpointExternalized()}. */ private final CheckpointProperties props; @@ -203,46 +209,80 @@ public class PendingCheckpoint { return onCompletionPromise; } - public CompletedCheckpoint finalizeCheckpoint() { + public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException { synchronized (lock) { - Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); - - // Persist if required - String externalPath = null; - if (props.externalizeCheckpoint()) { - try { - Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); - externalPath = SavepointStore.storeSavepoint( - targetDirectory, - savepoint - ); - } catch (IOException e) { - LOG.error("Failed to persist checkpoint {}.",checkpointId, e); - } - } + checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); - CompletedCheckpoint completed = new CompletedCheckpoint( - jobId, - checkpointId, - checkpointTimestamp, - System.currentTimeMillis(), - new HashMap<>(taskStates), - props, - externalPath); + // make sure we fulfill the promise with an exception if something fails + try { + // externalize the metadata + final Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); - onCompletionPromise.complete(completed); + // TEMP FIX - The savepoint store is strictly typed to file systems currently + // but the checkpoints think more generic. we need to work with file handles + // here until the savepoint serializer accepts a generic stream factory - if (statsCallback != null) { - // Finalize the statsCallback and give the completed checkpoint a - // callback for discards. - CompletedCheckpointStats.DiscardCallback discardCallback = statsCallback.reportCompletedCheckpoint(externalPath); - completed.setDiscardCallback(discardCallback); + final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint); + final String externalPointer = metadataHandle.getFilePath().getParent().toString(); + + return finalizeInternal(metadataHandle, externalPointer); + } + catch (Throwable t) { + onCompletionPromise.completeExceptionally(t); + ExceptionUtils.rethrowIOException(t); + return null; // silence the compiler + } + } + } + + public CompletedCheckpoint finalizeCheckpointNonExternalized() { + synchronized (lock) { + checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); + + // make sure we fulfill the promise with an exception if something fails + try { + // finalize without external metadata + return finalizeInternal(null, null); } + catch (Throwable t) { + onCompletionPromise.completeExceptionally(t); + ExceptionUtils.rethrow(t); + return null; // silence the compiler + } + } + } - dispose(false); + @GuardedBy("lock") + private CompletedCheckpoint finalizeInternal( + @Nullable StreamStateHandle externalMetadata, + @Nullable String externalPointer) { - return completed; + assert(Thread.holdsLock(lock)); + + CompletedCheckpoint completed = new CompletedCheckpoint( + jobId, + checkpointId, + checkpointTimestamp, + System.currentTimeMillis(), + new HashMap<>(taskStates), + props, + externalMetadata, + externalPointer); + + onCompletionPromise.complete(completed); + + if (statsCallback != null) { + // Finalize the statsCallback and give the completed checkpoint a + // callback for discards. + CompletedCheckpointStats.DiscardCallback discardCallback = + statsCallback.reportCompletedCheckpoint(externalPointer); + completed.setDiscardCallback(discardCallback); } + + // mark this pending checkpoint as disposed, but do NOT drop the state + dispose(false); + + return completed; } /** @@ -411,9 +451,9 @@ public class PendingCheckpoint { public void run() { try { StateUtil.bestEffortDiscardAllStateObjects(taskStates.values()); - } catch (Exception e) { - LOG.warn("Could not properly dispose the pending checkpoint " + - "{} of job {}.", checkpointId, jobId, e); + } catch (Throwable t) { + LOG.warn("Could not properly dispose the pending checkpoint {} of job {}.", + checkpointId, jobId, t); } finally { taskStates.clear(); } http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index 082bca9..a0248b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -96,4 +96,8 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt } } + @Override + public boolean requiresExternalizedCheckpoints() { + return false; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index fdd0d40..4b03cea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -125,6 +125,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto LOG.info("Initialized in '{}'.", checkpointsPath); } + @Override + public boolean requiresExternalizedCheckpoints() { + return true; + } + /** * Gets the latest checkpoint from ZooKeeper and removes all others. * http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java index 950a9a0..60f0287 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java @@ -19,11 +19,13 @@ package org.apache.flink.runtime.checkpoint.savepoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.checkpoint.CheckpointProperties; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskState; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.StreamStateHandle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,22 +48,27 @@ public class SavepointLoader { * @param jobId The JobID of the job to load the savepoint for. * @param tasks Tasks that will possibly be reset * @param savepointPath The path of the savepoint to rollback to - * @param userClassLoader The user code classloader + * @param classLoader The class loader to resolve serialized classes in legacy savepoint versions. * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped * to any job vertex in tasks. * * @throws IllegalStateException If mismatch between program and savepoint state - * @throws Exception If savepoint store failure + * @throws IOException If savepoint store failure */ public static CompletedCheckpoint loadAndValidateSavepoint( JobID jobId, Map<JobVertexID, ExecutionJobVertex> tasks, String savepointPath, - ClassLoader userClassLoader, + ClassLoader classLoader, boolean allowNonRestoredState) throws IOException { // (1) load the savepoint - Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath, userClassLoader); + final Tuple2<Savepoint, StreamStateHandle> savepointAndHandle = + SavepointStore.loadSavepointWithHandle(savepointPath, classLoader); + + final Savepoint savepoint = savepointAndHandle.f0; + final StreamStateHandle metadataHandle = savepointAndHandle.f1; + final Map<JobVertexID, TaskState> taskStates = new HashMap<>(savepoint.getTaskStates().size()); boolean expandedToLegacyIds = false; @@ -114,10 +121,12 @@ public class SavepointLoader { // (3) convert to checkpoint so the system can fall back to it CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); - return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L, taskStates, props, savepointPath); + return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L, + taskStates, props, metadataHandle, savepointPath); } // ------------------------------------------------------------------------ + /** This class is not meant to be instantiated */ private SavepointLoader() {} } http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java index 95370a5..5c8ac6b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java @@ -19,12 +19,15 @@ package org.apache.flink.runtime.checkpoint.savepoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; @@ -118,6 +121,28 @@ public class SavepointStore { * @throws IOException Failures during store are forwarded */ public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException { + // write and create the file handle + FileStateHandle metadataFileHandle = storeSavepointToHandle(directory, savepoint); + + // we return the savepoint directory path here! + // The directory path also works to resume from and is more elegant than the direct + // metadata file pointer + return metadataFileHandle.getFilePath().getParent().toString(); + } + + /** + * Stores the savepoint metadata file to a state handle. + * + * @param directory Target directory to store savepoint in + * @param savepoint Savepoint to be stored + * + * @return State handle to the checkpoint metadata + * @throws IOException Failures during store are forwarded + */ + public static <T extends Savepoint> FileStateHandle storeSavepointToHandle( + String directory, + T savepoint) throws IOException { + checkNotNull(directory, "Target directory"); checkNotNull(savepoint, "Savepoint"); @@ -127,10 +152,9 @@ public class SavepointStore { final FileSystem fs = FileSystem.get(basePath.toUri()); boolean success = false; - try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE); + try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE); DataOutputStream dos = new DataOutputStream(fdos)) { - // Write header dos.writeInt(MAGIC_NUMBER); dos.writeInt(savepoint.getVersion()); @@ -138,7 +162,13 @@ public class SavepointStore { // Write savepoint SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint); serializer.serialize(savepoint, dos); + + // construct result handle + FileStateHandle handle = new FileStateHandle(metadataFilePath, dos.size()); + + // all good! success = true; + return handle; } finally { if (!success && fs.exists(metadataFilePath)) { @@ -147,22 +177,37 @@ public class SavepointStore { } } } - - // we return the savepoint directory path here! - // The directory path also works to resume from and is more elegant than the direct - // metadata file pointer - return basePath.toString(); } /** * Loads the savepoint at the specified path. * * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file. + * @param classLoader The class loader used to resolve serialized classes from legacy savepoint formats. * @return The loaded savepoint + * * @throws IOException Failures during load are forwarded */ - public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader userClassLoader) throws IOException { - Preconditions.checkNotNull(savepointFileOrDirectory, "Path"); + public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader classLoader) throws IOException { + return loadSavepointWithHandle(savepointFileOrDirectory, classLoader).f0; + } + + /** + * Loads the savepoint at the specified path. This methods returns the savepoint, as well as the + * handle to the metadata. + * + * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file. + * @param classLoader The class loader used to resolve serialized classes from legacy savepoint formats. + * @return The loaded savepoint + * + * @throws IOException Failures during load are forwarded + */ + public static Tuple2<Savepoint, StreamStateHandle> loadSavepointWithHandle( + String savepointFileOrDirectory, + ClassLoader classLoader) throws IOException { + + checkNotNull(savepointFileOrDirectory, "savepointFileOrDirectory"); + checkNotNull(classLoader, "classLoader"); Path path = new Path(savepointFileOrDirectory); @@ -180,11 +225,13 @@ public class SavepointStore { LOG.info("Using savepoint file in {}", path); } else { throw new IOException("Cannot find meta data file in directory " + path - + ". Please try to load the savepoint directly from the meta data file " - + "instead of the directory."); + + ". Please try to load the savepoint directly from the meta data file " + + "instead of the directory."); } } + // load the savepoint + final Savepoint savepoint; try (DataInputStream dis = new DataInputViewStreamWrapper(fs.open(path))) { int magicNumber = dis.readInt(); @@ -192,15 +239,27 @@ public class SavepointStore { int version = dis.readInt(); SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version); - return serializer.deserialize(dis, userClassLoader); + savepoint = serializer.deserialize(dis, classLoader); } else { - throw new RuntimeException("Unexpected magic number. This is most likely " + - "caused by trying to load a Flink 1.0 savepoint. You cannot load a " + - "savepoint triggered by Flink 1.0 with this version of Flink. If it is " + - "_not_ a Flink 1.0 savepoint, this error indicates that the specified " + - "file is not a proper savepoint or the file has been corrupted."); + throw new RuntimeException("Unexpected magic number. This can have multiple reasons: " + + "(1) You are trying to load a Flink 1.0 savepoint, which is not supported by this " + + "version of Flink. (2) The file you were pointing to is not a savepoint at all. " + + "(3) The savepoint file has been corrupted."); } } + + // construct the stream handle to the metadata file + // we get the size best-effort + long size = 0; + try { + size = fs.getFileStatus(path).getLen(); + } + catch (Exception ignored) { + // we don't know the size, but we don't want to fail the savepoint loading for that + } + StreamStateHandle metadataHandle = new FileStateHandle(path, size); + + return new Tuple2<>(savepoint, metadataHandle); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java index c6f5c86..b250831 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FutureUtil; import java.util.concurrent.RunnableFuture; @@ -42,26 +43,22 @@ public class StateUtil { Iterable<? extends StateObject> handlesToDiscard) throws Exception { if (handlesToDiscard != null) { - - Exception suppressedExceptions = null; + Exception exception = null; for (StateObject state : handlesToDiscard) { if (state != null) { try { state.discardState(); - } catch (Exception ex) { - //best effort to still cleanup other states and deliver exceptions in the end - if (suppressedExceptions == null) { - suppressedExceptions = new Exception(ex); - } - suppressedExceptions.addSuppressed(ex); + } + catch (Exception ex) { + exception = ExceptionUtils.firstOrSuppressed(ex, exception); } } } - if (suppressedExceptions != null) { - throw suppressedExceptions; + if (exception != null) { + throw exception; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 21749cb..87cd4ac 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.jobmanager -import java.io.{File, IOException} +import java.io.IOException import java.net._ import java.util.UUID import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _} @@ -50,7 +50,7 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.executiongraph._ import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager} -import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID} +import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus} import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway @@ -77,7 +77,7 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util._ import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} -import org.apache.flink.runtime.{FlinkActor, JobException, LeaderSessionMessageFilter, LogMessages} +import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils} import org.jboss.netty.channel.ChannelException @@ -611,7 +611,7 @@ class JobManager( new BiFunction[CompletedCheckpoint, Throwable, Void] { override def apply(success: CompletedCheckpoint, cause: Throwable): Void = { if (success != null) { - val path = success.getExternalPath() + val path = success.getExternalPointer() log.info(s"Savepoint stored in $path. Now cancelling $jobId.") executionGraph.cancel() senderRef ! decorateMessage(CancellationSuccess(jobId, path)) @@ -787,11 +787,11 @@ class JobManager( new BiFunction[CompletedCheckpoint, Throwable, Void] { override def apply(success: CompletedCheckpoint, cause: Throwable): Void = { if (success != null) { - if (success.getExternalPath != null) { + if (success.getExternalPointer != null) { senderRef ! TriggerSavepointSuccess( jobId, success.getCheckpointID, - success.getExternalPath, + success.getExternalPointer, success.getTimestamp ) } else { @@ -1784,7 +1784,7 @@ class JobManager( case t: Throwable => log.error(s"Could not properly unregister job $jobID form the library cache.", t) } - jobManagerMetricGroup.map(_.removeJob(jobID)) + jobManagerMetricGroup.foreach(_.removeJob(jobID)) futureOption } http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index d4c3a2d..9517257 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -134,5 +134,10 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { public int getNumberOfRetainedCheckpoints() { return -1; } + + @Override + public boolean requiresExternalizedCheckpoints() { + return false; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java index 725b85f..f77c755 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java @@ -237,7 +237,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { Map<JobVertexID, TaskState> taskGroupStates, CheckpointProperties props) { - super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, props, null); + super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, props); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index 0d933ff..b34e9a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -19,9 +19,11 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -55,7 +57,9 @@ public class CompletedCheckpointTest { // Verify discard call is forwarded to state CompletedCheckpoint checkpoint = new CompletedCheckpoint( - new JobID(), 0, 0, 1, taskStates, CheckpointProperties.forStandardCheckpoint(), file.getAbsolutePath()); + new JobID(), 0, 0, 1, taskStates, CheckpointProperties.forStandardCheckpoint(), + new FileStateHandle(new Path(file.toURI()), file.length()), + file.getAbsolutePath()); checkpoint.discard(JobStatus.FAILED); @@ -74,7 +78,7 @@ public class CompletedCheckpointTest { boolean discardSubsumed = true; CheckpointProperties props = new CheckpointProperties(false, false, discardSubsumed, true, true, true, true); CompletedCheckpoint checkpoint = new CompletedCheckpoint( - new JobID(), 0, 0, 1, taskStates, props, null); + new JobID(), 0, 0, 1, taskStates, props); // Subsume checkpoint.subsume(); @@ -104,7 +108,9 @@ public class CompletedCheckpointTest { // Keep CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false); CompletedCheckpoint checkpoint = new CompletedCheckpoint( - new JobID(), 0, 0, 1, new HashMap<>(taskStates), props, externalPath); + new JobID(), 0, 0, 1, new HashMap<>(taskStates), props, + new FileStateHandle(new Path(file.toURI()), file.length()), + externalPath); checkpoint.discard(status); verify(state, times(0)).discardState(); @@ -113,7 +119,7 @@ public class CompletedCheckpointTest { // Discard props = new CheckpointProperties(false, false, true, true, true, true, true); checkpoint = new CompletedCheckpoint( - new JobID(), 0, 0, 1, new HashMap<>(taskStates), props, null); + new JobID(), 0, 0, 1, new HashMap<>(taskStates), props); checkpoint.discard(status); verify(state, times(1)).discardState(); @@ -135,8 +141,7 @@ public class CompletedCheckpointTest { 0, 1, new HashMap<>(taskStates), - CheckpointProperties.forStandardCheckpoint(), - null); + CheckpointProperties.forStandardCheckpoint()); CompletedCheckpointStats.DiscardCallback callback = mock(CompletedCheckpointStats.DiscardCallback.class); completed.setDiscardCallback(callback); http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 3a85c4c..6f04f39 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -24,9 +24,11 @@ import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; + import org.mockito.Mockito; import java.io.File; @@ -49,9 +51,6 @@ import static org.mockito.Mockito.verify; public class PendingCheckpointTest { - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap<>(); private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID(); @@ -59,6 +58,9 @@ public class PendingCheckpointTest { ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class)); } + @Rule + public final TemporaryFolder tmpFolder = new TemporaryFolder(); + /** * Tests that pending checkpoints can be subsumed iff they are forced. */ @@ -96,7 +98,7 @@ public class PendingCheckpointTest { PendingCheckpoint pending = createPendingCheckpoint(persisted, tmp.getAbsolutePath()); pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); assertEquals(0, tmp.listFiles().length); - pending.finalizeCheckpoint(); + pending.finalizeCheckpointExternalized(); assertEquals(1, tmp.listFiles().length); // Ephemeral checkpoint @@ -105,7 +107,7 @@ public class PendingCheckpointTest { pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); assertEquals(1, tmp.listFiles().length); - pending.finalizeCheckpoint(); + pending.finalizeCheckpointNonExternalized(); assertEquals(1, tmp.listFiles().length); } @@ -148,7 +150,8 @@ public class PendingCheckpointTest { assertFalse(future.isDone()); pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); - pending.finalizeCheckpoint(); + assertTrue(pending.isFullyAcknowledged()); + pending.finalizeCheckpointExternalized(); assertTrue(future.isDone()); // Finalize (missing ACKs) @@ -157,7 +160,13 @@ public class PendingCheckpointTest { assertFalse(future.isDone()); try { - pending.finalizeCheckpoint(); + pending.finalizeCheckpointNonExternalized(); + fail("Did not throw expected Exception"); + } catch (IllegalStateException ignored) { + // Expected + } + try { + pending.finalizeCheckpointExternalized(); fail("Did not throw expected Exception"); } catch (IllegalStateException ignored) { // Expected @@ -233,7 +242,7 @@ public class PendingCheckpointTest { pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); verify(callback, times(1)).reportSubtaskStats(any(JobVertexID.class), any(SubtaskStateStats.class)); - pending.finalizeCheckpoint(); + pending.finalizeCheckpointNonExternalized(); verify(callback, times(1)).reportCompletedCheckpoint(any(String.class)); } http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 5a38be2..cbb077c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -483,6 +483,10 @@ public class JobManagerHARecoveryTest { return checkpoints.size(); } + @Override + public boolean requiresExternalizedCheckpoints() { + return false; + } } static class MyCheckpointRecoveryFactory implements CheckpointRecoveryFactory { http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index 60b12d2..75f1fd4 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -979,7 +979,7 @@ class JobManagerITCase(_system: ActorSystem) jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), Option.apply("any")), testActor) val checkpoint = Mockito.mock(classOf[CompletedCheckpoint]) - when(checkpoint.getExternalPath).thenReturn("Expected test savepoint path") + when(checkpoint.getExternalPointer).thenReturn("Expected test savepoint path") // Succeed the promise savepointPromise.complete(checkpoint) http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index 60a3a62..f910e49 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -167,7 +167,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism); ActorSystem testSystem = null; - JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2]; + final JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2]; LeaderRetrievalService leaderRetrievalService = null; ActorSystem taskManagerSystem = null;
