This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 5ac161ca Add `cancel_job` REST API (#340)
5ac161ca is described below
commit 5ac161cad62b0deb3f91db3e8575b9c9231d337f
Author: Trent Feda <[email protected]>
AuthorDate: Wed Oct 12 00:37:54 2022 -0400
Add `cancel_job` REST API (#340)
---
ballista/rust/scheduler/src/api/handlers.rs | 27 ++++++++++++++++++++++
ballista/rust/scheduler/src/api/mod.rs | 6 +++++
.../rust/scheduler/src/scheduler_server/grpc.rs | 19 +++------------
ballista/rust/scheduler/src/state/mod.rs | 20 ++++++++++++++++
docs/source/user-guide/scheduler.md | 11 +++++----
5 files changed, 62 insertions(+), 21 deletions(-)
diff --git a/ballista/rust/scheduler/src/api/handlers.rs
b/ballista/rust/scheduler/src/api/handlers.rs
index d5010d6a..48d770a3 100644
--- a/ballista/rust/scheduler/src/api/handlers.rs
+++ b/ballista/rust/scheduler/src/api/handlers.rs
@@ -53,6 +53,11 @@ pub struct JobResponse {
pub percent_complete: u8,
}
+#[derive(Debug, serde::Serialize)]
+struct CancelJobResponse {
+ pub cancelled: bool,
+}
+
#[derive(Debug, serde::Serialize)]
pub struct QueryStageSummary {
pub stage_id: String,
@@ -157,6 +162,28 @@ pub(crate) async fn get_jobs<T: AsLogicalPlan, U:
AsExecutionPlan>(
Ok(warp::reply::json(&jobs))
}
+pub(crate) async fn cancel_job<T: AsLogicalPlan, U: AsExecutionPlan>(
+ data_server: SchedulerServer<T, U>,
+ job_id: String,
+) -> Result<impl warp::Reply, Rejection> {
+ // 404 if job doesn't exist
+ data_server
+ .state
+ .task_manager
+ .get_job_status(&job_id)
+ .await
+ .map_err(|_| warp::reject())?
+ .ok_or_else(warp::reject)?;
+
+ let cancelled = data_server
+ .state
+ .cancel_job(&job_id)
+ .await
+ .map_err(|_| warp::reject())?;
+
+ Ok(warp::reply::json(&CancelJobResponse { cancelled }))
+}
+
#[derive(Debug, serde::Serialize)]
pub struct QueryStagesResponse {
pub stages: Vec<QueryStageSummary>,
diff --git a/ballista/rust/scheduler/src/api/mod.rs
b/ballista/rust/scheduler/src/api/mod.rs
index 3f00b767..7710e1a2 100644
--- a/ballista/rust/scheduler/src/api/mod.rs
+++ b/ballista/rust/scheduler/src/api/mod.rs
@@ -97,6 +97,11 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static +
AsExecutionPlan>(
.and(with_data_server(scheduler_server.clone()))
.and_then(|data_server| handlers::get_jobs(data_server));
+ let route_cancel_job = warp::path!("api" / "job" / String)
+ .and(warp::patch())
+ .and(with_data_server(scheduler_server.clone()))
+ .and_then(|job_id, data_server| handlers::cancel_job(data_server,
job_id));
+
let route_query_stages = warp::path!("api" / "job" / String / "stages")
.and(with_data_server(scheduler_server.clone()))
.and_then(|job_id, data_server|
handlers::get_query_stages(data_server, job_id));
@@ -119,6 +124,7 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static +
AsExecutionPlan>(
let routes = route_scheduler_state
.or(route_executors)
.or(route_jobs)
+ .or(route_cancel_job)
.or(route_query_stages)
.or(route_job_dot)
.or(route_query_stage_dot)
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 1fbb86a5..9393ed78 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -529,22 +529,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
request: Request<CancelJobParams>,
) -> Result<Response<CancelJobResult>, Status> {
let job_id = request.into_inner().job_id;
- info!("Received cancellation request for job {}", job_id);
-
- match self.state.task_manager.cancel_job(&job_id).await {
- Ok(tasks) => {
-
self.state.executor_manager.cancel_running_tasks(tasks).await.map_err(|e| {
- let msg = format!("Error to cancel running task when
cancel the job {} due to {:?}", job_id, e);
- error!("{}", msg);
- Status::internal(msg)
- })?;
- Ok(Response::new(CancelJobResult { cancelled: true }))
- }
- Err(e) => {
- let msg = format!("Error cancelling job {}: {:?}", job_id, e);
- error!("{}", msg);
- Ok(Response::new(CancelJobResult { cancelled: false }))
- }
+ match self.state.cancel_job(&job_id).await {
+ Ok(cancelled) => Ok(Response::new(CancelJobResult { cancelled })),
+ Err(e) => Err(Status::internal(e.to_string())),
}
}
}
diff --git a/ballista/rust/scheduler/src/state/mod.rs
b/ballista/rust/scheduler/src/state/mod.rs
index 3319cb12..8cdd74d6 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -272,6 +272,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
Ok(())
}
+
+ pub(crate) async fn cancel_job(&self, job_id: &str) -> Result<bool> {
+ info!("Received cancellation request for job {}", job_id);
+
+ match self.task_manager.cancel_job(job_id).await {
+ Ok(tasks) => {
+
self.executor_manager.cancel_running_tasks(tasks).await.map_err(|e| {
+ let msg = format!("Error to cancel running tasks when
cancelling job {} due to {:?}", job_id, e);
+ error!("{}", msg);
+ BallistaError::Internal(msg)
+ })?;
+ Ok(true)
+ }
+ Err(e) => {
+ let msg = format!("Error cancelling job {}: {:?}", job_id, e);
+ error!("{}", msg);
+ Ok(false)
+ }
+ }
+ }
}
pub async fn with_lock<Out, F: Future<Output = Out>>(
diff --git a/docs/source/user-guide/scheduler.md
b/docs/source/user-guide/scheduler.md
index 21d8f876..2f94ebc3 100644
--- a/docs/source/user-guide/scheduler.md
+++ b/docs/source/user-guide/scheduler.md
@@ -29,8 +29,9 @@ The scheduler provides a web user interface that allows
queries to be monitored.
The scheduler also provides a REST API that allows jobs to be monitored.
-| API | Description
|
-| --------------------- |
----------------------------------------------------------- |
-| /api/jobs | Get a list of jobs that have been submitted to the
cluster. |
-| /api/job/{job_id} | Get a summary of a submitted job.
|
-| /api/job/{job_id}/dot | Produce a query plan in DOT (graphviz) format.
|
+| API | Method | Description
|
+| --------------------- | ------ |
----------------------------------------------------------- |
+| /api/jobs | GET | Get a list of jobs that have been submitted
to the cluster. |
+| /api/job/{job_id} | GET | Get a summary of a submitted job.
|
+| /api/job/{job_id}/dot | GET | Produce a query plan in DOT (graphviz)
format. |
+| /api/job/{job_id} | PATCH | Cancel a currently running job
|