This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ac161ca Add `cancel_job` REST API (#340)
5ac161ca is described below

commit 5ac161cad62b0deb3f91db3e8575b9c9231d337f
Author: Trent Feda <[email protected]>
AuthorDate: Wed Oct 12 00:37:54 2022 -0400

    Add `cancel_job` REST API (#340)
---
 ballista/rust/scheduler/src/api/handlers.rs        | 27 ++++++++++++++++++++++
 ballista/rust/scheduler/src/api/mod.rs             |  6 +++++
 .../rust/scheduler/src/scheduler_server/grpc.rs    | 19 +++------------
 ballista/rust/scheduler/src/state/mod.rs           | 20 ++++++++++++++++
 docs/source/user-guide/scheduler.md                | 11 +++++----
 5 files changed, 62 insertions(+), 21 deletions(-)

diff --git a/ballista/rust/scheduler/src/api/handlers.rs 
b/ballista/rust/scheduler/src/api/handlers.rs
index d5010d6a..48d770a3 100644
--- a/ballista/rust/scheduler/src/api/handlers.rs
+++ b/ballista/rust/scheduler/src/api/handlers.rs
@@ -53,6 +53,11 @@ pub struct JobResponse {
     pub percent_complete: u8,
 }
 
+#[derive(Debug, serde::Serialize)]
+struct CancelJobResponse {
+    pub cancelled: bool,
+}
+
 #[derive(Debug, serde::Serialize)]
 pub struct QueryStageSummary {
     pub stage_id: String,
@@ -157,6 +162,28 @@ pub(crate) async fn get_jobs<T: AsLogicalPlan, U: 
AsExecutionPlan>(
     Ok(warp::reply::json(&jobs))
 }
 
+pub(crate) async fn cancel_job<T: AsLogicalPlan, U: AsExecutionPlan>(
+    data_server: SchedulerServer<T, U>,
+    job_id: String,
+) -> Result<impl warp::Reply, Rejection> {
+    // 404 if job doesn't exist
+    data_server
+        .state
+        .task_manager
+        .get_job_status(&job_id)
+        .await
+        .map_err(|_| warp::reject())?
+        .ok_or_else(warp::reject)?;
+
+    let cancelled = data_server
+        .state
+        .cancel_job(&job_id)
+        .await
+        .map_err(|_| warp::reject())?;
+
+    Ok(warp::reply::json(&CancelJobResponse { cancelled }))
+}
+
 #[derive(Debug, serde::Serialize)]
 pub struct QueryStagesResponse {
     pub stages: Vec<QueryStageSummary>,
diff --git a/ballista/rust/scheduler/src/api/mod.rs 
b/ballista/rust/scheduler/src/api/mod.rs
index 3f00b767..7710e1a2 100644
--- a/ballista/rust/scheduler/src/api/mod.rs
+++ b/ballista/rust/scheduler/src/api/mod.rs
@@ -97,6 +97,11 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static + 
AsExecutionPlan>(
         .and(with_data_server(scheduler_server.clone()))
         .and_then(|data_server| handlers::get_jobs(data_server));
 
+    let route_cancel_job = warp::path!("api" / "job" / String)
+        .and(warp::patch())
+        .and(with_data_server(scheduler_server.clone()))
+        .and_then(|job_id, data_server| handlers::cancel_job(data_server, 
job_id));
+
     let route_query_stages = warp::path!("api" / "job" / String / "stages")
         .and(with_data_server(scheduler_server.clone()))
         .and_then(|job_id, data_server| 
handlers::get_query_stages(data_server, job_id));
@@ -119,6 +124,7 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static + 
AsExecutionPlan>(
     let routes = route_scheduler_state
         .or(route_executors)
         .or(route_jobs)
+        .or(route_cancel_job)
         .or(route_query_stages)
         .or(route_job_dot)
         .or(route_query_stage_dot)
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs 
b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 1fbb86a5..9393ed78 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -529,22 +529,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
         request: Request<CancelJobParams>,
     ) -> Result<Response<CancelJobResult>, Status> {
         let job_id = request.into_inner().job_id;
-        info!("Received cancellation request for job {}", job_id);
-
-        match self.state.task_manager.cancel_job(&job_id).await {
-            Ok(tasks) => {
-                
self.state.executor_manager.cancel_running_tasks(tasks).await.map_err(|e| {
-                        let msg = format!("Error to cancel running task when 
cancel the job {} due to {:?}", job_id, e);
-                        error!("{}", msg);
-                        Status::internal(msg)
-                })?;
-                Ok(Response::new(CancelJobResult { cancelled: true }))
-            }
-            Err(e) => {
-                let msg = format!("Error cancelling job {}: {:?}", job_id, e);
-                error!("{}", msg);
-                Ok(Response::new(CancelJobResult { cancelled: false }))
-            }
+        match self.state.cancel_job(&job_id).await {
+            Ok(cancelled) => Ok(Response::new(CancelJobResult { cancelled })),
+            Err(e) => Err(Status::internal(e.to_string())),
         }
     }
 }
diff --git a/ballista/rust/scheduler/src/state/mod.rs 
b/ballista/rust/scheduler/src/state/mod.rs
index 3319cb12..8cdd74d6 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -272,6 +272,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
 
         Ok(())
     }
+
+    pub(crate) async fn cancel_job(&self, job_id: &str) -> Result<bool> {
+        info!("Received cancellation request for job {}", job_id);
+
+        match self.task_manager.cancel_job(job_id).await {
+            Ok(tasks) => {
+                
self.executor_manager.cancel_running_tasks(tasks).await.map_err(|e| {
+                        let msg = format!("Error to cancel running tasks when 
cancelling job {} due to {:?}", job_id, e);
+                        error!("{}", msg);
+                        BallistaError::Internal(msg)
+                })?;
+                Ok(true)
+            }
+            Err(e) => {
+                let msg = format!("Error cancelling job {}: {:?}", job_id, e);
+                error!("{}", msg);
+                Ok(false)
+            }
+        }
+    }
 }
 
 pub async fn with_lock<Out, F: Future<Output = Out>>(
diff --git a/docs/source/user-guide/scheduler.md 
b/docs/source/user-guide/scheduler.md
index 21d8f876..2f94ebc3 100644
--- a/docs/source/user-guide/scheduler.md
+++ b/docs/source/user-guide/scheduler.md
@@ -29,8 +29,9 @@ The scheduler provides a web user interface that allows 
queries to be monitored.
 
 The scheduler also provides a REST API that allows jobs to be monitored.
 
-| API                   | Description                                          
       |
-| --------------------- | 
----------------------------------------------------------- |
-| /api/jobs             | Get a list of jobs that have been submitted to the 
cluster. |
-| /api/job/{job_id}     | Get a summary of a submitted job.                    
       |
-| /api/job/{job_id}/dot | Produce a query plan in DOT (graphviz) format.       
       |
+| API                   | Method | Description                                 
                |
+| --------------------- | ------ | 
----------------------------------------------------------- |
+| /api/jobs             | GET    | Get a list of jobs that have been submitted 
to the cluster. |
+| /api/job/{job_id}     | GET    | Get a summary of a submitted job.           
                |
+| /api/job/{job_id}/dot | GET    | Produce a query plan in DOT (graphviz) 
format.              |
+| /api/job/{job_id}     | PATCH  | Cancel a currently running job              
                |

Reply via email to