mattcuento commented on code in PR #1361:
URL:
https://github.com/apache/datafusion-ballista/pull/1361#discussion_r2749803351
##########
ballista/scheduler/src/state/task_manager.rs:
##########
@@ -516,15 +516,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
pub async fn executor_lost(&self, executor_id: &str) ->
Result<Vec<RunningTaskInfo>> {
// Collect all the running task need to cancel when there are running
stages rolled back.
let mut running_tasks_to_cancel: Vec<RunningTaskInfo> = vec![];
- // Collect graphs we update so we can update them in storage
- let updated_graphs: DashMap<String, ExecutionGraph> = DashMap::new();
Review Comment:
hm, was this just doing nothing as is? Any issue we should log for this
based on the comment?
##########
ballista/scheduler/src/state/execution_graph.rs:
##########
@@ -193,109 +326,379 @@ impl ExecutionGraph {
start_time: started_at,
end_time: 0,
stages,
- output_partitions,
output_locations: vec![],
task_id_gen: 0,
failed_stage_attempts: HashMap::new(),
session_config,
})
}
- /// Returns the job ID for this execution graph.
- pub fn job_id(&self) -> &str {
- self.job_id.as_str()
+ #[cfg(test)]
+ fn next_task_id(&mut self) -> usize {
+ let new_tid = self.task_id_gen;
+ self.task_id_gen += 1;
+ new_tid
}
- /// Returns the job name for this execution graph.
- pub fn job_name(&self) -> &str {
- self.job_name.as_str()
- }
+ /// Processing stage status update after task status changing
+ fn processing_stages_update(
+ &mut self,
+ updated_stages: UpdatedStages,
+ ) -> Result<Vec<QueryStageSchedulerEvent>> {
+ let job_id = self.job_id().to_owned();
+ let mut has_resolved = false;
+ let mut job_err_msg = "".to_owned();
- /// Returns the session ID associated with this job.
- pub fn session_id(&self) -> &str {
- self.session_id.as_str()
- }
+ for stage_id in updated_stages.resolved_stages {
+ self.resolve_stage(stage_id)?;
+ has_resolved = true;
+ }
- /// Returns the current job status.
- pub fn status(&self) -> &JobStatus {
- &self.status
- }
+ for stage_id in updated_stages.successful_stages {
+ self.succeed_stage(stage_id);
+ }
- /// Returns the timestamp when this job started execution.
- pub fn start_time(&self) -> u64 {
- self.start_time
- }
+ // Fail the stage and also abort the job
+ for (stage_id, err_msg) in &updated_stages.failed_stages {
+ job_err_msg =
+ format!("Job failed due to stage {stage_id} failed:
{err_msg}\n");
+ }
- /// Returns the timestamp when this job completed (0 if still running).
- pub fn end_time(&self) -> u64 {
- self.end_time
- }
+ let mut events = vec![];
+ // Only handle the rollback logic when there are no failed stages
+ if updated_stages.failed_stages.is_empty() {
+ let mut running_tasks_to_cancel = vec![];
+ for (stage_id, failure_reasons) in
updated_stages.rollback_running_stages {
+ let tasks = self.rollback_running_stage(stage_id,
failure_reasons)?;
+ running_tasks_to_cancel.extend(tasks);
+ }
- /// Returns the total number of stages in this execution graph.
- pub fn stage_count(&self) -> usize {
- self.stages.len()
- }
+ for stage_id in updated_stages.resubmit_successful_stages {
+ self.rerun_successful_stage(stage_id);
+ }
- /// Generates and returns the next unique task ID for this execution graph.
- pub fn next_task_id(&mut self) -> usize {
- let new_tid = self.task_id_gen;
- self.task_id_gen += 1;
- new_tid
- }
+ if !running_tasks_to_cancel.is_empty() {
+ events.push(QueryStageSchedulerEvent::CancelTasks(
+ running_tasks_to_cancel,
+ ));
+ }
+ }
- /// Exposes executions stages and stage id's
- pub fn stages(&self) -> &HashMap<usize, ExecutionStage> {
- &self.stages
+ if !updated_stages.failed_stages.is_empty() {
+ info!("Job {job_id} is failed");
+ self.fail_job(job_err_msg.clone());
+ events.push(QueryStageSchedulerEvent::JobRunningFailed {
+ job_id,
+ fail_message: job_err_msg,
+ queued_at: self.queued_at,
+ failed_at: timestamp_millis(),
+ });
+ } else if self.is_successful() {
+ // If this ExecutionGraph is successful, finish it
+ info!("Job {job_id} is success, finalizing output partitions");
+ self.succeed_job()?;
+ events.push(QueryStageSchedulerEvent::JobFinished {
+ job_id,
+ queued_at: self.queued_at,
+ completed_at: timestamp_millis(),
+ });
+ } else if has_resolved {
+ events.push(QueryStageSchedulerEvent::JobUpdated(job_id))
+ }
+ Ok(events)
}
- /// An ExecutionGraph is successful if all its stages are successful
- pub fn is_successful(&self) -> bool {
- self.stages
- .values()
- .all(|s| matches!(s, ExecutionStage::Successful(_)))
- }
+ /// Return a Vec of resolvable stage ids
+ fn update_stage_output_links(
+ &mut self,
+ stage_id: usize,
+ is_completed: bool,
+ locations: Vec<PartitionLocation>,
+ output_links: Vec<usize>,
+ ) -> Result<Vec<usize>> {
+ let mut resolved_stages = vec![];
+ let job_id = &self.job_id;
+ if output_links.is_empty() {
+ // If `output_links` is empty, then this is a final stage
+ self.output_locations.extend(locations);
+ } else {
+ for link in output_links.iter() {
+ // If this is an intermediate stage, we need to push its
`PartitionLocation`s to the parent stage
+ if let Some(linked_stage) = self.stages.get_mut(link) {
+ if let ExecutionStage::UnResolved(linked_unresolved_stage)
=
+ linked_stage
+ {
+ linked_unresolved_stage
+ .add_input_partitions(stage_id,
locations.clone())?;
- /// Returns true if all stages in this graph have completed successfully.
- pub fn is_complete(&self) -> bool {
- self.stages
- .values()
- .all(|s| matches!(s, ExecutionStage::Successful(_)))
- }
+ // If all tasks for this stage are complete, mark the
input complete in the parent stage
+ if is_completed {
+ linked_unresolved_stage.complete_input(stage_id);
+ }
- /// Revive the execution graph by converting the resolved stages to
running stages
- /// If any stages are converted, return true; else false.
- pub fn revive(&mut self) -> bool {
- let running_stages = self
- .stages
- .values()
- .filter_map(|stage| {
- if let ExecutionStage::Resolved(resolved_stage) = stage {
- Some(resolved_stage.to_running())
+ // If all input partitions are ready, we can resolve
any UnresolvedShuffleExec in the parent stage plan
+ if linked_unresolved_stage.resolvable() {
+
resolved_stages.push(linked_unresolved_stage.stage_id);
+ }
+ } else {
+ return Err(BallistaError::Internal(format!(
+ "Error updating job {job_id}: The stage {link} as
the output link of stage {stage_id} should be unresolved"
+ )));
+ }
} else {
- None
+ return Err(BallistaError::Internal(format!(
+ "Error updating job {job_id}: Invalid output link
{stage_id} for stage {link}"
+ )));
}
- })
- .collect::<Vec<_>>();
-
- if running_stages.is_empty() {
- false
- } else {
- for running_stage in running_stages {
- self.stages.insert(
- running_stage.stage_id,
- ExecutionStage::Running(running_stage),
- );
}
- true
}
+ Ok(resolved_stages)
}
- /// Update task statuses and task metrics in the graph.
- /// This will also push shuffle partitions to their respective shuffle
read stages.
- pub fn update_task_status(
- &mut self,
- executor: &ExecutorMetadata,
- task_statuses: Vec<TaskStatus>,
+ fn get_running_stage_id(&mut self, black_list: &[usize]) -> Option<usize> {
+ let mut running_stage_id = self.stages.iter().find_map(|(stage_id,
stage)| {
+ if black_list.contains(stage_id) {
+ None
+ } else if let ExecutionStage::Running(stage) = stage {
+ if stage.available_tasks() > 0 {
+ Some(*stage_id)
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ });
+
+ // If no available tasks found in the running stage,
+ // try to find a resolved stage and convert it to the running stage
+ if running_stage_id.is_none() {
+ if self.revive() {
+ running_stage_id = self.get_running_stage_id(black_list);
+ } else {
+ running_stage_id = None;
+ }
+ }
+
+ running_stage_id
+ }
+
+ fn reset_stages_internal(
+ &mut self,
+ executor_id: &str,
+ ) -> Result<(HashSet<usize>, Vec<RunningTaskInfo>)> {
+ let job_id = self.job_id.clone();
+ // collect the input stages that need to resubmit
+ let mut resubmit_inputs: HashSet<usize> = HashSet::new();
+
+ let mut reset_running_stage = HashSet::new();
+ let mut rollback_resolved_stages = HashSet::new();
+ let mut rollback_running_stages = HashSet::new();
+ let mut resubmit_successful_stages = HashSet::new();
+
+ let mut empty_inputs: HashMap<usize, StageOutput> = HashMap::new();
+ // check the unresolved, resolved and running stages
+ self.stages
+ .iter_mut()
+ .for_each(|(stage_id, stage)| {
+ let stage_inputs = match stage {
+ ExecutionStage::UnResolved(stage) => {
+ &mut stage.inputs
+ }
+ ExecutionStage::Resolved(stage) => {
+ &mut stage.inputs
+ }
+ ExecutionStage::Running(stage) => {
+ let reset = stage.reset_tasks(executor_id);
+ if reset > 0 {
+ warn!(
+ "Reset {reset} tasks for running job/stage
{job_id}/{stage_id} on lost Executor {executor_id}"
+ );
+ reset_running_stage.insert(*stage_id);
+ }
+ &mut stage.inputs
+ }
+ _ => &mut empty_inputs
+ };
+
+ // For each stage input, check whether there are input
locations match that executor
+ // and calculate the resubmit input stages if the input stages
are successful.
+ let mut rollback_stage = false;
+ stage_inputs.iter_mut().for_each(|(input_stage_id,
stage_output)| {
+ let mut match_found = false;
+ stage_output.partition_locations.iter_mut().for_each(
+ |(_partition, locs)| {
+ let before_len = locs.len();
+ locs.retain(|loc| loc.executor_meta.id !=
executor_id);
+ if locs.len() < before_len {
+ match_found = true;
+ }
+ },
+ );
+ if match_found {
+ stage_output.complete = false;
+ rollback_stage = true;
+ resubmit_inputs.insert(*input_stage_id);
+ }
+ });
+
+ if rollback_stage {
+ match stage {
+ ExecutionStage::Resolved(_) => {
+ rollback_resolved_stages.insert(*stage_id);
+ warn!(
+ "Roll back resolved job/stage {job_id}/{stage_id}
and change ShuffleReaderExec back to UnresolvedShuffleExec");
+ }
+ ExecutionStage::Running(_) => {
+ rollback_running_stages.insert(*stage_id);
+ warn!(
+ "Roll back running job/stage {job_id}/{stage_id}
and change ShuffleReaderExec back to UnresolvedShuffleExec");
+ }
+ _ => {}
+ }
+ }
+ });
+
+ // check and reset the successful stages
+ if !resubmit_inputs.is_empty() {
+ self.stages
+ .iter_mut()
+ .filter(|(stage_id, _stage)|
resubmit_inputs.contains(stage_id))
+ .filter_map(|(_stage_id, stage)| {
+ if let ExecutionStage::Successful(success) = stage {
+ Some(success)
+ } else {
+ None
+ }
+ })
+ .for_each(|stage| {
+ let reset = stage.reset_tasks(executor_id);
+ if reset > 0 {
+ resubmit_successful_stages.insert(stage.stage_id);
+ warn!(
+ "Reset {} tasks for successful job/stage {}/{} on
lost Executor {}",
+ reset, job_id, stage.stage_id, executor_id
+ )
+ }
+ });
+ }
+
+ for stage_id in rollback_resolved_stages.iter() {
+ self.rollback_resolved_stage(*stage_id)?;
+ }
+
+ let mut all_running_tasks = vec![];
+ for stage_id in rollback_running_stages.iter() {
+ let tasks = self.rollback_running_stage(
+ *stage_id,
+ HashSet::from([executor_id.to_owned()]),
+ )?;
+ all_running_tasks.extend(tasks);
+ }
+
+ for stage_id in resubmit_successful_stages.iter() {
+ self.rerun_successful_stage(*stage_id);
+ }
+
+ let mut reset_stage = HashSet::new();
+ reset_stage.extend(reset_running_stage);
+ reset_stage.extend(rollback_resolved_stages);
+ reset_stage.extend(rollback_running_stages);
+ reset_stage.extend(resubmit_successful_stages);
+ Ok((reset_stage, all_running_tasks))
+ }
+
+ /// Clear the stage failure count for this stage if the stage is finally
success
+ fn clear_stage_failure(&mut self, stage_id: usize) {
+ self.failed_stage_attempts.remove(&stage_id);
+ }
+}
+
+impl ExecutionGraph for StaticExecutionGraph {
+ fn cloned(&self) -> ExecutionGraphBox {
+ Box::new(self.clone())
+ }
+
+ fn job_id(&self) -> &str {
+ self.job_id.as_str()
+ }
+
+ fn job_name(&self) -> &str {
+ self.job_name.as_str()
+ }
+
+ fn session_id(&self) -> &str {
+ self.session_id.as_str()
+ }
+
+ fn status(&self) -> &JobStatus {
+ &self.status
+ }
+
+ fn start_time(&self) -> u64 {
+ self.start_time
+ }
+
+ fn end_time(&self) -> u64 {
+ self.end_time
+ }
+
+ fn completed_stages(&self) -> usize {
+ let mut completed_stages = 0;
+ for stage in self.stages.values() {
+ if let ExecutionStage::Successful(_) = stage {
+ completed_stages += 1;
+ }
+ }
+ completed_stages
+ }
+ /// An ExecutionGraph is successful if all its stages are successful
+ fn is_successful(&self) -> bool {
+ self.stages
+ .values()
+ .all(|s| matches!(s, ExecutionStage::Successful(_)))
+ }
+
+ // pub fn is_complete(&self) -> bool {
Review Comment:
Can probably remove this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]