[ 
https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985155#comment-15985155
 ] 

ASF GitHub Bot commented on FLINK-6390:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3782#discussion_r113507485
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
 ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint.hooks;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.io.SimpleVersionedSerializer;
    +import org.apache.flink.runtime.checkpoint.MasterState;
    +import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.slf4j.Logger;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeoutException;
    +
    +/**
    + * Collection of methods to deal with checkpoint master hooks.
    + */
    +public class MasterHooks {
    +
    +   // 
------------------------------------------------------------------------
    +   //  checkpoint triggering
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Triggers all given master hooks and returns state objects for each 
hook that
    +    * produced a state.
    +    * 
    +    * @param hooks The hooks to trigger
    +    * @param checkpointId The checkpoint ID of the triggering checkpoint
    +    * @param timestamp The (informational) timestamp for the triggering 
checkpoint 
    +    * @param executor An executor that can be used for asynchronous I/O 
calls
    +    * @param timeout The maximum time that a hook may take to complete
    +    * 
    +    * @return A list containing all states produced by the hooks
    +    * 
    +    * @throws FlinkException Thrown, if the hooks throw an exception, or 
the state+
    +    *                        deserialization fails.
    +    */
    +   public static List<MasterState> triggerMasterHooks(
    +                   Collection<MasterTriggerRestoreHook<?>> hooks,
    +                   long checkpointId,
    +                   long timestamp,
    +                   Executor executor,
    +                   Time timeout) throws FlinkException {
    +
    +           final ArrayList<MasterState> states = new 
ArrayList<>(hooks.size());
    +
    +           for (MasterTriggerRestoreHook<?> hook : hooks) {
    +                   MasterState state = triggerHook(hook, checkpointId, 
timestamp, executor, timeout);
    +                   if (state != null) {
    +                           states.add(state);
    +                   }
    +           }
    +
    +           states.trimToSize();
    +           return states;
    +   }
    +
    +   private static <T> MasterState triggerHook(
    +                   MasterTriggerRestoreHook<?> hook,
    +                   long checkpointId,
    +                   long timestamp,
    +                   Executor executor,
    +                   Time timeout) throws FlinkException {
    +
    +           @SuppressWarnings("unchecked")
    +           final MasterTriggerRestoreHook<T> typedHook = 
(MasterTriggerRestoreHook<T>) hook;
    +
    +           final String id = typedHook.getIdentifier();
    +           final SimpleVersionedSerializer<T> serializer = 
typedHook.createCheckpointDataSerializer();
    +
    +           // call the hook!
    +           final Future<T> resultFuture;
    +           try {
    +                   resultFuture = 
typedHook.triggerCheckpoint(checkpointId, timestamp, executor);
    +           }
    +           catch (Throwable t) {
    +                   ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
    +                   throw new FlinkException("Error while triggering 
checkpoint master hook '" + id + '\'', t);
    +           }
    +
    +           // is there is a result future, wait for its completion
    +           // in the future we want to make this asynchronous with futures 
(no pun intended)
    +           if (resultFuture == null) {
    +                   return null;
    +           }
    +           else {
    +                   final T result;
    +                   try {
    +                           result = resultFuture.get(timeout.getSize(), 
timeout.getUnit());
    +                   }
    +                   catch (InterruptedException e) {
    +                           // cannot continue here - restore interrupt 
status and leave
    +                           Thread.currentThread().interrupt();
    +                           throw new FlinkException("Checkpoint master 
hook was interrupted");
    +                   }
    +                   catch (ExecutionException e) {
    +                           throw new FlinkException("Checkpoint master 
hook '" + id + "' produced an exception", e.getCause());
    +                   }
    +                   catch (TimeoutException e) {
    +                           throw new FlinkException("Checkpoint master 
hook '" + id +
    +                                           "' did not complete in time (" 
+ timeout + ')');
    +                   }
    +
    +                   // if the result of the future is not null, return it 
as state
    +                   if (result == null) {
    +                           return null;
    +                   }
    +                   else if (serializer != null) {
    +                           try {
    +                                   final int version = 
serializer.getVersion();
    +                                   final byte[] bytes = 
serializer.serialize(result);
    +
    +                                   return new MasterState(id, bytes, 
version);
    +                           }
    +                           catch (Throwable t) {
    +                                   
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
    +                                   throw new FlinkException("Failed to 
serialize state of master hook '" + id + '\'', t);
    +                           }
    +                   }
    +                   else {
    +                           throw new FlinkException("Checkpoint hook '" + 
id + " is stateful but creates no serializer");
    +                   }
    +           }
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  checkpoint restoring
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Calls the restore method given checkpoint master hooks and passes 
the given master
    +    * state to them where state with a matching name is found. 
    +    * 
    +    * <p>If state is found and no hook with the same name is found, the 
method throws an
    +    * exception, unless the {@code allowUnmatchedState} flag is set.
    +    *     
    +    * @param masterHooks The hooks to call restore on
    +    * @param states The state to pass to the hooks
    +    * @param checkpointId The checkpoint ID of the restored checkpoint
    +    * @param allowUnmatchedState True, 
    +    * @param log The logger for log messages
    +    * 
    +    * @throws FlinkException Thrown, if the hooks throw an exception, or 
the state+
    +    *                        deserialization fails.
    +    */
    +   public static void restoreMasterHooks(
    +                   final Map<String, MasterTriggerRestoreHook<?>> 
masterHooks,
    +                   final List<MasterState> states,
    +                   final long checkpointId,
    +                   final boolean allowUnmatchedState,
    +                   final Logger log) throws FlinkException {
    +
    +           // early out
    +           if (states == null || states.isEmpty() || masterHooks == null 
|| masterHooks.isEmpty()) {
    +                   log.info("No master state to restore");
    +                   return;
    +           }
    +
    +           log.info("Calling master restore hooks");
    +
    +           // collect the hooks
    +           final LinkedHashMap<String, MasterTriggerRestoreHook<?>> 
allHooks = new LinkedHashMap<>(masterHooks);
    +
    +           // first, deserialize all hook state
    +           final ArrayList<Tuple2<MasterTriggerRestoreHook<?>, Object>> 
hooksAndStates = new ArrayList<>();
    +
    +           for (MasterState state : states) {
    +                   if (state != null) {
    +                           final String name = state.name();
    +                           final MasterTriggerRestoreHook<?> hook = 
allHooks.remove(name);
    +
    +                           if (hook != null) {
    +                                   log.debug("Found state to restore for 
hook '{}'", name);
    +
    +                                   Object deserializedState = 
deserializeState(state, hook);
    +                                   hooksAndStates.add(new 
Tuple2<MasterTriggerRestoreHook<?>, Object>(hook, deserializedState));
    +                           }
    +                           else if (!allowUnmatchedState) {
    +                                   throw new IllegalStateException("Found 
state '" + state.name() +
    +                                                   "' which is not resumed 
by any hook.");
    +                           }
    +                           else {
    +                                   log.info("Dropping unmatched state from 
'{}'", name);
    +                           }
    +                   }
    +           }
    +
    +           // now that all is deserialized, call the hooks 
    +           for (Tuple2<MasterTriggerRestoreHook<?>, Object> hookAndState : 
hooksAndStates) {
    +                   restoreHook(hookAndState.f1, hookAndState.f0, 
checkpointId);
    +           }
    +
    +           // trigger the remaining hooks without checkpointed state
    +           for (MasterTriggerRestoreHook<?> hook : allHooks.values()) {
    +                   restoreHook(null, hook, checkpointId);
    +           }
    +   }
    +
    +   private static <T> T deserializeState(MasterState state, 
MasterTriggerRestoreHook<?> hook) throws FlinkException {
    +           @SuppressWarnings("unchecked")
    +           final MasterTriggerRestoreHook<T> typedHook = 
(MasterTriggerRestoreHook<T>) hook;
    +           final String id = hook.getIdentifier();
    +
    +           try {
    +                   final SimpleVersionedSerializer<T> deserializer = 
typedHook.createCheckpointDataSerializer();
    +                   if (deserializer == null) {
    +                           throw new FlinkException("null serializer for 
state of hook " + hook.getIdentifier());
    +                   }
    +
    +                   return deserializer.deserialize(state.version(), 
state.bytes());
    +           }
    +           catch (Throwable t) {
    +                   throw new FlinkException("Cannot deserialize state for 
master hook '" + id + '\'');
    --- End diff --
    
    exception is swallowed here


> Add Trigger Hooks to the Checkpoint Coordinator
> -----------------------------------------------
>
>                 Key: FLINK-6390
>                 URL: https://issues.apache.org/jira/browse/FLINK-6390
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.3.0
>
>
> Some source systems require to be notified prior to starting a checkpoint, in 
> order to do preparatory work for the checkpoint.
> I propose to add an interface to allow sources to register hooks that are 
> called by the checkpoint coordinator when triggering / restoring a checkpoint.
> These hooks may produce state that is stores with the checkpoint metadata.
> Envisioned interface for the hooks
> {code}
> /**
>  * The interface for hooks that can be called by the checkpoint coordinator 
> when triggering or
>  * restoring a checkpoint. Such a hook is useful for example when preparing 
> external systems for
>  * taking or restoring checkpoints.
>  * 
>  * <p>The {@link #triggerCheckpoint(long, long, Executor)} method (called 
> when triggering a checkpoint)
>  * can return a result (via a future) that will be stored as part of the 
> checkpoint metadata.
>  * When restoring a checkpoint, that stored result will be given to the 
> {@link #restoreCheckpoint(long, Object)}
>  * method. The hook's {@link #getIdentifier() identifier} is used to map data 
> to hook in the presence
>  * of multiple hooks, and when resuming a savepoint that was potentially 
> created by a different job.
>  * The identifier has a similar role as for example the operator UID in the 
> streaming API.
>  * 
>  * <p>The MasterTriggerRestoreHook is defined when creating the streaming 
> dataflow graph. It is attached
>  * to the job graph, which gets sent to the cluster for execution. To avoid 
> having to make the hook
>  * itself serializable, these hooks are attached to the job graph via a 
> {@link MasterTriggerRestoreHook.Factory}.
>  * 
>  * @param <T> The type of the data produced by the hook and stored as part of 
> the checkpoint metadata.
>  *            If the hook never stores any data, this can be typed to {@code 
> Void}.
>  */
> public interface MasterTriggerRestoreHook<T> {
>       /**
>        * Gets the identifier of this hook. The identifier is used to identify 
> a specific hook in the
>        * presence of multiple hooks and to give it the correct checkpointed 
> data upon checkpoint restoration.
>        * 
>        * <p>The identifier should be unique between different hooks of a job, 
> but deterministic/constant
>        * so that upon resuming a savepoint, the hook will get the correct 
> data.
>        * For example, if the hook calls into another storage system and 
> persists namespace/schema specific
>        * information, then the name of the storage system, together with the 
> namespace/schema name could
>        * be an appropriate identifier.
>        * 
>        * <p>When multiple hooks of the same name are created and attached to 
> a job graph, only the first
>        * one is actually used. This can be exploited to deduplicate hooks 
> that would do the same thing.
>        * 
>        * @return The identifier of the hook. 
>        */
>       String getIdentifier();
>       /**
>        * This method is called by the checkpoint coordinator prior when 
> triggering a checkpoint, prior
>        * to sending the "trigger checkpoint" messages to the source tasks.
>        * 
>        * <p>If the hook implementation wants to store data as part of the 
> checkpoint, it may return
>        * that data via a future, otherwise it should return null. The data is 
> stored as part of
>        * the checkpoint metadata under the hooks identifier (see {@link 
> #getIdentifier()}).
>        * 
>        * <p>If the action by this hook needs to be executed synchronously, 
> then this method should
>        * directly execute the action synchronously and block until it is 
> complete. The returned future
>        * (if any) would typically be a completed future.
>        * 
>        * <p>If the action should be executed asynchronously and only needs to 
> complete before the
>        * checkpoint is considered completed, then the method may use the 
> given executor to execute the
>        * actual action and would signal its completion by completing the 
> future. For hooks that do not
>        * need to store data, the future would be completed with null.
>        * 
>        * @param checkpointId The ID (logical timestamp, monotonously 
> increasing) of the checkpoint
>        * @param timestamp The wall clock timestamp when the checkpoint was 
> triggered, for
>        *                  info/logging purposes. 
>        * @param executor The executor for asynchronous actions
>        * 
>        * @return Optionally, a future that signals when the hook has 
> completed and that contains
>        *         data to be stored with the checkpoint.
>        * 
>        * @throws Exception Exceptions encountered when calling the hook will 
> cause the checkpoint to abort.
>        */
>       @Nullable
>       Future<T> triggerCheckpoint(long checkpointId, long timestamp, Executor 
> executor) throws Exception;
>       /**
>        * This method is called by the checkpoint coordinator prior to 
> restoring the state of a checkpoint.
>        * If the checkpoint did store data from this hook, that data will be 
> passed to this method. 
>        * 
>        * @param checkpointId The The ID (logical timestamp) of the restored 
> checkpoint
>        * @param checkpointData The data originally stored in the checkpoint 
> by this hook, possibly null. 
>        * 
>        * @throws Exception Exceptions thrown while restoring the checkpoint 
> will cause the restore
>        *                   operation to fail and to possibly fall back to 
> another checkpoint. 
>        */
>       void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) 
> throws Exception;
>       /**
>        * Creates a the serializer to (de)serializes the data stored by this 
> hook. The serializer
>        * serializes the result of the Future returned by the {@link 
> #triggerCheckpoint(long, long, Executor)}
>        * method, and deserializes the data stored in the checkpoint into the 
> object passed to the
>        * {@link #restoreCheckpoint(long, Object)} method. 
>        * 
>        * <p>If the hook never returns any data to be stored, then this method 
> may return null as the
>        * serializer.
>        * 
>        * @return The serializer to (de)serializes the data stored by this hook
>        */
>       @Nullable
>       SimpleVersionedSerializer<T> createCheckpointDataSerializer();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to