martin-g commented on code in PR #1703:
URL: 
https://github.com/apache/datafusion-ballista/pull/1703#discussion_r3241468764


##########
ballista/scheduler/src/state/task_manager.rs:
##########
@@ -366,6 +366,32 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         Ok(jobs)
     }
 
+    /// Get all job ids including running, queued, and completed jobs
+    pub async fn get_all_jobs(&self) -> Result<Vec<JobOverview>> {
+        let job_ids = self.state.get_all_jobs().await?;
+
+        let mut jobs = vec![];
+        for job_id in &job_ids {
+            if let Some(cached) = self.get_active_execution_graph(job_id) {
+                let graph = cached.read().await;
+                jobs.push(graph.deref().into());
+            } else if let Some(graph) = 
self.state.get_execution_graph(job_id).await? {
+                jobs.push((&graph).into());
+            } else if let Some(status) = 
self.state.get_job_status(job_id).await? {
+                jobs.push(JobOverview {
+                    job_id: status.job_id.clone(),
+                    job_name: status.job_name.clone(),
+                    status,
+                    start_time: 0,
+                    end_time: 0,
+                    num_stages: 0,
+                    completed_stages: 0,
+                });
+            }

Review Comment:
   What if the job_id is not found in all three lookups ?
   Should a warning be logged ?!



##########
ballista/scheduler/src/state/task_manager.rs:
##########
@@ -366,6 +366,32 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         Ok(jobs)
     }
 
+    /// Get all job ids including running, queued, and completed jobs
+    pub async fn get_all_jobs(&self) -> Result<Vec<JobOverview>> {
+        let job_ids = self.state.get_all_jobs().await?;
+
+        let mut jobs = vec![];
+        for job_id in &job_ids {
+            if let Some(cached) = self.get_active_execution_graph(job_id) {
+                let graph = cached.read().await;
+                jobs.push(graph.deref().into());
+            } else if let Some(graph) = 
self.state.get_execution_graph(job_id).await? {
+                jobs.push((&graph).into());
+            } else if let Some(status) = 
self.state.get_job_status(job_id).await? {
+                jobs.push(JobOverview {
+                    job_id: status.job_id.clone(),
+                    job_name: status.job_name.clone(),
+                    status,
+                    start_time: 0,

Review Comment:
   Instead of returning `0` here we should use the `status` and try to extract 
the correct value if there is such.



##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -322,7 +322,7 @@ pub async fn get_jobs<
 ) -> Result<impl IntoResponse, SchedulerErrorResponse> {
     let state = &data_server.state;
 
-    let jobs = state.task_manager.get_jobs().await.map_err(|e| {

Review Comment:
   `get_jobs()` is no more used. It could be marked as deprecated and later 
either removed or at least renamed to `get_completed_jobs()`



##########
ballista/scheduler/src/state/task_manager.rs:
##########
@@ -366,6 +366,32 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         Ok(jobs)
     }
 
+    /// Get all job ids including running, queued, and completed jobs

Review Comment:
   ```suggestion
       /// Get all job ids including running, queued, and completed/failed jobs
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to