This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 51210532 Remove active execution graph when the related job is 
successful or failed (#392)
51210532 is described below

commit 5121053261b897ba44da30fd34c560dd7e5975c5
Author: yahoNanJing <[email protected]>
AuthorDate: Thu Oct 20 06:03:26 2022 +0800

    Remove active execution graph when the related job is successful or failed 
(#392)
    
    Co-authored-by: yangzhong <[email protected]>
---
 ballista/scheduler/src/scheduler_server/mod.rs |  5 ++---
 ballista/scheduler/src/state/task_manager.rs   | 12 ++++++++++--
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index 6d517bf9..176b85c8 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -454,10 +454,9 @@ mod test {
                 let graph = scheduler
                     .state
                     .task_manager
-                    .get_active_execution_graph(job_id)
-                    .await
+                    .get_job_execution_graph(job_id)
+                    .await?
                     .unwrap();
-                let graph = graph.read().await;
                 if graph.is_successful() {
                     break;
                 }
diff --git a/ballista/scheduler/src/state/task_manager.rs 
b/ballista/scheduler/src/state/task_manager.rs
index d005e269..c73612f5 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -300,7 +300,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
         with_lock(lock, self.state.delete(Keyspace::ActiveJobs, 
job_id)).await?;
 
-        if let Some(graph) = self.get_active_execution_graph(job_id).await {
+        if let Some(graph) = self.remove_active_execution_graph(job_id).await {
             let graph = graph.read().await.clone();
             if graph.is_successful() {
                 let value = self.encode_execution_graph(graph)?;
@@ -423,7 +423,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
             ]
         };
 
-        let _res = if let Some(graph) = 
self.get_active_execution_graph(job_id).await {
+        let _res = if let Some(graph) = 
self.remove_active_execution_graph(job_id).await {
             let mut graph = graph.write().await;
             let previous_status = graph.status();
             graph.fail_job(failure_reason);
@@ -592,6 +592,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         self.active_job_cache.get(job_id).map(|value| value.clone())
     }
 
+    /// Remove the `ExecutionGraph` for the given job ID from cache
+    pub(crate) async fn remove_active_execution_graph(
+        &self,
+        job_id: &str,
+    ) -> Option<Arc<RwLock<ExecutionGraph>>> {
+        self.active_job_cache.remove(job_id).map(|value| value.1)
+    }
+
     /// Get the `ExecutionGraph` for the given job ID. This will search fist 
in the `ActiveJobs`
     /// keyspace and then, if it doesn't find anything, search the 
`CompletedJobs` keyspace.
     pub(crate) async fn get_execution_graph(

Reply via email to