This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new a5b7f73ea Fix REST API panic on job list/detail when end_time < 
start_time (#1693)
a5b7f73ea is described below

commit a5b7f73ea6406d02987e1caba3fe09146d5cdb3d
Author: Abhinav Gautam <[email protected]>
AuthorDate: Tue May 12 19:43:24 2026 +0530

    Fix REST API panic on job list/detail when end_time < start_time (#1693)
    
    * Fix REST API panic when job end_time < start_time
    
    * Set execution graph end_time when a job fails
    
    * test: cover fail_job end_time and drop redundant comments
    
    * chore: use timestamp_millis in succeed_job; clarify test assert
---
 ballista/scheduler/src/api/handlers.rs          | 30 ++++++++++++++++---
 ballista/scheduler/src/state/aqe/mod.rs         | 13 +++------
 ballista/scheduler/src/state/execution_graph.rs | 39 ++++++++++++++++++++++---
 3 files changed, 65 insertions(+), 17 deletions(-)

diff --git a/ballista/scheduler/src/api/handlers.rs 
b/ballista/scheduler/src/api/handlers.rs
index 87eadf43d..925fbf7ea 100644
--- a/ballista/scheduler/src/api/handlers.rs
+++ b/ballista/scheduler/src/api/handlers.rs
@@ -330,8 +330,10 @@ pub async fn get_jobs<
     let jobs: Vec<JobResponse> = jobs
         .iter()
         .map(|job| {
-            let (plain_status, job_status) =
-                format_job_status(&job.status.status, job.end_time - 
job.start_time);
+            let (plain_status, job_status) = format_job_status(
+                &job.status.status,
+                job_elapsed_ms(job.start_time, job.end_time),
+            );
 
             // calculate progress based on completed stages for now, but we 
could use completed
             // tasks in the future to make this more accurate
@@ -377,8 +379,10 @@ pub async fn get_job<
         .ok_or_else(|| SchedulerErrorResponse::new(StatusCode::NOT_FOUND))?;
     let stage_plan = format!("{:?}", graph);
     let job = graph.as_ref();
-    let (plain_status, job_status) =
-        format_job_status(&job.status().status, job.end_time() - 
job.start_time());
+    let (plain_status, job_status) = format_job_status(
+        &job.status().status,
+        job_elapsed_ms(job.start_time(), job.end_time()),
+    );
 
     let num_stages = job.stage_count();
     let completed_stages = job.completed_stages();
@@ -686,6 +690,14 @@ fn task_duration_percentiles(tasks: 
&[Option<TaskSummary>]) -> Option<Percentile
     })
 }
 
+/// Returns elapsed wall time in milliseconds for API formatting.
+///
+/// Uses saturating subtraction so inconsistent timestamps (e.g. failed jobs, 
or
+/// `end_time` still zero while `start_time` is set) do not panic on subtract.
+fn job_elapsed_ms(start_time: u64, end_time: u64) -> u64 {
+    end_time.saturating_sub(start_time)
+}
+
 fn format_job_status(status: &Option<Status>, elapsed_ms: u64) -> (String, 
String) {
     match status {
         Some(Status::Queued(_)) => ("Queued".to_string(), 
"Queued".to_string()),
@@ -1001,4 +1013,14 @@ mod tests {
         let result = get_running_stage_time(&tasks, now);
         assert_eq!(result, "2.00s");
     }
+
+    #[test]
+    fn test_job_elapsed_ms_normal() {
+        assert_eq!(super::job_elapsed_ms(100, 500), 400);
+    }
+
+    #[test]
+    fn test_job_elapsed_ms_end_before_start_saturates_to_zero() {
+        assert_eq!(super::job_elapsed_ms(500, 100), 0);
+    }
 }
diff --git a/ballista/scheduler/src/state/aqe/mod.rs 
b/ballista/scheduler/src/state/aqe/mod.rs
index 470a6e9ee..6b1903dc9 100644
--- a/ballista/scheduler/src/state/aqe/mod.rs
+++ b/ballista/scheduler/src/state/aqe/mod.rs
@@ -39,7 +39,6 @@ use datafusion::prelude::SessionConfig;
 use log::{debug, error, info, warn};
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
-use std::time::{SystemTime, UNIX_EPOCH};
 use std::vec;
 
 // TODO: the AQE planner runs DataFusion's DefaultPhysicalPlanner with a
@@ -1167,6 +1166,8 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
 
     /// fail job with error message
     fn fail_job(&mut self, error: String) {
+        self.end_time = timestamp_millis();
+
         self.status = JobStatus {
             job_id: self.job_id.clone(),
             job_name: self.job_name.clone(),
@@ -1194,10 +1195,7 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
             .map(|l| l.try_into())
             .collect::<ballista_core::error::Result<Vec<_>>>()?;
 
-        self.end_time = SystemTime::now()
-            .duration_since(UNIX_EPOCH)
-            .unwrap()
-            .as_millis() as u64;
+        self.end_time = timestamp_millis();
 
         self.status = JobStatus {
             job_id: self.job_id.clone(),
@@ -1290,10 +1288,7 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
                 let task_attempt = stage.task_failure_numbers[partition_id];
                 let task_info = crate::state::execution_graph::TaskInfo {
                     task_id,
-                    scheduled_time: SystemTime::now()
-                        .duration_since(UNIX_EPOCH)
-                        .unwrap()
-                        .as_millis(),
+                    scheduled_time: timestamp_millis() as u128,
                     // Those times will be updated when the task finish
                     launch_time: 0,
                     start_exec_time: 0,
diff --git a/ballista/scheduler/src/state/execution_graph.rs 
b/ballista/scheduler/src/state/execution_graph.rs
index ce23a459a..8f0c30cfa 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -1348,6 +1348,8 @@ impl ExecutionGraph for StaticExecutionGraph {
 
     /// fail job with error message
     fn fail_job(&mut self, error: String) {
+        self.end_time = timestamp_millis();
+
         self.status = JobStatus {
             job_id: self.job_id.clone(),
             job_name: self.job_name.clone(),
@@ -1375,10 +1377,7 @@ impl ExecutionGraph for StaticExecutionGraph {
             .map(|l| l.try_into())
             .collect::<Result<Vec<_>>>()?;
 
-        self.end_time = SystemTime::now()
-            .duration_since(UNIX_EPOCH)
-            .unwrap()
-            .as_millis() as u64;
+        self.end_time = timestamp_millis();
 
         self.status = JobStatus {
             job_id: self.job_id.clone(),
@@ -1772,6 +1771,38 @@ mod test {
         test_union_all_plan, test_union_plan,
     };
 
+    #[tokio::test]
+    async fn test_fail_job_sets_end_time_and_failed_metadata() -> Result<()> {
+        let mut graph = test_aggregation_plan(4).await;
+        let start = graph.start_time();
+        assert_eq!(graph.end_time(), 0);
+
+        ExecutionGraph::fail_job(&mut graph, "test failure".to_string());
+
+        assert!(
+            matches!(
+                graph.status().status.as_ref(),
+                Some(job_status::Status::Failed(f)) if f.error == "test 
failure"
+            ),
+            "expected FailedJob status after fail_job"
+        );
+        assert!(
+            graph.end_time() >= start,
+            "end_time ({}) should be set and >= start_time ({})",
+            graph.end_time(),
+            start
+        );
+
+        if let Some(job_status::Status::Failed(failed)) = 
&graph.status().status {
+            assert_eq!(failed.started_at, start);
+            assert_eq!(failed.ended_at, graph.end_time());
+        } else {
+            panic!("missing FailedJob");
+        }
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_drain_tasks() -> Result<()> {
         let mut agg_graph = test_aggregation_plan(4).await;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to