milenkovicm commented on code in PR #1703:
URL: 
https://github.com/apache/datafusion-ballista/pull/1703#discussion_r3252574816


##########
ballista/scheduler/src/state/task_manager.rs:
##########
@@ -366,6 +366,50 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         Ok(jobs)
     }
 
+    /// Get all job ids including running, queued, and completed jobs
+    pub async fn get_all_jobs(&self) -> Result<Vec<JobOverview>> {
+        self.get_all_jobs_with_status(None).await
+    }
+
+    /// Get all jobs optionally filtered by status.
+    /// When `status` is None, returns all jobs regardless of status.
+    pub async fn get_all_jobs_with_status(
+        &self,
+        status: Option<job_status::Status>,
+    ) -> Result<Vec<JobOverview>> {
+        let job_ids = self.state.get_all_jobs().await?;
+
+        let mut jobs = vec![];
+        for job_id in &job_ids {
+            if let Some(cached) = self.get_active_execution_graph(job_id) {
+                let graph = cached.read().await;
+                jobs.push(graph.deref().into());
+            } else if let Some(graph) = 
self.state.get_execution_graph(job_id).await? {
+                jobs.push((&graph).into());
+            } else if let Some(job_status) = 
self.state.get_job_status(job_id).await? {
+                if let Some(ref filter) = status {
+                    if let Some(ref s) = job_status.status {
+                        if s != filter {
+                            continue;
+                        }
+                    } else {
+                        continue;
+                    }
+                }

Review Comment:
   do we need this filter for queued jobs, looks like status filter is not 
used, perhaps we can remove it for now



##########
ballista/scheduler/src/state/task_manager.rs:
##########
@@ -366,6 +366,50 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         Ok(jobs)
     }
 
+    /// Get all job ids including running, queued, and completed jobs

Review Comment:
   maybe we should also update `get_jobs` doc to reflect difference between 
this one 



##########
ballista/scheduler/src/state/task_manager.rs:
##########
@@ -366,6 +366,32 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         Ok(jobs)
     }
 
+    /// Get all job ids including running, queued, and completed jobs
+    pub async fn get_all_jobs(&self) -> Result<Vec<JobOverview>> {
+        let job_ids = self.state.get_all_jobs().await?;
+
+        let mut jobs = vec![];
+        for job_id in &job_ids {
+            if let Some(cached) = self.get_active_execution_graph(job_id) {
+                let graph = cached.read().await;
+                jobs.push(graph.deref().into());
+            } else if let Some(graph) = 
self.state.get_execution_graph(job_id).await? {
+                jobs.push((&graph).into());
+            } else if let Some(status) = 
self.state.get_job_status(job_id).await? {
+                jobs.push(JobOverview {
+                    job_id: status.job_id.clone(),
+                    job_name: status.job_name.clone(),
+                    status,
+                    start_time: 0,

Review Comment:
   there is no start time as its just queued, 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to