This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 0516616f1 feat: Expose Logical and Physical plan details in the REST 
API (#1498)
0516616f1 is described below

commit 0516616f1034e5aab6655aa7651bf9fd5aae25cd
Author: Marko Milenković <[email protected]>
AuthorDate: Wed Mar 11 21:40:28 2026 +0000

    feat: Expose Logical and Physical plan details in the REST API (#1498)
---
 .gitignore                                         |   1 +
 ballista/scheduler/src/api/handlers.rs             | 118 +++++++++++++++------
 ballista/scheduler/src/api/mod.rs                  |   6 +-
 ballista/scheduler/src/state/aqe/mod.rs            |  16 +++
 ballista/scheduler/src/state/execution_graph.rs    |  22 ++++
 .../scheduler/src/state/execution_graph_dot.rs     |   4 +
 ballista/scheduler/src/state/mod.rs                |   7 ++
 ballista/scheduler/src/state/task_manager.rs       |   6 ++
 ballista/scheduler/src/test_utils.rs               |  12 +++
 9 files changed, 160 insertions(+), 32 deletions(-)

diff --git a/.gitignore b/.gitignore
index 50983b431..1e47926b2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -108,3 +108,4 @@ logs/
 
 # Claude Code guidance file (local only)
 CLAUDE.md
+.claude/
\ No newline at end of file
diff --git a/ballista/scheduler/src/api/handlers.rs 
b/ballista/scheduler/src/api/handlers.rs
index de1546c59..a7ade706f 100644
--- a/ballista/scheduler/src/api/handlers.rs
+++ b/ballista/scheduler/src/api/handlers.rs
@@ -56,6 +56,12 @@ pub struct JobResponse {
     pub num_stages: usize,
     pub completed_stages: usize,
     pub percent_complete: u8,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub logical_plan: Option<String>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub physical_plan: Option<String>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub stage_plan: Option<String>,
 }
 
 #[derive(Debug, serde::Serialize)]
@@ -129,36 +135,8 @@ pub async fn get_jobs<
     let jobs: Vec<JobResponse> = jobs
         .iter()
         .map(|job| {
-            let status = &job.status;
-            let (plain_status, job_status) = match &status.status {
-                Some(Status::Queued(_)) => ("Queued".to_string(), 
"Queued".to_string()),
-                Some(Status::Running(_)) => ("Running".to_string(), 
"Running".to_string()),
-                Some(Status::Failed(error)) => ("Failed".to_string(), 
format!("Failed: {}", error.error)),
-                Some(Status::Successful(completed)) => {
-                    let num_rows = completed
-                        .partition_location
-                        .iter()
-                        .map(|p| {
-                            p.partition_stats.as_ref().map(|s| 
s.num_rows).unwrap_or(0)
-                        })
-                        .sum::<i64>();
-                    let num_rows_term = if num_rows == 1 { "row" } else { 
"rows" };
-                    let num_partitions = completed.partition_location.len();
-                    let num_partitions_term = if num_partitions == 1 {
-                        "partition"
-                    } else {
-                        "partitions"
-                    };
-                    ("Completed".to_string(),
-                    format!(
-                        "Completed. Produced {} {} containing {} {}. Elapsed 
time: {} ms.",
-                        num_partitions, num_partitions_term, num_rows, 
num_rows_term,
-                        job.end_time - job.start_time
-                    )
-                    )
-                }
-                _ => ("Invalid".to_string(), "Invalid State".to_string()),
-            };
+            let (plain_status, job_status) =
+                format_job_status(&job.status.status, job.end_time - 
job.start_time);
 
             // calculate progress based on completed stages for now, but we 
could use completed
             // tasks in the future to make this more accurate
@@ -172,6 +150,9 @@ pub async fn get_jobs<
                 num_stages: job.num_stages,
                 completed_stages: job.completed_stages,
                 percent_complete,
+                logical_plan: None,
+                physical_plan: None,
+                stage_plan: None,
             }
         })
         .collect();
@@ -179,6 +160,47 @@ pub async fn get_jobs<
     Ok(Json(jobs))
 }
 
