[FLINK-6390] [checkpoints] Add API for checkpoints that are triggered via external systems
This includes - A interface for hooks that are called by the checkpoint coordinator to trigger/restore a checkpoint - A source extension that triggers the operator checkpoints and barrier injection on certain events Because this changes the checkpoint metadata format, the commit introduces a new metadata format version. This closes #3782 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90ca4381 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90ca4381 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90ca4381 Branch: refs/heads/master Commit: 90ca438106e63c5032ee2ad27e54e9f573eac386 Parents: 6bdaf1e Author: Stephan Ewen <[email protected]> Authored: Mon Mar 27 17:20:47 2017 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Apr 26 21:56:45 2017 +0200 ---------------------------------------------------------------------- .../core/io/SimpleVersionedSerializer.java | 80 ++++ .../java/org/apache/flink/util/StringUtils.java | 7 + .../checkpoint/savepoint/SavepointV0.java | 10 + .../savepoint/SavepointV0Serializer.java | 12 +- .../checkpoint/CheckpointCoordinator.java | 80 +++- .../checkpoint/CheckpointDeclineReason.java | 4 +- .../runtime/checkpoint/CompletedCheckpoint.java | 31 +- .../flink/runtime/checkpoint/MasterState.java | 62 +++ .../checkpoint/MasterTriggerRestoreHook.java | 140 ++++++ .../runtime/checkpoint/PendingCheckpoint.java | 23 +- .../runtime/checkpoint/hooks/MasterHooks.java | 273 +++++++++++ .../runtime/checkpoint/savepoint/Savepoint.java | 6 + .../checkpoint/savepoint/SavepointLoader.java | 3 +- .../savepoint/SavepointSerializers.java | 7 +- .../checkpoint/savepoint/SavepointV1.java | 36 +- .../savepoint/SavepointV1Serializer.java | 76 +-- .../checkpoint/savepoint/SavepointV2.java | 91 ++++ .../savepoint/SavepointV2Serializer.java | 468 +++++++++++++++++++ .../runtime/executiongraph/ExecutionGraph.java | 9 + .../executiongraph/ExecutionGraphBuilder.java | 18 + .../tasks/JobCheckpointingSettings.java | 48 +- .../CheckpointCoordinatorMasterHooksTest.java | 421 +++++++++++++++++ .../CompletedCheckpointStoreTest.java | 2 +- .../checkpoint/CompletedCheckpointTest.java | 24 +- ...ExecutionGraphCheckpointCoordinatorTest.java | 1 + .../savepoint/CheckpointTestUtils.java | 184 ++++++++ .../savepoint/SavepointLoaderTest.java | 2 +- .../savepoint/SavepointStoreTest.java | 27 +- .../savepoint/SavepointV1SerializerTest.java | 17 +- .../checkpoint/savepoint/SavepointV1Test.java | 157 ------- .../savepoint/SavepointV2SerializerTest.java | 148 ++++++ .../checkpoint/savepoint/SavepointV2Test.java | 68 +++ .../ArchivedExecutionGraphTest.java | 2 + .../api/checkpoint/ExternallyInducedSource.java | 75 +++ .../checkpoint/WithMasterCheckpointHook.java | 38 ++ .../FunctionMasterCheckpointHookFactory.java | 45 ++ .../api/graph/StreamingJobGraphGenerator.java | 28 +- .../runtime/tasks/SourceStreamTask.java | 56 +++ .../WithMasterCheckpointHookConfigTest.java | 189 ++++++++ .../runtime/io/StreamRecordWriterTest.java | 5 - .../SourceExternalCheckpointTriggerTest.java | 171 +++++++ .../runtime/tasks/StreamTaskTestHarness.java | 7 +- .../test/checkpointing/SavepointITCase.java | 4 +- 43 files changed, 2874 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java new file mode 100644 index 0000000..6c061a5 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.io; + +import java.io.IOException; + +/** + * A simple serializer interface for versioned serialization. + * + * <p>The serializer has a version (returned by {@link #getVersion()}) which can be attached + * to the serialized data. When the serializer evolves, the version can be used to identify + * with which prior version the data was serialized. + * + * <pre>{@code + * MyType someObject = ...; + * SimpleVersionedSerializer<MyType> serializer = ...; + * + * byte[] serializedData = serializer.serialize(someObject); + * int version = serializer.getVersion(); + * + * MyType deserialized = serializer.deserialize(version, serializedData); + * + * byte[] someOldData = ...; + * int oldVersion = ...; + * MyType deserializedOldObject = serializer.deserialize(oldVersion, someOldData); + * + * }</pre> + * + * @param <E> The data type serialized / deserialized by this serializer. + */ +public interface SimpleVersionedSerializer<E> extends Versioned { + + /** + * Gets the version with which this serializer serializes. + * + * @return The version of the serialization schema. + */ + @Override + int getVersion(); + + /** + * Serializes the given object. The serialization is assumed to correspond to the + * current serialization version (as returned by {@link #getVersion()}. + * + * + * @param obj The object to serialize. + * @return The serialized data (bytes). + * + * @throws IOException Thrown, if the serialization fails. + */ + byte[] serialize(E obj) throws IOException; + + /** + * De-serializes the given data (bytes) which was serialized with the scheme of the + * indicated version. + * + * @param version The version in which the data was serialized + * @param serialized The serialized data + * @return The deserialized object + * + * @throws IOException Thrown, if the deserialization fails. + */ + E deserialize(int version, byte[] serialized) throws IOException; +} http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-core/src/main/java/org/apache/flink/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java index b84f602..abd6ba6 100644 --- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java @@ -309,6 +309,13 @@ public final class StringUtils { } } + /** + * Checks if the string is null, empty, or contains only whitespace characters. + * A whitespace character is defined via {@link Character#isWhitespace(char)}. + * + * @param str The string to check + * @return True, if the string is null or blank, false otherwise. + */ public static boolean isNullOrWhitespaceOnly(String str) { if (str == null || str.length() == 0) { return true; http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java index 1c51a69..f3ec1cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java @@ -19,6 +19,7 @@ package org.apache.flink.migration.runtime.checkpoint.savepoint; import org.apache.flink.migration.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; import org.apache.flink.util.Preconditions; @@ -58,6 +59,15 @@ public class SavepointV0 implements Savepoint { @Override public Collection<org.apache.flink.runtime.checkpoint.TaskState> getTaskStates() { + // since checkpoints are never deserialized into this format, + // this method should never be called + throw new UnsupportedOperationException(); + } + + @Override + public Collection<MasterState> getMasterStates() { + // since checkpoints are never deserialized into this format, + // this method should never be called throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java index 4739033..d285906 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java @@ -34,7 +34,7 @@ import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState; import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList; import org.apache.flink.migration.util.SerializedValue; import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -68,7 +68,7 @@ import java.util.Map; * don't rely on any involved Java classes to stay the same. */ @SuppressWarnings("deprecation") -public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> { +public class SavepointV0Serializer implements SavepointSerializer<SavepointV2> { public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer(); private static final StreamStateHandle SIGNAL_0 = new ByteStreamStateHandle("SIGNAL_0", new byte[]{0}); @@ -81,12 +81,12 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> { @Override - public void serialize(SavepointV1 savepoint, DataOutputStream dos) throws IOException { + public void serialize(SavepointV2 savepoint, DataOutputStream dos) throws IOException { throw new UnsupportedOperationException("This serializer is read-only and only exists for backwards compatibility"); } @Override - public SavepointV1 deserialize(DataInputStream dis, ClassLoader userClassLoader) throws IOException { + public SavepointV2 deserialize(DataInputStream dis, ClassLoader userClassLoader) throws IOException { long checkpointId = dis.readLong(); @@ -165,7 +165,7 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> { return serializedValue; } - private SavepointV1 convertSavepoint( + private SavepointV2 convertSavepoint( List<TaskState> taskStates, ClassLoader userClassLoader, long checkpointID) throws Exception { @@ -176,7 +176,7 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> { newTaskStates.add(convertTaskState(taskState, userClassLoader, checkpointID)); } - return new SavepointV1(checkpointID, newTaskStates); + return new SavepointV2(checkpointID, newTaskStates); } private org.apache.flink.runtime.checkpoint.TaskState convertTaskState( http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 256321e..23a38d4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -20,7 +20,9 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader; import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; import org.apache.flink.runtime.concurrent.ApplyFunction; @@ -39,7 +41,10 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,6 +143,9 @@ public class CheckpointCoordinator { /** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */ private final ScheduledThreadPoolExecutor timer; + /** The master checkpoint hooks executed by this checkpoint coordinator */ + private final HashMap<String, MasterTriggerRestoreHook<?>> masterHooks; + /** Actor that receives status updates from the execution graph this coordinator works for */ private JobStatusListener jobStatusListener; @@ -220,6 +228,7 @@ public class CheckpointCoordinator { this.executor = checkNotNull(executor); this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); + this.masterHooks = new HashMap<>(); this.timer = new ScheduledThreadPoolExecutor(1, new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer")); @@ -245,6 +254,45 @@ public class CheckpointCoordinator { } } + // -------------------------------------------------------------------------------------------- + // Configuration + // -------------------------------------------------------------------------------------------- + + /** + * Adds the given master hook to the checkpoint coordinator. This method does nothing, if + * the checkpoint coordinator already contained a hook with the same ID (as defined via + * {@link MasterTriggerRestoreHook#getIdentifier()}). + * + * @param hook The hook to add. + * @return True, if the hook was added, false if the checkpoint coordinator already + * contained a hook with the same ID. + */ + public boolean addMasterHook(MasterTriggerRestoreHook<?> hook) { + checkNotNull(hook); + + final String id = hook.getIdentifier(); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(id), "The hook has a null or empty id"); + + synchronized (lock) { + if (!masterHooks.containsKey(id)) { + masterHooks.put(id, hook); + return true; + } + else { + return false; + } + } + } + + /** + * Gets the number of currently register master hooks. + */ + public int getNumberOfRegisteredMasterHooks() { + synchronized (lock) { + return masterHooks.size(); + } + } + /** * Sets the checkpoint stats tracker. * @@ -492,6 +540,20 @@ public class CheckpointCoordinator { checkpoint.setStatsCallback(callback); } + // trigger the master hooks for the checkpoint + try { + List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(), + checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout)); + + for (MasterState s : masterStates) { + checkpoint.addMasterState(s); + } + } + catch (FlinkException e) { + checkpoint.abortError(e); + return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); + } + // schedule the timer that will clean up the expired checkpoints final Runnable canceller = new Runnable() { @Override @@ -962,13 +1024,25 @@ public class CheckpointCoordinator { LOG.info("Restoring from latest valid checkpoint: {}.", latest); + // re-assign the task states + final Map<JobVertexID, TaskState> taskStates = latest.getTaskStates(); StateAssignmentOperation stateAssignmentOperation = new StateAssignmentOperation(LOG, tasks, taskStates, allowNonRestoredState); - stateAssignmentOperation.assignStates(); + // call master hooks for restore + + MasterHooks.restoreMasterHooks( + masterHooks, + latest.getMasterHookStates(), + latest.getCheckpointID(), + allowNonRestoredState, + LOG); + + // update metrics + if (statsTracker != null) { long restoreTimestamp = System.currentTimeMillis(); RestoredCheckpointStats restored = new RestoredCheckpointStats( @@ -1022,9 +1096,9 @@ public class CheckpointCoordinator { return restoreLatestCheckpointedState(tasks, true, allowNonRestored); } - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ // Accessors - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ public int getNumberOfPendingCheckpoints() { return this.pendingCheckpoints.size(); http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java index 60fe657..41c50cc0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java @@ -36,7 +36,9 @@ public enum CheckpointDeclineReason { NOT_ALL_REQUIRED_TASKS_RUNNING("Not all required tasks are currently running."), - EXCEPTION("An Exception occurred while triggering the checkpoint."); + EXCEPTION("An Exception occurred while triggering the checkpoint."), + + EXPIRED("The checkpoint expired before triggering was complete"); // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 79fc31f..bb49b45 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -27,11 +27,16 @@ import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import static org.apache.flink.util.Preconditions.checkArgument; @@ -90,11 +95,14 @@ public class CompletedCheckpoint implements Serializable { private final long duration; /** States of the different task groups belonging to this checkpoint */ - private final Map<JobVertexID, TaskState> taskStates; + private final HashMap<JobVertexID, TaskState> taskStates; /** Properties for this checkpoint. */ private final CheckpointProperties props; + /** States that were created by a hook on the master (in the checkpoint coordinator) */ + private final Collection<MasterState> masterHookStates; + /** The state handle to the externalized meta data, if the metadata has been externalized */ @Nullable private final StreamStateHandle externalizedMetadata; @@ -118,6 +126,7 @@ public class CompletedCheckpoint implements Serializable { Map<JobVertexID, TaskState> taskStates) { this(job, checkpointID, timestamp, completionTimestamp, taskStates, + Collections.<MasterState>emptyList(), CheckpointProperties.forStandardCheckpoint()); } @@ -127,9 +136,11 @@ public class CompletedCheckpoint implements Serializable { long timestamp, long completionTimestamp, Map<JobVertexID, TaskState> taskStates, + @Nullable Collection<MasterState> masterHookStates, CheckpointProperties props) { - this(job, checkpointID, timestamp, completionTimestamp, taskStates, props, null, null); + this(job, checkpointID, timestamp, completionTimestamp, taskStates, + masterHookStates, props, null, null); } public CompletedCheckpoint( @@ -138,6 +149,7 @@ public class CompletedCheckpoint implements Serializable { long timestamp, long completionTimestamp, Map<JobVertexID, TaskState> taskStates, + @Nullable Collection<MasterState> masterHookStates, CheckpointProperties props, @Nullable StreamStateHandle externalizedMetadata, @Nullable String externalPointer) { @@ -156,7 +168,14 @@ public class CompletedCheckpoint implements Serializable { this.checkpointID = checkpointID; this.timestamp = timestamp; this.duration = completionTimestamp - timestamp; - this.taskStates = checkNotNull(taskStates); + + // we create copies here, to make sure we have no shared mutable + // data structure with the "outside world" + this.taskStates = new HashMap<>(checkNotNull(taskStates)); + this.masterHookStates = masterHookStates == null || masterHookStates.isEmpty() ? + Collections.<MasterState>emptyList() : + new ArrayList<>(masterHookStates); + this.props = checkNotNull(props); this.externalizedMetadata = externalizedMetadata; this.externalPointer = externalPointer; @@ -228,13 +247,17 @@ public class CompletedCheckpoint implements Serializable { } public Map<JobVertexID, TaskState> getTaskStates() { - return taskStates; + return Collections.unmodifiableMap(taskStates); } public TaskState getTaskState(JobVertexID jobVertexID) { return taskStates.get(jobVertexID); } + public Collection<MasterState> getMasterHookStates() { + return Collections.unmodifiableCollection(masterHookStates); + } + public boolean isExternalized() { return externalizedMetadata != null; } http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java new file mode 100644 index 0000000..2d09fdb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import java.util.Arrays; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Simple encapsulation of state generated by checkpoint coordinator. + */ +public class MasterState implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + private final String name; + private final byte[] bytes; + private final int version; + + public MasterState(String name, byte[] bytes, int version) { + this.name = checkNotNull(name); + this.bytes = checkNotNull(bytes); + this.version = version; + } + + // ------------------------------------------------------------------------ + + public String name() { + return name; + } + + public byte[] bytes() { + return bytes; + } + + public int version() { + return version; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "name: " + name + " ; version: " + version + " ; bytes: " + Arrays.toString(bytes); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java new file mode 100644 index 0000000..e77ed57 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.concurrent.Future; + +import javax.annotation.Nullable; +import java.util.concurrent.Executor; + +/** + * The interface for hooks that can be called by the checkpoint coordinator when triggering or + * restoring a checkpoint. Such a hook is useful for example when preparing external systems for + * taking or restoring checkpoints. + * + * <p>The {@link #triggerCheckpoint(long, long, Executor)} method (called when triggering a checkpoint) + * can return a result (via a future) that will be stored as part of the checkpoint metadata. + * When restoring a checkpoint, that stored result will be given to the {@link #restoreCheckpoint(long, Object)} + * method. The hook's {@link #getIdentifier() identifier} is used to map data to hook in the presence + * of multiple hooks, and when resuming a savepoint that was potentially created by a different job. + * The identifier has a similar role as for example the operator UID in the streaming API. + * + * <p>The MasterTriggerRestoreHook is defined when creating the streaming dataflow graph. It is attached + * to the job graph, which gets sent to the cluster for execution. To avoid having to make the hook + * itself serializable, these hooks are attached to the job graph via a {@link MasterTriggerRestoreHook.Factory}. + * + * @param <T> The type of the data produced by the hook and stored as part of the checkpoint metadata. + * If the hook never stores any data, this can be typed to {@code Void}. + */ +public interface MasterTriggerRestoreHook<T> { + + /** + * Gets the identifier of this hook. The identifier is used to identify a specific hook in the + * presence of multiple hooks and to give it the correct checkpointed data upon checkpoint restoration. + * + * <p>The identifier should be unique between different hooks of a job, but deterministic/constant + * so that upon resuming a savepoint, the hook will get the correct data. + * For example, if the hook calls into another storage system and persists namespace/schema specific + * information, then the name of the storage system, together with the namespace/schema name could + * be an appropriate identifier. + * + * <p>When multiple hooks of the same name are created and attached to a job graph, only the first + * one is actually used. This can be exploited to deduplicate hooks that would do the same thing. + * + * @return The identifier of the hook. + */ + String getIdentifier(); + + /** + * This method is called by the checkpoint coordinator prior when triggering a checkpoint, prior + * to sending the "trigger checkpoint" messages to the source tasks. + * + * <p>If the hook implementation wants to store data as part of the checkpoint, it may return + * that data via a future, otherwise it should return null. The data is stored as part of + * the checkpoint metadata under the hooks identifier (see {@link #getIdentifier()}). + * + * <p>If the action by this hook needs to be executed synchronously, then this method should + * directly execute the action synchronously and block until it is complete. The returned future + * (if any) would typically be a completed future. + * + * <p>If the action should be executed asynchronously and only needs to complete before the + * checkpoint is considered completed, then the method may use the given executor to execute the + * actual action and would signal its completion by completing the future. For hooks that do not + * need to store data, the future would be completed with null. + * + * @param checkpointId The ID (logical timestamp, monotonously increasing) of the checkpoint + * @param timestamp The wall clock timestamp when the checkpoint was triggered, for + * info/logging purposes. + * @param executor The executor for asynchronous actions + * + * @return Optionally, a future that signals when the hook has completed and that contains + * data to be stored with the checkpoint. + * + * @throws Exception Exceptions encountered when calling the hook will cause the checkpoint to abort. + */ + @Nullable + Future<T> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception; + + /** + * This method is called by the checkpoint coordinator prior to restoring the state of a checkpoint. + * If the checkpoint did store data from this hook, that data will be passed to this method. + * + * @param checkpointId The The ID (logical timestamp) of the restored checkpoint + * @param checkpointData The data originally stored in the checkpoint by this hook, possibly null. + * + * @throws Exception Exceptions thrown while restoring the checkpoint will cause the restore + * operation to fail and to possibly fall back to another checkpoint. + */ + void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) throws Exception; + + /** + * Creates a the serializer to (de)serializes the data stored by this hook. The serializer + * serializes the result of the Future returned by the {@link #triggerCheckpoint(long, long, Executor)} + * method, and deserializes the data stored in the checkpoint into the object passed to the + * {@link #restoreCheckpoint(long, Object)} method. + * + * <p>If the hook never returns any data to be stored, then this method may return null as the + * serializer. + * + * @return The serializer to (de)serializes the data stored by this hook + */ + @Nullable + SimpleVersionedSerializer<T> createCheckpointDataSerializer(); + + // ------------------------------------------------------------------------ + // factory + // ------------------------------------------------------------------------ + + /** + * A factory to instantiate a {@code MasterTriggerRestoreHook}. + * + * The hooks are defined when creating the streaming dataflow graph and are attached + * to the job graph, which gets sent to the cluster for execution. To avoid having to make + * the hook implementation serializable, a serializable hook factory is actually attached to the + * job graph instead of the hook implementation itself. + */ + interface Factory extends java.io.Serializable { + + /** + * Instantiates the {@code MasterTriggerRestoreHook}. + */ + <V> MasterTriggerRestoreHook<V> create(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 900331b..ce97edc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -41,8 +41,10 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; @@ -89,6 +91,8 @@ public class PendingCheckpoint { private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks; + private final List<MasterState> masterState; + /** Set of acknowledged tasks */ private final Set<ExecutionAttemptID> acknowledgedTasks; @@ -143,6 +147,7 @@ public class PendingCheckpoint { this.executor = Preconditions.checkNotNull(executor); this.taskStates = new HashMap<>(); + this.masterState = new ArrayList<>(); this.acknowledgedTasks = new HashSet<>(verticesToConfirm.size()); this.onCompletionPromise = new FlinkCompletableFuture<>(); } @@ -256,7 +261,7 @@ public class PendingCheckpoint { // make sure we fulfill the promise with an exception if something fails try { // externalize the metadata - final Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); + final Savepoint savepoint = new SavepointV2(checkpointId, taskStates.values()); // TEMP FIX - The savepoint store is strictly typed to file systems currently // but the checkpoints think more generic. we need to work with file handles @@ -321,7 +326,8 @@ public class PendingCheckpoint { checkpointId, checkpointTimestamp, System.currentTimeMillis(), - new HashMap<>(taskStates), + taskStates, + masterState, props, externalMetadata, externalPointer); @@ -345,6 +351,17 @@ public class PendingCheckpoint { } /** + * Adds a master state (state generated on the checkpoint coordinator) to + * the pending checkpoint. + * + * @param state The state to add + */ + public void addMasterState(MasterState state) { + checkNotNull(state); + masterState.add(state); + } + + /** * Acknowledges the task with the given execution attempt id and the given subtask state. * * @param executionAttemptId of the acknowledged task http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java new file mode 100644 index 0000000..409019e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.hooks; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.checkpoint.MasterState; +import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; + +/** + * Collection of methods to deal with checkpoint master hooks. + */ +public class MasterHooks { + + // ------------------------------------------------------------------------ + // checkpoint triggering + // ------------------------------------------------------------------------ + + /** + * Triggers all given master hooks and returns state objects for each hook that + * produced a state. + * + * @param hooks The hooks to trigger + * @param checkpointId The checkpoint ID of the triggering checkpoint + * @param timestamp The (informational) timestamp for the triggering checkpoint + * @param executor An executor that can be used for asynchronous I/O calls + * @param timeout The maximum time that a hook may take to complete + * + * @return A list containing all states produced by the hooks + * + * @throws FlinkException Thrown, if the hooks throw an exception, or the state+ + * deserialization fails. + */ + public static List<MasterState> triggerMasterHooks( + Collection<MasterTriggerRestoreHook<?>> hooks, + long checkpointId, + long timestamp, + Executor executor, + Time timeout) throws FlinkException { + + final ArrayList<MasterState> states = new ArrayList<>(hooks.size()); + + for (MasterTriggerRestoreHook<?> hook : hooks) { + MasterState state = triggerHook(hook, checkpointId, timestamp, executor, timeout); + if (state != null) { + states.add(state); + } + } + + states.trimToSize(); + return states; + } + + private static <T> MasterState triggerHook( + MasterTriggerRestoreHook<?> hook, + long checkpointId, + long timestamp, + Executor executor, + Time timeout) throws FlinkException { + + @SuppressWarnings("unchecked") + final MasterTriggerRestoreHook<T> typedHook = (MasterTriggerRestoreHook<T>) hook; + + final String id = typedHook.getIdentifier(); + final SimpleVersionedSerializer<T> serializer = typedHook.createCheckpointDataSerializer(); + + // call the hook! + final Future<T> resultFuture; + try { + resultFuture = typedHook.triggerCheckpoint(checkpointId, timestamp, executor); + } + catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + throw new FlinkException("Error while triggering checkpoint master hook '" + id + '\'', t); + } + + // is there is a result future, wait for its completion + // in the future we want to make this asynchronous with futures (no pun intended) + if (resultFuture == null) { + return null; + } + else { + final T result; + try { + result = resultFuture.get(timeout.getSize(), timeout.getUnit()); + } + catch (InterruptedException e) { + // cannot continue here - restore interrupt status and leave + Thread.currentThread().interrupt(); + throw new FlinkException("Checkpoint master hook was interrupted"); + } + catch (ExecutionException e) { + throw new FlinkException("Checkpoint master hook '" + id + "' produced an exception", e.getCause()); + } + catch (TimeoutException e) { + throw new FlinkException("Checkpoint master hook '" + id + + "' did not complete in time (" + timeout + ')'); + } + + // if the result of the future is not null, return it as state + if (result == null) { + return null; + } + else if (serializer != null) { + try { + final int version = serializer.getVersion(); + final byte[] bytes = serializer.serialize(result); + + return new MasterState(id, bytes, version); + } + catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + throw new FlinkException("Failed to serialize state of master hook '" + id + '\'', t); + } + } + else { + throw new FlinkException("Checkpoint hook '" + id + " is stateful but creates no serializer"); + } + } + } + + // ------------------------------------------------------------------------ + // checkpoint restoring + // ------------------------------------------------------------------------ + + /** + * Calls the restore method given checkpoint master hooks and passes the given master + * state to them where state with a matching name is found. + * + * <p>If state is found and no hook with the same name is found, the method throws an + * exception, unless the {@code allowUnmatchedState} flag is set. + * + * @param masterHooks The hooks to call restore on + * @param states The state to pass to the hooks + * @param checkpointId The checkpoint ID of the restored checkpoint + * @param allowUnmatchedState True, + * @param log The logger for log messages + * + * @throws FlinkException Thrown, if the hooks throw an exception, or the state+ + * deserialization fails. + */ + public static void restoreMasterHooks( + final Map<String, MasterTriggerRestoreHook<?>> masterHooks, + final Collection<MasterState> states, + final long checkpointId, + final boolean allowUnmatchedState, + final Logger log) throws FlinkException { + + // early out + if (states == null || states.isEmpty() || masterHooks == null || masterHooks.isEmpty()) { + log.info("No master state to restore"); + return; + } + + log.info("Calling master restore hooks"); + + // collect the hooks + final LinkedHashMap<String, MasterTriggerRestoreHook<?>> allHooks = new LinkedHashMap<>(masterHooks); + + // first, deserialize all hook state + final ArrayList<Tuple2<MasterTriggerRestoreHook<?>, Object>> hooksAndStates = new ArrayList<>(); + + for (MasterState state : states) { + if (state != null) { + final String name = state.name(); + final MasterTriggerRestoreHook<?> hook = allHooks.remove(name); + + if (hook != null) { + log.debug("Found state to restore for hook '{}'", name); + + Object deserializedState = deserializeState(state, hook); + hooksAndStates.add(new Tuple2<MasterTriggerRestoreHook<?>, Object>(hook, deserializedState)); + } + else if (!allowUnmatchedState) { + throw new IllegalStateException("Found state '" + state.name() + + "' which is not resumed by any hook."); + } + else { + log.info("Dropping unmatched state from '{}'", name); + } + } + } + + // now that all is deserialized, call the hooks + for (Tuple2<MasterTriggerRestoreHook<?>, Object> hookAndState : hooksAndStates) { + restoreHook(hookAndState.f1, hookAndState.f0, checkpointId); + } + + // trigger the remaining hooks without checkpointed state + for (MasterTriggerRestoreHook<?> hook : allHooks.values()) { + restoreHook(null, hook, checkpointId); + } + } + + private static <T> T deserializeState(MasterState state, MasterTriggerRestoreHook<?> hook) throws FlinkException { + @SuppressWarnings("unchecked") + final MasterTriggerRestoreHook<T> typedHook = (MasterTriggerRestoreHook<T>) hook; + final String id = hook.getIdentifier(); + + try { + final SimpleVersionedSerializer<T> deserializer = typedHook.createCheckpointDataSerializer(); + if (deserializer == null) { + throw new FlinkException("null serializer for state of hook " + hook.getIdentifier()); + } + + return deserializer.deserialize(state.version(), state.bytes()); + } + catch (Throwable t) { + throw new FlinkException("Cannot deserialize state for master hook '" + id + '\'', t); + } + } + + private static <T> void restoreHook( + final Object state, + final MasterTriggerRestoreHook<?> hook, + final long checkpointId) throws FlinkException { + + @SuppressWarnings("unchecked") + final T typedState = (T) state; + + @SuppressWarnings("unchecked") + final MasterTriggerRestoreHook<T> typedHook = (MasterTriggerRestoreHook<T>) hook; + + try { + typedHook.restoreCheckpoint(checkpointId, typedState); + } + catch (FlinkException e) { + throw e; + } + catch (Throwable t) { + // catch all here, including Errors that may come from dependency and classpath issues + ExceptionUtils.rethrowIfFatalError(t); + throw new FlinkException("Error while calling restoreCheckpoint on checkpoint hook '" + + hook.getIdentifier() + '\'', t); + } + } + + // ------------------------------------------------------------------------ + + /** This class is not meant to be instantiated */ + private MasterHooks() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java index baad05f..79ec596 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint.savepoint; import org.apache.flink.core.io.Versioned; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.TaskState; import java.util.Collection; @@ -58,6 +59,11 @@ public interface Savepoint extends Versioned { Collection<TaskState> getTaskStates(); /** + * Gets the checkpointed states generated by the master. + */ + Collection<MasterState> getMasterStates(); + + /** * Disposes the savepoint. */ void dispose() throws Exception; http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java index 60f0287..8ee38da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.TaskState; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.StreamStateHandle; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,7 +123,7 @@ public class SavepointLoader { // (3) convert to checkpoint so the system can fall back to it CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L, - taskStates, props, metadataHandle, savepointPath); + taskStates, savepoint.getMasterStates(), props, metadataHandle, savepointPath); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java index 3155d60..c1fcf4f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint.savepoint; +import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0; import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer; import org.apache.flink.util.Preconditions; @@ -30,14 +31,16 @@ import java.util.Map; public class SavepointSerializers { - private static final int SAVEPOINT_VERSION_0 = 0; private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(2); static { - SERIALIZERS.put(SAVEPOINT_VERSION_0, SavepointV0Serializer.INSTANCE); + SERIALIZERS.put(SavepointV0.VERSION, SavepointV0Serializer.INSTANCE); SERIALIZERS.put(SavepointV1.VERSION, SavepointV1Serializer.INSTANCE); + SERIALIZERS.put(SavepointV2.VERSION, SavepointV2Serializer.INSTANCE); } + // ------------------------------------------------------------------------ + /** * Returns the {@link SavepointSerializer} for the given savepoint. * http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java index 5976bbf..196c870 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint.savepoint; +import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.TaskState; import org.apache.flink.util.Preconditions; @@ -60,36 +61,21 @@ public class SavepointV1 implements Savepoint { } @Override - public void dispose() throws Exception { - for (TaskState taskState : taskStates) { - taskState.discardState(); - } - taskStates.clear(); - } - - @Override - public String toString() { - return "Savepoint(version=" + VERSION + ")"; + public Collection<MasterState> getMasterStates() { + // since checkpoints are never deserialized into this format, + // this method should never be called + throw new UnsupportedOperationException(); } @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } - - SavepointV1 that = (SavepointV1) o; - return checkpointId == that.checkpointId && getTaskStates().equals(that.getTaskStates()); + public void dispose() throws Exception { + // since checkpoints are never deserialized into this format, + // this method should never be called + throw new UnsupportedOperationException(); } @Override - public int hashCode() { - int result = (int) (checkpointId ^ (checkpointId >>> 32)); - result = 31 * result + taskStates.hashCode(); - return result; + public String toString() { + return "Savepoint(version=" + VERSION + ")"; } } http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java index 44461d8..ae9f4a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint.savepoint; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskState; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -37,18 +38,19 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; /** - * Serializer for {@link SavepointV1} instances. - * <p> - * <p>In contrast to previous savepoint versions, this serializer makes sure - * that no default Java serialization is used for serialization. Therefore, we - * don't rely on any involved Java classes to stay the same. + * Deserializer for checkpoints written in format {@code 1} (Flink 1.2.x format) + * + * <p>In contrast to the previous versions, this serializer makes sure that no Java + * serialization is used for serialization. Therefore, we don't rely on any involved + * classes to stay the same. */ -class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { +class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { private static final byte NULL_HANDLE = 0; private static final byte BYTE_STREAM_STATE_HANDLE = 1; @@ -63,39 +65,12 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { } @Override - public void serialize(SavepointV1 savepoint, DataOutputStream dos) throws IOException { - try { - dos.writeLong(savepoint.getCheckpointId()); - - Collection<TaskState> taskStates = savepoint.getTaskStates(); - dos.writeInt(taskStates.size()); - - for (TaskState taskState : savepoint.getTaskStates()) { - // Vertex ID - dos.writeLong(taskState.getJobVertexID().getLowerPart()); - dos.writeLong(taskState.getJobVertexID().getUpperPart()); - - // Parallelism - int parallelism = taskState.getParallelism(); - dos.writeInt(parallelism); - dos.writeInt(taskState.getMaxParallelism()); - dos.writeInt(taskState.getChainLength()); - - // Sub task states - Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates(); - dos.writeInt(subtaskStateMap.size()); - for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) { - dos.writeInt(entry.getKey()); - serializeSubtaskState(entry.getValue(), dos); - } - } - } catch (Exception e) { - throw new IOException(e); - } + public void serialize(SavepointV2 savepoint, DataOutputStream dos) throws IOException { + throw new UnsupportedOperationException("This serializer is read-only and only exists for backwards compatibility"); } @Override - public SavepointV1 deserialize(DataInputStream dis, ClassLoader cl) throws IOException { + public SavepointV2 deserialize(DataInputStream dis, ClassLoader cl) throws IOException { long checkpointId = dis.readLong(); // Task states @@ -122,7 +97,34 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { } } - return new SavepointV1(checkpointId, taskStates); + return new SavepointV2(checkpointId, taskStates, Collections.<MasterState>emptyList()); + } + + public void serializeOld(SavepointV1 savepoint, DataOutputStream dos) throws IOException { + dos.writeLong(savepoint.getCheckpointId()); + + Collection<TaskState> taskStates = savepoint.getTaskStates(); + dos.writeInt(taskStates.size()); + + for (TaskState taskState : savepoint.getTaskStates()) { + // Vertex ID + dos.writeLong(taskState.getJobVertexID().getLowerPart()); + dos.writeLong(taskState.getJobVertexID().getUpperPart()); + + // Parallelism + int parallelism = taskState.getParallelism(); + dos.writeInt(parallelism); + dos.writeInt(taskState.getMaxParallelism()); + dos.writeInt(taskState.getChainLength()); + + // Sub task states + Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates(); + dos.writeInt(subtaskStateMap.size()); + for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) { + dos.writeInt(entry.getKey()); + serializeSubtaskState(entry.getValue(), dos); + } + } } private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java new file mode 100644 index 0000000..100982d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.savepoint; + +import org.apache.flink.runtime.checkpoint.MasterState; +import org.apache.flink.runtime.checkpoint.TaskState; + +import java.util.Collection; +import java.util.Collections; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The persistent checkpoint metadata, format version 2. + * his format was introduced with Flink 1.3.0. + */ +public class SavepointV2 implements Savepoint { + + /** The savepoint version. */ + public static final int VERSION = 2; + + /** The checkpoint ID */ + private final long checkpointId; + + /** The task states */ + private final Collection<TaskState> taskStates; + + /** The states generated by the CheckpointCoordinator */ + private final Collection<MasterState> masterStates; + + + public SavepointV2(long checkpointId, Collection<TaskState> taskStates) { + this(checkpointId, taskStates, Collections.<MasterState>emptyList()); + } + + public SavepointV2(long checkpointId, Collection<TaskState> taskStates, Collection<MasterState> masterStates) { + this.checkpointId = checkpointId; + this.taskStates = checkNotNull(taskStates, "taskStates"); + this.masterStates = checkNotNull(masterStates, "masterStates"); + } + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public long getCheckpointId() { + return checkpointId; + } + + @Override + public Collection<TaskState> getTaskStates() { + return taskStates; + } + + @Override + public Collection<MasterState> getMasterStates() { + return masterStates; + } + + @Override + public void dispose() throws Exception { + for (TaskState taskState : taskStates) { + taskState.discardState(); + } + taskStates.clear(); + masterStates.clear(); + } + + @Override + public String toString() { + return "Checkpoint Metadata (version=" + VERSION + ')'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java new file mode 100644 index 0000000..307ea16 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.savepoint; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.MasterState; +import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeOffsets; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * (De)serializer for checkpoint metadata format version 2. + * + * <p>This format version adds + * + * <p>Basic checkpoint metadata layout: + * <pre> + * +--------------+---------------+-----------------+ + * | checkpointID | master states | operator states | + * +--------------+---------------+-----------------+ + * + * Master state: + * +--------------+---------------------+---------+------+---------------+ + * | magic number | num remaining bytes | version | name | payload bytes | + * +--------------+---------------------+---------+------+---------------+ + * </pre> + */ +class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { + + /** Random magic number for consistency checks */ + private static final int MASTER_STATE_MAGIC_NUMBER = 0xc96b1696; + + private static final byte NULL_HANDLE = 0; + private static final byte BYTE_STREAM_STATE_HANDLE = 1; + private static final byte FILE_STREAM_STATE_HANDLE = 2; + private static final byte KEY_GROUPS_HANDLE = 3; + private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4; + + /** The singleton instance of the serializer */ + public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer(); + + // ------------------------------------------------------------------------ + + /** Singleton, not meant to be instantiated */ + private SavepointV2Serializer() {} + + // ------------------------------------------------------------------------ + // (De)serialization entry points + // ------------------------------------------------------------------------ + + @Override + public void serialize(SavepointV2 checkpointMetadata, DataOutputStream dos) throws IOException { + // first: checkpoint ID + dos.writeLong(checkpointMetadata.getCheckpointId()); + + // second: master state + final Collection<MasterState> masterStates = checkpointMetadata.getMasterStates(); + dos.writeInt(masterStates.size()); + for (MasterState ms : masterStates) { + serializeMasterState(ms, dos); + } + + // third: task states + final Collection<TaskState> taskStates = checkpointMetadata.getTaskStates(); + dos.writeInt(taskStates.size()); + + for (TaskState taskState : checkpointMetadata.getTaskStates()) { + // Vertex ID + dos.writeLong(taskState.getJobVertexID().getLowerPart()); + dos.writeLong(taskState.getJobVertexID().getUpperPart()); + + // Parallelism + int parallelism = taskState.getParallelism(); + dos.writeInt(parallelism); + dos.writeInt(taskState.getMaxParallelism()); + dos.writeInt(taskState.getChainLength()); + + // Sub task states + Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates(); + dos.writeInt(subtaskStateMap.size()); + for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) { + dos.writeInt(entry.getKey()); + serializeSubtaskState(entry.getValue(), dos); + } + } + } + + @Override + public SavepointV2 deserialize(DataInputStream dis, ClassLoader cl) throws IOException { + // first: checkpoint ID + final long checkpointId = dis.readLong(); + if (checkpointId < 0) { + throw new IOException("invalid checkpoint ID: " + checkpointId); + } + + // second: master state + final List<MasterState> masterStates; + final int numMasterStates = dis.readInt(); + + if (numMasterStates == 0) { + masterStates = Collections.emptyList(); + } + else if (numMasterStates > 0) { + masterStates = new ArrayList<>(numMasterStates); + for (int i = 0; i < numMasterStates; i++) { + masterStates.add(deserializeMasterState(dis)); + } + } + else { + throw new IOException("invalid number of master states: " + numMasterStates); + } + + // third: task states + final int numTaskStates = dis.readInt(); + final ArrayList<TaskState> taskStates = new ArrayList<>(numTaskStates); + + for (int i = 0; i < numTaskStates; i++) { + JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong()); + int parallelism = dis.readInt(); + int maxParallelism = dis.readInt(); + int chainLength = dis.readInt(); + + // Add task state + TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism, chainLength); + taskStates.add(taskState); + + // Sub task states + int numSubTaskStates = dis.readInt(); + + for (int j = 0; j < numSubTaskStates; j++) { + int subtaskIndex = dis.readInt(); + SubtaskState subtaskState = deserializeSubtaskState(dis); + taskState.putState(subtaskIndex, subtaskState); + } + } + + return new SavepointV2(checkpointId, taskStates, masterStates); + } + + // ------------------------------------------------------------------------ + // master state (de)serialization methods + // ------------------------------------------------------------------------ + + private void serializeMasterState(MasterState state, DataOutputStream dos) throws IOException { + // magic number for error detection + dos.writeInt(MASTER_STATE_MAGIC_NUMBER); + + // for safety, we serialize first into an array and then write the array and its + // length into the checkpoint + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(baos); + + out.writeInt(state.version()); + out.writeUTF(state.name()); + + final byte[] bytes = state.bytes(); + out.writeInt(bytes.length); + out.write(bytes, 0, bytes.length); + + out.close(); + byte[] data = baos.toByteArray(); + + dos.writeInt(data.length); + dos.write(data, 0, data.length); + } + + private MasterState deserializeMasterState(DataInputStream dis) throws IOException { + final int magicNumber = dis.readInt(); + if (magicNumber != MASTER_STATE_MAGIC_NUMBER) { + throw new IOException("incorrect magic number in master styte byte sequence"); + } + + final int numBytes = dis.readInt(); + if (numBytes <= 0) { + throw new IOException("found zero or negative length for master state bytes"); + } + + final byte[] data = new byte[numBytes]; + dis.readFully(data); + + final DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); + + final int version = in.readInt(); + final String name = in.readUTF(); + + final byte[] bytes = new byte[in.readInt()]; + in.readFully(bytes); + + // check that the data is not corrupt + if (in.read() != -1) { + throw new IOException("found trailing bytes in master state"); + } + + return new MasterState(name, bytes, version); + } + + // ------------------------------------------------------------------------ + // task state (de)serialization methods + // ------------------------------------------------------------------------ + + private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException { + + dos.writeLong(-1); + + ChainedStateHandle<StreamStateHandle> nonPartitionableState = subtaskState.getLegacyOperatorState(); + + int len = nonPartitionableState != null ? nonPartitionableState.getLength() : 0; + dos.writeInt(len); + for (int i = 0; i < len; ++i) { + StreamStateHandle stateHandle = nonPartitionableState.get(i); + serializeStreamStateHandle(stateHandle, dos); + } + + ChainedStateHandle<OperatorStateHandle> operatorStateBackend = subtaskState.getManagedOperatorState(); + + len = operatorStateBackend != null ? operatorStateBackend.getLength() : 0; + dos.writeInt(len); + for (int i = 0; i < len; ++i) { + OperatorStateHandle stateHandle = operatorStateBackend.get(i); + serializeOperatorStateHandle(stateHandle, dos); + } + + ChainedStateHandle<OperatorStateHandle> operatorStateFromStream = subtaskState.getRawOperatorState(); + + len = operatorStateFromStream != null ? operatorStateFromStream.getLength() : 0; + dos.writeInt(len); + for (int i = 0; i < len; ++i) { + OperatorStateHandle stateHandle = operatorStateFromStream.get(i); + serializeOperatorStateHandle(stateHandle, dos); + } + + KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState(); + serializeKeyedStateHandle(keyedStateBackend, dos); + + KeyedStateHandle keyedStateStream = subtaskState.getRawKeyedState(); + serializeKeyedStateHandle(keyedStateStream, dos); + } + + private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException { + // Duration field has been removed from SubtaskState + long ignoredDuration = dis.readLong(); + + int len = dis.readInt(); + List<StreamStateHandle> nonPartitionableState = new ArrayList<>(len); + for (int i = 0; i < len; ++i) { + StreamStateHandle streamStateHandle = deserializeStreamStateHandle(dis); + nonPartitionableState.add(streamStateHandle); + } + + + len = dis.readInt(); + List<OperatorStateHandle> operatorStateBackend = new ArrayList<>(len); + for (int i = 0; i < len; ++i) { + OperatorStateHandle streamStateHandle = deserializeOperatorStateHandle(dis); + operatorStateBackend.add(streamStateHandle); + } + + len = dis.readInt(); + List<OperatorStateHandle> operatorStateStream = new ArrayList<>(len); + for (int i = 0; i < len; ++i) { + OperatorStateHandle streamStateHandle = deserializeOperatorStateHandle(dis); + operatorStateStream.add(streamStateHandle); + } + + KeyedStateHandle keyedStateBackend = deserializeKeyedStateHandle(dis); + + KeyedStateHandle keyedStateStream = deserializeKeyedStateHandle(dis); + + ChainedStateHandle<StreamStateHandle> nonPartitionableStateChain = + new ChainedStateHandle<>(nonPartitionableState); + + ChainedStateHandle<OperatorStateHandle> operatorStateBackendChain = + new ChainedStateHandle<>(operatorStateBackend); + + ChainedStateHandle<OperatorStateHandle> operatorStateStreamChain = + new ChainedStateHandle<>(operatorStateStream); + + return new SubtaskState( + nonPartitionableStateChain, + operatorStateBackendChain, + operatorStateStreamChain, + keyedStateBackend, + keyedStateStream); + } + + private static void serializeKeyedStateHandle( + KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException { + + if (stateHandle == null) { + dos.writeByte(NULL_HANDLE); + } else if (stateHandle instanceof KeyGroupsStateHandle) { + KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) stateHandle; + + dos.writeByte(KEY_GROUPS_HANDLE); + dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup()); + dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); + for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) { + dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup)); + } + serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos); + } else { + throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass()); + } + } + + private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException { + final int type = dis.readByte(); + if (NULL_HANDLE == type) { + return null; + } else if (KEY_GROUPS_HANDLE == type) { + int startKeyGroup = dis.readInt(); + int numKeyGroups = dis.readInt(); + KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1); + long[] offsets = new long[numKeyGroups]; + for (int i = 0; i < numKeyGroups; ++i) { + offsets[i] = dis.readLong(); + } + KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets( + keyGroupRange, offsets); + StreamStateHandle stateHandle = deserializeStreamStateHandle(dis); + return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); + } else { + throw new IllegalStateException("Reading invalid KeyedStateHandle, type: " + type); + } + } + + private static void serializeOperatorStateHandle( + OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException { + + if (stateHandle != null) { + dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE); + Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap = + stateHandle.getStateNameToPartitionOffsets(); + dos.writeInt(partitionOffsetsMap.size()); + for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : partitionOffsetsMap.entrySet()) { + dos.writeUTF(entry.getKey()); + + OperatorStateHandle.StateMetaInfo stateMetaInfo = entry.getValue(); + + int mode = stateMetaInfo.getDistributionMode().ordinal(); + dos.writeByte(mode); + + long[] offsets = stateMetaInfo.getOffsets(); + dos.writeInt(offsets.length); + for (long offset : offsets) { + dos.writeLong(offset); + } + } + serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos); + } else { + dos.writeByte(NULL_HANDLE); + } + } + + private static OperatorStateHandle deserializeOperatorStateHandle( + DataInputStream dis) throws IOException { + + final int type = dis.readByte(); + if (NULL_HANDLE == type) { + return null; + } else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type) { + int mapSize = dis.readInt(); + Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>(mapSize); + for (int i = 0; i < mapSize; ++i) { + String key = dis.readUTF(); + + int modeOrdinal = dis.readByte(); + OperatorStateHandle.Mode mode = OperatorStateHandle.Mode.values()[modeOrdinal]; + + long[] offsets = new long[dis.readInt()]; + for (int j = 0; j < offsets.length; ++j) { + offsets[j] = dis.readLong(); + } + + OperatorStateHandle.StateMetaInfo metaInfo = + new OperatorStateHandle.StateMetaInfo(offsets, mode); + offsetsMap.put(key, metaInfo); + } + StreamStateHandle stateHandle = deserializeStreamStateHandle(dis); + return new OperatorStateHandle(offsetsMap, stateHandle); + } else { + throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type); + } + } + + private static void serializeStreamStateHandle( + StreamStateHandle stateHandle, DataOutputStream dos) throws IOException { + + if (stateHandle == null) { + dos.writeByte(NULL_HANDLE); + + } else if (stateHandle instanceof FileStateHandle) { + dos.writeByte(FILE_STREAM_STATE_HANDLE); + FileStateHandle fileStateHandle = (FileStateHandle) stateHandle; + dos.writeLong(stateHandle.getStateSize()); + dos.writeUTF(fileStateHandle.getFilePath().toString()); + + } else if (stateHandle instanceof ByteStreamStateHandle) { + dos.writeByte(BYTE_STREAM_STATE_HANDLE); + ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle) stateHandle; + dos.writeUTF(byteStreamStateHandle.getHandleName()); + byte[] internalData = byteStreamStateHandle.getData(); + dos.writeInt(internalData.length); + dos.write(byteStreamStateHandle.getData()); + + } else { + throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass()); + } + + dos.flush(); + } + + private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException { + final int type = dis.read(); + if (NULL_HANDLE == type) { + return null; + } else if (FILE_STREAM_STATE_HANDLE == type) { + long size = dis.readLong(); + String pathString = dis.readUTF(); + return new FileStateHandle(new Path(pathString), size); + } else if (BYTE_STREAM_STATE_HANDLE == type) { + String handleName = dis.readUTF(); + int numBytes = dis.readInt(); + byte[] data = new byte[numBytes]; + dis.readFully(data); + return new ByteStreamStateHandle(handleName, data); + } else { + throw new IOException("Unknown implementation of StreamStateHandle, code: " + type); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 29b9806..23ed99d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -360,6 +361,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive List<ExecutionJobVertex> verticesToTrigger, List<ExecutionJobVertex> verticesToWaitFor, List<ExecutionJobVertex> verticesToCommitTo, + List<MasterTriggerRestoreHook<?>> masterHooks, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, String checkpointDir, @@ -395,6 +397,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive checkpointDir, ioExecutor); + // register the master hooks on the checkpoint coordinator + for (MasterTriggerRestoreHook<?> hook : masterHooks) { + if (!checkpointCoordinator.addMasterHook(hook)) { + LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier()); + } + } + checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker); // interval of max long value indicates disable periodic checkpoint, http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index a10c62e..b40817f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge; @@ -51,6 +52,7 @@ import org.slf4j.Logger; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -230,6 +232,21 @@ public class ExecutionGraphBuilder { } } + // instantiate the user-defined checkpoint hooks + + final MasterTriggerRestoreHook.Factory[] hookFactories = snapshotSettings.getMasterHooks(); + final List<MasterTriggerRestoreHook<?>> hooks; + + if (hookFactories == null || hookFactories.length == 0) { + hooks = Collections.emptyList(); + } + else { + hooks = new ArrayList<>(hookFactories.length); + for (MasterTriggerRestoreHook.Factory factory : hookFactories) { + hooks.add(factory.create()); + } + } + executionGraph.enableCheckpointing( snapshotSettings.getCheckpointInterval(), snapshotSettings.getCheckpointTimeout(), @@ -239,6 +256,7 @@ public class ExecutionGraphBuilder { triggerVertices, ackVertices, confirmVertices, + hooks, checkpointIdCounter, completedCheckpoints, externalizedCheckpointsDir,
