This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bce9a4c43fb6ff61762885cb0fbc984a7338ce19
Author: Gary Yao <[email protected]>
AuthorDate: Thu Jun 20 17:20:00 2019 +0200

    [hotfix][runtime] Use Set instead of IdentityHashMap where possible
    
    Replace instances where we use an IdentityHashMap as a set with
    Collections.newSetFromMap() in RestartPipelinedRegionStrategy and
    PipelineRegionComputeUtil.
---
 .../failover/flip1/PipelinedRegionComputeUtil.java | 11 ++++------
 .../flip1/RestartPipelinedRegionStrategy.java      | 25 +++++++++++-----------
 2 files changed, 17 insertions(+), 19 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
index 291c894..b21b52f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
@@ -22,6 +22,7 @@ package 
org.apache.flink.runtime.executiongraph.failover.flip1;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Map;
@@ -41,8 +42,6 @@ public final class PipelinedRegionComputeUtil {
                        return 
uniqueRegions(buildOneRegionForAllVertices(topology));
                }
 
-               // we use the map (list -> null) to imitate an IdentityHashSet 
(which does not exist)
-               // this helps to optimize the building performance as it uses 
reference equality
                final Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion = 
new IdentityHashMap<>();
 
                // iterate all the vertices which are topologically sorted
@@ -105,11 +104,9 @@ public final class PipelinedRegionComputeUtil {
 
        private static Set<Set<FailoverVertex>> uniqueRegions(final 
Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion) {
                // find out all the distinct regions
-               final IdentityHashMap<Set<FailoverVertex>, Object> 
distinctRegions = new IdentityHashMap<>();
-               for (Set<FailoverVertex> regionVertices : 
vertexToRegion.values()) {
-                       distinctRegions.put(regionVertices, null);
-               }
-               return distinctRegions.keySet();
+               final Set<Set<FailoverVertex>> distinctRegions = 
Collections.newSetFromMap(new IdentityHashMap<>());
+               distinctRegions.addAll(vertexToRegion.values());
+               return distinctRegions;
        }
 
        private PipelinedRegionComputeUtil() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
index 58d1675..a87764b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
@@ -50,7 +51,7 @@ public class RestartPipelinedRegionStrategy implements 
FailoverStrategy {
        private final FailoverTopology topology;
 
        /** All failover regions. */
-       private final IdentityHashMap<FailoverRegion, Object> regions;
+       private final Set<FailoverRegion> regions;
 
        /** Maps execution vertex id to failover region. */
        private final Map<ExecutionVertexID, FailoverRegion> vertexToRegionMap;
@@ -80,7 +81,7 @@ public class RestartPipelinedRegionStrategy implements 
FailoverStrategy {
                ResultPartitionAvailabilityChecker 
resultPartitionAvailabilityChecker) {
 
                this.topology = checkNotNull(topology);
-               this.regions = new IdentityHashMap<>();
+               this.regions = Collections.newSetFromMap(new 
IdentityHashMap<>());
                this.vertexToRegionMap = new HashMap<>();
                this.resultPartitionAvailabilityChecker = new 
RegionFailoverResultPartitionAvailabilityChecker(
                        resultPartitionAvailabilityChecker);
@@ -100,7 +101,7 @@ public class RestartPipelinedRegionStrategy implements 
FailoverStrategy {
                for (Set<FailoverVertex> regionVertices : distinctRegions) {
                        LOG.debug("Creating a failover region with {} 
vertices.", regionVertices.size());
                        final FailoverRegion failoverRegion = new 
FailoverRegion(regionVertices);
-                       regions.put(failoverRegion, null);
+                       regions.add(failoverRegion);
                        for (FailoverVertex vertex : regionVertices) {
                                
vertexToRegionMap.put(vertex.getExecutionVertexID(), failoverRegion);
                        }
@@ -171,26 +172,26 @@ public class RestartPipelinedRegionStrategy implements 
FailoverStrategy {
         * 3. If a region is involved, all of its consumer regions are involved
         */
        private Set<FailoverRegion> getRegionsToRestart(FailoverRegion 
failedRegion) {
-               IdentityHashMap<FailoverRegion, Object> regionsToRestart = new 
IdentityHashMap<>();
-               IdentityHashMap<FailoverRegion, Object> visitedRegions = new 
IdentityHashMap<>();
+               Set<FailoverRegion> regionsToRestart = 
Collections.newSetFromMap(new IdentityHashMap<>());
+               Set<FailoverRegion> visitedRegions = 
Collections.newSetFromMap(new IdentityHashMap<>());
 
                // start from the failed region to visit all involved regions
                Queue<FailoverRegion> regionsToVisit = new ArrayDeque<>();
-               visitedRegions.put(failedRegion, null);
+               visitedRegions.add(failedRegion);
                regionsToVisit.add(failedRegion);
                while (!regionsToVisit.isEmpty()) {
                        FailoverRegion regionToRestart = regionsToVisit.poll();
 
                        // an involved region should be restarted
-                       regionsToRestart.put(regionToRestart, null);
+                       regionsToRestart.add(regionToRestart);
 
                        // if a needed input result partition is not available, 
its producer region is involved
                        for (FailoverVertex vertex : 
regionToRestart.getAllExecutionVertices()) {
                                for (FailoverEdge inEdge : 
vertex.getInputEdges()) {
                                        if 
(!resultPartitionAvailabilityChecker.isAvailable(inEdge.getResultPartitionID()))
 {
                                                FailoverRegion producerRegion = 
vertexToRegionMap.get(inEdge.getSourceVertex().getExecutionVertexID());
-                                               if 
(!visitedRegions.containsKey(producerRegion)) {
-                                                       
visitedRegions.put(producerRegion, null);
+                                               if 
(!visitedRegions.contains(producerRegion)) {
+                                                       
visitedRegions.add(producerRegion);
                                                        
regionsToVisit.add(producerRegion);
                                                }
                                        }
@@ -201,15 +202,15 @@ public class RestartPipelinedRegionStrategy implements 
FailoverStrategy {
                        for (FailoverVertex vertex : 
regionToRestart.getAllExecutionVertices()) {
                                for (FailoverEdge outEdge : 
vertex.getOutputEdges()) {
                                        FailoverRegion consumerRegion = 
vertexToRegionMap.get(outEdge.getTargetVertex().getExecutionVertexID());
-                                       if 
(!visitedRegions.containsKey(consumerRegion)) {
-                                               
visitedRegions.put(consumerRegion, null);
+                                       if 
(!visitedRegions.contains(consumerRegion)) {
+                                               
visitedRegions.add(consumerRegion);
                                                
regionsToVisit.add(consumerRegion);
                                        }
                                }
                        }
                }
 
-               return regionsToRestart.keySet();
+               return regionsToRestart;
        }
 
        // 
------------------------------------------------------------------------

Reply via email to