[FLINK-5867] [flip-1] Improve performance of Pipelined Failover Region construction
This method exploits the fact that verties are already in topological order. This closes #3773 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcfd37ca Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcfd37ca Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcfd37ca Branch: refs/heads/master Commit: dcfd37ca69380de5afbf28ad946dde90ab0de722 Parents: 166a3f8 Author: Stephan Ewen <[email protected]> Authored: Fri Apr 21 00:02:19 2017 +0200 Committer: Stephan Ewen <[email protected]> Committed: Sat May 6 21:40:05 2017 +0200 ---------------------------------------------------------------------- .../RestartPipelinedRegionStrategy.java | 170 +++++++++++++------ 1 file changed, 115 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dcfd37ca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java index 0a5baa8..1884d1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java @@ -24,14 +24,16 @@ import org.apache.flink.runtime.executiongraph.ExecutionEdge; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.executiongraph.IntermediateResult; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.FlinkRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.concurrent.Executor; @@ -91,16 +93,20 @@ public class RestartPipelinedRegionStrategy extends FailoverStrategy { if (failoverRegion == null) { executionGraph.failGlobal(new FlinkException( - "Can not find a failover region for the execution " + ev.getTaskNameWithSubtaskIndex())); + "Can not find a failover region for the execution " + ev.getTaskNameWithSubtaskIndex(), cause)); } else { + LOG.info("Recovering task failure for {} #{} ({}) via restart of failover region", + taskExecution.getVertex().getTaskNameWithSubtaskIndex(), + taskExecution.getAttemptNumber(), + taskExecution.getAttemptId()); + failoverRegion.onExecutionFail(taskExecution, cause); } } @Override public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) { - LOG.debug("Generating failover regions for {} new job vertices", newJobVerticesTopological.size()); generateAllFailoverRegion(newJobVerticesTopological); } @@ -109,74 +115,128 @@ public class RestartPipelinedRegionStrategy extends FailoverStrategy { return "Pipelined Region Failover"; } - // Generate all the FailoverRegion from the new added job vertexes + /** + * Generate all the FailoverRegion from the new added job vertexes + */ private void generateAllFailoverRegion(List<ExecutionJobVertex> newJobVerticesTopological) { + final IdentityHashMap<ExecutionVertex, ArrayList<ExecutionVertex>> vertexToRegion = new IdentityHashMap<>(); + + // we use the map (list -> null) to imitate an IdentityHashSet (which does not exist) + final IdentityHashMap<ArrayList<ExecutionVertex>, Object> distinctRegions = new IdentityHashMap<>(); + + // this loop will worst case iterate over every edge in the graph (complexity is O(#edges)) + for (ExecutionJobVertex ejv : newJobVerticesTopological) { - for (ExecutionVertex ev : ejv.getTaskVertices()) { - if (getFailoverRegion(ev) != null) { - continue; - } - List<ExecutionVertex> pipelinedExecutions = new ArrayList<>(); - List<ExecutionVertex> orgExecutions = new ArrayList<>(); - orgExecutions.add(ev); - pipelinedExecutions.add(ev); - getAllPipelinedConnectedVertexes(orgExecutions, pipelinedExecutions); - - FailoverRegion region = new FailoverRegion(executionGraph, executor, pipelinedExecutions); - for (ExecutionVertex vertex : pipelinedExecutions) { - vertexToRegion.put(vertex, region); + + // currently, jobs with a co-location constraint fail as one + // we want to improve that in the future (or get rid of co-location constraints) + if (ejv.getCoLocationGroup() != null) { + makeAllOneRegion(newJobVerticesTopological); + return; + } + + // see if this JobVertex one has pipelined inputs at all + final List<IntermediateResult> inputs = ejv.getInputs(); + final int numInputs = inputs.size(); + boolean hasPipelinedInputs = false; + + for (IntermediateResult input : inputs) { + if (input.getResultType().isPipelined()) { + hasPipelinedInputs = true; + break; } } - } - } - /** - * Get all connected executions of the original executions - * - * @param orgExecutions the original execution vertexes - * @param connectedExecutions the total connected executions - */ - private static void getAllPipelinedConnectedVertexes(List<ExecutionVertex> orgExecutions, List<ExecutionVertex> connectedExecutions) { - List<ExecutionVertex> newAddedExecutions = new ArrayList<>(); - for (ExecutionVertex ev : orgExecutions) { - // Add downstream ExecutionVertex - for (IntermediateResultPartition irp : ev.getProducedPartitions().values()) { - if (irp.getIntermediateResult().getResultType().isPipelined()) { - for (List<ExecutionEdge> consumers : irp.getConsumers()) { - for (ExecutionEdge consumer : consumers) { - ExecutionVertex cev = consumer.getTarget(); - if (!connectedExecutions.contains(cev)) { - newAddedExecutions.add(cev); + if (hasPipelinedInputs) { + // build upon the predecessors + for (ExecutionVertex ev : ejv.getTaskVertices()) { + + // remember the region in which we are + ArrayList<ExecutionVertex> thisRegion = null; + + for (int inputNum = 0; inputNum < numInputs; inputNum++) { + if (inputs.get(inputNum).getResultType().isPipelined()) { + + for (ExecutionEdge edge : ev.getInputEdges(inputNum)) { + final ExecutionVertex predecessor = edge.getSource().getProducer(); + final ArrayList<ExecutionVertex> predecessorRegion = vertexToRegion.get(predecessor); + + if (thisRegion != null) { + // we already have a region. see if it is the same as the predecessor's region + if (predecessorRegion != thisRegion) { + + // we need to merge our region and the predecessor's region + thisRegion.addAll(predecessorRegion); + distinctRegions.remove(predecessorRegion); + + // remap the vertices from that merged region + for (ExecutionVertex inPredRegion: predecessorRegion) { + vertexToRegion.put(inPredRegion, thisRegion); + } + } + } + else if (predecessor != null) { + // first case, make this our region + thisRegion = predecessorRegion; + thisRegion.add(ev); + vertexToRegion.put(ev, thisRegion); + } + else { + // throw an uncaught exception here + // this is a bug and not a recoverable situation + throw new FlinkRuntimeException( + "bug in the logic to construct the pipelined failover regions"); + } } } } } } - if (!newAddedExecutions.isEmpty()) { - connectedExecutions.addAll(newAddedExecutions); - getAllPipelinedConnectedVertexes(newAddedExecutions, connectedExecutions); - newAddedExecutions.clear(); - } - // Add upstream ExecutionVertex - int inputNum = ev.getNumberOfInputs(); - for (int i = 0; i < inputNum; i++) { - for (ExecutionEdge input : ev.getInputEdges(i)) { - if (input.getSource().getIntermediateResult().getResultType().isPipelined()) { - ExecutionVertex pev = input.getSource().getProducer(); - if (!connectedExecutions.contains(pev)) { - newAddedExecutions.add(pev); - } - } + else { + // no pipelined inputs, start a new region + for (ExecutionVertex ev : ejv.getTaskVertices()) { + ArrayList<ExecutionVertex> region = new ArrayList<>(1); + region.add(ev); + vertexToRegion.put(ev, region); + distinctRegions.put(region, null); } } - if (!newAddedExecutions.isEmpty()) { - connectedExecutions.addAll(0, newAddedExecutions); - getAllPipelinedConnectedVertexes(newAddedExecutions, connectedExecutions); - newAddedExecutions.clear(); + } + + // now that we have all regions, create the failover region objects + LOG.info("Creating {} individual failover regions for job {} ({})", + executionGraph.getJobName(), executionGraph.getJobID()); + + for (List<ExecutionVertex> region : distinctRegions.keySet()) { + final FailoverRegion failoverRegion = new FailoverRegion(executionGraph, executor, region); + for (ExecutionVertex ev : region) { + this.vertexToRegion.put(ev, failoverRegion); } } } + private void makeAllOneRegion(List<ExecutionJobVertex> jobVertices) { + LOG.warn("Cannot decompose ExecutionGraph into individual failover regions due to use of " + + "Co-Location constraints (iterations). Job will fail over as one holistic unit."); + + final ArrayList<ExecutionVertex> allVertices = new ArrayList<>(); + + for (ExecutionJobVertex ejv : jobVertices) { + + // safe some incremental size growing + allVertices.ensureCapacity(allVertices.size() + ejv.getParallelism()); + + for (ExecutionVertex ev : ejv.getTaskVertices()) { + allVertices.add(ev); + } + } + + final FailoverRegion singleRegion = new FailoverRegion(executionGraph, executor, allVertices); + for (ExecutionVertex ev : allVertices) { + vertexToRegion.put(ev, singleRegion); + } + } + // ------------------------------------------------------------------------ // testing // ------------------------------------------------------------------------
