This is an automated email from the ASF dual-hosted git repository. yangjiang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 48c4c2d9 [minor] remove useless bracelet (#739)
48c4c2d9 is described below
commit 48c4c2d92b846c629d7ded44b3a26f334f27a673
Author: Yang Jiang <[email protected]>
AuthorDate: Tue Apr 11 18:01:17 2023 +0800
[minor] remove useless bracelet (#739)
---
ballista/scheduler/src/state/execution_graph.rs | 280 ++++++++++++------------
1 file changed, 135 insertions(+), 145 deletions(-)
diff --git a/ballista/scheduler/src/state/execution_graph.rs
b/ballista/scheduler/src/state/execution_graph.rs
index 4f98e6b9..339649b4 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -311,161 +311,152 @@ impl ExecutionGraph {
if let ExecutionStage::Running(running_stage) = stage {
let mut locations = vec![];
for task_status in stage_task_statuses.into_iter() {
- {
- let task_stage_attempt_num =
- task_status.stage_attempt_num as usize;
- if task_stage_attempt_num <
running_stage.stage_attempt_num {
- warn!("Ignore TaskStatus update with TID {} as
it's from Stage {}.{} and there is a more recent stage attempt {}.{} running",
+ let task_stage_attempt_num =
+ task_status.stage_attempt_num as usize;
+ if task_stage_attempt_num <
running_stage.stage_attempt_num {
+ warn!("Ignore TaskStatus update with TID {} as
it's from Stage {}.{} and there is a more recent stage attempt {}.{} running",
task_status.task_id, stage_id,
task_stage_attempt_num, stage_id, running_stage.stage_attempt_num);
- continue;
- }
- let partition_id =
task_status.clone().partition_id as usize;
- let task_identity = format!(
- "TID {} {}/{}.{}/{}",
- task_status.task_id,
- job_id,
- stage_id,
- task_stage_attempt_num,
- partition_id
- );
- let operator_metrics = task_status.metrics.clone();
+ continue;
+ }
+ let partition_id = task_status.clone().partition_id as
usize;
+ let task_identity = format!(
+ "TID {} {}/{}.{}/{}",
+ task_status.task_id,
+ job_id,
+ stage_id,
+ task_stage_attempt_num,
+ partition_id
+ );
+ let operator_metrics = task_status.metrics.clone();
- if !running_stage
- .update_task_info(partition_id,
task_status.clone())
- {
- continue;
- }
+ if !running_stage
+ .update_task_info(partition_id,
task_status.clone())
+ {
+ continue;
+ }
- if let
Some(task_status::Status::Failed(failed_task)) =
- task_status.status
- {
- let failed_reason = failed_task.failed_reason;
+ if let Some(task_status::Status::Failed(failed_task)) =
+ task_status.status
+ {
+ let failed_reason = failed_task.failed_reason;
+
+ match failed_reason {
+ Some(FailedReason::FetchPartitionError(
+ fetch_partiton_error,
+ )) => {
+ let failed_attempts = failed_stage_attempts
+ .entry(stage_id)
+ .or_insert_with(HashSet::new);
+
failed_attempts.insert(task_stage_attempt_num);
+ if failed_attempts.len() <
max_stage_failures {
+ let map_stage_id =
+ fetch_partiton_error.map_stage_id
as usize;
+ let map_partition_id =
fetch_partiton_error
+ .map_partition_id
+ as usize;
+ let executor_id =
+ fetch_partiton_error.executor_id;
- match failed_reason {
- Some(FailedReason::FetchPartitionError(
- fetch_partiton_error,
- )) => {
- let failed_attempts =
failed_stage_attempts
- .entry(stage_id)
- .or_insert_with(HashSet::new);
-
failed_attempts.insert(task_stage_attempt_num);
- if failed_attempts.len() <
max_stage_failures {
- let map_stage_id =
fetch_partiton_error
- .map_stage_id
- as usize;
- let map_partition_id =
fetch_partiton_error
- .map_partition_id
- as usize;
- let executor_id =
-
fetch_partiton_error.executor_id;
-
- if !failed_stages.is_empty() {
- let error_msg = format!(
- "Stages was marked
failed, ignore FetchPartitionError from task {task_identity}");
- warn!("{}", error_msg);
- } else {
- // There are different removal
strategies here.
- // We can choose just remove
the map_partition_id in the FetchPartitionError, when resubmit the input stage,
there are less tasks
- // need to rerun, but this
might miss many more bad input partitions, lead to more stage level retries in
following.
- // Here we choose remove all
the bad input partitions which match the same executor id in this single input
stage.
- // There are other more
aggressive approaches, like considering the executor is lost and check all the
running stages in this graph.
- // Or count the fetch failure
number on executor and mark the executor lost globally.
- let removed_map_partitions =
- running_stage
-
.remove_input_partitions(
- map_stage_id,
- map_partition_id,
- &executor_id,
- )?;
-
- let failure_reasons =
- rollback_running_stages
- .entry(stage_id)
-
.or_insert_with(HashSet::new);
-
failure_reasons.insert(executor_id);
-
- let missing_inputs =
- resubmit_successful_stages
- .entry(map_stage_id)
-
.or_insert_with(HashSet::new);
- missing_inputs
-
.extend(removed_map_partitions);
- warn!("Need to resubmit the
current running Stage {} and its map Stage {} due to FetchPartitionError from
task {}",
- stage_id, map_stage_id,
task_identity)
- }
- } else {
+ if !failed_stages.is_empty() {
let error_msg = format!(
- "Stage {} has failed {} times,
\
- most recent failure reason: {:?}",
- stage_id,
- max_stage_failures,
- failed_task.error
- );
- error!("{}", error_msg);
- failed_stages.insert(stage_id,
error_msg);
+ "Stages was marked failed,
ignore FetchPartitionError from task {task_identity}");
+ warn!("{}", error_msg);
+ } else {
+ // There are different removal
strategies here.
+ // We can choose just remove the
map_partition_id in the FetchPartitionError, when resubmit the input stage,
there are less tasks
+ // need to rerun, but this might
miss many more bad input partitions, lead to more stage level retries in
following.
+ // Here we choose remove all the
bad input partitions which match the same executor id in this single input
stage.
+ // There are other more aggressive
approaches, like considering the executor is lost and check all the running
stages in this graph.
+ // Or count the fetch failure
number on executor and mark the executor lost globally.
+ let removed_map_partitions =
running_stage
+ .remove_input_partitions(
+ map_stage_id,
+ map_partition_id,
+ &executor_id,
+ )?;
+
+ let failure_reasons =
rollback_running_stages
+ .entry(stage_id)
+ .or_insert_with(HashSet::new);
+
failure_reasons.insert(executor_id);
+
+ let missing_inputs =
+ resubmit_successful_stages
+ .entry(map_stage_id)
+
.or_insert_with(HashSet::new);
+
missing_inputs.extend(removed_map_partitions);
+ warn!("Need to resubmit the
current running Stage {} and its map Stage {} due to FetchPartitionError from
task {}",
+ stage_id, map_stage_id,
task_identity)
}
+ } else {
+ let error_msg = format!(
+ "Stage {} has failed {} times, \
+ most recent failure reason: {:?}",
+ stage_id,
+ max_stage_failures,
+ failed_task.error
+ );
+ error!("{}", error_msg);
+ failed_stages.insert(stage_id,
error_msg);
}
- Some(FailedReason::ExecutionError(_)) => {
- failed_stages.insert(stage_id,
failed_task.error);
- }
- Some(_) => {
- if failed_task.retryable
- && failed_task.count_to_failures
+ }
+ Some(FailedReason::ExecutionError(_)) => {
+ failed_stages.insert(stage_id,
failed_task.error);
+ }
+ Some(_) => {
+ if failed_task.retryable
+ && failed_task.count_to_failures
+ {
+ if
running_stage.task_failure_number(partition_id)
+ < max_task_failures
{
- if running_stage
-
.task_failure_number(partition_id)
- < max_task_failures
- {
- // TODO add new struct to
track all the failed task infos
- // The failure TaskInfo is
ignored and set to None here
- running_stage
-
.reset_task_info(partition_id);
- } else {
- let error_msg = format!(
- "Task {} in Stage {} failed {} times, fail the stage,
most recent failure reason: {:?}",
- partition_id, stage_id, max_task_failures,
failed_task.error
- );
- error!("{}", error_msg);
- failed_stages.insert(stage_id,
error_msg);
- }
- } else if failed_task.retryable {
// TODO add new struct to track
all the failed task infos
// The failure TaskInfo is ignored
and set to None here
running_stage.reset_task_info(partition_id);
+ } else {
+ let error_msg = format!(
+ "Task {} in Stage {} failed {}
times, fail the stage, most recent failure reason: {:?}",
+ partition_id, stage_id,
max_task_failures, failed_task.error
+ );
+ error!("{}", error_msg);
+ failed_stages.insert(stage_id,
error_msg);
}
- }
- None => {
- let error_msg = format!(
- "Task {partition_id} in Stage
{stage_id} failed with unknown failure reasons, fail the stage");
- error!("{}", error_msg);
- failed_stages.insert(stage_id,
error_msg);
+ } else if failed_task.retryable {
+ // TODO add new struct to track all
the failed task infos
+ // The failure TaskInfo is ignored and
set to None here
+
running_stage.reset_task_info(partition_id);
}
}
- } else if let Some(task_status::Status::Successful(
- successful_task,
- )) = task_status.status
- {
- // update task metrics for successfu task
- running_stage.update_task_metrics(
- partition_id,
- operator_metrics,
- )?;
-
- locations.append(&mut partition_to_location(
- &job_id,
- partition_id,
- stage_id,
- executor,
- successful_task.partitions,
- ));
- } else {
- warn!(
- "The task {}'s status is invalid for
updating",
- task_identity
- );
+ None => {
+ let error_msg = format!(
+ "Task {partition_id} in Stage
{stage_id} failed with unknown failure reasons, fail the stage");
+ error!("{}", error_msg);
+ failed_stages.insert(stage_id, error_msg);
+ }
}
+ } else if let Some(task_status::Status::Successful(
+ successful_task,
+ )) = task_status.status
+ {
+ // update task metrics for successfu task
+ running_stage
+ .update_task_metrics(partition_id,
operator_metrics)?;
+
+ locations.append(&mut partition_to_location(
+ &job_id,
+ partition_id,
+ stage_id,
+ executor,
+ successful_task.partitions,
+ ));
+ } else {
+ warn!(
+ "The task {}'s status is invalid for updating",
+ task_identity
+ );
}
}
+
let is_final_successful = running_stage.is_successful()
&& !reset_running_stages.contains_key(&stage_id);
if is_final_successful {
@@ -899,9 +890,9 @@ impl ExecutionGraph {
let task_info = TaskInfo {
task_id,
scheduled_time: SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .unwrap()
- .as_millis(),
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_millis(),
// Those times will be updated when the task finish
launch_time: 0,
start_exec_time: 0,
@@ -1040,15 +1031,14 @@ impl ExecutionGraph {
warn!(
"Roll back resolved job/stage {}/{} and change
ShuffleReaderExec back to UnresolvedShuffleExec",
job_id, stage_id);
-
- },
+ }
ExecutionStage::Running(_) => {
rollback_running_stages.insert(*stage_id);
warn!(
"Roll back running job/stage {}/{} and change
ShuffleReaderExec back to UnresolvedShuffleExec",
job_id, stage_id);
- },
- _ => {},
+ }
+ _ => {}
}
}
});
