GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/3772

    [FLINK-5869] [flip-1] Introduce abstraction for FailoverStrategy

    This PR has two sets of changes that I could not pull apart into separate 
pull requests.
    
    # (1) Termination Futures
    
    Prior to this change, the `ExecutionGraph` decided when cancellation and 
finishing was complete by tracking how many `ExecutionJobVertex` were in a 
terminal state.
    
    This abstraction is too inflexible to track when subregions of the graph 
are in a terminal state. To fix that, this change introduces a *termination 
future* on the `Execution`. Building conjunct futures of the termination 
futures, any observer can track when any number of vertices in a terminal state.
    
    The `ExecutionGraph` now also uses that model to track when cancellation of 
all vertices during failover is complete.
    
    # Local Failover and FailoverStrategy
    
    The `ExecutionGraph` now supports *local failover* and *global failover*. 
Quoting from the JavaDocs:
    
      - **Global failover** aborts the task executions for all vertices and 
restarts whole data flow graph from the last completed checkpoint. Global 
failover is considered the *fallback strategy* that is used when a local 
failover is unsuccessful, or when a issue is found in the state of the 
ExecutionGraph that could mark it as inconsistent (caused by a bug).
    
      - **Local failover** is triggered when an individual vertex execution (a 
task) fails. The local failover is coordinated by the `FailoverStrategy`. A 
local failover typically attempts to restart as little as possible, but as much 
as necessary.
    
      - Between local- and global failover, the global failover always takes 
precedence, because it is the core mechanism that the `ExecutionGraph` relies 
on to bring back consistency. The guard that, the `ExecutionGraph` maintains a 
**global modification version**, which is incremented with every global 
failover (and other global actions, like job cancellation, or terminal 
failure). Local failover is always scoped by the modification version that the 
execution graph had when the failover was triggered. If a new global 
modification version is reached during local failover (meaning there is a 
concurrent global failover), the failover strategy has to yield before the 
global failover.  
    
    ### Failover Strategies
    
    How exactly local failover happens is the concern of a pluggable 
`FailoverStrategy`.
    
      - The default failover strategy simply triggers a global failover
      - The pull request introduces a very simple *restart individual* failover 
strategy that restarts tasks without any connections to other tasks 
independently.
    
    # Tests
    
    This pull requests adds new tests for
    
      - The termination future abstraction
      - The global mod version handling
      - Proper handling of concurrent local- and global failover
    
    The pull requests rewrites various original tests. This was necessary, 
because the tests were using Mockito very heavily and re-building or whitebox 
testing specific behavior that was affected by the changes.
    
    The changes to the tests introduce simple ways to actually bring up a 
functional ExecutionGraph and walk it through its state transitions. That way 
the tests now rely minimally on mocking and actually test the proper 
ExecutionGraph, rather than a mock which is expected to behave similar to the 
proper class.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink flip-1-basics

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3772.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3772
    
----
commit ef7fd9964c1c74feb4641e57a138c54558b2449c
Author: Stephan Ewen <se...@apache.org>
Date:   2017-03-21T18:13:34Z

    [FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to 
ExecutionGraph
    
      - Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to 
differentiate from fine grained failures/recovery
      - Add base class for FailoverStrategy
      - Add default implementation (restart all tasks)
      - Add logic to load the failover strategy from the configuration

commit c04a8a312098fddce14e392b8d9dbf396b1df3f3
Author: Stephan Ewen <se...@apache.org>
Date:   2017-03-29T20:49:54Z

    [FLINK-6340] [flip-1] Add a termination future to the Execution

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to