zhuzhurk commented on a change in pull request #8430: [FLINK-12068] [runtime] 
Backtrack failover regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284585195
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 ##########
 @@ -162,40 +176,96 @@ private void buildOneRegionForAllVertices() {
         * In this strategy, all task vertices in 'involved' regions are 
proposed to be restarted.
         * The 'involved' regions are calculated with rules below:
         * 1. The region containing the failed task is always involved
-        * 2. TODO: If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
+        * 2. If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
         *    the region containing the partition producer task is involved
-        * 3. TODO: If a region is involved, all of its consumer regions are 
involved
+        * 3. If a region is involved, all of its consumer regions are involved
         *
         * @param executionVertexId ID of the failed task
         * @param cause cause of the failure
         * @return set of IDs of vertices to restart
         */
        @Override
        public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID 
executionVertexId, Throwable cause) {
-               final FailoverRegion failedRegion = 
regions.get(executionVertexId);
+               LOG.info("Calculating tasks to restart to recover the failed 
task {}.", executionVertexId);
+
+               final FailoverRegion failedRegion = 
vertexToRegionMap.get(executionVertexId);
                if (failedRegion == null) {
                        // TODO: show the task name in the log
                        throw new IllegalStateException("Can not find the 
failover region for task " + executionVertexId, cause);
                }
 
-               // TODO: if the failure cause is data consumption error, mark 
the corresponding data partition to be unavailable
+               // if the failure cause is data consumption error, mark the 
corresponding data partition to be failed,
+               // so that the failover process will try to recover it
+               Optional<DataConsumptionException> dataConsumptionException = 
ExceptionUtils.findThrowable(
+                       cause, DataConsumptionException.class);
+               if (dataConsumptionException.isPresent()) {
+                       
resultPartitionAvailabilityChecker.markResultPartitionFailed(
+                               
dataConsumptionException.get().getPartitionId().getPartitionId());
+               }
 
-               return getRegionsToRestart(failedRegion).stream().flatMap(
+               // calculate the tasks to restart based on the result of 
regions to restart
+               Set<FailoverRegion> regionsToRestart = 
getRegionsToRestart(failedRegion);
+               Set<ExecutionVertexID> tasksToRestart = 
regionsToRestart.stream().flatMap(
                        r -> 
r.getAllExecutionVertexIDs().stream()).collect(Collectors.toSet());
+
+               // the previous failed partition will be recovered. remove its 
failed state from the checker
+               if (dataConsumptionException.isPresent()) {
+                       
resultPartitionAvailabilityChecker.removeResultPartitionFromFailedState(
+                               
dataConsumptionException.get().getPartitionId().getPartitionId());
+               }
+
+               LOG.info("{} tasks should be restarted to recover the failed 
task {}. ", tasksToRestart.size(), executionVertexId);
+               return tasksToRestart;
        }
 
        /**
         * All 'involved' regions are proposed to be restarted.
         * The 'involved' regions are calculated with rules below:
         * 1. The region containing the failed task is always involved
-        * 2. TODO: If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
+        * 2. If an input result partition of an involved region is not 
available, i.e. Missing or Corrupted,
         *    the region containing the partition producer task is involved
-        * 3. TODO: If a region is involved, all of its consumer regions are 
involved
+        * 3. If a region is involved, all of its consumer regions are involved
         */
-       private Set<FailoverRegion> getRegionsToRestart(FailoverRegion 
regionToRestart) {
-               return Collections.singleton(regionToRestart);
+       private Set<FailoverRegion> getRegionsToRestart(FailoverRegion 
failedRegion) {
+               IdentityHashMap<FailoverRegion, Object> regionsToRestart = new 
IdentityHashMap<>();
+               IdentityHashMap<FailoverRegion, Object> visitedRegions = new 
IdentityHashMap<>();
+
+               // start from the failed region to visit all involved regions
+               Queue<FailoverRegion> regionsToVisit = new ArrayDeque<>();
+               visitedRegions.put(failedRegion, null);
+               regionsToVisit.add(failedRegion);
+               while (!regionsToVisit.isEmpty()) {
+                       FailoverRegion regionToRestart = regionsToVisit.poll();
+
+                       // an involved region should be restarted
+                       regionsToRestart.put(regionToRestart, null);
 
-               // TODO: implement backtracking logic
+                       // if a needed input result partition is not available, 
its producer region is involved
+                       for (FailoverVertex vertex : 
regionToRestart.getAllExecutionVertices()) {
+                               for (FailoverEdge inEdge : 
vertex.getInputEdges()) {
 
 Review comment:
   It is similar to pre-calculating the region input result partitions and 
consumer regions.
   Having boundary vertices, we still need to calculate the input result 
partitions and consumer regions, through iterating edges of the boundary 
vertices. 
   
   The pre-calculation saves time in 2 aspects:
    * loop count(edges to iterate), by finding out the outer boundaries, 
especially for large regions
    * time cost each loop, by doing some lookup ahead, such as 
vertexToRegionMap.get(...)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to