This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a8b72220154d3349b5157d02aab78bf8e3b381dd Author: Gary Yao <[email protected]> AuthorDate: Wed Jun 19 10:58:17 2019 +0200 [FLINK-12883][runtime] Extract computation of pipelined regions. --- .../failover/flip1/PipelinedRegionComputeUtil.java | 117 +++++++++++++++++++++ .../flip1/RestartPipelinedRegionStrategy.java | 75 +------------ 2 files changed, 119 insertions(+), 73 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java new file mode 100644 index 0000000..291c894 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.Set; + +/** + * Utility for computing pipeliend regions. + */ +public final class PipelinedRegionComputeUtil { + + private static final Logger LOG = LoggerFactory.getLogger(PipelinedRegionComputeUtil.class); + + public static Set<Set<FailoverVertex>> computePipelinedRegions(final FailoverTopology topology) { + // currently we let a job with co-location constraints fail as one region + // putting co-located vertices in the same region with each other can be a future improvement + if (topology.containsCoLocationConstraints()) { + return uniqueRegions(buildOneRegionForAllVertices(topology)); + } + + // we use the map (list -> null) to imitate an IdentityHashSet (which does not exist) + // this helps to optimize the building performance as it uses reference equality + final Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion = new IdentityHashMap<>(); + + // iterate all the vertices which are topologically sorted + for (FailoverVertex vertex : topology.getFailoverVertices()) { + Set<FailoverVertex> currentRegion = new HashSet<>(1); + currentRegion.add(vertex); + vertexToRegion.put(vertex, currentRegion); + + for (FailoverEdge inputEdge : vertex.getInputEdges()) { + if (inputEdge.getResultPartitionType().isPipelined()) { + final FailoverVertex producerVertex = inputEdge.getSourceVertex(); + final Set<FailoverVertex> producerRegion = vertexToRegion.get(producerVertex); + + if (producerRegion == null) { + throw new IllegalStateException("Producer task " + producerVertex.getExecutionVertexName() + + " failover region is null while calculating failover region for the consumer task " + + vertex.getExecutionVertexName() + ". This should be a failover region building bug."); + } + + // check if it is the same as the producer region, if so skip the merge + // this check can significantly reduce compute complexity in All-to-All PIPELINED edge case + if (currentRegion != producerRegion) { + // merge current region and producer region + // merge the smaller region into the larger one to reduce the cost + final Set<FailoverVertex> smallerSet; + final Set<FailoverVertex> largerSet; + if (currentRegion.size() < producerRegion.size()) { + smallerSet = currentRegion; + largerSet = producerRegion; + } else { + smallerSet = producerRegion; + largerSet = currentRegion; + } + for (FailoverVertex v : smallerSet) { + vertexToRegion.put(v, largerSet); + } + largerSet.addAll(smallerSet); + currentRegion = largerSet; + } + } + } + } + + return uniqueRegions(vertexToRegion); + } + + private static Map<FailoverVertex, Set<FailoverVertex>> buildOneRegionForAllVertices(final FailoverTopology topology) { + LOG.warn("Cannot decompose the topology into individual failover regions due to use of " + + "Co-Location constraints (iterations). Job will fail over as one holistic unit."); + + final Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion = new IdentityHashMap<>(); + + final Set<FailoverVertex> allVertices = new HashSet<>(); + for (FailoverVertex vertex : topology.getFailoverVertices()) { + allVertices.add(vertex); + vertexToRegion.put(vertex, allVertices); + } + return vertexToRegion; + } + + private static Set<Set<FailoverVertex>> uniqueRegions(final Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion) { + // find out all the distinct regions + final IdentityHashMap<Set<FailoverVertex>, Object> distinctRegions = new IdentityHashMap<>(); + for (Set<FailoverVertex> regionVertices : vertexToRegion.values()) { + distinctRegions.put(regionVertices, null); + } + return distinctRegions.keySet(); + } + + private PipelinedRegionComputeUtil() { + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java index 8e06327..58d1675 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java @@ -94,66 +94,10 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy { // ------------------------------------------------------------------------ private void buildFailoverRegions() { - // currently we let a job with co-location constraints fail as one region - // putting co-located vertices in the same region with each other can be a future improvement - if (topology.containsCoLocationConstraints()) { - buildOneRegionForAllVertices(); - return; - } - - // we use the map (list -> null) to imitate an IdentityHashSet (which does not exist) - // this helps to optimize the building performance as it uses reference equality - final IdentityHashMap<FailoverVertex, HashSet<FailoverVertex>> vertexToRegion = new IdentityHashMap<>(); - - // iterate all the vertices which are topologically sorted - for (FailoverVertex vertex : topology.getFailoverVertices()) { - HashSet<FailoverVertex> currentRegion = new HashSet<>(1); - currentRegion.add(vertex); - vertexToRegion.put(vertex, currentRegion); - - for (FailoverEdge inputEdge : vertex.getInputEdges()) { - if (inputEdge.getResultPartitionType().isPipelined()) { - final FailoverVertex producerVertex = inputEdge.getSourceVertex(); - final HashSet<FailoverVertex> producerRegion = vertexToRegion.get(producerVertex); - - if (producerRegion == null) { - throw new IllegalStateException("Producer task " + producerVertex.getExecutionVertexName() - + " failover region is null while calculating failover region for the consumer task " - + vertex.getExecutionVertexName() + ". This should be a failover region building bug."); - } - - // check if it is the same as the producer region, if so skip the merge - // this check can significantly reduce compute complexity in All-to-All PIPELINED edge case - if (currentRegion != producerRegion) { - // merge current region and producer region - // merge the smaller region into the larger one to reduce the cost - final HashSet<FailoverVertex> smallerSet; - final HashSet<FailoverVertex> largerSet; - if (currentRegion.size() < producerRegion.size()) { - smallerSet = currentRegion; - largerSet = producerRegion; - } else { - smallerSet = producerRegion; - largerSet = currentRegion; - } - for (FailoverVertex v : smallerSet) { - vertexToRegion.put(v, largerSet); - } - largerSet.addAll(smallerSet); - currentRegion = largerSet; - } - } - } - } - - // find out all the distinct regions - final IdentityHashMap<HashSet<FailoverVertex>, Object> distinctRegions = new IdentityHashMap<>(); - for (HashSet<FailoverVertex> regionVertices : vertexToRegion.values()) { - distinctRegions.put(regionVertices, null); - } + final Set<Set<FailoverVertex>> distinctRegions = PipelinedRegionComputeUtil.computePipelinedRegions(topology); // creating all the failover regions and register them - for (HashSet<FailoverVertex> regionVertices : distinctRegions.keySet()) { + for (Set<FailoverVertex> regionVertices : distinctRegions) { LOG.debug("Creating a failover region with {} vertices.", regionVertices.size()); final FailoverRegion failoverRegion = new FailoverRegion(regionVertices); regions.put(failoverRegion, null); @@ -165,21 +109,6 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy { LOG.info("Created {} failover regions.", regions.size()); } - private void buildOneRegionForAllVertices() { - LOG.warn("Cannot decompose the topology into individual failover regions due to use of " + - "Co-Location constraints (iterations). Job will fail over as one holistic unit."); - - final Set<FailoverVertex> allVertices = new HashSet<>(); - for (FailoverVertex vertex : topology.getFailoverVertices()) { - allVertices.add(vertex); - } - - final FailoverRegion region = new FailoverRegion(allVertices); - regions.put(region, null); - for (FailoverVertex vertex : topology.getFailoverVertices()) { - vertexToRegionMap.put(vertex.getExecutionVertexID(), region); - } - } // ------------------------------------------------------------------------ // task failure handling
