yahoNanJing commented on code in PR #261:
URL: https://github.com/apache/arrow-ballista/pull/261#discussion_r982480317
##########
ballista/rust/scheduler/src/state/execution_graph.rs:
##########
@@ -202,88 +228,208 @@ impl ExecutionGraph {
&mut self,
executor: &ExecutorMetadata,
task_statuses: Vec<TaskStatus>,
- ) -> Result<Option<QueryStageSchedulerEvent>> {
+ max_task_failures: usize,
+ max_stage_failures: usize,
+ ) -> Result<Vec<QueryStageSchedulerEvent>> {
let job_id = self.job_id().to_owned();
// First of all, classify the statuses by stages
let mut job_task_statuses: HashMap<usize, Vec<TaskStatus>> =
HashMap::new();
for task_status in task_statuses {
- if let Some(task_id) = task_status.task_id.as_ref() {
- if task_id.job_id != job_id {
- return Err(BallistaError::Internal(format!(
- "Error updating job {}: Invalid task status job ID {}",
- job_id, task_id.job_id
- )));
- }
- let stage_task_statuses = job_task_statuses
- .entry(task_id.stage_id as usize)
- .or_insert_with(Vec::new);
- stage_task_statuses.push(task_status);
- } else {
- error!("There's no task id when updating status");
- }
+ let stage_id = task_status.stage_id as usize;
+ let stage_task_statuses =
+ job_task_statuses.entry(stage_id).or_insert_with(Vec::new);
+ stage_task_statuses.push(task_status);
}
// Revive before updating due to some updates not saved
// It will be refined later
self.revive();
- let mut events = vec![];
+ let current_running_stages: HashSet<usize> =
+ HashSet::from_iter(self.running_stages());
+
+ // Copy the failed stage attempts from self
+ let mut failed_stage_attempts: HashMap<usize, HashSet<usize>> =
HashMap::new();
+ for (stage_id, attempts) in self.failed_stage_attempts.iter() {
+ failed_stage_attempts
+ .insert(*stage_id,
HashSet::from_iter(attempts.iter().copied()));
+ }
+
+ let mut resolved_stages = HashSet::new();
+ let mut successful_stages = HashSet::new();
+ let mut failed_stages = HashMap::new();
+ let mut rollback_running_stages = HashMap::new();
+ let mut resubmit_successful_stages: HashMap<usize, HashSet<usize>> =
+ HashMap::new();
+ let mut reset_running_stages: HashMap<usize, HashSet<usize>> =
HashMap::new();
+
for (stage_id, stage_task_statuses) in job_task_statuses {
if let Some(stage) = self.stages.get_mut(&stage_id) {
if let ExecutionStage::Running(running_stage) = stage {
let mut locations = vec![];
for task_status in stage_task_statuses.into_iter() {
- if let TaskStatus {
- task_id:
- Some(protobuf::PartitionId {
- job_id,
- stage_id,
- partition_id,
- }),
- metrics: operator_metrics,
- status: Some(status),
- } = task_status
{
let stage_id = stage_id as usize;
- let partition_id = partition_id as usize;
+ let task_stage_attempt_num =
+ task_status.stage_attempt_num as usize;
+ if task_stage_attempt_num <
running_stage.stage_attempt_num {
+ warn!("Ignore TaskStatus update with TID {} as
it's from Stage {}.{} and there is a more recent stage attempt {}.{} running",
+ task_status.task_id, stage_id,
task_stage_attempt_num, stage_id, running_stage.stage_attempt_num);
+ continue;
+ }
+ let partition_id =
task_status.clone().partition_id as usize;
+ let task_identity = format!(
+ "TID {} {}/{}.{}/{}",
+ task_status.task_id,
+ job_id,
+ stage_id,
+ task_stage_attempt_num,
+ partition_id
+ );
+ let operator_metrics = task_status.metrics.clone();
- running_stage
- .update_task_status(partition_id,
status.clone());
+ if !running_stage
+ .update_task_info(partition_id,
task_status.clone())
+ {
+ continue;
+ }
- // TODO Should be able to reschedule this task.
- if let task_status::Status::Failed(failed_task) =
status {
- events.push(StageEvent::StageFailed(
- stage_id,
- format!(
- "Task {}/{}/{} failed: {}",
- job_id, stage_id, partition_id,
failed_task.error
- ),
- ));
- break;
- } else if let
task_status::Status::Completed(completed_task) =
- status
+ if let
Some(task_status::Status::Failed(failed_task)) =
+ task_status.status
{
- // update task metrics for completed task
+ let failed_reason = failed_task.failed_reason;
+
+ match failed_reason {
+ Some(FailedReason::FetchPartitionError(
+ fetch_partiton_error,
+ )) => {
+ let failed_attempts =
failed_stage_attempts
+ .entry(stage_id)
+ .or_insert_with(HashSet::new);
+
failed_attempts.insert(task_stage_attempt_num);
+ if failed_attempts.len() <
max_stage_failures {
+ let map_stage_id =
fetch_partiton_error
+ .map_stage_id
+ as usize;
+ let map_partition_id =
fetch_partiton_error
+ .map_partition_id
+ as usize;
+ let executor_id =
+
fetch_partiton_error.executor_id;
+
+ if !failed_stages.is_empty() {
+ let error_msg = format!(
+ "Stages was marked
failed, ignore FetchPartitionError from task {}", task_identity);
+ warn!("{}", error_msg);
+ } else {
+ // There are different removal
strategies here.
+ // We can choose just remove
the map_partition_id in the FetchPartitionError, when resubmit the input stage,
there are less tasks
+ // need to rerun, but this
might miss many more bad input partitions, lead to more stage level retries in
following.
+ // Here we choose remove all
the bad input partitions which match the same executor id in this single input
stage.
+ // There are other more
aggressive approaches, like considering the executor is lost and check all the
running stages in this graph.
+ // Or count the fetch failure
number on executor and mark the executor lost globally.
+ let removed_map_partitions =
+ running_stage
+
.remove_input_partitions(
+ map_stage_id,
+ map_partition_id,
+ &executor_id,
+ )?;
+
+ let failure_reasons =
+ rollback_running_stages
+ .entry(stage_id)
+
.or_insert_with(HashSet::new);
+
failure_reasons.insert(executor_id);
+
+ let missing_inputs =
+ resubmit_successful_stages
+ .entry(map_stage_id)
+
.or_insert_with(HashSet::new);
+ missing_inputs
+
.extend(removed_map_partitions);
+ warn!("Need to resubmit the
current running Stage {} and its map Stage {} due to FetchPartitionError from
task {}",
+ stage_id, map_stage_id,
task_identity)
+ }
+ } else {
+ let error_msg = format!(
+ "Stage {} has failed {} times,
\
+ most recent failure reason: {:?}",
+ stage_id,
+ max_stage_failures,
+ failed_task.error
+ );
+ error!("{}", error_msg);
+ failed_stages.insert(stage_id,
error_msg);
+ }
+ }
+ Some(FailedReason::ExecutionError(_)) => {
+ failed_stages.insert(stage_id,
failed_task.error);
+ }
+ Some(_) => {
+ if failed_task.retryable
+ && failed_task.count_to_failures
+ {
+ if running_stage
+
.task_failure_number(partition_id)
+ < max_task_failures
+ {
+ // TODO add new struct to
track all the failed task infos
+ // The failure TaskInfo is
ignored and set to None here
+ running_stage
+
.reset_task_info(partition_id);
+ } else {
+ let error_msg = format!(
+ "Task {} in Stage {} failed {} times, fail the stage,
most recent failure reason: {:?}",
+ partition_id, stage_id, max_task_failures,
failed_task.error
+ );
+ error!("{}", error_msg);
+ failed_stages.insert(stage_id,
error_msg);
+ }
+ } else if failed_task.retryable {
+ // TODO add new struct to track
all the failed task infos
+ // The failure TaskInfo is ignored
and set to None here
+
running_stage.reset_task_info(partition_id);
+ }
+ }
+ None => {
+ let error_msg = format!(
+ "Task {} in Stage {} failed with
unknown failure reasons, fail the stage",
+ partition_id, stage_id);
+ error!("{}", error_msg);
+ failed_stages.insert(stage_id,
error_msg);
+ }
+ }
+ } else if let Some(task_status::Status::Successful(
+ successful_task,
+ )) = task_status.status
+ {
+ // update task metrics for successfu task
running_stage.update_task_metrics(
partition_id,
operator_metrics,
)?;
locations.append(&mut partition_to_location(
&job_id,
+ partition_id,
stage_id,
executor,
- completed_task.partitions,
+ successful_task.partitions,
));
} else {
- warn!("The task {}/{}/{} with status {:?} is
invalid for updating", job_id, stage_id, partition_id, status);
+ warn!(
+ "The task {}'s status is invalid for
updating",
+ task_identity
+ );
}
}
}
- let is_completed = running_stage.is_completed();
- if is_completed {
- events.push(StageEvent::StageCompleted(stage_id));
- // if this stage is completed, we want to combine the
stage metrics to plan's metric set and print out the plan
+ let is_final_successful = running_stage.is_successful()
+ && !reset_running_stages.contains_key(&stage_id);
Review Comment:
When this running stage is converted from successful stage, there may be in
flight running tasks of the stages depending on this stage. When we find those
tasks fails due to fetching data failure, then we also need to reset this
running stage. Therefore, here, the reset_running_stages check is necessary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]