houqp commented on a change in pull request #1337:
URL: https://github.com/apache/arrow-datafusion/pull/1337#discussion_r753711896



##########
File path: ballista/rust/scheduler/src/state/mod.rs
##########
@@ -279,8 +291,20 @@ impl SchedulerState {
     pub async fn assign_next_schedulable_task(
         &self,
         executor_id: &str,
+        job_ids: Vec<String>,
     ) -> Result<Option<(TaskStatus, Arc<dyn ExecutionPlan>)>> {
-        let tasks = self.get_all_tasks().await?;
+        // Will assign the task with same jobId to same executor to improve 
locality.
+        let tasks;
+        let mut tasks_by_job_id: HashMap<String, TaskStatus> = 
Default::default();
+        for job_id in job_ids {
+            tasks_by_job_id.extend(self.get_tasks_with_job_id(&job_id).await?);
+        }
+        tasks_by_job_id.retain(|_, v| v.status.is_none());

Review comment:
       i think this filter could have been done earlier in 
`get_tasks_with_job_id` so we won't need to go through the hashmap twice.

##########
File path: ballista/rust/scheduler/src/state/mod.rs
##########
@@ -242,6 +242,18 @@ impl SchedulerState {
             .collect()
     }
 
+    pub async fn get_tasks_with_job_id(

Review comment:
       does this need to be a public function? looks like it's only used in 
internal code

##########
File path: ballista/rust/scheduler/src/lib.rs
##########
@@ -210,11 +211,12 @@ impl SchedulerGrpc for SchedulerServer {
                         error!("{}", msg);
                         tonic::Status::internal(msg)
                     })?;
+                
job_ids.push(task_status.partition_id.as_ref().unwrap().job_id.clone());

Review comment:
       do we know if the job ids are guaranteed to be unique here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to