zentol commented on a change in pull request #8430: [FLINK-12068] [runtime] 
Backtrack failover regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284183023
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 ##########
 @@ -162,40 +201,90 @@ private void buildOneRegionForAllVertices() {
         * In this strategy, all task vertices in 'involved' regions are 
proposed to be restarted.
         * The 'involved' regions are calculated with rules below:
         * 1. The region containing the failed task is always involved
-        * 2. TODO: If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
+        * 2. If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
         *    the region containing the partition producer task is involved
-        * 3. TODO: If a region is involved, all of its consumer regions are 
involved
+        * 3. If a region is involved, all of its consumer regions are involved
         *
         * @param executionVertexId ID of the failed task
         * @param cause cause of the failure
         * @return set of IDs of vertices to restart
         */
        @Override
        public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID 
executionVertexId, Throwable cause) {
-               final FailoverRegion failedRegion = 
regions.get(executionVertexId);
+               LOG.info("Calculating tasks to restart to recover the failed 
task {}.", executionVertexId);
+
+               final FailoverRegion failedRegion = 
vertexToRegionMap.get(executionVertexId);
                if (failedRegion == null) {
                        // TODO: show the task name in the log
                        throw new IllegalStateException("Can not find the 
failover region for task " + executionVertexId, cause);
                }
 
-               // TODO: if the failure cause is data consumption error, mark 
the corresponding data partition to be unavailable
+               // if the failure cause is data consumption error, mark the 
corresponding data partition to be failed,
+               // so that the failover process will try to recover it
+               if (cause instanceof DataConsumptionException) {
+                       
resultPartitionAvailabilityChecker.markResultPartitionFailed(
+                               ((DataConsumptionException) 
cause).getPartitionId().getPartitionId());
+               }
 
-               return getRegionsToRestart(failedRegion).stream().flatMap(
+               // calculate the tasks to restart based on the result of 
regions to restart
+               Set<FailoverRegion> regionsToRestart = 
getRegionsToRestart(failedRegion);
+               Set<ExecutionVertexID> tasksToRestart = 
regionsToRestart.stream().flatMap(
                        r -> 
r.getAllExecutionVertexIDs().stream()).collect(Collectors.toSet());
+
+               // the previous failed partition will be recovered. remove its 
failed state from the checker
+               if (cause instanceof DataConsumptionException) {
+                       
resultPartitionAvailabilityChecker.removeResultPartitionFromFailedState(
+                               ((DataConsumptionException) 
cause).getPartitionId().getPartitionId());
+               }
+
+               LOG.info("{} tasks should be restarted to recover the failed 
task {}. ", tasksToRestart.size(), executionVertexId);
+               return tasksToRestart;
        }
 
        /**
         * All 'involved' regions are proposed to be restarted.
         * The 'involved' regions are calculated with rules below:
         * 1. The region containing the failed task is always involved
-        * 2. TODO: If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
+        * 2. If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
         *    the region containing the partition producer task is involved
-        * 3. TODO: If a region is involved, all of its consumer regions are 
involved
+        * 3. If a region is involved, all of its consumer regions are involved
         */
-       private Set<FailoverRegion> getRegionsToRestart(FailoverRegion 
regionToRestart) {
-               return Collections.singleton(regionToRestart);
+       private Set<FailoverRegion> getRegionsToRestart(FailoverRegion 
failedRegion) {
+               IdentityHashMap<FailoverRegion, Object> regionsToRestart = new 
IdentityHashMap<>();
+               IdentityHashMap<FailoverRegion, Object> visitedRegions = new 
IdentityHashMap<>();
+
+               // start from the failed region to visit all involved regions
+               Queue<FailoverRegion> regionsToVisit = new ArrayDeque<>();
+               regionsToVisit.add(failedRegion);
+               while (!regionsToVisit.isEmpty()) {
+                       FailoverRegion regionToRestart = regionsToVisit.poll();
+
+                       if (!visitedRegions.containsKey(regionToRestart)) {
+                               visitedRegions.put(regionToRestart, null);
+
+                               // an involved region should be restarted
+                               regionsToRestart.put(regionToRestart, null);
 
-               // TODO: implement backtracking logic
+                               // if a needed input result partition is not 
available, its producer region is involved
+                               for (FailoverVertex vertex : 
regionToRestart.getAllExecutionVertices()) {
+                                       for (FailoverEdge inEdge : 
vertex.getInputEdges()) {
+                                               if 
(!resultPartitionAvailabilityChecker.isAvailable(inEdge.getResultPartitionID()))
 {
+                                                       FailoverRegion 
producerRegion = 
vertexToRegionMap.get(inEdge.getSourceVertex().getExecutionVertexID());
+                                                       if 
(!visitedRegions.containsKey(producerRegion)) {
+                                                               
regionsToVisit.add(producerRegion);
+                                                       }
+                                               }
+                                       }
+                               }
+
+                               // all consumer regions of an involved region 
should be involved
+                               for (FailoverRegion consumerRegion : 
regionConsumers.get(regionToRestart)) {
 
 Review comment:
   `regionsToVisit.addAll(regionConsumers.get(regionToRestart))`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to