[ 
https://issues.apache.org/jira/browse/FLINK-2976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15046756#comment-15046756
 ] 

ASF GitHub Bot commented on FLINK-2976:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1434#discussion_r46940511
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
    @@ -440,4 +478,226 @@ private void configureExecutionRetryDelay() {
                long executionRetryDelay = 
streamGraph.getExecutionConfig().getExecutionRetryDelay();
                jobGraph.setExecutionRetryDelay(executionRetryDelay);
        }
    +
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Returns a map with a hash for each {@link StreamNode} of the {@link
    +    * StreamGraph}. The hash is used as the {@link JobVertexID} in order to
    +    * identify nodes across job submissions if they didn't change.
    +    *
    +    * <p>The complete {@link StreamGraph} is traversed. The hash is either
    +    * computed from the transformation's user-specified id (see
    +    * {@link StreamTransformation#getUid()}) or generated in a 
deterministic way.
    +    *
    +    * <p>The generated hash is deterministic with respect to:
    +    * <ul>
    +    * <li>node-local properties (like parallelism, UDF, node ID),
    +    * <li>chained output nodes, and
    +    * <li>input nodes hashes
    +    * </ul>
    +    *
    +    * @return A map from {@link StreamNode#id} to hash as 16-byte array.
    +    */
    +   private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
    +           // The hash function used to generate the hash
    +           final HashFunction hashFunction = Hashing.murmur3_128(0);
    +           final Map<Integer, byte[]> hashes = new HashMap<>();
    +
    +           Set<Integer> visited = new HashSet<>();
    +           Queue<StreamNode> remaining = new ArrayDeque<>();
    +
    +           // We need to make the source order deterministic. This depends 
on the
    +           // ordering of the sources in the Environment, e.g. if a source 
X is
    +           // added before source Y, X will have a lower ID than Y 
(assigned by a
    +           // static counter).
    +           List<Integer> sources = new ArrayList<>();
    +           for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
    +                   sources.add(sourceNodeId);
    +           }
    +
    +           Collections.sort(sources);
    +
    +           // Traverse the graph in a breadth-first manner. Keep in mind 
that
    +           // the graph is not a tree and multiple paths to nodes can 
exist.
    +
    +           // Start with source nodes
    +           for (Integer sourceNodeId : sources) {
    +                   remaining.add(streamGraph.getStreamNode(sourceNodeId));
    +                   visited.add(sourceNodeId);
    +           }
    +
    +           StreamNode currentNode;
    +           while ((currentNode = remaining.poll()) != null) {
    +                   // Generate the hash code. Because multiple path exist 
to each
    +                   // node, we might not have all required inputs 
available to
    +                   // generate the hash code.
    +                   if (generateNodeHash(currentNode, hashFunction, 
hashes)) {
    +                           // Add the child nodes
    +                           for (StreamEdge outEdge : 
currentNode.getOutEdges()) {
    --- End diff --
    
    How are the out edges defined? Are they also set according to the creation 
order? If this is the case, then this means that changing the order of 
intermediate operators will render them incompatible even though the topology 
stays the same.


> Save and load checkpoints manually
> ----------------------------------
>
>                 Key: FLINK-2976
>                 URL: https://issues.apache.org/jira/browse/FLINK-2976
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Runtime
>    Affects Versions: 0.10.0
>            Reporter: Ufuk Celebi
>             Fix For: 1.0.0
>
>
> Currently, all checkpointed state is bound to a job. After the job finishes 
> all state is lost. In case of an HA cluster, jobs can live longer than the 
> cluster, but they still suffer from the same issue when they finish.
> Multiple users have requested the feature to manually save a checkpoint in 
> order to resume from it at a later point. This is especially important for 
> production environments. As an example, consider upgrading your existing 
> production Flink program. Currently, you loose all the state of your program. 
> With the proposed mechanism, it will be possible to save a checkpoint, stop 
> and update your program, and then continue your program with the  checkpoint.
> The required operations can be simple:
> saveCheckpoint(JobID) => checkpointID: long
> loadCheckpoint(JobID, long) => void
> For the initial version, I would apply the following restriction:
> - The topology needs to stay the same (JobGraph parallelism, etc.)
> A user can configure this behaviour via the environment like the 
> checkpointing interval. Furthermore, the user can trigger the save operation 
> via the command line at arbitrary times and load a checkpoint when submitting 
> a job, e.g.
> bin/flink checkpoint <JobID> => checkpointID: long 
> and
> bin/flink run --loadCheckpoint JobID [latest saved checkpoint]
> bin/flink run --loadCheckpoint (JobID,long) [specific saved checkpoint]
> As far as I can tell, the required mechanisms are similar to the ones 
> implemented for JobManager high availability. We need to make sure to persist 
> the CompletedCheckpoint instances as a pointer to the checkpoint state and to 
> *not* remove saved checkpoint state.
> On the client side, we need to give the job and its vertices the same IDs to 
> allow mapping the checkpoint state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to