thinkharderdev commented on code in PR #146:
URL: https://github.com/apache/arrow-ballista/pull/146#discussion_r947821858


##########
ballista/rust/executor/src/executor_server.rs:
##########
@@ -411,4 +411,31 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorGrpc
     ) -> Result<Response<StopExecutorResult>, Status> {
         todo!()
     }
+
+    async fn cancel_tasks(
+        &self,
+        request: Request<CancelTasksParams>,
+    ) -> Result<Response<CancelTasksResult>, Status> {
+        let partitions = request.into_inner().partition_id;
+        info!("Cancelling partition tasks for {:?}", partitions);
+
+        let mut cancelled = true;
+
+        for partition in partitions {
+            if let Err(e) = self
+                .executor
+                .cancel_task(
+                    partition.job_id,
+                    partition.stage_id as usize,
+                    partition.partition_id as usize,
+                )
+                .await
+            {
+                error!("Error cancelling task: {:?}", e);
+                cancelled = false;
+            }
+        }
+
+        Ok(Response::new(CancelTasksResult { cancelled }))
+    }
 }

Review Comment:
   The cancellation only flips an atomic boolean and doesn't actually wait for 
the task to stop executing, so there should never be a case where the 
cancellation takes a long time. The underlying task may take some time before 
it reaches a yield point and actually stops executing but as far as I can tell 
there is no way to track that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to