+pub async fn get_job<
+    T: AsLogicalPlan + Clone + Send + Sync + 'static,
+    U: AsExecutionPlan + Send + Sync + 'static,
+>(
+    State(data_server): State<Arc<SchedulerServer<T, U>>>,
+    Path(job_id): Path<String>,
+) -> Result<impl IntoResponse, StatusCode> {
+    let graph = data_server
+        .state
+        .task_manager
+        .get_job_execution_graph(&job_id)
+        .await
+        .map_err(|err| {
+            tracing::error!("Error occurred while getting the execution graph 
for job '{job_id}': {err:?}");
+            StatusCode::INTERNAL_SERVER_ERROR
+        })?
+        .ok_or(StatusCode::NOT_FOUND)?;
+    let stage_plan = format!("{:?}", graph);
+    let job = graph.as_ref();
+    let (plain_status, job_status) =
+        format_job_status(&job.status().status, job.end_time() - 
job.start_time());
+
+    let num_stages = job.stage_count();
+    let completed_stages = job.completed_stages();
+    let percent_complete =
+        ((completed_stages as f32 / num_stages as f32) * 100_f32) as u8;
+
+    Ok(Json(JobResponse {
+        job_id: job.job_id().to_string(),
+        job_name: job.job_name().to_string(),
+        job_status,
+        status: plain_status,
+        num_stages,
+        completed_stages,
+        percent_complete,
+        logical_plan: job.logical_plan().map(str::to_owned),
+        physical_plan: job.physical_plan().map(str::to_owned),
+        stage_plan: Some(stage_plan),
+    }))
+}
+
 pub async fn cancel_job<
     T: AsLogicalPlan + Clone + Send + Sync + 'static,
     U: AsExecutionPlan + Send + Sync + 'static,
@@ -314,6 +336,42 @@ pub async fn get_query_stages<
     }
 }
 
+fn format_job_status(status: &Option<Status>, elapsed_ms: u64) -> (String, 
String) {
+    match status {
+        Some(Status::Queued(_)) => ("Queued".to_string(), 
"Queued".to_string()),
+        Some(Status::Running(_)) => ("Running".to_string(), 
"Running".to_string()),
+        Some(Status::Failed(error)) => {
+            ("Failed".to_string(), format!("Failed: {}", error.error))
+        }
+        Some(Status::Successful(completed)) => {
+            let num_rows = completed
+                .partition_location
+                .iter()
+                .map(|p| p.partition_stats.as_ref().map(|s| 
s.num_rows).unwrap_or(0))
+                .sum::<i64>();
+            let num_rows_term = if num_rows == 1 { "row" } else { "rows" };
+            let num_partitions = completed.partition_location.len();
+            let num_partitions_term = if num_partitions == 1 {
+                "partition"
+            } else {
+                "partitions"
+            };
+            (
+                "Completed".to_string(),
+                format!(
+                    "Completed. Produced {} {} containing {} {}. Elapsed time: 
{} ms.",
+                    num_partitions,
+                    num_partitions_term,
+                    num_rows,
+                    num_rows_term,
+                    elapsed_ms
+                ),
+            )
+        }
+        _ => ("Invalid".to_string(), "Invalid State".to_string()),
+    }
+}
+
 fn get_elapsed_compute_nanos(metrics: &[MetricsSet]) -> String {
     let nanos: usize = metrics
         .iter()
diff --git a/ballista/scheduler/src/api/mod.rs 
b/ballista/scheduler/src/api/mod.rs
index 733a60f8c..2662e3eea 100644
--- a/ballista/scheduler/src/api/mod.rs
+++ b/ballista/scheduler/src/api/mod.rs
@@ -13,7 +13,6 @@
 mod handlers;
 
 use crate::scheduler_server::SchedulerServer;
-use axum::routing::patch;
 use axum::{Router, routing::get};
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use datafusion_proto::physical_plan::AsExecutionPlan;
@@ -30,7 +29,10 @@ pub fn get_routes<
         .route("/api/state", get(handlers::get_scheduler_state::<T, U>))
         .route("/api/executors", get(handlers::get_executors::<T, U>))
         .route("/api/jobs", get(handlers::get_jobs::<T, U>))
-        .route("/api/job/{job_id}", patch(handlers::cancel_job::<T, U>))
+        .route(
+            "/api/job/{job_id}",
+            get(handlers::get_job::<T, U>).patch(handlers::cancel_job::<T, U>),
+        )
         .route(
             "/api/job/{job_id}/stages",
             get(handlers::get_query_stages::<T, U>),
diff --git a/ballista/scheduler/src/state/aqe/mod.rs 
b/ballista/scheduler/src/state/aqe/mod.rs
index 718902bc3..32725097a 100644
--- a/ballista/scheduler/src/state/aqe/mod.rs
+++ b/ballista/scheduler/src/state/aqe/mod.rs
@@ -109,6 +109,10 @@ pub(crate) struct AdaptiveExecutionGraph {
     failed_stage_attempts: HashMap<usize, HashSet<usize>>,
     /// Session config for this job
     session_config: Arc<SessionConfig>,
+    /// Logical plan as a human-readable string, captured at submission time.
+    logical_plan: Option<String>,
+    /// Physical plan as a human-readable string, captured at submission time.
+    physical_plan: Option<String>,
 }
 
 impl AdaptiveExecutionGraph {
@@ -125,6 +129,8 @@ impl AdaptiveExecutionGraph {
         plan: Arc<dyn ExecutionPlan>,
         queued_at: u64,
         session_config: Arc<SessionConfig>,
+        logical_plan: Option<String>,
+        physical_plan: Option<String>,
     ) -> ballista_core::error::Result<Self> {
         let mut planner =
             AdaptivePlanner::try_new(&session_config, plan, 
job_name.to_owned())?;
@@ -178,6 +184,8 @@ impl AdaptiveExecutionGraph {
             task_id_gen: 0,
             failed_stage_attempts: HashMap::new(),
             session_config,
+            logical_plan,
+            physical_plan,
         })
     }
 }
@@ -503,6 +511,14 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
         &self.status
     }
 
