This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8b72220154d3349b5157d02aab78bf8e3b381dd
Author: Gary Yao <[email protected]>
AuthorDate: Wed Jun 19 10:58:17 2019 +0200

    [FLINK-12883][runtime] Extract computation of pipelined regions.
---
 .../failover/flip1/PipelinedRegionComputeUtil.java | 117 +++++++++++++++++++++
 .../flip1/RestartPipelinedRegionStrategy.java      |  75 +------------
 2 files changed, 119 insertions(+), 73 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
new file mode 100644
index 0000000..291c894
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Utility for computing pipeliend regions.
+ */
+public final class PipelinedRegionComputeUtil {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedRegionComputeUtil.class);
+
+       public static Set<Set<FailoverVertex>> computePipelinedRegions(final 
FailoverTopology topology) {
+               // currently we let a job with co-location constraints fail as 
one region
+               // putting co-located vertices in the same region with each 
other can be a future improvement
+               if (topology.containsCoLocationConstraints()) {
+                       return 
uniqueRegions(buildOneRegionForAllVertices(topology));
+               }
+
+               // we use the map (list -> null) to imitate an IdentityHashSet 
(which does not exist)
+               // this helps to optimize the building performance as it uses 
reference equality
+               final Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion = 
new IdentityHashMap<>();
+
+               // iterate all the vertices which are topologically sorted
+               for (FailoverVertex vertex : topology.getFailoverVertices()) {
+                       Set<FailoverVertex> currentRegion = new HashSet<>(1);
+                       currentRegion.add(vertex);
+                       vertexToRegion.put(vertex, currentRegion);
+
+                       for (FailoverEdge inputEdge : vertex.getInputEdges()) {
+                               if 
(inputEdge.getResultPartitionType().isPipelined()) {
+                                       final FailoverVertex producerVertex = 
inputEdge.getSourceVertex();
+                                       final Set<FailoverVertex> 
producerRegion = vertexToRegion.get(producerVertex);
+
+                                       if (producerRegion == null) {
+                                               throw new 
IllegalStateException("Producer task " + producerVertex.getExecutionVertexName()
+                                                       + " failover region is 
null while calculating failover region for the consumer task "
+                                                       + 
vertex.getExecutionVertexName() + ". This should be a failover region building 
bug.");
+                                       }
+
+                                       // check if it is the same as the 
producer region, if so skip the merge
+                                       // this check can significantly reduce 
compute complexity in All-to-All PIPELINED edge case
+                                       if (currentRegion != producerRegion) {
+                                               // merge current region and 
producer region
+                                               // merge the smaller region 
into the larger one to reduce the cost
+                                               final Set<FailoverVertex> 
smallerSet;
+                                               final Set<FailoverVertex> 
largerSet;
+                                               if (currentRegion.size() < 
producerRegion.size()) {
+                                                       smallerSet = 
currentRegion;
+                                                       largerSet = 
producerRegion;
+                                               } else {
+                                                       smallerSet = 
producerRegion;
+                                                       largerSet = 
currentRegion;
+                                               }
+                                               for (FailoverVertex v : 
smallerSet) {
+                                                       vertexToRegion.put(v, 
largerSet);
+                                               }
+                                               largerSet.addAll(smallerSet);
+                                               currentRegion = largerSet;
+                                       }
+                               }
+                       }
+               }
+
+               return uniqueRegions(vertexToRegion);
+       }
+
+       private static Map<FailoverVertex, Set<FailoverVertex>> 
buildOneRegionForAllVertices(final FailoverTopology topology) {
+               LOG.warn("Cannot decompose the topology into individual 
failover regions due to use of " +
+                       "Co-Location constraints (iterations). Job will fail 
over as one holistic unit.");
+
+               final Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion = 
new IdentityHashMap<>();
+
+               final Set<FailoverVertex> allVertices = new HashSet<>();
+               for (FailoverVertex vertex : topology.getFailoverVertices()) {
+                       allVertices.add(vertex);
+                       vertexToRegion.put(vertex, allVertices);
+               }
+               return vertexToRegion;
+       }
+
+       private static Set<Set<FailoverVertex>> uniqueRegions(final 
Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion) {
+               // find out all the distinct regions
+               final IdentityHashMap<Set<FailoverVertex>, Object> 
distinctRegions = new IdentityHashMap<>();
+               for (Set<FailoverVertex> regionVertices : 
vertexToRegion.values()) {
+                       distinctRegions.put(regionVertices, null);
+               }
+               return distinctRegions.keySet();
+       }
+
+       private PipelinedRegionComputeUtil() {
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
index 8e06327..58d1675 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
@@ -94,66 +94,10 @@ public class RestartPipelinedRegionStrategy implements 
FailoverStrategy {
        // 
------------------------------------------------------------------------
 
        private void buildFailoverRegions() {
-               // currently we let a job with co-location constraints fail as 
one region
-               // putting co-located vertices in the same region with each 
other can be a future improvement
-               if (topology.containsCoLocationConstraints()) {
-                       buildOneRegionForAllVertices();
-                       return;
-               }
-
-               // we use the map (list -> null) to imitate an IdentityHashSet 
(which does not exist)
-               // this helps to optimize the building performance as it uses 
reference equality
-               final IdentityHashMap<FailoverVertex, HashSet<FailoverVertex>> 
vertexToRegion = new IdentityHashMap<>();
-
-               // iterate all the vertices which are topologically sorted
-               for (FailoverVertex vertex : topology.getFailoverVertices()) {
-                       HashSet<FailoverVertex> currentRegion = new 
HashSet<>(1);
-                       currentRegion.add(vertex);
-                       vertexToRegion.put(vertex, currentRegion);
-
-                       for (FailoverEdge inputEdge : vertex.getInputEdges()) {
-                               if 
(inputEdge.getResultPartitionType().isPipelined()) {
-                                       final FailoverVertex producerVertex = 
inputEdge.getSourceVertex();
-                                       final HashSet<FailoverVertex> 
producerRegion = vertexToRegion.get(producerVertex);
-
-                                       if (producerRegion == null) {
-                                               throw new 
IllegalStateException("Producer task " + producerVertex.getExecutionVertexName()
-                                                       + " failover region is 
null while calculating failover region for the consumer task "
-                                                       + 
vertex.getExecutionVertexName() + ". This should be a failover region building 
bug.");
-                                       }
-
-                                       // check if it is the same as the 
producer region, if so skip the merge
-                                       // this check can significantly reduce 
compute complexity in All-to-All PIPELINED edge case
-                                       if (currentRegion != producerRegion) {
-                                               // merge current region and 
producer region
-                                               // merge the smaller region 
into the larger one to reduce the cost
-                                               final HashSet<FailoverVertex> 
smallerSet;
-                                               final HashSet<FailoverVertex> 
largerSet;
-                                               if (currentRegion.size() < 
producerRegion.size()) {
-                                                       smallerSet = 
currentRegion;
-                                                       largerSet = 
producerRegion;
-                                               } else {
-                                                       smallerSet = 
producerRegion;
-                                                       largerSet = 
currentRegion;
-                                               }
-                                               for (FailoverVertex v : 
smallerSet) {
-                                                       vertexToRegion.put(v, 
largerSet);
-                                               }
-                                               largerSet.addAll(smallerSet);
-                                               currentRegion = largerSet;
-                                       }
-                               }
-                       }
-               }
-
-               // find out all the distinct regions
-               final IdentityHashMap<HashSet<FailoverVertex>, Object> 
distinctRegions = new IdentityHashMap<>();
-               for (HashSet<FailoverVertex> regionVertices : 
vertexToRegion.values()) {
-                       distinctRegions.put(regionVertices, null);
-               }
+               final Set<Set<FailoverVertex>> distinctRegions = 
PipelinedRegionComputeUtil.computePipelinedRegions(topology);
 
                // creating all the failover regions and register them
-               for (HashSet<FailoverVertex> regionVertices : 
distinctRegions.keySet()) {
+               for (Set<FailoverVertex> regionVertices : distinctRegions) {
                        LOG.debug("Creating a failover region with {} 
vertices.", regionVertices.size());
                        final FailoverRegion failoverRegion = new 
FailoverRegion(regionVertices);
                        regions.put(failoverRegion, null);
@@ -165,21 +109,6 @@ public class RestartPipelinedRegionStrategy implements 
FailoverStrategy {
                LOG.info("Created {} failover regions.", regions.size());
        }
 
-       private void buildOneRegionForAllVertices() {
-               LOG.warn("Cannot decompose the topology into individual 
failover regions due to use of " +
-                       "Co-Location constraints (iterations). Job will fail 
over as one holistic unit.");
-
-               final Set<FailoverVertex> allVertices = new HashSet<>();
-               for (FailoverVertex vertex : topology.getFailoverVertices()) {
-                       allVertices.add(vertex);
-               }
-
-               final FailoverRegion region = new FailoverRegion(allVertices);
-               regions.put(region, null);
-               for (FailoverVertex vertex : topology.getFailoverVertices()) {
-                       vertexToRegionMap.put(vertex.getExecutionVertexID(), 
region);
-               }
-       }
 
        // 
------------------------------------------------------------------------
        //  task failure handling

Reply via email to