zentol commented on a change in pull request #8430: [FLINK-12068] [runtime]
Backtrack failover regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284175986
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
##########
@@ -162,40 +201,90 @@ private void buildOneRegionForAllVertices() {
* In this strategy, all task vertices in 'involved' regions are
proposed to be restarted.
* The 'involved' regions are calculated with rules below:
* 1. The region containing the failed task is always involved
- * 2. TODO: If an input result partition of an involved region is not
available, i.e. Missing or Corrupted,
+ * 2. If an input result partition of an involved region is not
available, i.e. Missing or Corrupted,
* the region containing the partition producer task is involved
- * 3. TODO: If a region is involved, all of its consumer regions are
involved
+ * 3. If a region is involved, all of its consumer regions are involved
*
* @param executionVertexId ID of the failed task
* @param cause cause of the failure
* @return set of IDs of vertices to restart
*/
@Override
public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID
executionVertexId, Throwable cause) {
- final FailoverRegion failedRegion =
regions.get(executionVertexId);
+ LOG.info("Calculating tasks to restart to recover the failed
task {}.", executionVertexId);
+
+ final FailoverRegion failedRegion =
vertexToRegionMap.get(executionVertexId);
if (failedRegion == null) {
// TODO: show the task name in the log
throw new IllegalStateException("Can not find the
failover region for task " + executionVertexId, cause);
}
- // TODO: if the failure cause is data consumption error, mark
the corresponding data partition to be unavailable
+ // if the failure cause is data consumption error, mark the
corresponding data partition to be failed,
+ // so that the failover process will try to recover it
+ if (cause instanceof DataConsumptionException) {
+
resultPartitionAvailabilityChecker.markResultPartitionFailed(
+ ((DataConsumptionException)
cause).getPartitionId().getPartitionId());
+ }
- return getRegionsToRestart(failedRegion).stream().flatMap(
+ // calculate the tasks to restart based on the result of
regions to restart
+ Set<FailoverRegion> regionsToRestart =
getRegionsToRestart(failedRegion);
+ Set<ExecutionVertexID> tasksToRestart =
regionsToRestart.stream().flatMap(
Review comment:
can we safe time here by using a simple loop?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services