mingmwang commented on code in PR #146: URL: https://github.com/apache/arrow-ballista/pull/146#discussion_r946566294
########## ballista/rust/executor/src/executor_server.rs: ########## @@ -411,4 +411,31 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorGrpc ) -> Result<Response<StopExecutorResult>, Status> { todo!() } + + async fn cancel_tasks( + &self, + request: Request<CancelTasksParams>, + ) -> Result<Response<CancelTasksResult>, Status> { + let partitions = request.into_inner().partition_id; + info!("Cancelling partition tasks for {:?}", partitions); + + let mut cancelled = true; + + for partition in partitions { + if let Err(e) = self + .executor + .cancel_task( + partition.job_id, + partition.stage_id as usize, + partition.partition_id as usize, + ) + .await + { + error!("Error cancelling task: {:?}", e); + cancelled = false; + } + } + + Ok(Response::new(CancelTasksResult { cancelled })) + } } Review Comment: The cancel_tasks RPC might take long time (wait for cancel). Sometimes the operators are burning cpu and can not reach a yield point in short term. Maybe we should just return the rpc immediately with a status called 'Cancelling'. Whether the tasks/futures are finally cancelled do not matter here. When the task/futures are really cancelled, we leverage the task status update RPC call to report back the finally status and the Scheduler can update the task related metadata and task slots later. If the task/future becomes zombie, the executor can also report back zombie task status to Scheduler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org