GJL commented on a change in pull request #8804: [FLINK-12883][WIP][runtime] 
Add elaborated partition release logic
URL: https://github.com/apache/flink/pull/8804#discussion_r296176302
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 ##########
 @@ -94,66 +94,10 @@ public RestartPipelinedRegionStrategy(
        // 
------------------------------------------------------------------------
 
        private void buildFailoverRegions() {
-               // currently we let a job with co-location constraints fail as 
one region
-               // putting co-located vertices in the same region with each 
other can be a future improvement
-               if (topology.containsCoLocationConstraints()) {
-                       buildOneRegionForAllVertices();
-                       return;
-               }
-
-               // we use the map (list -> null) to imitate an IdentityHashSet 
(which does not exist)
-               // this helps to optimize the building performance as it uses 
reference equality
-               final IdentityHashMap<FailoverVertex, HashSet<FailoverVertex>> 
vertexToRegion = new IdentityHashMap<>();
-
-               // iterate all the vertices which are topologically sorted
-               for (FailoverVertex vertex : topology.getFailoverVertices()) {
-                       HashSet<FailoverVertex> currentRegion = new 
HashSet<>(1);
-                       currentRegion.add(vertex);
-                       vertexToRegion.put(vertex, currentRegion);
-
-                       for (FailoverEdge inputEdge : vertex.getInputEdges()) {
-                               if 
(inputEdge.getResultPartitionType().isPipelined()) {
-                                       final FailoverVertex producerVertex = 
inputEdge.getSourceVertex();
-                                       final HashSet<FailoverVertex> 
producerRegion = vertexToRegion.get(producerVertex);
-
-                                       if (producerRegion == null) {
-                                               throw new 
IllegalStateException("Producer task " + producerVertex.getExecutionVertexName()
-                                                       + " failover region is 
null while calculating failover region for the consumer task "
-                                                       + 
vertex.getExecutionVertexName() + ". This should be a failover region building 
bug.");
-                                       }
-
-                                       // check if it is the same as the 
producer region, if so skip the merge
-                                       // this check can significantly reduce 
compute complexity in All-to-All PIPELINED edge case
-                                       if (currentRegion != producerRegion) {
-                                               // merge current region and 
producer region
-                                               // merge the smaller region 
into the larger one to reduce the cost
-                                               final HashSet<FailoverVertex> 
smallerSet;
-                                               final HashSet<FailoverVertex> 
largerSet;
-                                               if (currentRegion.size() < 
producerRegion.size()) {
-                                                       smallerSet = 
currentRegion;
-                                                       largerSet = 
producerRegion;
-                                               } else {
-                                                       smallerSet = 
producerRegion;
-                                                       largerSet = 
currentRegion;
-                                               }
-                                               for (FailoverVertex v : 
smallerSet) {
-                                                       vertexToRegion.put(v, 
largerSet);
-                                               }
-                                               largerSet.addAll(smallerSet);
-                                               currentRegion = largerSet;
-                                       }
-                               }
-                       }
-               }
-
-               // find out all the distinct regions
-               final IdentityHashMap<HashSet<FailoverVertex>, Object> 
distinctRegions = new IdentityHashMap<>();
-               for (HashSet<FailoverVertex> regionVertices : 
vertexToRegion.values()) {
-                       distinctRegions.put(regionVertices, null);
-               }
+               final Set<Set<FailoverVertex>> distinctRegions = 
PipelinedRegionComputeUtil.computePipelinedRegions(topology);
 
 Review comment:
   That's not so obvious for me. `RestartPipelinedRegionStrategy` still needs 
`FailoverVertex` at the end of the day. Mapping from `PipelinedRegion` to 
`Set<FailoverVertex>` is tedious. 
   
   To me It's also not clear whether it is the right abstraction that 
`PipelinedRegionComputeUtil#computePipelinedRegions()` receives 
`FailoverTopology` as input. 
   
   Lastly, I think that this comment competes with [that 
one](https://github.com/apache/flink/pull/8804/files#r295769220).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to