zentol commented on a change in pull request #8804: [FLINK-12883][WIP][runtime]
Add elaborated partition release logic
URL: https://github.com/apache/flink/pull/8804#discussion_r295756573
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
##########
@@ -94,66 +94,10 @@ public RestartPipelinedRegionStrategy(
//
------------------------------------------------------------------------
private void buildFailoverRegions() {
- // currently we let a job with co-location constraints fail as
one region
- // putting co-located vertices in the same region with each
other can be a future improvement
- if (topology.containsCoLocationConstraints()) {
- buildOneRegionForAllVertices();
- return;
- }
-
- // we use the map (list -> null) to imitate an IdentityHashSet
(which does not exist)
- // this helps to optimize the building performance as it uses
reference equality
- final IdentityHashMap<FailoverVertex, HashSet<FailoverVertex>>
vertexToRegion = new IdentityHashMap<>();
-
- // iterate all the vertices which are topologically sorted
- for (FailoverVertex vertex : topology.getFailoverVertices()) {
- HashSet<FailoverVertex> currentRegion = new
HashSet<>(1);
- currentRegion.add(vertex);
- vertexToRegion.put(vertex, currentRegion);
-
- for (FailoverEdge inputEdge : vertex.getInputEdges()) {
- if
(inputEdge.getResultPartitionType().isPipelined()) {
- final FailoverVertex producerVertex =
inputEdge.getSourceVertex();
- final HashSet<FailoverVertex>
producerRegion = vertexToRegion.get(producerVertex);
-
- if (producerRegion == null) {
- throw new
IllegalStateException("Producer task " + producerVertex.getExecutionVertexName()
- + " failover region is
null while calculating failover region for the consumer task "
- +
vertex.getExecutionVertexName() + ". This should be a failover region building
bug.");
- }
-
- // check if it is the same as the
producer region, if so skip the merge
- // this check can significantly reduce
compute complexity in All-to-All PIPELINED edge case
- if (currentRegion != producerRegion) {
- // merge current region and
producer region
- // merge the smaller region
into the larger one to reduce the cost
- final HashSet<FailoverVertex>
smallerSet;
- final HashSet<FailoverVertex>
largerSet;
- if (currentRegion.size() <
producerRegion.size()) {
- smallerSet =
currentRegion;
- largerSet =
producerRegion;
- } else {
- smallerSet =
producerRegion;
- largerSet =
currentRegion;
- }
- for (FailoverVertex v :
smallerSet) {
- vertexToRegion.put(v,
largerSet);
- }
- largerSet.addAll(smallerSet);
- currentRegion = largerSet;
- }
- }
- }
- }
-
- // find out all the distinct regions
- final IdentityHashMap<HashSet<FailoverVertex>, Object>
distinctRegions = new IdentityHashMap<>();
- for (HashSet<FailoverVertex> regionVertices :
vertexToRegion.values()) {
- distinctRegions.put(regionVertices, null);
- }
+ final Set<Set<FailoverVertex>> distinctRegions =
PipelinedRegionComputeUtil.computePipelinedRegions(topology);
Review comment:
we should modify the constructor to accept the pipelined regions instead;
similarly to the `RegionPartitionReleaseStrategy `
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services