+    fn logical_plan(&self) -> Option<&str> {
+        self.logical_plan.as_deref()
+    }
+
+    fn physical_plan(&self) -> Option<&str> {
+        self.physical_plan.as_deref()
+    }
+
     fn start_time(&self) -> u64 {
         self.start_time
     }
diff --git a/ballista/scheduler/src/state/execution_graph.rs 
b/ballista/scheduler/src/state/execution_graph.rs
index 27a325479..c443918ef 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -112,6 +112,12 @@ pub trait ExecutionGraph: Debug {
     /// Returns the current job status.
     fn status(&self) -> &JobStatus;
 
+    /// Returns the logical plan as a string, if captured at submission time.
+    fn logical_plan(&self) -> Option<&str>;
+
+    /// Returns the physical plan as a string, if captured at submission time.
+    fn physical_plan(&self) -> Option<&str>;
+
     /// Returns the timestamp when this job started execution.
     fn start_time(&self) -> u64;
 
@@ -263,6 +269,10 @@ pub struct StaticExecutionGraph {
     failed_stage_attempts: HashMap<usize, HashSet<usize>>,
     /// Session config for this job
     session_config: Arc<SessionConfig>,
+    /// Logical plan as a human-readable string, captured at submission time.
+    logical_plan: Option<String>,
+    /// Physical plan as a human-readable string, captured at submission time.
+    physical_plan: Option<String>,
 }
 
 /// Information about a currently running task.
@@ -298,6 +308,8 @@ impl StaticExecutionGraph {
         queued_at: u64,
         session_config: Arc<SessionConfig>,
         planner: &mut dyn DistributedPlanner,
+        logical_plan: Option<String>,
+        physical_plan: Option<String>,
     ) -> Result<Self> {
         let shuffle_stages =
             planner.plan_query_stages(job_id, plan, session_config.options())?;
@@ -330,6 +342,8 @@ impl StaticExecutionGraph {
             task_id_gen: 0,
             failed_stage_attempts: HashMap::new(),
             session_config,
+            logical_plan,
+            physical_plan,
         })
     }
 
@@ -635,6 +649,14 @@ impl ExecutionGraph for StaticExecutionGraph {
         &self.status
     }
 
