yahoNanJing commented on code in PR #261:
URL: https://github.com/apache/arrow-ballista/pull/261#discussion_r982480317


##########
ballista/rust/scheduler/src/state/execution_graph.rs:
##########
@@ -202,88 +228,208 @@ impl ExecutionGraph {
         &mut self,
         executor: &ExecutorMetadata,
         task_statuses: Vec<TaskStatus>,
-    ) -> Result<Option<QueryStageSchedulerEvent>> {
+        max_task_failures: usize,
+        max_stage_failures: usize,
+    ) -> Result<Vec<QueryStageSchedulerEvent>> {
         let job_id = self.job_id().to_owned();
         // First of all, classify the statuses by stages
         let mut job_task_statuses: HashMap<usize, Vec<TaskStatus>> = 
HashMap::new();
         for task_status in task_statuses {
-            if let Some(task_id) = task_status.task_id.as_ref() {
-                if task_id.job_id != job_id {
-                    return Err(BallistaError::Internal(format!(
-                        "Error updating job {}: Invalid task status job ID {}",
-                        job_id, task_id.job_id
-                    )));
-                }
-                let stage_task_statuses = job_task_statuses
-                    .entry(task_id.stage_id as usize)
-                    .or_insert_with(Vec::new);
-                stage_task_statuses.push(task_status);
-            } else {
-                error!("There's no task id when updating status");
-            }
+            let stage_id = task_status.stage_id as usize;
+            let stage_task_statuses =
+                job_task_statuses.entry(stage_id).or_insert_with(Vec::new);
+            stage_task_statuses.push(task_status);
         }
 
         // Revive before updating due to some updates not saved
         // It will be refined later
         self.revive();
 
-        let mut events = vec![];
+        let current_running_stages: HashSet<usize> =
+            HashSet::from_iter(self.running_stages());
+
+        // Copy the failed stage attempts from self
+        let mut failed_stage_attempts: HashMap<usize, HashSet<usize>> = 
HashMap::new();
+        for (stage_id, attempts) in self.failed_stage_attempts.iter() {
+            failed_stage_attempts
+                .insert(*stage_id, 
HashSet::from_iter(attempts.iter().copied()));
+        }
+
+        let mut resolved_stages = HashSet::new();
+        let mut successful_stages = HashSet::new();
+        let mut failed_stages = HashMap::new();
+        let mut rollback_running_stages = HashMap::new();
+        let mut resubmit_successful_stages: HashMap<usize, HashSet<usize>> =
+            HashMap::new();
+        let mut reset_running_stages: HashMap<usize, HashSet<usize>> = 
HashMap::new();
+
         for (stage_id, stage_task_statuses) in job_task_statuses {
             if let Some(stage) = self.stages.get_mut(&stage_id) {
                 if let ExecutionStage::Running(running_stage) = stage {
                     let mut locations = vec![];
                     for task_status in stage_task_statuses.into_iter() {
-                        if let TaskStatus {
-                            task_id:
-                                Some(protobuf::PartitionId {
-                                    job_id,
-                                    stage_id,
-                                    partition_id,
-                                }),
-                            metrics: operator_metrics,
-                            status: Some(status),
-                        } = task_status
                         {
                             let stage_id = stage_id as usize;
-                            let partition_id = partition_id as usize;
+                            let task_stage_attempt_num =
+                                task_status.stage_attempt_num as usize;
+                            if task_stage_attempt_num < 
running_stage.stage_attempt_num {
+                                warn!("Ignore TaskStatus update with TID {} as 
it's from Stage {}.{} and there is a more recent stage attempt {}.{} running",
+                                    task_status.task_id, stage_id, 
task_stage_attempt_num, stage_id, running_stage.stage_attempt_num);
+                                continue;
+                            }
+                            let partition_id = 
task_status.clone().partition_id as usize;
+                            let task_identity = format!(
+                                "TID {} {}/{}.{}/{}",
+                                task_status.task_id,
+                                job_id,
+                                stage_id,
+                                task_stage_attempt_num,
+                                partition_id
+                            );
+                            let operator_metrics = task_status.metrics.clone();
 
-                            running_stage
-                                .update_task_status(partition_id, 
status.clone());
+                            if !running_stage
+                                .update_task_info(partition_id, 
task_status.clone())
+                            {
+                                continue;
+                            }
 
-                            // TODO Should be able to reschedule this task.
-                            if let task_status::Status::Failed(failed_task) = 
status {
-                                events.push(StageEvent::StageFailed(
-                                    stage_id,
-                                    format!(
-                                        "Task {}/{}/{} failed: {}",
-                                        job_id, stage_id, partition_id, 
failed_task.error
-                                    ),
-                                ));
-                                break;
-                            } else if let 
task_status::Status::Completed(completed_task) =
-                                status
+                            if let 
Some(task_status::Status::Failed(failed_task)) =
+                                task_status.status
                             {
-                                // update task metrics for completed task
+                                let failed_reason = failed_task.failed_reason;
+
+                                match failed_reason {
+                                    Some(FailedReason::FetchPartitionError(
+                                        fetch_partiton_error,
+                                    )) => {
+                                        let failed_attempts = 
failed_stage_attempts
+                                            .entry(stage_id)
+                                            .or_insert_with(HashSet::new);
+                                        
failed_attempts.insert(task_stage_attempt_num);
+                                        if failed_attempts.len() < 
max_stage_failures {
+                                            let map_stage_id = 
fetch_partiton_error
+                                                .map_stage_id
+                                                as usize;
+                                            let map_partition_id = 
fetch_partiton_error
+                                                .map_partition_id
+                                                as usize;
+                                            let executor_id =
+                                                
fetch_partiton_error.executor_id;
+
+                                            if !failed_stages.is_empty() {
+                                                let error_msg = format!(
+                                                        "Stages was marked 
failed, ignore FetchPartitionError from task {}", task_identity);
+                                                warn!("{}", error_msg);
+                                            } else {
+                                                // There are different removal 
strategies here.
+                                                // We can choose just remove 
the map_partition_id in the FetchPartitionError, when resubmit the input stage, 
there are less tasks
+                                                // need to rerun, but this 
might miss many more bad input partitions, lead to more stage level retries in 
following.
+                                                // Here we choose remove all 
the bad input partitions which match the same executor id in this single input 
stage.
+                                                // There are other more 
aggressive approaches, like considering the executor is lost and check all the 
running stages in this graph.
+                                                // Or count the fetch failure 
number on executor and mark the executor lost globally.
+                                                let removed_map_partitions =
+                                                    running_stage
+                                                        
.remove_input_partitions(
+                                                            map_stage_id,
+                                                            map_partition_id,
+                                                            &executor_id,
+                                                        )?;
+
+                                                let failure_reasons =
+                                                    rollback_running_stages
+                                                        .entry(stage_id)
+                                                        
.or_insert_with(HashSet::new);
+                                                
failure_reasons.insert(executor_id);
+
+                                                let missing_inputs =
+                                                    resubmit_successful_stages
+                                                        .entry(map_stage_id)
+                                                        
.or_insert_with(HashSet::new);
+                                                missing_inputs
+                                                    
.extend(removed_map_partitions);
+                                                warn!("Need to resubmit the 
current running Stage {} and its map Stage {} due to FetchPartitionError from 
task {}",
+                                                    stage_id, map_stage_id, 
task_identity)
+                                            }
+                                        } else {
+                                            let error_msg = format!(
+                                                "Stage {} has failed {} times, 
\
+                                            most recent failure reason: {:?}",
+                                                stage_id,
+                                                max_stage_failures,
+                                                failed_task.error
+                                            );
+                                            error!("{}", error_msg);
+                                            failed_stages.insert(stage_id, 
error_msg);
+                                        }
+                                    }
+                                    Some(FailedReason::ExecutionError(_)) => {
+                                        failed_stages.insert(stage_id, 
failed_task.error);
+                                    }
+                                    Some(_) => {
+                                        if failed_task.retryable
+                                            && failed_task.count_to_failures
+                                        {
+                                            if running_stage
+                                                
.task_failure_number(partition_id)
+                                                < max_task_failures
+                                            {
+                                                // TODO add new struct to 
track all the failed task infos
+                                                // The failure TaskInfo is 
ignored and set to None here
+                                                running_stage
+                                                    
.reset_task_info(partition_id);
+                                            } else {
+                                                let error_msg = format!(
+                        "Task {} in Stage {} failed {} times, fail the stage, 
most recent failure reason: {:?}",
+                        partition_id, stage_id, max_task_failures, 
failed_task.error
+                    );
+                                                error!("{}", error_msg);
+                                                failed_stages.insert(stage_id, 
error_msg);
+                                            }
+                                        } else if failed_task.retryable {
+                                            // TODO add new struct to track 
all the failed task infos
+                                            // The failure TaskInfo is ignored 
and set to None here
+                                            
running_stage.reset_task_info(partition_id);
+                                        }
+                                    }
+                                    None => {
+                                        let error_msg = format!(
+                                            "Task {} in Stage {} failed with 
unknown failure reasons, fail the stage",
+                                            partition_id, stage_id);
+                                        error!("{}", error_msg);
+                                        failed_stages.insert(stage_id, 
error_msg);
+                                    }
+                                }
+                            } else if let Some(task_status::Status::Successful(
+                                successful_task,
+                            )) = task_status.status
+                            {
+                                // update task metrics for successfu task
                                 running_stage.update_task_metrics(
                                     partition_id,
                                     operator_metrics,
                                 )?;
 
                                 locations.append(&mut partition_to_location(
                                     &job_id,
+                                    partition_id,
                                     stage_id,
                                     executor,
-                                    completed_task.partitions,
+                                    successful_task.partitions,
                                 ));
                             } else {
-                                warn!("The task {}/{}/{} with status {:?} is 
invalid for updating", job_id, stage_id, partition_id, status);
+                                warn!(
+                                    "The task {}'s status is invalid for 
updating",
+                                    task_identity
+                                );
                             }
                         }
                     }
-                    let is_completed = running_stage.is_completed();
-                    if is_completed {
-                        events.push(StageEvent::StageCompleted(stage_id));
-                        // if this stage is completed, we want to combine the 
stage metrics to plan's metric set and print out the plan
+                    let is_final_successful = running_stage.is_successful()
+                        && !reset_running_stages.contains_key(&stage_id);

Review Comment:
   When this running stage is converted from successful stage, there may be in 
flight running tasks of the stages depending on this stage. When we find those 
tasks fails due to fetching data failure, then we also need to reset this 
running stage. Therefore, here, the reset_running_stages check is necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to