yahoNanJing commented on code in PR #153:
URL: https://github.com/apache/arrow-ballista/pull/153#discussion_r954433762


##########
ballista/rust/scheduler/src/state/task_manager.rs:
##########
@@ -106,123 +127,63 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         }
     }
 
-    /// Generate a new random Job ID
-    pub fn generate_job_id(&self) -> String {
-        let mut rng = thread_rng();
-        std::iter::repeat(())
-            .map(|()| rng.sample(Alphanumeric))
-            .map(char::from)
-            .take(7)
-            .collect()
-    }
-
-    /// Atomically update given task statuses in the respective job and return 
a tuple containing:
+    /// Update given task statuses in the respective job and return a tuple 
containing:
     /// 1. A list of QueryStageSchedulerEvent to publish.
     /// 2. A list of reservations that can now be offered.
-    ///
-    /// When a task is updated, there may or may not be more tasks pending for 
its job. If there are more
-    /// tasks pending then we want to reschedule one of those tasks on the 
same task slot. In that case
-    /// we will set the `job_id` on the `ExecutorReservation` so the scheduler 
attempts to assign tasks from
-    /// the same job. Note that when the scheduler attempts to fill the 
reservation, there is no guarantee
-    /// that the available task is still available.
     pub(crate) async fn update_task_statuses(
         &self,
         executor: &ExecutorMetadata,
         task_status: Vec<TaskStatus>,
     ) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> {
-        let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
-
-        with_lock(lock, async {
-            let mut events: Vec<QueryStageSchedulerEvent> = vec![];
-            let mut reservation: Vec<ExecutorReservation> = vec![];
+        let mut job_updates: HashMap<String, Vec<TaskStatus>> = HashMap::new();
+        for status in task_status {
+            debug!("Task Update\n{:?}", status);
+            if let Some(job_id) = status.task_id.as_ref().map(|id| &id.job_id) 
{
+                let job_task_statuses =
+                    job_updates.entry(job_id.clone()).or_insert_with(Vec::new);
+                job_task_statuses.push(status);
+            } else {
+                warn!("Received task with no job ID");
+            }
+        }
 
-            let mut job_updates: HashMap<String, Vec<TaskStatus>> = 
HashMap::new();
+        let mut events: Vec<QueryStageSchedulerEvent> = vec![];
+        let mut total_num_tasks = 0;
+        for (job_id, statuses) in job_updates {
+            let num_tasks = statuses.len();
+            debug!("Updating {} tasks in job {}", num_tasks, job_id);
 
-            for status in task_status {
-                debug!("Task Update\n{:?}", status);
-                if let Some(job_id) = status.task_id.as_ref().map(|id| 
&id.job_id) {
-                    if let Some(statuses) = job_updates.get_mut(job_id) {
-                        statuses.push(status)
-                    } else {
-                        job_updates.insert(job_id.clone(), vec![status]);
-                    }
-                } else {
-                    warn!("Received task with no job ID");
-                }
-            }
+            total_num_tasks += num_tasks;
 
-            let mut txn_ops: Vec<(Keyspace, String, Vec<u8>)> = vec![];
-
-            for (job_id, statuses) in job_updates {
-                let num_tasks = statuses.len();
-                debug!("Updating {} tasks in job {}", num_tasks, job_id);
-
-                let mut graph = self.get_execution_graph(&job_id).await?;
-
-                graph.update_task_status(executor, statuses)?;
-
-                if graph.complete() {
-                    // If this ExecutionGraph is complete, finalize it
-                    info!(
-                        "Job {} is complete, finalizing output partitions",
-                        graph.job_id()
-                    );
-                    graph.finalize()?;
-                    
events.push(QueryStageSchedulerEvent::JobFinished(job_id.clone()));
-
-                    for _ in 0..num_tasks {
-                        reservation
-                            
.push(ExecutorReservation::new_free(executor.id.to_owned()));
-                    }
-                } else if let Some(job_status::Status::Failed(failure)) =
-                    graph.status().status
-                {
-                    events.push(QueryStageSchedulerEvent::JobFailed(
-                        job_id.clone(),
-                        failure.error,
-                    ));
-
-                    for _ in 0..num_tasks {
-                        reservation
-                            
.push(ExecutorReservation::new_free(executor.id.to_owned()));
-                    }
-                } else {
-                    // Otherwise keep the task slots reserved for this job
-                    for _ in 0..num_tasks {
-                        reservation.push(ExecutorReservation::new_assigned(
-                            executor.id.to_owned(),
-                            job_id.clone(),
-                        ));
-                    }
-                }
+            let graph = self.get_active_execution_graph(&job_id).await;
+            let job_event = if let Some(graph) = graph {
+                let mut graph = graph.write().await;
+                graph.update_task_status(executor, statuses)?
+            } else {
+                // TODO Deal with curator changed case
+                error!("Fail to find job {} in the active cache and it may not 
be curated by this scheduler", job_id);
+                None

Review Comment:
   Since now we haven't implemented the multi-scheduler task updating logic in 
the executor, I prefer to leave the handling logic empty here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to