[
https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985155#comment-15985155
]
ASF GitHub Bot commented on FLINK-6390:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/3782#discussion_r113507485
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.hooks;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Collection of methods to deal with checkpoint master hooks.
+ */
+public class MasterHooks {
+
+ //
------------------------------------------------------------------------
+ // checkpoint triggering
+ //
------------------------------------------------------------------------
+
+ /**
+ * Triggers all given master hooks and returns state objects for each
hook that
+ * produced a state.
+ *
+ * @param hooks The hooks to trigger
+ * @param checkpointId The checkpoint ID of the triggering checkpoint
+ * @param timestamp The (informational) timestamp for the triggering
checkpoint
+ * @param executor An executor that can be used for asynchronous I/O
calls
+ * @param timeout The maximum time that a hook may take to complete
+ *
+ * @return A list containing all states produced by the hooks
+ *
+ * @throws FlinkException Thrown, if the hooks throw an exception, or
the state+
+ * deserialization fails.
+ */
+ public static List<MasterState> triggerMasterHooks(
+ Collection<MasterTriggerRestoreHook<?>> hooks,
+ long checkpointId,
+ long timestamp,
+ Executor executor,
+ Time timeout) throws FlinkException {
+
+ final ArrayList<MasterState> states = new
ArrayList<>(hooks.size());
+
+ for (MasterTriggerRestoreHook<?> hook : hooks) {
+ MasterState state = triggerHook(hook, checkpointId,
timestamp, executor, timeout);
+ if (state != null) {
+ states.add(state);
+ }
+ }
+
+ states.trimToSize();
+ return states;
+ }
+
+ private static <T> MasterState triggerHook(
+ MasterTriggerRestoreHook<?> hook,
+ long checkpointId,
+ long timestamp,
+ Executor executor,
+ Time timeout) throws FlinkException {
+
+ @SuppressWarnings("unchecked")
+ final MasterTriggerRestoreHook<T> typedHook =
(MasterTriggerRestoreHook<T>) hook;
+
+ final String id = typedHook.getIdentifier();
+ final SimpleVersionedSerializer<T> serializer =
typedHook.createCheckpointDataSerializer();
+
+ // call the hook!
+ final Future<T> resultFuture;
+ try {
+ resultFuture =
typedHook.triggerCheckpoint(checkpointId, timestamp, executor);
+ }
+ catch (Throwable t) {
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+ throw new FlinkException("Error while triggering
checkpoint master hook '" + id + '\'', t);
+ }
+
+ // is there is a result future, wait for its completion
+ // in the future we want to make this asynchronous with futures
(no pun intended)
+ if (resultFuture == null) {
+ return null;
+ }
+ else {
+ final T result;
+ try {
+ result = resultFuture.get(timeout.getSize(),
timeout.getUnit());
+ }
+ catch (InterruptedException e) {
+ // cannot continue here - restore interrupt
status and leave
+ Thread.currentThread().interrupt();
+ throw new FlinkException("Checkpoint master
hook was interrupted");
+ }
+ catch (ExecutionException e) {
+ throw new FlinkException("Checkpoint master
hook '" + id + "' produced an exception", e.getCause());
+ }
+ catch (TimeoutException e) {
+ throw new FlinkException("Checkpoint master
hook '" + id +
+ "' did not complete in time ("
+ timeout + ')');
+ }
+
+ // if the result of the future is not null, return it
as state
+ if (result == null) {
+ return null;
+ }
+ else if (serializer != null) {
+ try {
+ final int version =
serializer.getVersion();
+ final byte[] bytes =
serializer.serialize(result);
+
+ return new MasterState(id, bytes,
version);
+ }
+ catch (Throwable t) {
+
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+ throw new FlinkException("Failed to
serialize state of master hook '" + id + '\'', t);
+ }
+ }
+ else {
+ throw new FlinkException("Checkpoint hook '" +
id + " is stateful but creates no serializer");
+ }
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // checkpoint restoring
+ //
------------------------------------------------------------------------
+
+ /**
+ * Calls the restore method given checkpoint master hooks and passes
the given master
+ * state to them where state with a matching name is found.
+ *
+ * <p>If state is found and no hook with the same name is found, the
method throws an
+ * exception, unless the {@code allowUnmatchedState} flag is set.
+ *
+ * @param masterHooks The hooks to call restore on
+ * @param states The state to pass to the hooks
+ * @param checkpointId The checkpoint ID of the restored checkpoint
+ * @param allowUnmatchedState True,
+ * @param log The logger for log messages
+ *
+ * @throws FlinkException Thrown, if the hooks throw an exception, or
the state+
+ * deserialization fails.
+ */
+ public static void restoreMasterHooks(
+ final Map<String, MasterTriggerRestoreHook<?>>
masterHooks,
+ final List<MasterState> states,
+ final long checkpointId,
+ final boolean allowUnmatchedState,
+ final Logger log) throws FlinkException {
+
+ // early out
+ if (states == null || states.isEmpty() || masterHooks == null
|| masterHooks.isEmpty()) {
+ log.info("No master state to restore");
+ return;
+ }
+
+ log.info("Calling master restore hooks");
+
+ // collect the hooks
+ final LinkedHashMap<String, MasterTriggerRestoreHook<?>>
allHooks = new LinkedHashMap<>(masterHooks);
+
+ // first, deserialize all hook state
+ final ArrayList<Tuple2<MasterTriggerRestoreHook<?>, Object>>
hooksAndStates = new ArrayList<>();
+
+ for (MasterState state : states) {
+ if (state != null) {
+ final String name = state.name();
+ final MasterTriggerRestoreHook<?> hook =
allHooks.remove(name);
+
+ if (hook != null) {
+ log.debug("Found state to restore for
hook '{}'", name);
+
+ Object deserializedState =
deserializeState(state, hook);
+ hooksAndStates.add(new
Tuple2<MasterTriggerRestoreHook<?>, Object>(hook, deserializedState));
+ }
+ else if (!allowUnmatchedState) {
+ throw new IllegalStateException("Found
state '" + state.name() +
+ "' which is not resumed
by any hook.");
+ }
+ else {
+ log.info("Dropping unmatched state from
'{}'", name);
+ }
+ }
+ }
+
+ // now that all is deserialized, call the hooks
+ for (Tuple2<MasterTriggerRestoreHook<?>, Object> hookAndState :
hooksAndStates) {
+ restoreHook(hookAndState.f1, hookAndState.f0,
checkpointId);
+ }
+
+ // trigger the remaining hooks without checkpointed state
+ for (MasterTriggerRestoreHook<?> hook : allHooks.values()) {
+ restoreHook(null, hook, checkpointId);
+ }
+ }
+
+ private static <T> T deserializeState(MasterState state,
MasterTriggerRestoreHook<?> hook) throws FlinkException {
+ @SuppressWarnings("unchecked")
+ final MasterTriggerRestoreHook<T> typedHook =
(MasterTriggerRestoreHook<T>) hook;
+ final String id = hook.getIdentifier();
+
+ try {
+ final SimpleVersionedSerializer<T> deserializer =
typedHook.createCheckpointDataSerializer();
+ if (deserializer == null) {
+ throw new FlinkException("null serializer for
state of hook " + hook.getIdentifier());
+ }
+
+ return deserializer.deserialize(state.version(),
state.bytes());
+ }
+ catch (Throwable t) {
+ throw new FlinkException("Cannot deserialize state for
master hook '" + id + '\'');
--- End diff --
exception is swallowed here
> Add Trigger Hooks to the Checkpoint Coordinator
> -----------------------------------------------
>
> Key: FLINK-6390
> URL: https://issues.apache.org/jira/browse/FLINK-6390
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Some source systems require to be notified prior to starting a checkpoint, in
> order to do preparatory work for the checkpoint.
> I propose to add an interface to allow sources to register hooks that are
> called by the checkpoint coordinator when triggering / restoring a checkpoint.
> These hooks may produce state that is stores with the checkpoint metadata.
> Envisioned interface for the hooks
> {code}
> /**
> * The interface for hooks that can be called by the checkpoint coordinator
> when triggering or
> * restoring a checkpoint. Such a hook is useful for example when preparing
> external systems for
> * taking or restoring checkpoints.
> *
> * <p>The {@link #triggerCheckpoint(long, long, Executor)} method (called
> when triggering a checkpoint)
> * can return a result (via a future) that will be stored as part of the
> checkpoint metadata.
> * When restoring a checkpoint, that stored result will be given to the
> {@link #restoreCheckpoint(long, Object)}
> * method. The hook's {@link #getIdentifier() identifier} is used to map data
> to hook in the presence
> * of multiple hooks, and when resuming a savepoint that was potentially
> created by a different job.
> * The identifier has a similar role as for example the operator UID in the
> streaming API.
> *
> * <p>The MasterTriggerRestoreHook is defined when creating the streaming
> dataflow graph. It is attached
> * to the job graph, which gets sent to the cluster for execution. To avoid
> having to make the hook
> * itself serializable, these hooks are attached to the job graph via a
> {@link MasterTriggerRestoreHook.Factory}.
> *
> * @param <T> The type of the data produced by the hook and stored as part of
> the checkpoint metadata.
> * If the hook never stores any data, this can be typed to {@code
> Void}.
> */
> public interface MasterTriggerRestoreHook<T> {
> /**
> * Gets the identifier of this hook. The identifier is used to identify
> a specific hook in the
> * presence of multiple hooks and to give it the correct checkpointed
> data upon checkpoint restoration.
> *
> * <p>The identifier should be unique between different hooks of a job,
> but deterministic/constant
> * so that upon resuming a savepoint, the hook will get the correct
> data.
> * For example, if the hook calls into another storage system and
> persists namespace/schema specific
> * information, then the name of the storage system, together with the
> namespace/schema name could
> * be an appropriate identifier.
> *
> * <p>When multiple hooks of the same name are created and attached to
> a job graph, only the first
> * one is actually used. This can be exploited to deduplicate hooks
> that would do the same thing.
> *
> * @return The identifier of the hook.
> */
> String getIdentifier();
> /**
> * This method is called by the checkpoint coordinator prior when
> triggering a checkpoint, prior
> * to sending the "trigger checkpoint" messages to the source tasks.
> *
> * <p>If the hook implementation wants to store data as part of the
> checkpoint, it may return
> * that data via a future, otherwise it should return null. The data is
> stored as part of
> * the checkpoint metadata under the hooks identifier (see {@link
> #getIdentifier()}).
> *
> * <p>If the action by this hook needs to be executed synchronously,
> then this method should
> * directly execute the action synchronously and block until it is
> complete. The returned future
> * (if any) would typically be a completed future.
> *
> * <p>If the action should be executed asynchronously and only needs to
> complete before the
> * checkpoint is considered completed, then the method may use the
> given executor to execute the
> * actual action and would signal its completion by completing the
> future. For hooks that do not
> * need to store data, the future would be completed with null.
> *
> * @param checkpointId The ID (logical timestamp, monotonously
> increasing) of the checkpoint
> * @param timestamp The wall clock timestamp when the checkpoint was
> triggered, for
> * info/logging purposes.
> * @param executor The executor for asynchronous actions
> *
> * @return Optionally, a future that signals when the hook has
> completed and that contains
> * data to be stored with the checkpoint.
> *
> * @throws Exception Exceptions encountered when calling the hook will
> cause the checkpoint to abort.
> */
> @Nullable
> Future<T> triggerCheckpoint(long checkpointId, long timestamp, Executor
> executor) throws Exception;
> /**
> * This method is called by the checkpoint coordinator prior to
> restoring the state of a checkpoint.
> * If the checkpoint did store data from this hook, that data will be
> passed to this method.
> *
> * @param checkpointId The The ID (logical timestamp) of the restored
> checkpoint
> * @param checkpointData The data originally stored in the checkpoint
> by this hook, possibly null.
> *
> * @throws Exception Exceptions thrown while restoring the checkpoint
> will cause the restore
> * operation to fail and to possibly fall back to
> another checkpoint.
> */
> void restoreCheckpoint(long checkpointId, @Nullable T checkpointData)
> throws Exception;
> /**
> * Creates a the serializer to (de)serializes the data stored by this
> hook. The serializer
> * serializes the result of the Future returned by the {@link
> #triggerCheckpoint(long, long, Executor)}
> * method, and deserializes the data stored in the checkpoint into the
> object passed to the
> * {@link #restoreCheckpoint(long, Object)} method.
> *
> * <p>If the hook never returns any data to be stored, then this method
> may return null as the
> * serializer.
> *
> * @return The serializer to (de)serializes the data stored by this hook
> */
> @Nullable
> SimpleVersionedSerializer<T> createCheckpointDataSerializer();
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)