This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bce9a4c43fb6ff61762885cb0fbc984a7338ce19 Author: Gary Yao <[email protected]> AuthorDate: Thu Jun 20 17:20:00 2019 +0200 [hotfix][runtime] Use Set instead of IdentityHashMap where possible Replace instances where we use an IdentityHashMap as a set with Collections.newSetFromMap() in RestartPipelinedRegionStrategy and PipelineRegionComputeUtil. --- .../failover/flip1/PipelinedRegionComputeUtil.java | 11 ++++------ .../flip1/RestartPipelinedRegionStrategy.java | 25 +++++++++++----------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java index 291c894..b21b52f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java @@ -22,6 +22,7 @@ package org.apache.flink.runtime.executiongraph.failover.flip1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Map; @@ -41,8 +42,6 @@ public final class PipelinedRegionComputeUtil { return uniqueRegions(buildOneRegionForAllVertices(topology)); } - // we use the map (list -> null) to imitate an IdentityHashSet (which does not exist) - // this helps to optimize the building performance as it uses reference equality final Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion = new IdentityHashMap<>(); // iterate all the vertices which are topologically sorted @@ -105,11 +104,9 @@ public final class PipelinedRegionComputeUtil { private static Set<Set<FailoverVertex>> uniqueRegions(final Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion) { // find out all the distinct regions - final IdentityHashMap<Set<FailoverVertex>, Object> distinctRegions = new IdentityHashMap<>(); - for (Set<FailoverVertex> regionVertices : vertexToRegion.values()) { - distinctRegions.put(regionVertices, null); - } - return distinctRegions.keySet(); + final Set<Set<FailoverVertex>> distinctRegions = Collections.newSetFromMap(new IdentityHashMap<>()); + distinctRegions.addAll(vertexToRegion.values()); + return distinctRegions; } private PipelinedRegionComputeUtil() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java index 58d1675..a87764b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayDeque; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; @@ -50,7 +51,7 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy { private final FailoverTopology topology; /** All failover regions. */ - private final IdentityHashMap<FailoverRegion, Object> regions; + private final Set<FailoverRegion> regions; /** Maps execution vertex id to failover region. */ private final Map<ExecutionVertexID, FailoverRegion> vertexToRegionMap; @@ -80,7 +81,7 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy { ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) { this.topology = checkNotNull(topology); - this.regions = new IdentityHashMap<>(); + this.regions = Collections.newSetFromMap(new IdentityHashMap<>()); this.vertexToRegionMap = new HashMap<>(); this.resultPartitionAvailabilityChecker = new RegionFailoverResultPartitionAvailabilityChecker( resultPartitionAvailabilityChecker); @@ -100,7 +101,7 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy { for (Set<FailoverVertex> regionVertices : distinctRegions) { LOG.debug("Creating a failover region with {} vertices.", regionVertices.size()); final FailoverRegion failoverRegion = new FailoverRegion(regionVertices); - regions.put(failoverRegion, null); + regions.add(failoverRegion); for (FailoverVertex vertex : regionVertices) { vertexToRegionMap.put(vertex.getExecutionVertexID(), failoverRegion); } @@ -171,26 +172,26 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy { * 3. If a region is involved, all of its consumer regions are involved */ private Set<FailoverRegion> getRegionsToRestart(FailoverRegion failedRegion) { - IdentityHashMap<FailoverRegion, Object> regionsToRestart = new IdentityHashMap<>(); - IdentityHashMap<FailoverRegion, Object> visitedRegions = new IdentityHashMap<>(); + Set<FailoverRegion> regionsToRestart = Collections.newSetFromMap(new IdentityHashMap<>()); + Set<FailoverRegion> visitedRegions = Collections.newSetFromMap(new IdentityHashMap<>()); // start from the failed region to visit all involved regions Queue<FailoverRegion> regionsToVisit = new ArrayDeque<>(); - visitedRegions.put(failedRegion, null); + visitedRegions.add(failedRegion); regionsToVisit.add(failedRegion); while (!regionsToVisit.isEmpty()) { FailoverRegion regionToRestart = regionsToVisit.poll(); // an involved region should be restarted - regionsToRestart.put(regionToRestart, null); + regionsToRestart.add(regionToRestart); // if a needed input result partition is not available, its producer region is involved for (FailoverVertex vertex : regionToRestart.getAllExecutionVertices()) { for (FailoverEdge inEdge : vertex.getInputEdges()) { if (!resultPartitionAvailabilityChecker.isAvailable(inEdge.getResultPartitionID())) { FailoverRegion producerRegion = vertexToRegionMap.get(inEdge.getSourceVertex().getExecutionVertexID()); - if (!visitedRegions.containsKey(producerRegion)) { - visitedRegions.put(producerRegion, null); + if (!visitedRegions.contains(producerRegion)) { + visitedRegions.add(producerRegion); regionsToVisit.add(producerRegion); } } @@ -201,15 +202,15 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy { for (FailoverVertex vertex : regionToRestart.getAllExecutionVertices()) { for (FailoverEdge outEdge : vertex.getOutputEdges()) { FailoverRegion consumerRegion = vertexToRegionMap.get(outEdge.getTargetVertex().getExecutionVertexID()); - if (!visitedRegions.containsKey(consumerRegion)) { - visitedRegions.put(consumerRegion, null); + if (!visitedRegions.contains(consumerRegion)) { + visitedRegions.add(consumerRegion); regionsToVisit.add(consumerRegion); } } } } - return regionsToRestart.keySet(); + return regionsToRestart; } // ------------------------------------------------------------------------
