GJL commented on a change in pull request #8804: [FLINK-12883][WIP][runtime]
Add elaborated partition release logic
URL: https://github.com/apache/flink/pull/8804#discussion_r296176302
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
##########
@@ -94,66 +94,10 @@ public RestartPipelinedRegionStrategy(
//
------------------------------------------------------------------------
private void buildFailoverRegions() {
- // currently we let a job with co-location constraints fail as
one region
- // putting co-located vertices in the same region with each
other can be a future improvement
- if (topology.containsCoLocationConstraints()) {
- buildOneRegionForAllVertices();
- return;
- }
-
- // we use the map (list -> null) to imitate an IdentityHashSet
(which does not exist)
- // this helps to optimize the building performance as it uses
reference equality
- final IdentityHashMap<FailoverVertex, HashSet<FailoverVertex>>
vertexToRegion = new IdentityHashMap<>();
-
- // iterate all the vertices which are topologically sorted
- for (FailoverVertex vertex : topology.getFailoverVertices()) {
- HashSet<FailoverVertex> currentRegion = new
HashSet<>(1);
- currentRegion.add(vertex);
- vertexToRegion.put(vertex, currentRegion);
-
- for (FailoverEdge inputEdge : vertex.getInputEdges()) {
- if
(inputEdge.getResultPartitionType().isPipelined()) {
- final FailoverVertex producerVertex =
inputEdge.getSourceVertex();
- final HashSet<FailoverVertex>
producerRegion = vertexToRegion.get(producerVertex);
-
- if (producerRegion == null) {
- throw new
IllegalStateException("Producer task " + producerVertex.getExecutionVertexName()
- + " failover region is
null while calculating failover region for the consumer task "
- +
vertex.getExecutionVertexName() + ". This should be a failover region building
bug.");
- }
-
- // check if it is the same as the
producer region, if so skip the merge
- // this check can significantly reduce
compute complexity in All-to-All PIPELINED edge case
- if (currentRegion != producerRegion) {
- // merge current region and
producer region
- // merge the smaller region
into the larger one to reduce the cost
- final HashSet<FailoverVertex>
smallerSet;
- final HashSet<FailoverVertex>
largerSet;
- if (currentRegion.size() <
producerRegion.size()) {
- smallerSet =
currentRegion;
- largerSet =
producerRegion;
- } else {
- smallerSet =
producerRegion;
- largerSet =
currentRegion;
- }
- for (FailoverVertex v :
smallerSet) {
- vertexToRegion.put(v,
largerSet);
- }
- largerSet.addAll(smallerSet);
- currentRegion = largerSet;
- }
- }
- }
- }
-
- // find out all the distinct regions
- final IdentityHashMap<HashSet<FailoverVertex>, Object>
distinctRegions = new IdentityHashMap<>();
- for (HashSet<FailoverVertex> regionVertices :
vertexToRegion.values()) {
- distinctRegions.put(regionVertices, null);
- }
+ final Set<Set<FailoverVertex>> distinctRegions =
PipelinedRegionComputeUtil.computePipelinedRegions(topology);
Review comment:
That's not so obvious for me. `RestartPipelinedRegionStrategy` still needs
`FailoverVertex` at the end of the day. Mapping from `PipelinedRegion` to
`Set<FailoverVertex>` is tedious.
To me It's also not clear whether it is the right abstraction that
`PipelinedRegionComputeUtil#computePipelinedRegions()` receives
`FailoverTopology` as input.
Lastly, I think that this comment competes with [that
one](https://github.com/apache/flink/pull/8804/files#r295769220).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services