[FLINK-5867] [flip-1] Support restarting only pipelined sub-regions of the 
ExecutionGraph on task failure


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4eb9e46e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4eb9e46e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4eb9e46e

Branch: refs/heads/master
Commit: 4eb9e46ee31e3f003c1a92322d13056cb4d4cfd5
Parents: b01d737
Author: shuai.xus <[email protected]>
Authored: Tue Apr 18 14:15:29 2017 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Sat May 6 21:40:04 2017 +0200

----------------------------------------------------------------------
 .../executiongraph/failover/FailoverRegion.java | 251 ++++++++++
 .../failover/FailoverStrategyLoader.java        |   8 +-
 .../RestartPipelinedRegionStrategy.java         | 206 ++++++++
 .../executiongraph/ExecutionGraphTestUtils.java |  38 +-
 .../executiongraph/FailoverRegionTest.java      | 490 +++++++++++++++++++
 .../PipelinedRegionFailoverConcurrencyTest.java | 353 +++++++++++++
 .../RestartPipelinedRegionStrategyTest.java     | 396 +++++++++++++++
 7 files changed, 1731 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
new file mode 100644
index 0000000..b36cfcf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * FailoverRegion manages the failover of a minimal pipeline connected sub 
graph.
+ * It will change from CREATED to CANCELING and then to CANCELLED and at last 
to RUNNING,
+ */
+public class FailoverRegion {
+
+       private static final AtomicReferenceFieldUpdater<FailoverRegion, 
JobStatus> STATE_UPDATER =
+                       
AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, 
"state");
+
+       /** The log object used for debugging. */
+       private static final Logger LOG = 
LoggerFactory.getLogger(FailoverRegion.class);
+
+       // 
------------------------------------------------------------------------
+
+       /** a unique id for debugging */
+       private final AbstractID id = new AbstractID();
+
+       private final ExecutionGraph executionGraph;
+
+       private final List<ExecutionVertex> connectedExecutionVertexes;
+
+       /** The executor that executes the recovery action after all vertices 
are in a */
+       private final Executor executor;
+
+       /** Current status of the job execution */
+       private volatile JobStatus state = JobStatus.RUNNING;
+
+
+       public FailoverRegion(ExecutionGraph executionGraph, Executor executor, 
List<ExecutionVertex> connectedExecutions) {
+               this.executionGraph = checkNotNull(executionGraph);
+               this.executor = checkNotNull(executor);
+               this.connectedExecutionVertexes = 
checkNotNull(connectedExecutions);
+
+               LOG.debug("Created failover region {} with vertices: {}", id, 
connectedExecutions);
+       }
+
+       public void onExecutionFail(Execution taskExecution, Throwable cause) {
+               // TODO: check if need to failover the preceding region
+               if (!executionGraph.getRestartStrategy().canRestart()) {
+                       // delegate the failure to a global fail that will 
check the restart strategy and not restart
+                       executionGraph.failGlobal(cause);
+               }
+               else {
+                       cancel(taskExecution.getGlobalModVersion());
+               }
+       }
+
+       private void allVerticesInTerminalState(long 
globalModVersionOfFailover) {
+               while (true) {
+                       JobStatus curStatus = this.state;
+                       if (curStatus.equals(JobStatus.CANCELLING)) {
+                               if (transitionState(curStatus, 
JobStatus.CANCELED)) {
+                                       reset(globalModVersionOfFailover);
+                                       break;
+                               }
+                       }
+                       else {
+                               LOG.info("FailoverRegion {} is {} when 
allVerticesInTerminalState.", id, state);
+                               break;
+                       }
+               }
+       }
+
+       public JobStatus getState() {
+               return state;
+       }
+
+       /**
+        * get all execution vertexes contained in this region
+        */
+       public List<ExecutionVertex> getAllExecutionVertexes() {
+               return connectedExecutionVertexes;
+       }
+
+       // Notice the region to failover, 
+       private void failover(long globalModVersionOfFailover) {
+               if (!executionGraph.getRestartStrategy().canRestart()) {
+                       executionGraph.failGlobal(new 
FlinkException("RestartStrategy validate fail"));
+               }
+               else {
+                       JobStatus curStatus = this.state;
+                       if (curStatus.equals(JobStatus.RUNNING)) {
+                               cancel(globalModVersionOfFailover);
+                       }
+                       else if (curStatus.equals(JobStatus.CANCELED)) {
+                               reset(globalModVersionOfFailover);
+                       }
+                       else {
+                               LOG.info("FailoverRegion {} is {} when notified 
to failover.", id, state);
+                       }
+               }
+       }
+
+       // cancel all executions in this sub graph
+       private void cancel(final long globalModVersionOfFailover) {
+               while (true) {
+                       JobStatus curStatus = this.state;
+                       if (curStatus.equals(JobStatus.RUNNING)) {
+                               if (transitionState(curStatus, 
JobStatus.CANCELLING)) {
+
+                                       // we build a future that is complete 
once all vertices have reached a terminal state
+                                       final ArrayList<Future<?>> futures = 
new ArrayList<>(connectedExecutionVertexes.size());
+
+                                       // cancel all tasks (that still need 
cancelling)
+                                       for (ExecutionVertex vertex : 
connectedExecutionVertexes) {
+                                               futures.add(vertex.cancel());
+                                       }
+
+                                       final FutureUtils.ConjunctFuture 
allTerminal = FutureUtils.combineAll(futures);
+                                       allTerminal.thenAcceptAsync(new 
AcceptFunction<Void>() {
+                                               @Override
+                                               public void accept(Void value) {
+                                                       
allVerticesInTerminalState(globalModVersionOfFailover);
+                                               }
+                                       }, executor);
+
+                                       break;
+                               }
+                       }
+                       else {
+                               LOG.info("FailoverRegion {} is {} when 
cancel.", id, state);
+                               break;
+                       }
+               }
+       }
+
+       // reset all executions in this sub graph
+       private void reset(long globalModVersionOfFailover) {
+               try {
+                       // reset all connected ExecutionVertexes
+                       final Collection<CoLocationGroup> colGroups = new 
HashSet<>();
+                       final long restartTimestamp = 
System.currentTimeMillis();
+
+                       for (ExecutionVertex ev : connectedExecutionVertexes) {
+                               CoLocationGroup cgroup = 
ev.getJobVertex().getCoLocationGroup();
+                               if (cgroup != null && 
!colGroups.contains(cgroup)){
+                                       cgroup.resetConstraints();
+                                       colGroups.add(cgroup);
+                               }
+
+                               ev.resetForNewExecution(restartTimestamp, 
globalModVersionOfFailover);
+                       }
+                       if (transitionState(JobStatus.CANCELED, 
JobStatus.CREATED)) {
+                               restart(globalModVersionOfFailover);
+                       }
+                       else {
+                               LOG.info("FailoverRegion {} switched from 
CANCELLING to CREATED fail, will fail this region again.", id);
+                               failover(globalModVersionOfFailover);
+                       }
+               }
+               catch (GlobalModVersionMismatch e) {
+                       // happens when a global recovery happens concurrently 
to the regional recovery
+                       // go back to a clean state
+                       state = JobStatus.RUNNING;
+               }
+               catch (Throwable e) {
+                       LOG.info("FailoverRegion {} reset fail, will failover 
again.", id);
+                       failover(globalModVersionOfFailover);
+               }
+       }
+
+       // restart all executions in this sub graph
+       private void restart(long globalModVersionOfFailover) {
+               try {
+                       if (transitionState(JobStatus.CREATED, 
JobStatus.RUNNING)) {
+                               // if we have checkpointed state, reload it 
into the executions
+                               //TODO: checkpoint support restore part 
ExecutionVertex cp
+                               /**
+                               if (executionGraph.getCheckpointCoordinator() 
!= null) {
+                                       
executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
+                                                       
connectedExecutionVertexes, false, false);
+                               }
+                               */
+                               //TODO, use restart strategy to schedule them.
+                               //restart all connected ExecutionVertexes
+                               for (ExecutionVertex ev : 
connectedExecutionVertexes) {
+                                       try {
+                                               ev.scheduleForExecution(
+                                                               
executionGraph.getSlotProvider(),
+                                                               
executionGraph.isQueuedSchedulingAllowed());
+                                       }
+                                       catch (Throwable e) {
+                                               
failover(globalModVersionOfFailover);
+                                       }
+                               }
+                       }
+                       else {
+                               LOG.info("FailoverRegion {} switched from 
CREATED to RUNNING fail, will fail this region again.", id);
+                               failover(globalModVersionOfFailover);
+                       }
+               } catch (Exception e) {
+                       LOG.info("FailoverRegion {} restart failed, failover 
again.", id, e);
+                       failover(globalModVersionOfFailover);
+               }
+       }
+
+       private boolean transitionState(JobStatus current, JobStatus newState) {
+               if (STATE_UPDATER.compareAndSet(this, current, newState)) {
+                       LOG.info("FailoverRegion {} switched from state {} to 
{}.", id, current, newState);
+                       return true;
+               }
+               else {
+                       return false;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
index f18a90f..8b6fa6e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
@@ -35,9 +35,12 @@ public class FailoverStrategyLoader {
        /** Config name for the {@link RestartAllStrategy} */
        public static final String FULL_RESTART_STRATEGY_NAME = "full";
 
-       /** Config name for the strategy that restarts individual tasks */
+       /** Config name for the {@link RestartIndividualStrategy} */
        public static final String INDIVIDUAL_RESTART_STRATEGY_NAME = 
"individual";
 
+       /** Config name for the {@link RestartPipelinedRegionStrategy} */
+       public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = 
"region";
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -59,6 +62,9 @@ public class FailoverStrategyLoader {
                                case FULL_RESTART_STRATEGY_NAME:
                                        return new RestartAllStrategy.Factory();
 
+                               case PIPELINED_REGION_RESTART_STRATEGY_NAME:
+                                       return new 
RestartPipelinedRegionStrategy.Factory();
+
                                case INDIVIDUAL_RESTART_STRATEGY_NAME:
                                        return new 
RestartIndividualStrategy.Factory();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
new file mode 100644
index 0000000..0a5baa8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A failover strategy that restarts regions of the ExecutionGraph. A region 
is defined
+ * by this strategy as the weakly connected component of tasks that 
communicate via pipelined
+ * data exchange.
+ */
+public class RestartPipelinedRegionStrategy extends FailoverStrategy {
+
+       /** The log object used for debugging. */
+       private static final Logger LOG = 
LoggerFactory.getLogger(RestartPipelinedRegionStrategy.class);
+
+       /** The execution graph on which this FailoverStrategy works */
+       private final ExecutionGraph executionGraph;
+
+       /** The executor used for future actions */
+       private final Executor executor;
+
+       /** Fast lookup from vertex to failover region */
+       private final HashMap<ExecutionVertex, FailoverRegion> vertexToRegion;
+
+
+       /**
+        * Creates a new failover strategy to restart pipelined regions that 
works on the given
+        * execution graph and uses the execution graph's future executor to 
call restart actions.
+        * 
+        * @param executionGraph The execution graph on which this 
FailoverStrategy will work
+        */
+       public RestartPipelinedRegionStrategy(ExecutionGraph executionGraph) {
+               this(executionGraph, executionGraph.getFutureExecutor());
+       }
+
+       /**
+        * Creates a new failover strategy to restart pipelined regions that 
works on the given
+        * execution graph and uses the given executor to call restart actions.
+        * 
+        * @param executionGraph The execution graph on which this 
FailoverStrategy will work
+        * @param executor  The executor used for future actions
+        */
+       public RestartPipelinedRegionStrategy(ExecutionGraph executionGraph, 
Executor executor) {
+               this.executionGraph = checkNotNull(executionGraph);
+               this.executor = checkNotNull(executor);
+               this.vertexToRegion = new HashMap<>();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  failover implementation
+       // 
------------------------------------------------------------------------ 
+
+       @Override
+       public void onTaskFailure(Execution taskExecution, Throwable cause) {
+               final ExecutionVertex ev = taskExecution.getVertex();
+               final FailoverRegion failoverRegion = vertexToRegion.get(ev);
+
+               if (failoverRegion == null) {
+                       executionGraph.failGlobal(new FlinkException(
+                                       "Can not find a failover region for the 
execution " + ev.getTaskNameWithSubtaskIndex()));
+               }
+               else {
+                       failoverRegion.onExecutionFail(taskExecution, cause);
+               }
+       }
+
+       @Override
+       public void notifyNewVertices(List<ExecutionJobVertex> 
newJobVerticesTopological) {
+               LOG.debug("Generating failover regions for {} new job 
vertices", newJobVerticesTopological.size());
+               generateAllFailoverRegion(newJobVerticesTopological);
+       }
+
+       @Override
+       public String getStrategyName() {
+               return "Pipelined Region Failover";
+       }
+
+       // Generate all the FailoverRegion from the new added job vertexes
+       private void generateAllFailoverRegion(List<ExecutionJobVertex> 
newJobVerticesTopological) {
+               for (ExecutionJobVertex ejv : newJobVerticesTopological) {
+                       for (ExecutionVertex ev : ejv.getTaskVertices()) {
+                               if (getFailoverRegion(ev) != null) {
+                                       continue;
+                               }
+                               List<ExecutionVertex> pipelinedExecutions = new 
ArrayList<>();
+                               List<ExecutionVertex> orgExecutions = new 
ArrayList<>();
+                               orgExecutions.add(ev);
+                               pipelinedExecutions.add(ev);
+                               getAllPipelinedConnectedVertexes(orgExecutions, 
pipelinedExecutions);
+
+                               FailoverRegion region = new 
FailoverRegion(executionGraph, executor, pipelinedExecutions);
+                               for (ExecutionVertex vertex : 
pipelinedExecutions) {
+                                       vertexToRegion.put(vertex, region);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Get all connected executions of the original executions
+        *
+        * @param orgExecutions  the original execution vertexes
+        * @param connectedExecutions  the total connected executions
+        */
+       private static void 
getAllPipelinedConnectedVertexes(List<ExecutionVertex> orgExecutions, 
List<ExecutionVertex> connectedExecutions) {
+               List<ExecutionVertex> newAddedExecutions = new ArrayList<>();
+               for (ExecutionVertex ev : orgExecutions) {
+                       // Add downstream ExecutionVertex
+                       for (IntermediateResultPartition irp : 
ev.getProducedPartitions().values()) {
+                               if 
(irp.getIntermediateResult().getResultType().isPipelined()) {
+                                       for (List<ExecutionEdge> consumers : 
irp.getConsumers()) {
+                                               for (ExecutionEdge consumer : 
consumers) {
+                                                       ExecutionVertex cev = 
consumer.getTarget();
+                                                       if 
(!connectedExecutions.contains(cev)) {
+                                                               
newAddedExecutions.add(cev);
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+                       if (!newAddedExecutions.isEmpty()) {
+                               connectedExecutions.addAll(newAddedExecutions);
+                               
getAllPipelinedConnectedVertexes(newAddedExecutions, connectedExecutions);
+                               newAddedExecutions.clear();
+                       }
+                       // Add upstream ExecutionVertex
+                       int inputNum = ev.getNumberOfInputs();
+                       for (int i = 0; i < inputNum; i++) {
+                               for (ExecutionEdge input : ev.getInputEdges(i)) 
{
+                                       if 
(input.getSource().getIntermediateResult().getResultType().isPipelined()) {
+                                               ExecutionVertex pev = 
input.getSource().getProducer();
+                                               if 
(!connectedExecutions.contains(pev)) {
+                                                       
newAddedExecutions.add(pev);
+                                               }
+                                       }
+                               }
+                       }
+                       if (!newAddedExecutions.isEmpty()) {
+                               connectedExecutions.addAll(0, 
newAddedExecutions);
+                               
getAllPipelinedConnectedVertexes(newAddedExecutions, connectedExecutions);
+                               newAddedExecutions.clear();
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  testing
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Finds the failover region that contains the given execution vertex.
+        */
+       @VisibleForTesting
+       public FailoverRegion getFailoverRegion(ExecutionVertex ev) {
+               return vertexToRegion.get(ev);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  factory
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Factory that instantiates the RestartPipelinedRegionStrategy.
+        */
+       public static class Factory implements FailoverStrategy.Factory {
+
+               @Override
+               public FailoverStrategy create(ExecutionGraph executionGraph) {
+                       return new 
RestartPipelinedRegionStrategy(executionGraph);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 0d7e389..140e984 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
@@ -137,6 +138,27 @@ public class ExecutionGraphTestUtils {
                }
        }
 
+       public static void waitUntilFailoverRegionState(FailoverRegion region, 
JobStatus status, long maxWaitMillis)
+                       throws TimeoutException {
+
+               checkNotNull(region);
+               checkNotNull(status);
+               checkArgument(maxWaitMillis >= 0);
+
+               // this is a poor implementation - we may want to improve it 
eventually
+               final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : 
System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+               while (region.getState() != status && System.nanoTime() < 
deadline) {
+                       try {
+                               Thread.sleep(2);
+                       } catch (InterruptedException ignored) {}
+               }
+
+               if (System.nanoTime() >= deadline) {
+                       throw new TimeoutException();
+               }
+       }
+
        /**
         * Takes all vertices in the given ExecutionGraph and switches their 
current
         * execution to RUNNING.
@@ -320,21 +342,17 @@ public class ExecutionGraphTestUtils {
 
                @Override
                public Object handleMessage(Object message) {
-                       Object result = null;
-                       if(message instanceof SubmitTask) {
+                       if (message instanceof SubmitTask) {
                                SubmitTask submitTask = (SubmitTask) message;
                                lastTDD = submitTask.tasks();
-
-                               result = Acknowledge.get();
+                               return Acknowledge.get();
                        } else if(message instanceof CancelTask) {
-                               CancelTask cancelTask = (CancelTask) message;
-
-                               result = Acknowledge.get();
+                               return Acknowledge.get();
                        } else if(message instanceof 
FailIntermediateResultPartitions) {
-                               result = new Object();
+                               return new Object();
+                       } else {
+                               return null;
                        }
-
-                       return result;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
new file mode 100644
index 0000000..beeefb1b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import 
org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilFailoverRegionState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FailoverRegionTest extends TestLogger {
+
+       /**
+        * Tests that a job only has one failover region and can recover from 
task failure successfully
+        * @throws Exception
+        */
+       @Test
+       public void testSingleRegionFailover() throws Exception {
+               RestartStrategy restartStrategy = new 
InfiniteDelayRestartStrategy(10);
+               ExecutionGraph eg = 
createSingleRegionExecutionGraph(restartStrategy);
+               RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+               ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev).getState());
+
+               ev.getCurrentExecutionAttempt().fail(new Exception("Test 
Exception"));
+               assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev).getState());
+
+               for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
+                       evs.getCurrentExecutionAttempt().cancelingComplete();
+               }
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev).getState());
+       }
+
+       /**
+        * Tests that a job has server failover regions and one region failover 
does not influence others
+        * 
+        * <pre>
+        *     (a1) ---> (b1) -+-> (c1) ---+-> (d1) 
+        *                     X          /
+        *     (a2) ---> (b2) -+-> (c2) -+
+        *
+        *           ^         ^         ^
+        *           |         |         |
+        *     (pipelined) (blocking) (pipelined)
+        *
+        * </pre>
+        */
+       @Test
+       public void testMultiRegionsFailover() throws Exception {
+               final JobID jobId = new JobID();
+               final String jobName = "Test Job Sample Name";
+
+               final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 
20);
+                               
+               JobVertex v1 = new JobVertex("vertex1");
+               JobVertex v2 = new JobVertex("vertex2");
+               JobVertex v3 = new JobVertex("vertex3");
+               JobVertex v4 = new JobVertex("vertex4");
+
+               v1.setParallelism(2);
+               v2.setParallelism(2);
+               v3.setParallelism(2);
+               v4.setParallelism(1);
+
+               v1.setInvokableClass(AbstractInvokable.class);
+               v2.setInvokableClass(AbstractInvokable.class);
+               v3.setInvokableClass(AbstractInvokable.class);
+               v4.setInvokableClass(AbstractInvokable.class);
+
+               v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
+               v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+               v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+
+               List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
+
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutor(),
+                               TestingUtils.defaultExecutor(),
+                               jobId,
+                               jobName,
+                               new Configuration(),
+                               new SerializedValue<>(new ExecutionConfig()),
+                               AkkaUtils.getDefaultTimeout(),
+                               new InfiniteDelayRestartStrategy(10),
+                               new FailoverPipelinedRegionWithDirectExecutor(),
+                               Collections.<BlobKey>emptyList(),
+                               Collections.<URL>emptyList(),
+                               slotProvider,
+                               ExecutionGraph.class.getClassLoader());
+
+               eg.attachJobGraph(ordered);
+
+               RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+               // the following two vertices are in the same failover region
+               ExecutionVertex ev11 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+               ExecutionVertex ev21 = 
eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+
+               // the following two vertices are in the same failover region
+               ExecutionVertex ev12 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+               ExecutionVertex ev22 = 
eg.getJobVertex(v2.getID()).getTaskVertices()[1];
+
+               // the following vertices are in one failover region
+               ExecutionVertex ev31 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+               ExecutionVertex ev32 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+               ExecutionVertex ev4 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+
+               eg.scheduleForExecution();
+
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
+
+               ev21.scheduleForExecution(slotProvider, true);
+               ev21.getCurrentExecutionAttempt().fail(new Exception("New 
fail"));
+               assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev11).getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev22).getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev31).getState());
+
+               ev11.getCurrentExecutionAttempt().cancelingComplete();
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev22).getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev31).getState());
+
+               ev11.getCurrentExecutionAttempt().markFinished();
+               ev21.getCurrentExecutionAttempt().markFinished();
+               ev22.scheduleForExecution(slotProvider, true);
+               ev22.getCurrentExecutionAttempt().markFinished();
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev22).getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev31).getState());
+
+               waitUntilExecutionState(ev31.getCurrentExecutionAttempt(), 
ExecutionState.DEPLOYING, 2000);
+               waitUntilExecutionState(ev32.getCurrentExecutionAttempt(), 
ExecutionState.DEPLOYING, 2000);
+
+               ev31.getCurrentExecutionAttempt().fail(new Exception("New 
fail"));
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev22).getState());
+               assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev31).getState());
+
+               ev32.getCurrentExecutionAttempt().cancelingComplete();
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev22).getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev31).getState());
+       }
+
+       /**
+        * Tests that when a task fail, and restart strategy doesn't support 
restarting, the job will go to failed
+        * @throws Exception
+        */
+       @Test
+       public void testNoManualRestart() throws Exception {
+               NoRestartStrategy restartStrategy = new NoRestartStrategy();
+               ExecutionGraph eg = 
createSingleRegionExecutionGraph(restartStrategy);
+
+               ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+               ev.fail(new Exception("Test Exception"));
+
+               for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
+                       evs.getCurrentExecutionAttempt().cancelingComplete();
+               }
+               assertEquals(JobStatus.FAILED, eg.getState());
+       }
+
+       /**
+        * Tests that two failover regions failover at the same time, they will 
not influence each orther
+        * @throws Exception
+        */
+       @Test
+       public void testMutilRegionFailoverAtSameTime() throws Exception {
+               Instance instance = ExecutionGraphTestUtils.getInstance(
+                               new ActorTaskManagerGateway(
+                                               new 
SimpleActorGateway(TestingUtils.directExecutionContext())),
+                               16);
+
+               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+               scheduler.newInstanceAvailable(instance);
+
+               final JobID jobId = new JobID();
+               final String jobName = "Test Job Sample Name";
+               final Configuration cfg = new Configuration();
+
+               JobVertex v1 = new JobVertex("vertex1");
+               JobVertex v2 = new JobVertex("vertex2");
+               JobVertex v3 = new JobVertex("vertex3");
+               JobVertex v4 = new JobVertex("vertex4");
+
+               v1.setParallelism(2);
+               v2.setParallelism(2);
+               v3.setParallelism(2);
+               v4.setParallelism(2);
+
+               v1.setInvokableClass(AbstractInvokable.class);
+               v2.setInvokableClass(AbstractInvokable.class);
+               v3.setInvokableClass(AbstractInvokable.class);
+               v4.setInvokableClass(AbstractInvokable.class);
+
+               v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+               v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+               v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+
+               List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
+
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutor(),
+                               TestingUtils.defaultExecutor(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               new SerializedValue<>(new ExecutionConfig()),
+                               AkkaUtils.getDefaultTimeout(),
+                               new InfiniteDelayRestartStrategy(10),
+                               new RestartPipelinedRegionStrategy.Factory(),
+                               Collections.<BlobKey>emptyList(),
+                               Collections.<URL>emptyList(),
+                               scheduler,
+                               ExecutionGraph.class.getClassLoader());
+               try {
+                       eg.attachJobGraph(ordered);
+               }
+               catch (JobException e) {
+                       e.printStackTrace();
+                       fail("Job failed with exception: " + e.getMessage());
+               }
+               eg.scheduleForExecution();
+               RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+               ExecutionVertex ev11 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+               ExecutionVertex ev12 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+               ExecutionVertex ev31 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+               ExecutionVertex ev32 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev31).getState());
+
+               ev11.getCurrentExecutionAttempt().fail(new Exception("new 
fail"));
+               ev31.getCurrentExecutionAttempt().fail(new Exception("new 
fail"));
+               assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev11).getState());
+               assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev31).getState());
+
+               ev32.getCurrentExecutionAttempt().cancelingComplete();
+               waitUntilFailoverRegionState(strategy.getFailoverRegion(ev31), 
JobStatus.RUNNING, 1000);
+
+               ev12.getCurrentExecutionAttempt().cancelingComplete();
+               waitUntilFailoverRegionState(strategy.getFailoverRegion(ev11), 
JobStatus.RUNNING, 1000);
+       }
+
+       /**
+        * Tests that if a task reports the result of its preceding task is 
failed,
+        * its preceding task will be considered as failed, and start to 
failover
+        * TODO: as the report part is not finished yet, this case is ignored 
temporarily
+        * @throws Exception
+        */
+       @Ignore
+       @Test
+       public void testSucceedingNoticePreceding() throws Exception {
+               Instance instance = ExecutionGraphTestUtils.getInstance(
+                               new ActorTaskManagerGateway(
+                                               new 
SimpleActorGateway(TestingUtils.directExecutionContext())),
+                               14);
+
+               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+               scheduler.newInstanceAvailable(instance);
+
+               final JobID jobId = new JobID();
+               final String jobName = "Test Job Sample Name";
+               final Configuration cfg = new Configuration();
+
+               JobVertex v1 = new JobVertex("vertex1");
+               JobVertex v2 = new JobVertex("vertex2");
+
+               v1.setParallelism(1);
+               v2.setParallelism(1);
+
+               v1.setInvokableClass(AbstractInvokable.class);
+               v2.setInvokableClass(AbstractInvokable.class);
+
+               v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+
+               List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, 
v2));
+
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutor(),
+                               TestingUtils.defaultExecutor(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               new SerializedValue<>(new ExecutionConfig()),
+                               AkkaUtils.getDefaultTimeout(),
+                               new InfiniteDelayRestartStrategy(10),
+                               new FailoverPipelinedRegionWithDirectExecutor(),
+                               Collections.<BlobKey>emptyList(),
+                               Collections.<URL>emptyList(),
+                               scheduler,
+                               ExecutionGraph.class.getClassLoader());
+               try {
+                       eg.attachJobGraph(ordered);
+               }
+               catch (JobException e) {
+                       e.printStackTrace();
+                       fail("Job failed with exception: " + e.getMessage());
+               }
+               eg.setScheduleMode(ScheduleMode.EAGER);
+               eg.scheduleForExecution();
+               RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+               ExecutionVertex ev11 = 
eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+               ExecutionVertex ev21 = 
eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+               ev21.getCurrentExecutionAttempt().fail(new Exception("Fail with 
v1"));
+
+               assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev21).getState());
+               assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev11).getState());
+       }
+
+       /**
+        * Tests that a new failure comes while the failover region is in 
CANCELLING
+        * @throws Exception
+        */
+       @Test
+       public void testFailWhileCancelling() throws Exception {
+               RestartStrategy restartStrategy = new 
InfiniteDelayRestartStrategy();
+               ExecutionGraph eg = 
createSingleRegionExecutionGraph(restartStrategy);
+               RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+               Iterator<ExecutionVertex> iter = 
eg.getAllExecutionVertices().iterator();
+               ExecutionVertex ev1 = iter.next();
+               ev1.getCurrentExecutionAttempt().switchToRunning();
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev1).getState());
+
+               ev1.getCurrentExecutionAttempt().fail(new Exception("new 
fail"));
+               assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev1).getState());
+
+               ExecutionVertex ev2 = iter.next();
+               ev2.getCurrentExecutionAttempt().fail(new Exception("new 
fail"));
+               assertEquals(JobStatus.RUNNING, eg.getState());
+               assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev1).getState());
+       }
+
+       /**
+        * Tests that a new failure comes while the failover region is 
restarting
+        * @throws Exception
+        */
+       @Test
+       public void testFailWhileRestarting() throws Exception {
+               RestartStrategy restartStrategy = new 
InfiniteDelayRestartStrategy();
+               ExecutionGraph eg = 
createSingleRegionExecutionGraph(restartStrategy);
+               RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+               Iterator<ExecutionVertex> iter = 
eg.getAllExecutionVertices().iterator();
+               ExecutionVertex ev1 = iter.next();
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev1).getState());
+
+               ev1.getCurrentExecutionAttempt().fail(new Exception("new 
fail"));
+               assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev1).getState());
+
+               for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
+                       evs.getCurrentExecutionAttempt().cancelingComplete();
+               }
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev1).getState());
+
+               ev1.getCurrentExecutionAttempt().fail(new Exception("new 
fail"));
+               assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev1).getState());
+       }
+
+       private static ExecutionGraph 
createSingleRegionExecutionGraph(RestartStrategy restartStrategy) throws 
Exception {
+               Instance instance = ExecutionGraphTestUtils.getInstance(
+                               new ActorTaskManagerGateway(
+                                               new 
SimpleActorGateway(TestingUtils.directExecutionContext())),
+                               14);
+
+               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+               scheduler.newInstanceAvailable(instance);
+
+               final JobID jobId = new JobID();
+               final String jobName = "Test Job Sample Name";
+               final Configuration cfg = new Configuration();
+
+               JobVertex v1 = new JobVertex("vertex1");
+               JobVertex v2 = new JobVertex("vertex2");
+               JobVertex v3 = new JobVertex("vertex3");
+
+               v1.setParallelism(3);
+               v2.setParallelism(2);
+               v3.setParallelism(2);
+
+               v1.setInvokableClass(AbstractInvokable.class);
+               v2.setInvokableClass(AbstractInvokable.class);
+               v3.setInvokableClass(AbstractInvokable.class);
+
+               v3.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+               v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+
+               List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, 
v3));
+
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutor(),
+                               TestingUtils.defaultExecutor(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               new SerializedValue<>(new ExecutionConfig()),
+                               AkkaUtils.getDefaultTimeout(),
+                               restartStrategy,
+                               new FailoverPipelinedRegionWithDirectExecutor(),
+                               Collections.<BlobKey>emptyList(),
+                               Collections.<URL>emptyList(),
+                               scheduler,
+                               ExecutionGraph.class.getClassLoader());
+               try {
+                       eg.attachJobGraph(ordered);
+               }
+               catch (JobException e) {
+                       e.printStackTrace();
+                       fail("Job failed with exception: " + e.getMessage());
+               }
+
+               eg.scheduleForExecution();
+               return eg;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A factory to create a RestartPipelinedRegionStrategy that uses a
+        * direct (synchronous) executor for easier testing.
+        */
+       private static class FailoverPipelinedRegionWithDirectExecutor 
implements Factory {
+
+               @Override
+               public FailoverStrategy create(ExecutionGraph executionGraph) {
+                       return new 
RestartPipelinedRegionStrategy(executionGraph, Executors.directExecutor());
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
new file mode 100644
index 0000000..635ec75
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import 
org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * These tests make sure that global failover (restart all) always takes 
precedence over
+ * local recovery strategies for the {@link RestartPipelinedRegionStrategy}
+ * 
+ * <p>This test must be in the package it resides in, because it uses 
package-private methods
+ * from the ExecutionGraph classes.
+ */
+public class PipelinedRegionFailoverConcurrencyTest {
+
+       /**
+        * Tests that a cancellation concurrent to a local failover leads to a 
properly
+        * cancelled state.
+        */
+       @Test
+       public void testCancelWhileInLocalFailover() throws Exception {
+
+               // the logic in this test is as follows:
+               //  - start a job
+               //  - cause a task failure and delay the local recovery action 
via the manual executor
+               //  - cancel the job to go into cancelling
+               //  - resume in local recovery action
+               //  - validate that this does in fact not start a new task, 
because the graph as a
+               //    whole should now be cancelled already
+
+               final JobID jid = new JobID();
+               final int parallelism = 2;
+
+               final ManuallyTriggeredDirectExecutor executor = new 
ManuallyTriggeredDirectExecutor();
+
+               final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, parallelism);
+
+               final ExecutionGraph graph = createSampleGraph(
+                               jid,
+                               new 
FailoverPipelinedRegionWithCustomExecutor(executor),
+                               new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, 0),
+                               slotProvider,
+                               2);
+
+               final ExecutionJobVertex ejv = 
graph.getVerticesTopologically().iterator().next();
+               final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+               final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+               graph.scheduleForExecution();
+               assertEquals(JobStatus.RUNNING, graph.getState());
+
+               // let one of the vertices fail - that triggers a local 
recovery action
+               vertex1.getCurrentExecutionAttempt().fail(new Exception("test 
failure"));
+               assertEquals(ExecutionState.FAILED, 
vertex1.getCurrentExecutionAttempt().getState());
+
+               // graph should still be running and the failover recovery 
action should be queued
+               assertEquals(JobStatus.RUNNING, graph.getState());
+               assertEquals(1, executor.numQueuedRunnables());
+
+               // now cancel the job
+               graph.cancel();
+
+               assertEquals(JobStatus.CANCELLING, graph.getState());
+               assertEquals(ExecutionState.FAILED, 
vertex1.getCurrentExecutionAttempt().getState());
+               assertEquals(ExecutionState.CANCELING, 
vertex2.getCurrentExecutionAttempt().getState());
+
+               // let the recovery action continue
+               executor.trigger();
+
+               // now report that cancelling is complete for the other vertex
+               vertex2.getCurrentExecutionAttempt().cancelingComplete();
+
+               assertEquals(JobStatus.CANCELED, graph.getState());
+               
assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal());
+               
assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal());
+
+               // make sure all slots are recycled
+               assertEquals(parallelism, 
slotProvider.getNumberOfAvailableSlots());
+       }
+
+       /**
+        * Tests that a terminal global failure concurrent to a local failover
+        * leads to a properly failed state.
+        */
+       @Test
+       public void testGlobalFailureConcurrentToLocalFailover() throws 
Exception {
+
+               // the logic in this test is as follows:
+               //  - start a job
+               //  - cause a task failure and delay the local recovery action 
via the manual executor
+               //  - cause a global failure
+               //  - resume in local recovery action
+               //  - validate that this does in fact not start a new task, 
because the graph as a
+               //    whole should now be terminally failed already
+
+               final JobID jid = new JobID();
+               final int parallelism = 2;
+
+               final ManuallyTriggeredDirectExecutor executor = new 
ManuallyTriggeredDirectExecutor();
+
+               final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, parallelism);
+
+               final ExecutionGraph graph = createSampleGraph(
+                               jid,
+                               new 
FailoverPipelinedRegionWithCustomExecutor(executor),
+                               new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, 0),
+                               slotProvider,
+                               2);
+
+               final ExecutionJobVertex ejv = 
graph.getVerticesTopologically().iterator().next();
+               final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+               final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+               graph.scheduleForExecution();
+               assertEquals(JobStatus.RUNNING, graph.getState());
+
+               // let one of the vertices fail - that triggers a local 
recovery action
+               vertex1.getCurrentExecutionAttempt().fail(new Exception("test 
failure"));
+               assertEquals(ExecutionState.FAILED, 
vertex1.getCurrentExecutionAttempt().getState());
+
+               // graph should still be running and the failover recovery 
action should be queued
+               assertEquals(JobStatus.RUNNING, graph.getState());
+               assertEquals(1, executor.numQueuedRunnables());
+
+               // now cancel the job
+               graph.failGlobal(new SuppressRestartsException(new 
Exception("test exception")));
+
+               assertEquals(JobStatus.FAILING, graph.getState());
+               assertEquals(ExecutionState.FAILED, 
vertex1.getCurrentExecutionAttempt().getState());
+               assertEquals(ExecutionState.CANCELING, 
vertex2.getCurrentExecutionAttempt().getState());
+
+               // let the recovery action continue
+               executor.trigger();
+
+               // now report that cancelling is complete for the other vertex
+               vertex2.getCurrentExecutionAttempt().cancelingComplete();
+
+               assertEquals(JobStatus.FAILED, graph.getState());
+               
assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal());
+               
assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal());
+
+               // make sure all slots are recycled
+               assertEquals(parallelism, 
slotProvider.getNumberOfAvailableSlots());
+       }
+
+       /**
+        * Tests that a local failover does not try to trump a global failover.
+        */
+       @Test
+       public void testGlobalRecoveryConcurrentToLocalRecovery() throws 
Exception {
+
+               // the logic in this test is as follows:
+               //  - start a job
+               //  - cause a task failure and delay the local recovery action 
via the manual executor
+               //  - cause a global failure that is recovering immediately
+               //  - resume in local recovery action
+               //  - validate that this does in fact not cause another task 
restart, because the global
+               //    recovery should already have restarted the task graph
+
+               final JobID jid = new JobID();
+               final int parallelism = 2;
+
+               final ManuallyTriggeredDirectExecutor executor = new 
ManuallyTriggeredDirectExecutor();
+
+               final SimpleSlotProvider slotProvider = new 
SimpleSlotProvider(jid, parallelism);
+
+               final ExecutionGraph graph = createSampleGraph(
+                               jid,
+                               new 
FailoverPipelinedRegionWithCustomExecutor(executor),
+                               new FixedDelayRestartStrategy(2, 0), // twice 
restart, no delay
+                               slotProvider,
+                               2);
+               RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy)graph.getFailoverStrategy();
+
+               final ExecutionJobVertex ejv = 
graph.getVerticesTopologically().iterator().next();
+               final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+               final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+               graph.scheduleForExecution();
+               assertEquals(JobStatus.RUNNING, graph.getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(vertex1).getState());
+
+               // let one of the vertices fail - that triggers a local 
recovery action
+               vertex2.getCurrentExecutionAttempt().fail(new Exception("test 
failure"));
+               assertEquals(ExecutionState.FAILED, 
vertex2.getCurrentExecutionAttempt().getState());
+               assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(vertex2).getState());
+
+               // graph should still be running and the failover recovery 
action should be queued
+               assertEquals(JobStatus.RUNNING, graph.getState());
+               assertEquals(1, executor.numQueuedRunnables());
+
+               // now cancel the job
+               graph.failGlobal(new Exception("test exception"));
+
+               assertEquals(JobStatus.FAILING, graph.getState());
+               assertEquals(ExecutionState.FAILED, 
vertex2.getCurrentExecutionAttempt().getState());
+               assertEquals(ExecutionState.CANCELING, 
vertex1.getCurrentExecutionAttempt().getState());
+
+               // now report that cancelling is complete for the other vertex
+               vertex1.getCurrentExecutionAttempt().cancelingComplete();
+
+               waitUntilJobStatus(graph, JobStatus.RUNNING, 1000);
+               assertEquals(JobStatus.RUNNING, graph.getState());
+
+               waitUntilExecutionState(vertex1.getCurrentExecutionAttempt(), 
ExecutionState.DEPLOYING, 1000);
+               waitUntilExecutionState(vertex2.getCurrentExecutionAttempt(), 
ExecutionState.DEPLOYING, 1000);
+               vertex1.getCurrentExecutionAttempt().switchToRunning();
+               vertex2.getCurrentExecutionAttempt().switchToRunning();
+               assertEquals(ExecutionState.RUNNING, 
vertex1.getCurrentExecutionAttempt().getState());
+               assertEquals(ExecutionState.RUNNING, 
vertex2.getCurrentExecutionAttempt().getState());
+
+               // let the recovery action continue - this should do nothing 
any more
+               executor.trigger();
+
+               // validate that the graph is still peachy
+               assertEquals(JobStatus.RUNNING, graph.getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(vertex1).getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(vertex2).getState());
+               assertEquals(ExecutionState.RUNNING, 
vertex1.getCurrentExecutionAttempt().getState());
+               assertEquals(ExecutionState.RUNNING, 
vertex2.getCurrentExecutionAttempt().getState());
+               assertEquals(1, 
vertex1.getCurrentExecutionAttempt().getAttemptNumber());
+               assertEquals(1, 
vertex2.getCurrentExecutionAttempt().getAttemptNumber());
+               assertEquals(1, vertex1.getCopyOfPriorExecutionsList().size());
+               assertEquals(1, vertex2.getCopyOfPriorExecutionsList().size());
+
+               // make sure all slots are in use
+               assertEquals(0, slotProvider.getNumberOfAvailableSlots());
+
+               // validate that a task failure then can be handled by the 
local recovery
+               vertex2.getCurrentExecutionAttempt().fail(new Exception("test 
failure"));
+               assertEquals(1, executor.numQueuedRunnables());
+
+               // let the local recovery action continue - this should recover 
the vertex2
+               executor.trigger();
+
+               waitUntilExecutionState(vertex2.getCurrentExecutionAttempt(), 
ExecutionState.DEPLOYING, 1000);
+               vertex2.getCurrentExecutionAttempt().switchToRunning();
+
+               // validate that the local recovery result
+               assertEquals(JobStatus.RUNNING, graph.getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(vertex1).getState());
+               assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(vertex2).getState());
+               assertEquals(ExecutionState.RUNNING, 
vertex1.getCurrentExecutionAttempt().getState());
+               assertEquals(ExecutionState.RUNNING, 
vertex2.getCurrentExecutionAttempt().getState());
+               assertEquals(1, 
vertex1.getCurrentExecutionAttempt().getAttemptNumber());
+               assertEquals(2, 
vertex2.getCurrentExecutionAttempt().getAttemptNumber());
+               assertEquals(1, vertex1.getCopyOfPriorExecutionsList().size());
+               assertEquals(2, vertex2.getCopyOfPriorExecutionsList().size());
+
+               // make sure all slots are in use
+               assertEquals(0, slotProvider.getNumberOfAvailableSlots());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       private ExecutionGraph createSampleGraph(
+                       JobID jid,
+                       Factory failoverStrategy,
+                       RestartStrategy restartStrategy,
+                       SlotProvider slotProvider,
+                       int parallelism) throws Exception {
+
+               // build a simple execution graph with on job vertex, 
parallelism 2
+               final ExecutionGraph graph = new ExecutionGraph(
+                               TestingUtils.defaultExecutor(),
+                               TestingUtils.defaultExecutor(),
+                               jid,
+                               "test job",
+                               new Configuration(),
+                               new SerializedValue<>(new ExecutionConfig()),
+                               Time.seconds(10),
+                               restartStrategy,
+                               failoverStrategy,
+                               Collections.<BlobKey>emptyList(),
+                               Collections.<URL>emptyList(),
+                               slotProvider,
+                               getClass().getClassLoader());
+
+               JobVertex jv = new JobVertex("test vertex");
+               jv.setInvokableClass(NoOpInvokable.class);
+               jv.setParallelism(parallelism);
+
+               JobGraph jg = new JobGraph(jid, "testjob", jv);
+               
graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
+
+               return graph;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class FailoverPipelinedRegionWithCustomExecutor 
implements Factory {
+
+               private final Executor executor;
+
+               FailoverPipelinedRegionWithCustomExecutor(Executor executor) {
+                       this.executor = executor;
+               }
+
+               @Override
+               public FailoverStrategy create(ExecutionGraph executionGraph) {
+                       return new 
RestartPipelinedRegionStrategy(executionGraph, executor);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
new file mode 100644
index 0000000..45aabe6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
+import 
org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class RestartPipelinedRegionStrategyTest {
+
+       /**
+        * Creates a JobGraph of the following form:
+        * 
+        * <pre>
+        *  v1--->v2-->\
+        *              \
+        *               v4 --->\
+        *        ----->/        \
+        *  v3-->/                v5
+        *       \               /
+        *        ------------->/
+        * </pre>
+        */
+       @Test
+       public void testSimpleFailoverRegion() throws Exception {
+               
+               final JobID jobId = new JobID();
+               final String jobName = "Test Job Sample Name";
+               final Configuration cfg = new Configuration();
+               
+               JobVertex v1 = new JobVertex("vertex1");
+               JobVertex v2 = new JobVertex("vertex2");
+               JobVertex v3 = new JobVertex("vertex3");
+               JobVertex v4 = new JobVertex("vertex4");
+               JobVertex v5 = new JobVertex("vertex5");
+               
+               v1.setParallelism(5);
+               v2.setParallelism(7);
+               v3.setParallelism(2);
+               v4.setParallelism(11);
+               v5.setParallelism(4);
+
+               v1.setInvokableClass(AbstractInvokable.class);
+               v2.setInvokableClass(AbstractInvokable.class);
+               v3.setInvokableClass(AbstractInvokable.class);
+               v4.setInvokableClass(AbstractInvokable.class);
+               v5.setInvokableClass(AbstractInvokable.class);
+
+               v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+               v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+               v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+               v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+               v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+               
+               List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+
+        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+               ExecutionGraph eg = new ExecutionGraph(
+                       TestingUtils.defaultExecutor(),
+                       TestingUtils.defaultExecutor(),
+                       jobId, 
+                       jobName, 
+                       cfg,
+                       new SerializedValue<>(new ExecutionConfig()),
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy(),
+            new RestartPipelinedRegionStrategy.Factory(),
+            Collections.<BlobKey>emptyList(),
+            Collections.<URL>emptyList(),
+            scheduler,
+            ExecutionGraph.class.getClassLoader());
+               try {
+                       eg.attachJobGraph(ordered);
+               }
+               catch (JobException e) {
+                       e.printStackTrace();
+                       fail("Job failed with exception: " + e.getMessage());
+               }
+
+        RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
+        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
+        ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
+        ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
+        ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID());
+        FailoverRegion region1 = 
strategy.getFailoverRegion(ejv1.getTaskVertices()[2]);
+        FailoverRegion region2 = 
strategy.getFailoverRegion(ejv2.getTaskVertices()[3]);
+        FailoverRegion region3 = 
strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
+        FailoverRegion region4 = 
strategy.getFailoverRegion(ejv4.getTaskVertices()[4]);
+        FailoverRegion region5 = 
strategy.getFailoverRegion(ejv5.getTaskVertices()[1]);
+
+        assertEquals(region1, region2);
+        assertEquals(region3, region2);
+        assertEquals(region4, region2);
+        assertEquals(region5, region2);
+       }
+
+    /**
+     * Creates a JobGraph of the following form:
+     *
+     * <pre>
+     *  v2 ------->\
+     *              \
+     *  v1---------> v4 --->|\
+     *                        \
+     *                        v5
+     *                       /
+     *  v3--------------->|/
+     * </pre>
+     */
+       @Test
+       public void testMultipleFailoverRegions() throws Exception {
+               final JobID jobId = new JobID();
+               final String jobName = "Test Job Sample Name";
+               final Configuration cfg = new Configuration();
+
+        JobVertex v1 = new JobVertex("vertex1");
+        JobVertex v2 = new JobVertex("vertex2");
+        JobVertex v3 = new JobVertex("vertex3");
+        JobVertex v4 = new JobVertex("vertex4");
+        JobVertex v5 = new JobVertex("vertex5");
+
+        v1.setParallelism(3);
+        v2.setParallelism(2);
+        v3.setParallelism(2);
+        v4.setParallelism(5);
+        v5.setParallelism(2);
+
+        v1.setInvokableClass(AbstractInvokable.class);
+        v2.setInvokableClass(AbstractInvokable.class);
+        v3.setInvokableClass(AbstractInvokable.class);
+        v4.setInvokableClass(AbstractInvokable.class);
+        v5.setInvokableClass(AbstractInvokable.class);
+
+        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+        v4.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+        v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+
+        List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, 
v2, v3, v4, v5));
+
+        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+               ExecutionGraph eg = new ExecutionGraph(
+                       TestingUtils.defaultExecutor(),
+                       TestingUtils.defaultExecutor(),
+                       jobId, 
+                       jobName, 
+                       cfg,
+                       new SerializedValue<>(new ExecutionConfig()),
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy(),
+            new RestartPipelinedRegionStrategy.Factory(),
+            Collections.<BlobKey>emptyList(),
+            Collections.<URL>emptyList(),
+            scheduler,
+            ExecutionGraph.class.getClassLoader());
+               try {
+                       eg.attachJobGraph(ordered);
+               }
+               catch (JobException e) {
+                       e.printStackTrace();
+                       fail("Job failed with exception: " + e.getMessage());
+               }
+
+        // All in one failover region
+        RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
+        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
+        ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
+        ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
+        ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID());
+        FailoverRegion region1 = 
strategy.getFailoverRegion(ejv1.getTaskVertices()[1]);
+        FailoverRegion region2 = 
strategy.getFailoverRegion(ejv2.getTaskVertices()[0]);
+        FailoverRegion region4 = 
strategy.getFailoverRegion(ejv4.getTaskVertices()[3]);
+        FailoverRegion region31 = 
strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
+        FailoverRegion region32 = 
strategy.getFailoverRegion(ejv3.getTaskVertices()[1]);
+        FailoverRegion region51 = 
strategy.getFailoverRegion(ejv5.getTaskVertices()[0]);
+        FailoverRegion region52 = 
strategy.getFailoverRegion(ejv5.getTaskVertices()[1]);
+
+        //There should be 5 failover regions. v1 v2 v4 in one, v3 has two, v5 
has two
+        assertEquals(region1, region2);
+        assertEquals(region2, region4);
+        assertFalse(region31.equals(region32));
+        assertFalse(region51.equals(region52));
+       }
+
+    /**
+     * Creates a JobGraph of the following form:
+     *
+     * <pre>
+     *  v1--->v2-->\
+     *              \
+     *               v4 --->|\
+     *        ----->/        \
+     *  v3-->/                v5
+     *       \               /
+     *        ------------->/
+     * </pre>
+     */
+       @Test
+       public void testSingleRegionWithMixedInput() throws Exception {
+               final JobID jobId = new JobID();
+               final String jobName = "Test Job Sample Name";
+               final Configuration cfg = new Configuration();
+
+        JobVertex v1 = new JobVertex("vertex1");
+        JobVertex v2 = new JobVertex("vertex2");
+        JobVertex v3 = new JobVertex("vertex3");
+        JobVertex v4 = new JobVertex("vertex4");
+        JobVertex v5 = new JobVertex("vertex5");
+
+        v1.setParallelism(3);
+        v2.setParallelism(2);
+        v3.setParallelism(2);
+        v4.setParallelism(5);
+        v5.setParallelism(2);
+
+        v1.setInvokableClass(AbstractInvokable.class);
+        v2.setInvokableClass(AbstractInvokable.class);
+        v3.setInvokableClass(AbstractInvokable.class);
+        v4.setInvokableClass(AbstractInvokable.class);
+        v5.setInvokableClass(AbstractInvokable.class);
+
+        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+        v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+
+        List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, 
v2, v3, v4, v5));
+
+        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+               ExecutionGraph eg = new ExecutionGraph(
+                       TestingUtils.defaultExecutor(),
+                       TestingUtils.defaultExecutor(),
+                       jobId, 
+                       jobName, 
+                       cfg,
+                       new SerializedValue<>(new ExecutionConfig()),
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy(),
+            new RestartPipelinedRegionStrategy.Factory(),
+            Collections.<BlobKey>emptyList(),
+            Collections.<URL>emptyList(),
+            scheduler,
+            ExecutionGraph.class.getClassLoader());
+               try {
+                       eg.attachJobGraph(ordered);
+               }
+               catch (JobException e) {
+                       e.printStackTrace();
+                       fail("Job failed with exception: " + e.getMessage());
+               }
+
+        // All in one failover region
+        RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
+        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
+        ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
+        ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
+        ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID());
+        FailoverRegion region1 = 
strategy.getFailoverRegion(ejv1.getTaskVertices()[1]);
+        FailoverRegion region2 = 
strategy.getFailoverRegion(ejv2.getTaskVertices()[0]);
+        FailoverRegion region4 = 
strategy.getFailoverRegion(ejv4.getTaskVertices()[3]);
+        FailoverRegion region3 = 
strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
+        FailoverRegion region5 = 
strategy.getFailoverRegion(ejv5.getTaskVertices()[1]);
+
+        assertEquals(region1, region2);
+        assertEquals(region2, region4);
+        assertEquals(region3, region2);
+        assertEquals(region1, region5);
+    }
+
+    /**
+     * Creates a JobGraph of the following form:
+     *
+     * <pre>
+     *  v1-->v2-->|\
+     *              \
+     *               v4
+     *             /
+     *  v3------>/
+     * </pre>
+     */
+       @Test
+       public void testMultiRegionNotAllToAll() throws Exception {
+               final JobID jobId = new JobID();
+               final String jobName = "Test Job Sample Name";
+               final Configuration cfg = new Configuration();
+
+        JobVertex v1 = new JobVertex("vertex1");
+        JobVertex v2 = new JobVertex("vertex2");
+        JobVertex v3 = new JobVertex("vertex3");
+        JobVertex v4 = new JobVertex("vertex4");
+        JobVertex v5 = new JobVertex("vertex5");
+
+        v1.setParallelism(2);
+        v2.setParallelism(2);
+        v3.setParallelism(5);
+        v4.setParallelism(5);
+
+        v1.setInvokableClass(AbstractInvokable.class);
+        v2.setInvokableClass(AbstractInvokable.class);
+        v3.setInvokableClass(AbstractInvokable.class);
+        v4.setInvokableClass(AbstractInvokable.class);
+
+        v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
+        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+
+        List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, 
v2, v3, v4));
+
+        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+        ExecutionGraph eg = new ExecutionGraph(
+                       TestingUtils.defaultExecutor(),
+                       TestingUtils.defaultExecutor(),
+                       jobId, 
+                       jobName, 
+                       cfg,
+                       new SerializedValue<>(new ExecutionConfig()),
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy(),
+            new RestartPipelinedRegionStrategy.Factory(),
+            Collections.<BlobKey>emptyList(),
+            Collections.<URL>emptyList(),
+            scheduler,
+            ExecutionGraph.class.getClassLoader());
+               try {
+                       eg.attachJobGraph(ordered);
+               }
+               catch (JobException e) {
+                       e.printStackTrace();
+                       fail("Job failed with exception: " + e.getMessage());
+               }
+
+        // All in one failover region
+        RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
+        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
+        ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
+        ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
+        FailoverRegion region11 = 
strategy.getFailoverRegion(ejv1.getTaskVertices()[0]);
+        FailoverRegion region12 = 
strategy.getFailoverRegion(ejv1.getTaskVertices()[1]);
+        FailoverRegion region21 = 
strategy.getFailoverRegion(ejv2.getTaskVertices()[0]);
+        FailoverRegion region22 = 
strategy.getFailoverRegion(ejv2.getTaskVertices()[1]);
+        FailoverRegion region3 = 
strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
+        FailoverRegion region4 = 
strategy.getFailoverRegion(ejv4.getTaskVertices()[3]);
+
+        //There should be 3 failover regions. v1 v2 in two, v3 and v4 in one
+        assertEquals(region11, region21);
+        assertEquals(region12, region22);
+        assertFalse(region11.equals(region12));
+        assertFalse(region3.equals(region4));
+       }
+
+}

Reply via email to