+    fn logical_plan(&self) -> Option<&str> {
+        self.logical_plan.as_deref()
+    }
+
+    fn physical_plan(&self) -> Option<&str> {
+        self.physical_plan.as_deref()
+    }
+
     fn start_time(&self) -> u64 {
         self.start_time
     }
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs 
b/ballista/scheduler/src/state/execution_graph_dot.rs
index e08fd663f..28f29711a 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -612,6 +612,8 @@ filter_expr="]
             0,
             Arc::new(SessionConfig::new_with_ballista()),
             &mut planner,
+            None,
+            None,
         )
     }
 
@@ -648,6 +650,8 @@ filter_expr="]
             0,
             Arc::new(SessionConfig::new_with_ballista()),
             &mut planner,
+            None,
+            None,
         )
     }
 }
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index 82fa60c97..663494336 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -443,11 +443,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
             None
         };
 
+        let logical_plan_str = plan.display_indent().to_string();
+
         let plan = session_ctx.state().create_physical_plan(plan).await?;
         debug!(
             "Physical plan: {}",
             DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
         );
+        let physical_plan_str = DisplayableExecutionPlan::new(plan.as_ref())
+            .indent(false)
+            .to_string();
 
         let plan = plan.transform_down(&|node: Arc<dyn ExecutionPlan>| {
             if node.output_partitioning().partition_count() == 0 {
@@ -490,6 +495,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
                 queued_at,
                 session_config,
                 subscriber,
+                Some(logical_plan_str),
+                Some(physical_plan_str),
             )
             .await?;
 
diff --git a/ballista/scheduler/src/state/task_manager.rs 
b/ballista/scheduler/src/state/task_manager.rs
index b55806407..7a10de451 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -278,6 +278,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         queued_at: u64,
         session_config: Arc<SessionConfig>,
         subscriber: Option<JobStatusSubscriber>,
+        logical_plan: Option<String>,
+        physical_plan: Option<String>,
     ) -> Result<()> {
         let mut planner = DefaultDistributedPlanner::new();
 
@@ -294,6 +296,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                 plan,
                 queued_at,
                 session_config,
+                logical_plan,
+                physical_plan,
             )?) as ExecutionGraphBox
         } else {
             debug!("Using static query planner for job planning");
@@ -306,6 +310,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
                 queued_at,
                 session_config,
                 &mut planner,
+                logical_plan,
+                physical_plan,
             )?) as ExecutionGraphBox
         };
 
diff --git a/ballista/scheduler/src/test_utils.rs 
b/ballista/scheduler/src/test_utils.rs
index 1d4f3633f..80837a0fa 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -920,6 +920,8 @@ pub async fn test_aggregation_plan_with_job_id(
         0,
         Arc::new(SessionConfig::new_with_ballista()),
         &mut planner,
+        None,
+        None,
     )
     .unwrap()
 }
@@ -968,6 +970,8 @@ pub async fn test_two_aggregations_plan(partition: usize) 
-> StaticExecutionGrap
         0,
         Arc::new(SessionConfig::new_with_ballista()),
         &mut planner,
+        None,
+        None,
     )
     .unwrap()
 }
@@ -1008,6 +1012,8 @@ pub async fn test_coalesce_plan(partition: usize) -> 
StaticExecutionGraph {
         0,
         Arc::new(SessionConfig::new_with_ballista()),
         &mut planner,
+        None,
+        None,
     )
     .unwrap()
 }
@@ -1068,6 +1074,8 @@ pub async fn test_join_plan(partition: usize) -> 
StaticExecutionGraph {
         0,
         Arc::new(SessionConfig::new_with_ballista()),
         &mut planner,
+        None,
+        None,
     )
     .unwrap();
 
@@ -1110,6 +1118,8 @@ pub async fn test_union_all_plan(partition: usize) -> 
StaticExecutionGraph {
         0,
         Arc::new(SessionConfig::new_with_ballista()),
         &mut planner,
+        None,
+        None,
     )
     .unwrap();
 
@@ -1152,6 +1162,8 @@ pub async fn test_union_plan(partition: usize) -> 
StaticExecutionGraph {
         0,
         Arc::new(SessionConfig::new_with_ballista()),
         &mut planner,
+        None,
+        None,
     )
     .unwrap();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to