yahoNanJing commented on code in PR #261:
URL: https://github.com/apache/arrow-ballista/pull/261#discussion_r982256320
##########
ballista/rust/scheduler/src/state/execution_graph.rs:
##########
@@ -296,18 +442,105 @@ impl ExecutionGraph {
}
let output_links = running_stage.output_links.clone();
- events.append(&mut self.update_stage_output_links(
- stage_id,
- is_completed,
- locations,
- output_links,
- )?);
+ resolved_stages.extend(
+ &mut self
+ .update_stage_output_links(
+ stage_id,
+ is_final_successful,
+ locations,
+ output_links,
+ )?
+ .into_iter(),
+ );
+ } else if let ExecutionStage::UnResolved(unsolved_stage) =
stage {
+ for task_status in stage_task_statuses.into_iter() {
+ let stage_id = stage_id as usize;
+ let task_stage_attempt_num =
+ task_status.stage_attempt_num as usize;
+ let partition_id = task_status.clone().partition_id as
usize;
+ let task_identity = format!(
+ "TID {} {}/{}.{}/{}",
+ task_status.task_id,
+ job_id,
+ stage_id,
+ task_stage_attempt_num,
+ partition_id
+ );
+ let mut should_ignore = true;
+ // handle delayed failed tasks if the stage's next
attempt is still in UnResolved status.
+ if let Some(task_status::Status::Failed(failed_task)) =
Review Comment:
Here, only failed tasks will be dealt with. Once a running stage converted
into an unresolved, it will discard all of its tasks info. Then to update the
successful task info is meaningless.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]