thinkharderdev commented on code in PR #557:
URL: https://github.com/apache/arrow-ballista/pull/557#discussion_r1045716605


##########
ballista/scheduler/src/state/mod.rs:
##########
@@ -236,46 +236,53 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
                     }
                 }
 
+                let mut join_handles = vec![];
                 for (executor_id, tasks) in 
executor_stage_assignments.into_iter() {
                     let tasks: Vec<Vec<TaskDescription>> = 
tasks.into_values().collect();
                     // Total number of tasks to be launched for one executor
                     let n_tasks: usize =
                         tasks.iter().map(|stage_tasks| 
stage_tasks.len()).sum();
 
-                    match self
-                        .executor_manager
-                        .get_executor_metadata(&executor_id)
-                        .await
-                    {
-                        Ok(executor) => {
-                            if let Err(e) = self
-                                .task_manager
-                                .launch_multi_task(
-                                    &executor,
-                                    tasks,
-                                    &self.executor_manager,
-                                )
-                                .await
-                            {
-                                error!("Failed to launch new task: {:?}", e);
-                                for _i in 0..n_tasks {
-                                    unassigned_reservations.push(
-                                        ExecutorReservation::new_free(
-                                            executor_id.clone(),
-                                        ),
-                                    );
+                    let task_manager = self.task_manager.clone();
+                    let executor_manager = self.executor_manager.clone();
+                    let join_handle = tokio::spawn(async move {
+                        let success = match executor_manager
+                            .get_executor_metadata(&executor_id)
+                            .await
+                        {
+                            Ok(executor) => {
+                                if let Err(e) = task_manager
+                                    .launch_multi_task(
+                                        &executor,
+                                        tasks,
+                                        &executor_manager,
+                                    )
+                                    .await
+                                {
+                                    error!("Failed to launch new task: {:?}", 
e);
+                                    false
+                                } else {
+                                    true
                                 }
                             }
-                        }
-                        Err(e) => {
-                            error!("Failed to launch new task, could not get 
executor metadata: {:?}", e);
-                            for _i in 0..n_tasks {
-                                unassigned_reservations.push(
-                                    
ExecutorReservation::new_free(executor_id.clone()),
-                                );
+                            Err(e) => {
+                                error!("Failed to launch new task, could not 
get executor metadata: {:?}", e);
+                                false
                             }
+                        };
+                        if success {
+                            vec![]
+                        } else {
+                            vec![
+                                
ExecutorReservation::new_free(executor_id.clone(),);
+                                n_tasks
+                            ]
                         }
-                    }
+                    });
+                    join_handles.push(join_handle);
+                }
+                for join_handle in join_handles {
+                    unassigned_reservations.append(&mut join_handle.await?);

Review Comment:
   Not sure I understand, wouldn't this still process the work sequentially? I 
would we would want do do a `futures::future::join_all` on the collection of 
futures to actually execute them in parallel



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to