[FLINK-5867] [flip-1] Support restarting only pipelined sub-regions of the ExecutionGraph on task failure
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4eb9e46e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4eb9e46e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4eb9e46e Branch: refs/heads/master Commit: 4eb9e46ee31e3f003c1a92322d13056cb4d4cfd5 Parents: b01d737 Author: shuai.xus <[email protected]> Authored: Tue Apr 18 14:15:29 2017 +0800 Committer: Stephan Ewen <[email protected]> Committed: Sat May 6 21:40:04 2017 +0200 ---------------------------------------------------------------------- .../executiongraph/failover/FailoverRegion.java | 251 ++++++++++ .../failover/FailoverStrategyLoader.java | 8 +- .../RestartPipelinedRegionStrategy.java | 206 ++++++++ .../executiongraph/ExecutionGraphTestUtils.java | 38 +- .../executiongraph/FailoverRegionTest.java | 490 +++++++++++++++++++ .../PipelinedRegionFailoverConcurrencyTest.java | 353 +++++++++++++ .../RestartPipelinedRegionStrategyTest.java | 396 +++++++++++++++ 7 files changed, 1731 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java new file mode 100644 index 0000000..b36cfcf --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * FailoverRegion manages the failover of a minimal pipeline connected sub graph. + * It will change from CREATED to CANCELING and then to CANCELLED and at last to RUNNING, + */ +public class FailoverRegion { + + private static final AtomicReferenceFieldUpdater<FailoverRegion, JobStatus> STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, "state"); + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(FailoverRegion.class); + + // ------------------------------------------------------------------------ + + /** a unique id for debugging */ + private final AbstractID id = new AbstractID(); + + private final ExecutionGraph executionGraph; + + private final List<ExecutionVertex> connectedExecutionVertexes; + + /** The executor that executes the recovery action after all vertices are in a */ + private final Executor executor; + + /** Current status of the job execution */ + private volatile JobStatus state = JobStatus.RUNNING; + + + public FailoverRegion(ExecutionGraph executionGraph, Executor executor, List<ExecutionVertex> connectedExecutions) { + this.executionGraph = checkNotNull(executionGraph); + this.executor = checkNotNull(executor); + this.connectedExecutionVertexes = checkNotNull(connectedExecutions); + + LOG.debug("Created failover region {} with vertices: {}", id, connectedExecutions); + } + + public void onExecutionFail(Execution taskExecution, Throwable cause) { + // TODO: check if need to failover the preceding region + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + executionGraph.failGlobal(cause); + } + else { + cancel(taskExecution.getGlobalModVersion()); + } + } + + private void allVerticesInTerminalState(long globalModVersionOfFailover) { + while (true) { + JobStatus curStatus = this.state; + if (curStatus.equals(JobStatus.CANCELLING)) { + if (transitionState(curStatus, JobStatus.CANCELED)) { + reset(globalModVersionOfFailover); + break; + } + } + else { + LOG.info("FailoverRegion {} is {} when allVerticesInTerminalState.", id, state); + break; + } + } + } + + public JobStatus getState() { + return state; + } + + /** + * get all execution vertexes contained in this region + */ + public List<ExecutionVertex> getAllExecutionVertexes() { + return connectedExecutionVertexes; + } + + // Notice the region to failover, + private void failover(long globalModVersionOfFailover) { + if (!executionGraph.getRestartStrategy().canRestart()) { + executionGraph.failGlobal(new FlinkException("RestartStrategy validate fail")); + } + else { + JobStatus curStatus = this.state; + if (curStatus.equals(JobStatus.RUNNING)) { + cancel(globalModVersionOfFailover); + } + else if (curStatus.equals(JobStatus.CANCELED)) { + reset(globalModVersionOfFailover); + } + else { + LOG.info("FailoverRegion {} is {} when notified to failover.", id, state); + } + } + } + + // cancel all executions in this sub graph + private void cancel(final long globalModVersionOfFailover) { + while (true) { + JobStatus curStatus = this.state; + if (curStatus.equals(JobStatus.RUNNING)) { + if (transitionState(curStatus, JobStatus.CANCELLING)) { + + // we build a future that is complete once all vertices have reached a terminal state + final ArrayList<Future<?>> futures = new ArrayList<>(connectedExecutionVertexes.size()); + + // cancel all tasks (that still need cancelling) + for (ExecutionVertex vertex : connectedExecutionVertexes) { + futures.add(vertex.cancel()); + } + + final FutureUtils.ConjunctFuture allTerminal = FutureUtils.combineAll(futures); + allTerminal.thenAcceptAsync(new AcceptFunction<Void>() { + @Override + public void accept(Void value) { + allVerticesInTerminalState(globalModVersionOfFailover); + } + }, executor); + + break; + } + } + else { + LOG.info("FailoverRegion {} is {} when cancel.", id, state); + break; + } + } + } + + // reset all executions in this sub graph + private void reset(long globalModVersionOfFailover) { + try { + // reset all connected ExecutionVertexes + final Collection<CoLocationGroup> colGroups = new HashSet<>(); + final long restartTimestamp = System.currentTimeMillis(); + + for (ExecutionVertex ev : connectedExecutionVertexes) { + CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup(); + if (cgroup != null && !colGroups.contains(cgroup)){ + cgroup.resetConstraints(); + colGroups.add(cgroup); + } + + ev.resetForNewExecution(restartTimestamp, globalModVersionOfFailover); + } + if (transitionState(JobStatus.CANCELED, JobStatus.CREATED)) { + restart(globalModVersionOfFailover); + } + else { + LOG.info("FailoverRegion {} switched from CANCELLING to CREATED fail, will fail this region again.", id); + failover(globalModVersionOfFailover); + } + } + catch (GlobalModVersionMismatch e) { + // happens when a global recovery happens concurrently to the regional recovery + // go back to a clean state + state = JobStatus.RUNNING; + } + catch (Throwable e) { + LOG.info("FailoverRegion {} reset fail, will failover again.", id); + failover(globalModVersionOfFailover); + } + } + + // restart all executions in this sub graph + private void restart(long globalModVersionOfFailover) { + try { + if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { + // if we have checkpointed state, reload it into the executions + //TODO: checkpoint support restore part ExecutionVertex cp + /** + if (executionGraph.getCheckpointCoordinator() != null) { + executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState( + connectedExecutionVertexes, false, false); + } + */ + //TODO, use restart strategy to schedule them. + //restart all connected ExecutionVertexes + for (ExecutionVertex ev : connectedExecutionVertexes) { + try { + ev.scheduleForExecution( + executionGraph.getSlotProvider(), + executionGraph.isQueuedSchedulingAllowed()); + } + catch (Throwable e) { + failover(globalModVersionOfFailover); + } + } + } + else { + LOG.info("FailoverRegion {} switched from CREATED to RUNNING fail, will fail this region again.", id); + failover(globalModVersionOfFailover); + } + } catch (Exception e) { + LOG.info("FailoverRegion {} restart failed, failover again.", id, e); + failover(globalModVersionOfFailover); + } + } + + private boolean transitionState(JobStatus current, JobStatus newState) { + if (STATE_UPDATER.compareAndSet(this, current, newState)) { + LOG.info("FailoverRegion {} switched from state {} to {}.", id, current, newState); + return true; + } + else { + return false; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java index f18a90f..8b6fa6e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java @@ -35,9 +35,12 @@ public class FailoverStrategyLoader { /** Config name for the {@link RestartAllStrategy} */ public static final String FULL_RESTART_STRATEGY_NAME = "full"; - /** Config name for the strategy that restarts individual tasks */ + /** Config name for the {@link RestartIndividualStrategy} */ public static final String INDIVIDUAL_RESTART_STRATEGY_NAME = "individual"; + /** Config name for the {@link RestartPipelinedRegionStrategy} */ + public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = "region"; + // ------------------------------------------------------------------------ /** @@ -59,6 +62,9 @@ public class FailoverStrategyLoader { case FULL_RESTART_STRATEGY_NAME: return new RestartAllStrategy.Factory(); + case PIPELINED_REGION_RESTART_STRATEGY_NAME: + return new RestartPipelinedRegionStrategy.Factory(); + case INDIVIDUAL_RESTART_STRATEGY_NAME: return new RestartIndividualStrategy.Factory(); http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java new file mode 100644 index 0000000..0a5baa8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A failover strategy that restarts regions of the ExecutionGraph. A region is defined + * by this strategy as the weakly connected component of tasks that communicate via pipelined + * data exchange. + */ +public class RestartPipelinedRegionStrategy extends FailoverStrategy { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(RestartPipelinedRegionStrategy.class); + + /** The execution graph on which this FailoverStrategy works */ + private final ExecutionGraph executionGraph; + + /** The executor used for future actions */ + private final Executor executor; + + /** Fast lookup from vertex to failover region */ + private final HashMap<ExecutionVertex, FailoverRegion> vertexToRegion; + + + /** + * Creates a new failover strategy to restart pipelined regions that works on the given + * execution graph and uses the execution graph's future executor to call restart actions. + * + * @param executionGraph The execution graph on which this FailoverStrategy will work + */ + public RestartPipelinedRegionStrategy(ExecutionGraph executionGraph) { + this(executionGraph, executionGraph.getFutureExecutor()); + } + + /** + * Creates a new failover strategy to restart pipelined regions that works on the given + * execution graph and uses the given executor to call restart actions. + * + * @param executionGraph The execution graph on which this FailoverStrategy will work + * @param executor The executor used for future actions + */ + public RestartPipelinedRegionStrategy(ExecutionGraph executionGraph, Executor executor) { + this.executionGraph = checkNotNull(executionGraph); + this.executor = checkNotNull(executor); + this.vertexToRegion = new HashMap<>(); + } + + // ------------------------------------------------------------------------ + // failover implementation + // ------------------------------------------------------------------------ + + @Override + public void onTaskFailure(Execution taskExecution, Throwable cause) { + final ExecutionVertex ev = taskExecution.getVertex(); + final FailoverRegion failoverRegion = vertexToRegion.get(ev); + + if (failoverRegion == null) { + executionGraph.failGlobal(new FlinkException( + "Can not find a failover region for the execution " + ev.getTaskNameWithSubtaskIndex())); + } + else { + failoverRegion.onExecutionFail(taskExecution, cause); + } + } + + @Override + public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) { + LOG.debug("Generating failover regions for {} new job vertices", newJobVerticesTopological.size()); + generateAllFailoverRegion(newJobVerticesTopological); + } + + @Override + public String getStrategyName() { + return "Pipelined Region Failover"; + } + + // Generate all the FailoverRegion from the new added job vertexes + private void generateAllFailoverRegion(List<ExecutionJobVertex> newJobVerticesTopological) { + for (ExecutionJobVertex ejv : newJobVerticesTopological) { + for (ExecutionVertex ev : ejv.getTaskVertices()) { + if (getFailoverRegion(ev) != null) { + continue; + } + List<ExecutionVertex> pipelinedExecutions = new ArrayList<>(); + List<ExecutionVertex> orgExecutions = new ArrayList<>(); + orgExecutions.add(ev); + pipelinedExecutions.add(ev); + getAllPipelinedConnectedVertexes(orgExecutions, pipelinedExecutions); + + FailoverRegion region = new FailoverRegion(executionGraph, executor, pipelinedExecutions); + for (ExecutionVertex vertex : pipelinedExecutions) { + vertexToRegion.put(vertex, region); + } + } + } + } + + /** + * Get all connected executions of the original executions + * + * @param orgExecutions the original execution vertexes + * @param connectedExecutions the total connected executions + */ + private static void getAllPipelinedConnectedVertexes(List<ExecutionVertex> orgExecutions, List<ExecutionVertex> connectedExecutions) { + List<ExecutionVertex> newAddedExecutions = new ArrayList<>(); + for (ExecutionVertex ev : orgExecutions) { + // Add downstream ExecutionVertex + for (IntermediateResultPartition irp : ev.getProducedPartitions().values()) { + if (irp.getIntermediateResult().getResultType().isPipelined()) { + for (List<ExecutionEdge> consumers : irp.getConsumers()) { + for (ExecutionEdge consumer : consumers) { + ExecutionVertex cev = consumer.getTarget(); + if (!connectedExecutions.contains(cev)) { + newAddedExecutions.add(cev); + } + } + } + } + } + if (!newAddedExecutions.isEmpty()) { + connectedExecutions.addAll(newAddedExecutions); + getAllPipelinedConnectedVertexes(newAddedExecutions, connectedExecutions); + newAddedExecutions.clear(); + } + // Add upstream ExecutionVertex + int inputNum = ev.getNumberOfInputs(); + for (int i = 0; i < inputNum; i++) { + for (ExecutionEdge input : ev.getInputEdges(i)) { + if (input.getSource().getIntermediateResult().getResultType().isPipelined()) { + ExecutionVertex pev = input.getSource().getProducer(); + if (!connectedExecutions.contains(pev)) { + newAddedExecutions.add(pev); + } + } + } + } + if (!newAddedExecutions.isEmpty()) { + connectedExecutions.addAll(0, newAddedExecutions); + getAllPipelinedConnectedVertexes(newAddedExecutions, connectedExecutions); + newAddedExecutions.clear(); + } + } + } + + // ------------------------------------------------------------------------ + // testing + // ------------------------------------------------------------------------ + + /** + * Finds the failover region that contains the given execution vertex. + */ + @VisibleForTesting + public FailoverRegion getFailoverRegion(ExecutionVertex ev) { + return vertexToRegion.get(ev); + } + + // ------------------------------------------------------------------------ + // factory + // ------------------------------------------------------------------------ + + /** + * Factory that instantiates the RestartPipelinedRegionStrategy. + */ + public static class Factory implements FailoverStrategy.Factory { + + @Override + public FailoverStrategy create(ExecutionGraph executionGraph) { + return new RestartPipelinedRegionStrategy(executionGraph); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 0d7e389..140e984 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.FailoverRegion; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; @@ -137,6 +138,27 @@ public class ExecutionGraphTestUtils { } } + public static void waitUntilFailoverRegionState(FailoverRegion region, JobStatus status, long maxWaitMillis) + throws TimeoutException { + + checkNotNull(region); + checkNotNull(status); + checkArgument(maxWaitMillis >= 0); + + // this is a poor implementation - we may want to improve it eventually + final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : System.nanoTime() + (maxWaitMillis * 1_000_000); + + while (region.getState() != status && System.nanoTime() < deadline) { + try { + Thread.sleep(2); + } catch (InterruptedException ignored) {} + } + + if (System.nanoTime() >= deadline) { + throw new TimeoutException(); + } + } + /** * Takes all vertices in the given ExecutionGraph and switches their current * execution to RUNNING. @@ -320,21 +342,17 @@ public class ExecutionGraphTestUtils { @Override public Object handleMessage(Object message) { - Object result = null; - if(message instanceof SubmitTask) { + if (message instanceof SubmitTask) { SubmitTask submitTask = (SubmitTask) message; lastTDD = submitTask.tasks(); - - result = Acknowledge.get(); + return Acknowledge.get(); } else if(message instanceof CancelTask) { - CancelTask cancelTask = (CancelTask) message; - - result = Acknowledge.get(); + return Acknowledge.get(); } else if(message instanceof FailIntermediateResultPartitions) { - result = new Object(); + return new Object(); + } else { + return null; } - - return result; } } http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java new file mode 100644 index 0000000..beeefb1b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; + +import org.junit.Ignore; +import org.junit.Test; + +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilFailoverRegionState; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class FailoverRegionTest extends TestLogger { + + /** + * Tests that a job only has one failover region and can recover from task failure successfully + * @throws Exception + */ + @Test + public void testSingleRegionFailover() throws Exception { + RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy(10); + ExecutionGraph eg = createSingleRegionExecutionGraph(restartStrategy); + RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy(); + + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev).getState()); + + ev.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); + assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev).getState()); + + for (ExecutionVertex evs : eg.getAllExecutionVertices()) { + evs.getCurrentExecutionAttempt().cancelingComplete(); + } + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev).getState()); + } + + /** + * Tests that a job has server failover regions and one region failover does not influence others + * + * <pre> + * (a1) ---> (b1) -+-> (c1) ---+-> (d1) + * X / + * (a2) ---> (b2) -+-> (c2) -+ + * + * ^ ^ ^ + * | | | + * (pipelined) (blocking) (pipelined) + * + * </pre> + */ + @Test + public void testMultiRegionsFailover() throws Exception { + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + + final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 20); + + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + JobVertex v4 = new JobVertex("vertex4"); + + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(2); + v4.setParallelism(1); + + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v4.setInvokableClass(AbstractInvokable.class); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4); + + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + jobId, + jobName, + new Configuration(), + new SerializedValue<>(new ExecutionConfig()), + AkkaUtils.getDefaultTimeout(), + new InfiniteDelayRestartStrategy(10), + new FailoverPipelinedRegionWithDirectExecutor(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + slotProvider, + ExecutionGraph.class.getClassLoader()); + + eg.attachJobGraph(ordered); + + RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy(); + + // the following two vertices are in the same failover region + ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; + ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; + + // the following two vertices are in the same failover region + ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; + ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; + + // the following vertices are in one failover region + ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; + ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + ExecutionVertex ev4 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; + + eg.scheduleForExecution(); + + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState()); + + ev21.scheduleForExecution(slotProvider, true); + ev21.getCurrentExecutionAttempt().fail(new Exception("New fail")); + assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev11).getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState()); + + ev11.getCurrentExecutionAttempt().cancelingComplete(); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState()); + + ev11.getCurrentExecutionAttempt().markFinished(); + ev21.getCurrentExecutionAttempt().markFinished(); + ev22.scheduleForExecution(slotProvider, true); + ev22.getCurrentExecutionAttempt().markFinished(); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState()); + + waitUntilExecutionState(ev31.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 2000); + waitUntilExecutionState(ev32.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 2000); + + ev31.getCurrentExecutionAttempt().fail(new Exception("New fail")); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState()); + assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev31).getState()); + + ev32.getCurrentExecutionAttempt().cancelingComplete(); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState()); + } + + /** + * Tests that when a task fail, and restart strategy doesn't support restarting, the job will go to failed + * @throws Exception + */ + @Test + public void testNoManualRestart() throws Exception { + NoRestartStrategy restartStrategy = new NoRestartStrategy(); + ExecutionGraph eg = createSingleRegionExecutionGraph(restartStrategy); + + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + ev.fail(new Exception("Test Exception")); + + for (ExecutionVertex evs : eg.getAllExecutionVertices()) { + evs.getCurrentExecutionAttempt().cancelingComplete(); + } + assertEquals(JobStatus.FAILED, eg.getState()); + } + + /** + * Tests that two failover regions failover at the same time, they will not influence each orther + * @throws Exception + */ + @Test + public void testMutilRegionFailoverAtSameTime() throws Exception { + Instance instance = ExecutionGraphTestUtils.getInstance( + new ActorTaskManagerGateway( + new SimpleActorGateway(TestingUtils.directExecutionContext())), + 16); + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + scheduler.newInstanceAvailable(instance); + + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + final Configuration cfg = new Configuration(); + + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + JobVertex v4 = new JobVertex("vertex4"); + + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(2); + v4.setParallelism(2); + + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v4.setInvokableClass(AbstractInvokable.class); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4); + + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + jobId, + jobName, + cfg, + new SerializedValue<>(new ExecutionConfig()), + AkkaUtils.getDefaultTimeout(), + new InfiniteDelayRestartStrategy(10), + new RestartPipelinedRegionStrategy.Factory(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + scheduler, + ExecutionGraph.class.getClassLoader()); + try { + eg.attachJobGraph(ordered); + } + catch (JobException e) { + e.printStackTrace(); + fail("Job failed with exception: " + e.getMessage()); + } + eg.scheduleForExecution(); + RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy(); + + ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; + ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; + ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState()); + + ev11.getCurrentExecutionAttempt().fail(new Exception("new fail")); + ev31.getCurrentExecutionAttempt().fail(new Exception("new fail")); + assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev11).getState()); + assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev31).getState()); + + ev32.getCurrentExecutionAttempt().cancelingComplete(); + waitUntilFailoverRegionState(strategy.getFailoverRegion(ev31), JobStatus.RUNNING, 1000); + + ev12.getCurrentExecutionAttempt().cancelingComplete(); + waitUntilFailoverRegionState(strategy.getFailoverRegion(ev11), JobStatus.RUNNING, 1000); + } + + /** + * Tests that if a task reports the result of its preceding task is failed, + * its preceding task will be considered as failed, and start to failover + * TODO: as the report part is not finished yet, this case is ignored temporarily + * @throws Exception + */ + @Ignore + @Test + public void testSucceedingNoticePreceding() throws Exception { + Instance instance = ExecutionGraphTestUtils.getInstance( + new ActorTaskManagerGateway( + new SimpleActorGateway(TestingUtils.directExecutionContext())), + 14); + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + scheduler.newInstanceAvailable(instance); + + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + final Configuration cfg = new Configuration(); + + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + + v1.setParallelism(1); + v2.setParallelism(1); + + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2)); + + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + jobId, + jobName, + cfg, + new SerializedValue<>(new ExecutionConfig()), + AkkaUtils.getDefaultTimeout(), + new InfiniteDelayRestartStrategy(10), + new FailoverPipelinedRegionWithDirectExecutor(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + scheduler, + ExecutionGraph.class.getClassLoader()); + try { + eg.attachJobGraph(ordered); + } + catch (JobException e) { + e.printStackTrace(); + fail("Job failed with exception: " + e.getMessage()); + } + eg.setScheduleMode(ScheduleMode.EAGER); + eg.scheduleForExecution(); + RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy(); + + ExecutionVertex ev11 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; + ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; + ev21.getCurrentExecutionAttempt().fail(new Exception("Fail with v1")); + + assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev21).getState()); + assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev11).getState()); + } + + /** + * Tests that a new failure comes while the failover region is in CANCELLING + * @throws Exception + */ + @Test + public void testFailWhileCancelling() throws Exception { + RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy(); + ExecutionGraph eg = createSingleRegionExecutionGraph(restartStrategy); + RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy(); + + Iterator<ExecutionVertex> iter = eg.getAllExecutionVertices().iterator(); + ExecutionVertex ev1 = iter.next(); + ev1.getCurrentExecutionAttempt().switchToRunning(); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev1).getState()); + + ev1.getCurrentExecutionAttempt().fail(new Exception("new fail")); + assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev1).getState()); + + ExecutionVertex ev2 = iter.next(); + ev2.getCurrentExecutionAttempt().fail(new Exception("new fail")); + assertEquals(JobStatus.RUNNING, eg.getState()); + assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev1).getState()); + } + + /** + * Tests that a new failure comes while the failover region is restarting + * @throws Exception + */ + @Test + public void testFailWhileRestarting() throws Exception { + RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy(); + ExecutionGraph eg = createSingleRegionExecutionGraph(restartStrategy); + RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy(); + + Iterator<ExecutionVertex> iter = eg.getAllExecutionVertices().iterator(); + ExecutionVertex ev1 = iter.next(); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev1).getState()); + + ev1.getCurrentExecutionAttempt().fail(new Exception("new fail")); + assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev1).getState()); + + for (ExecutionVertex evs : eg.getAllExecutionVertices()) { + evs.getCurrentExecutionAttempt().cancelingComplete(); + } + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev1).getState()); + + ev1.getCurrentExecutionAttempt().fail(new Exception("new fail")); + assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev1).getState()); + } + + private static ExecutionGraph createSingleRegionExecutionGraph(RestartStrategy restartStrategy) throws Exception { + Instance instance = ExecutionGraphTestUtils.getInstance( + new ActorTaskManagerGateway( + new SimpleActorGateway(TestingUtils.directExecutionContext())), + 14); + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + scheduler.newInstanceAvailable(instance); + + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + final Configuration cfg = new Configuration(); + + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + + v1.setParallelism(3); + v2.setParallelism(2); + v3.setParallelism(2); + + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + + v3.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, v3)); + + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + jobId, + jobName, + cfg, + new SerializedValue<>(new ExecutionConfig()), + AkkaUtils.getDefaultTimeout(), + restartStrategy, + new FailoverPipelinedRegionWithDirectExecutor(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + scheduler, + ExecutionGraph.class.getClassLoader()); + try { + eg.attachJobGraph(ordered); + } + catch (JobException e) { + e.printStackTrace(); + fail("Job failed with exception: " + e.getMessage()); + } + + eg.scheduleForExecution(); + return eg; + } + + // ------------------------------------------------------------------------ + + /** + * A factory to create a RestartPipelinedRegionStrategy that uses a + * direct (synchronous) executor for easier testing. + */ + private static class FailoverPipelinedRegionWithDirectExecutor implements Factory { + + @Override + public FailoverStrategy create(ExecutionGraph executionGraph) { + return new RestartPipelinedRegionStrategy(executionGraph, Executors.directExecutor()); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java new file mode 100644 index 0000000..635ec75 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.SuppressRestartsException; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.SerializedValue; + +import org.junit.Test; + +import java.net.URL; +import java.util.Collections; +import java.util.concurrent.Executor; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * These tests make sure that global failover (restart all) always takes precedence over + * local recovery strategies for the {@link RestartPipelinedRegionStrategy} + * + * <p>This test must be in the package it resides in, because it uses package-private methods + * from the ExecutionGraph classes. + */ +public class PipelinedRegionFailoverConcurrencyTest { + + /** + * Tests that a cancellation concurrent to a local failover leads to a properly + * cancelled state. + */ + @Test + public void testCancelWhileInLocalFailover() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause a task failure and delay the local recovery action via the manual executor + // - cancel the job to go into cancelling + // - resume in local recovery action + // - validate that this does in fact not start a new task, because the graph as a + // whole should now be cancelled already + + final JobID jid = new JobID(); + final int parallelism = 2; + + final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor(); + + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism); + + final ExecutionGraph graph = createSampleGraph( + jid, + new FailoverPipelinedRegionWithCustomExecutor(executor), + new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0), + slotProvider, + 2); + + final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next(); + final ExecutionVertex vertex1 = ejv.getTaskVertices()[0]; + final ExecutionVertex vertex2 = ejv.getTaskVertices()[1]; + + graph.scheduleForExecution(); + assertEquals(JobStatus.RUNNING, graph.getState()); + + // let one of the vertices fail - that triggers a local recovery action + vertex1.getCurrentExecutionAttempt().fail(new Exception("test failure")); + assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState()); + + // graph should still be running and the failover recovery action should be queued + assertEquals(JobStatus.RUNNING, graph.getState()); + assertEquals(1, executor.numQueuedRunnables()); + + // now cancel the job + graph.cancel(); + + assertEquals(JobStatus.CANCELLING, graph.getState()); + assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState()); + assertEquals(ExecutionState.CANCELING, vertex2.getCurrentExecutionAttempt().getState()); + + // let the recovery action continue + executor.trigger(); + + // now report that cancelling is complete for the other vertex + vertex2.getCurrentExecutionAttempt().cancelingComplete(); + + assertEquals(JobStatus.CANCELED, graph.getState()); + assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal()); + assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal()); + + // make sure all slots are recycled + assertEquals(parallelism, slotProvider.getNumberOfAvailableSlots()); + } + + /** + * Tests that a terminal global failure concurrent to a local failover + * leads to a properly failed state. + */ + @Test + public void testGlobalFailureConcurrentToLocalFailover() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause a task failure and delay the local recovery action via the manual executor + // - cause a global failure + // - resume in local recovery action + // - validate that this does in fact not start a new task, because the graph as a + // whole should now be terminally failed already + + final JobID jid = new JobID(); + final int parallelism = 2; + + final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor(); + + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism); + + final ExecutionGraph graph = createSampleGraph( + jid, + new FailoverPipelinedRegionWithCustomExecutor(executor), + new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0), + slotProvider, + 2); + + final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next(); + final ExecutionVertex vertex1 = ejv.getTaskVertices()[0]; + final ExecutionVertex vertex2 = ejv.getTaskVertices()[1]; + + graph.scheduleForExecution(); + assertEquals(JobStatus.RUNNING, graph.getState()); + + // let one of the vertices fail - that triggers a local recovery action + vertex1.getCurrentExecutionAttempt().fail(new Exception("test failure")); + assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState()); + + // graph should still be running and the failover recovery action should be queued + assertEquals(JobStatus.RUNNING, graph.getState()); + assertEquals(1, executor.numQueuedRunnables()); + + // now cancel the job + graph.failGlobal(new SuppressRestartsException(new Exception("test exception"))); + + assertEquals(JobStatus.FAILING, graph.getState()); + assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState()); + assertEquals(ExecutionState.CANCELING, vertex2.getCurrentExecutionAttempt().getState()); + + // let the recovery action continue + executor.trigger(); + + // now report that cancelling is complete for the other vertex + vertex2.getCurrentExecutionAttempt().cancelingComplete(); + + assertEquals(JobStatus.FAILED, graph.getState()); + assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal()); + assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal()); + + // make sure all slots are recycled + assertEquals(parallelism, slotProvider.getNumberOfAvailableSlots()); + } + + /** + * Tests that a local failover does not try to trump a global failover. + */ + @Test + public void testGlobalRecoveryConcurrentToLocalRecovery() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause a task failure and delay the local recovery action via the manual executor + // - cause a global failure that is recovering immediately + // - resume in local recovery action + // - validate that this does in fact not cause another task restart, because the global + // recovery should already have restarted the task graph + + final JobID jid = new JobID(); + final int parallelism = 2; + + final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor(); + + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism); + + final ExecutionGraph graph = createSampleGraph( + jid, + new FailoverPipelinedRegionWithCustomExecutor(executor), + new FixedDelayRestartStrategy(2, 0), // twice restart, no delay + slotProvider, + 2); + RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)graph.getFailoverStrategy(); + + final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next(); + final ExecutionVertex vertex1 = ejv.getTaskVertices()[0]; + final ExecutionVertex vertex2 = ejv.getTaskVertices()[1]; + + graph.scheduleForExecution(); + assertEquals(JobStatus.RUNNING, graph.getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex1).getState()); + + // let one of the vertices fail - that triggers a local recovery action + vertex2.getCurrentExecutionAttempt().fail(new Exception("test failure")); + assertEquals(ExecutionState.FAILED, vertex2.getCurrentExecutionAttempt().getState()); + assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(vertex2).getState()); + + // graph should still be running and the failover recovery action should be queued + assertEquals(JobStatus.RUNNING, graph.getState()); + assertEquals(1, executor.numQueuedRunnables()); + + // now cancel the job + graph.failGlobal(new Exception("test exception")); + + assertEquals(JobStatus.FAILING, graph.getState()); + assertEquals(ExecutionState.FAILED, vertex2.getCurrentExecutionAttempt().getState()); + assertEquals(ExecutionState.CANCELING, vertex1.getCurrentExecutionAttempt().getState()); + + // now report that cancelling is complete for the other vertex + vertex1.getCurrentExecutionAttempt().cancelingComplete(); + + waitUntilJobStatus(graph, JobStatus.RUNNING, 1000); + assertEquals(JobStatus.RUNNING, graph.getState()); + + waitUntilExecutionState(vertex1.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000); + waitUntilExecutionState(vertex2.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000); + vertex1.getCurrentExecutionAttempt().switchToRunning(); + vertex2.getCurrentExecutionAttempt().switchToRunning(); + assertEquals(ExecutionState.RUNNING, vertex1.getCurrentExecutionAttempt().getState()); + assertEquals(ExecutionState.RUNNING, vertex2.getCurrentExecutionAttempt().getState()); + + // let the recovery action continue - this should do nothing any more + executor.trigger(); + + // validate that the graph is still peachy + assertEquals(JobStatus.RUNNING, graph.getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex1).getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex2).getState()); + assertEquals(ExecutionState.RUNNING, vertex1.getCurrentExecutionAttempt().getState()); + assertEquals(ExecutionState.RUNNING, vertex2.getCurrentExecutionAttempt().getState()); + assertEquals(1, vertex1.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, vertex2.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, vertex1.getCopyOfPriorExecutionsList().size()); + assertEquals(1, vertex2.getCopyOfPriorExecutionsList().size()); + + // make sure all slots are in use + assertEquals(0, slotProvider.getNumberOfAvailableSlots()); + + // validate that a task failure then can be handled by the local recovery + vertex2.getCurrentExecutionAttempt().fail(new Exception("test failure")); + assertEquals(1, executor.numQueuedRunnables()); + + // let the local recovery action continue - this should recover the vertex2 + executor.trigger(); + + waitUntilExecutionState(vertex2.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000); + vertex2.getCurrentExecutionAttempt().switchToRunning(); + + // validate that the local recovery result + assertEquals(JobStatus.RUNNING, graph.getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex1).getState()); + assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex2).getState()); + assertEquals(ExecutionState.RUNNING, vertex1.getCurrentExecutionAttempt().getState()); + assertEquals(ExecutionState.RUNNING, vertex2.getCurrentExecutionAttempt().getState()); + assertEquals(1, vertex1.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(2, vertex2.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, vertex1.getCopyOfPriorExecutionsList().size()); + assertEquals(2, vertex2.getCopyOfPriorExecutionsList().size()); + + // make sure all slots are in use + assertEquals(0, slotProvider.getNumberOfAvailableSlots()); + } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + private ExecutionGraph createSampleGraph( + JobID jid, + Factory failoverStrategy, + RestartStrategy restartStrategy, + SlotProvider slotProvider, + int parallelism) throws Exception { + + // build a simple execution graph with on job vertex, parallelism 2 + final ExecutionGraph graph = new ExecutionGraph( + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + jid, + "test job", + new Configuration(), + new SerializedValue<>(new ExecutionConfig()), + Time.seconds(10), + restartStrategy, + failoverStrategy, + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + slotProvider, + getClass().getClassLoader()); + + JobVertex jv = new JobVertex("test vertex"); + jv.setInvokableClass(NoOpInvokable.class); + jv.setParallelism(parallelism); + + JobGraph jg = new JobGraph(jid, "testjob", jv); + graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources()); + + return graph; + } + + // ------------------------------------------------------------------------ + + private static class FailoverPipelinedRegionWithCustomExecutor implements Factory { + + private final Executor executor; + + FailoverPipelinedRegionWithCustomExecutor(Executor executor) { + this.executor = executor; + } + + @Override + public FailoverStrategy create(ExecutionGraph executionGraph) { + return new RestartPipelinedRegionStrategy(executionGraph, executor); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java new file mode 100644 index 0000000..45aabe6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.executiongraph.failover.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; +import org.junit.Test; + +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +public class RestartPipelinedRegionStrategyTest { + + /** + * Creates a JobGraph of the following form: + * + * <pre> + * v1--->v2-->\ + * \ + * v4 --->\ + * ----->/ \ + * v3-->/ v5 + * \ / + * ------------->/ + * </pre> + */ + @Test + public void testSimpleFailoverRegion() throws Exception { + + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + final Configuration cfg = new Configuration(); + + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + JobVertex v4 = new JobVertex("vertex4"); + JobVertex v5 = new JobVertex("vertex5"); + + v1.setParallelism(5); + v2.setParallelism(7); + v3.setParallelism(2); + v4.setParallelism(11); + v5.setParallelism(4); + + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v4.setInvokableClass(AbstractInvokable.class); + v5.setInvokableClass(AbstractInvokable.class); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + jobId, + jobName, + cfg, + new SerializedValue<>(new ExecutionConfig()), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy(), + new RestartPipelinedRegionStrategy.Factory(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + scheduler, + ExecutionGraph.class.getClassLoader()); + try { + eg.attachJobGraph(ordered); + } + catch (JobException e) { + e.printStackTrace(); + fail("Job failed with exception: " + e.getMessage()); + } + + RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy(); + ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID()); + ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID()); + ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID()); + ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID()); + ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID()); + FailoverRegion region1 = strategy.getFailoverRegion(ejv1.getTaskVertices()[2]); + FailoverRegion region2 = strategy.getFailoverRegion(ejv2.getTaskVertices()[3]); + FailoverRegion region3 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]); + FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[4]); + FailoverRegion region5 = strategy.getFailoverRegion(ejv5.getTaskVertices()[1]); + + assertEquals(region1, region2); + assertEquals(region3, region2); + assertEquals(region4, region2); + assertEquals(region5, region2); + } + + /** + * Creates a JobGraph of the following form: + * + * <pre> + * v2 ------->\ + * \ + * v1---------> v4 --->|\ + * \ + * v5 + * / + * v3--------------->|/ + * </pre> + */ + @Test + public void testMultipleFailoverRegions() throws Exception { + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + final Configuration cfg = new Configuration(); + + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + JobVertex v4 = new JobVertex("vertex4"); + JobVertex v5 = new JobVertex("vertex5"); + + v1.setParallelism(3); + v2.setParallelism(2); + v3.setParallelism(2); + v4.setParallelism(5); + v5.setParallelism(2); + + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v4.setInvokableClass(AbstractInvokable.class); + v5.setInvokableClass(AbstractInvokable.class); + + v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + jobId, + jobName, + cfg, + new SerializedValue<>(new ExecutionConfig()), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy(), + new RestartPipelinedRegionStrategy.Factory(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + scheduler, + ExecutionGraph.class.getClassLoader()); + try { + eg.attachJobGraph(ordered); + } + catch (JobException e) { + e.printStackTrace(); + fail("Job failed with exception: " + e.getMessage()); + } + + // All in one failover region + RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy(); + ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID()); + ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID()); + ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID()); + ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID()); + ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID()); + FailoverRegion region1 = strategy.getFailoverRegion(ejv1.getTaskVertices()[1]); + FailoverRegion region2 = strategy.getFailoverRegion(ejv2.getTaskVertices()[0]); + FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[3]); + FailoverRegion region31 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]); + FailoverRegion region32 = strategy.getFailoverRegion(ejv3.getTaskVertices()[1]); + FailoverRegion region51 = strategy.getFailoverRegion(ejv5.getTaskVertices()[0]); + FailoverRegion region52 = strategy.getFailoverRegion(ejv5.getTaskVertices()[1]); + + //There should be 5 failover regions. v1 v2 v4 in one, v3 has two, v5 has two + assertEquals(region1, region2); + assertEquals(region2, region4); + assertFalse(region31.equals(region32)); + assertFalse(region51.equals(region52)); + } + + /** + * Creates a JobGraph of the following form: + * + * <pre> + * v1--->v2-->\ + * \ + * v4 --->|\ + * ----->/ \ + * v3-->/ v5 + * \ / + * ------------->/ + * </pre> + */ + @Test + public void testSingleRegionWithMixedInput() throws Exception { + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + final Configuration cfg = new Configuration(); + + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + JobVertex v4 = new JobVertex("vertex4"); + JobVertex v5 = new JobVertex("vertex5"); + + v1.setParallelism(3); + v2.setParallelism(2); + v3.setParallelism(2); + v4.setParallelism(5); + v5.setParallelism(2); + + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v4.setInvokableClass(AbstractInvokable.class); + v5.setInvokableClass(AbstractInvokable.class); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + jobId, + jobName, + cfg, + new SerializedValue<>(new ExecutionConfig()), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy(), + new RestartPipelinedRegionStrategy.Factory(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + scheduler, + ExecutionGraph.class.getClassLoader()); + try { + eg.attachJobGraph(ordered); + } + catch (JobException e) { + e.printStackTrace(); + fail("Job failed with exception: " + e.getMessage()); + } + + // All in one failover region + RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy(); + ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID()); + ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID()); + ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID()); + ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID()); + ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID()); + FailoverRegion region1 = strategy.getFailoverRegion(ejv1.getTaskVertices()[1]); + FailoverRegion region2 = strategy.getFailoverRegion(ejv2.getTaskVertices()[0]); + FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[3]); + FailoverRegion region3 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]); + FailoverRegion region5 = strategy.getFailoverRegion(ejv5.getTaskVertices()[1]); + + assertEquals(region1, region2); + assertEquals(region2, region4); + assertEquals(region3, region2); + assertEquals(region1, region5); + } + + /** + * Creates a JobGraph of the following form: + * + * <pre> + * v1-->v2-->|\ + * \ + * v4 + * / + * v3------>/ + * </pre> + */ + @Test + public void testMultiRegionNotAllToAll() throws Exception { + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + final Configuration cfg = new Configuration(); + + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + JobVertex v4 = new JobVertex("vertex4"); + JobVertex v5 = new JobVertex("vertex5"); + + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(5); + v4.setParallelism(5); + + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v4.setInvokableClass(AbstractInvokable.class); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4)); + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + jobId, + jobName, + cfg, + new SerializedValue<>(new ExecutionConfig()), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy(), + new RestartPipelinedRegionStrategy.Factory(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + scheduler, + ExecutionGraph.class.getClassLoader()); + try { + eg.attachJobGraph(ordered); + } + catch (JobException e) { + e.printStackTrace(); + fail("Job failed with exception: " + e.getMessage()); + } + + // All in one failover region + RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy(); + ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID()); + ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID()); + ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID()); + ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID()); + FailoverRegion region11 = strategy.getFailoverRegion(ejv1.getTaskVertices()[0]); + FailoverRegion region12 = strategy.getFailoverRegion(ejv1.getTaskVertices()[1]); + FailoverRegion region21 = strategy.getFailoverRegion(ejv2.getTaskVertices()[0]); + FailoverRegion region22 = strategy.getFailoverRegion(ejv2.getTaskVertices()[1]); + FailoverRegion region3 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]); + FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[3]); + + //There should be 3 failover regions. v1 v2 in two, v3 and v4 in one + assertEquals(region11, region21); + assertEquals(region12, region22); + assertFalse(region11.equals(region12)); + assertFalse(region3.equals(region4)); + } + +}
