This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new a5b7f73ea Fix REST API panic on job list/detail when end_time <
start_time (#1693)
a5b7f73ea is described below
commit a5b7f73ea6406d02987e1caba3fe09146d5cdb3d
Author: Abhinav Gautam <[email protected]>
AuthorDate: Tue May 12 19:43:24 2026 +0530
Fix REST API panic on job list/detail when end_time < start_time (#1693)
* Fix REST API panic when job end_time < start_time
* Set execution graph end_time when a job fails
* test: cover fail_job end_time and drop redundant comments
* chore: use timestamp_millis in succeed_job; clarify test assert
---
ballista/scheduler/src/api/handlers.rs | 30 ++++++++++++++++---
ballista/scheduler/src/state/aqe/mod.rs | 13 +++------
ballista/scheduler/src/state/execution_graph.rs | 39 ++++++++++++++++++++++---
3 files changed, 65 insertions(+), 17 deletions(-)
diff --git a/ballista/scheduler/src/api/handlers.rs
b/ballista/scheduler/src/api/handlers.rs
index 87eadf43d..925fbf7ea 100644
--- a/ballista/scheduler/src/api/handlers.rs
+++ b/ballista/scheduler/src/api/handlers.rs
@@ -330,8 +330,10 @@ pub async fn get_jobs<
let jobs: Vec<JobResponse> = jobs
.iter()
.map(|job| {
- let (plain_status, job_status) =
- format_job_status(&job.status.status, job.end_time -
job.start_time);
+ let (plain_status, job_status) = format_job_status(
+ &job.status.status,
+ job_elapsed_ms(job.start_time, job.end_time),
+ );
// calculate progress based on completed stages for now, but we
could use completed
// tasks in the future to make this more accurate
@@ -377,8 +379,10 @@ pub async fn get_job<
.ok_or_else(|| SchedulerErrorResponse::new(StatusCode::NOT_FOUND))?;
let stage_plan = format!("{:?}", graph);
let job = graph.as_ref();
- let (plain_status, job_status) =
- format_job_status(&job.status().status, job.end_time() -
job.start_time());
+ let (plain_status, job_status) = format_job_status(
+ &job.status().status,
+ job_elapsed_ms(job.start_time(), job.end_time()),
+ );
let num_stages = job.stage_count();
let completed_stages = job.completed_stages();
@@ -686,6 +690,14 @@ fn task_duration_percentiles(tasks:
&[Option<TaskSummary>]) -> Option<Percentile
})
}
+/// Returns elapsed wall time in milliseconds for API formatting.
+///
+/// Uses saturating subtraction so inconsistent timestamps (e.g. failed jobs,
or
+/// `end_time` still zero while `start_time` is set) do not panic on subtract.
+fn job_elapsed_ms(start_time: u64, end_time: u64) -> u64 {
+ end_time.saturating_sub(start_time)
+}
+
fn format_job_status(status: &Option<Status>, elapsed_ms: u64) -> (String,
String) {
match status {
Some(Status::Queued(_)) => ("Queued".to_string(),
"Queued".to_string()),
@@ -1001,4 +1013,14 @@ mod tests {
let result = get_running_stage_time(&tasks, now);
assert_eq!(result, "2.00s");
}
+
+ #[test]
+ fn test_job_elapsed_ms_normal() {
+ assert_eq!(super::job_elapsed_ms(100, 500), 400);
+ }
+
+ #[test]
+ fn test_job_elapsed_ms_end_before_start_saturates_to_zero() {
+ assert_eq!(super::job_elapsed_ms(500, 100), 0);
+ }
}
diff --git a/ballista/scheduler/src/state/aqe/mod.rs
b/ballista/scheduler/src/state/aqe/mod.rs
index 470a6e9ee..6b1903dc9 100644
--- a/ballista/scheduler/src/state/aqe/mod.rs
+++ b/ballista/scheduler/src/state/aqe/mod.rs
@@ -39,7 +39,6 @@ use datafusion::prelude::SessionConfig;
use log::{debug, error, info, warn};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
-use std::time::{SystemTime, UNIX_EPOCH};
use std::vec;
// TODO: the AQE planner runs DataFusion's DefaultPhysicalPlanner with a
@@ -1167,6 +1166,8 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
/// fail job with error message
fn fail_job(&mut self, error: String) {
+ self.end_time = timestamp_millis();
+
self.status = JobStatus {
job_id: self.job_id.clone(),
job_name: self.job_name.clone(),
@@ -1194,10 +1195,7 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
.map(|l| l.try_into())
.collect::<ballista_core::error::Result<Vec<_>>>()?;
- self.end_time = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .unwrap()
- .as_millis() as u64;
+ self.end_time = timestamp_millis();
self.status = JobStatus {
job_id: self.job_id.clone(),
@@ -1290,10 +1288,7 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
let task_attempt = stage.task_failure_numbers[partition_id];
let task_info = crate::state::execution_graph::TaskInfo {
task_id,
- scheduled_time: SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .unwrap()
- .as_millis(),
+ scheduled_time: timestamp_millis() as u128,
// Those times will be updated when the task finish
launch_time: 0,
start_exec_time: 0,
diff --git a/ballista/scheduler/src/state/execution_graph.rs
b/ballista/scheduler/src/state/execution_graph.rs
index ce23a459a..8f0c30cfa 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -1348,6 +1348,8 @@ impl ExecutionGraph for StaticExecutionGraph {
/// fail job with error message
fn fail_job(&mut self, error: String) {
+ self.end_time = timestamp_millis();
+
self.status = JobStatus {
job_id: self.job_id.clone(),
job_name: self.job_name.clone(),
@@ -1375,10 +1377,7 @@ impl ExecutionGraph for StaticExecutionGraph {
.map(|l| l.try_into())
.collect::<Result<Vec<_>>>()?;
- self.end_time = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .unwrap()
- .as_millis() as u64;
+ self.end_time = timestamp_millis();
self.status = JobStatus {
job_id: self.job_id.clone(),
@@ -1772,6 +1771,38 @@ mod test {
test_union_all_plan, test_union_plan,
};
+ #[tokio::test]
+ async fn test_fail_job_sets_end_time_and_failed_metadata() -> Result<()> {
+ let mut graph = test_aggregation_plan(4).await;
+ let start = graph.start_time();
+ assert_eq!(graph.end_time(), 0);
+
+ ExecutionGraph::fail_job(&mut graph, "test failure".to_string());
+
+ assert!(
+ matches!(
+ graph.status().status.as_ref(),
+ Some(job_status::Status::Failed(f)) if f.error == "test
failure"
+ ),
+ "expected FailedJob status after fail_job"
+ );
+ assert!(
+ graph.end_time() >= start,
+ "end_time ({}) should be set and >= start_time ({})",
+ graph.end_time(),
+ start
+ );
+
+ if let Some(job_status::Status::Failed(failed)) =
&graph.status().status {
+ assert_eq!(failed.started_at, start);
+ assert_eq!(failed.ended_at, graph.end_time());
+ } else {
+ panic!("missing FailedJob");
+ }
+
+ Ok(())
+ }
+
#[tokio::test]
async fn test_drain_tasks() -> Result<()> {
let mut agg_graph = test_aggregation_plan(4).await;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]