martin-g commented on code in PR #1543:
URL: 
https://github.com/apache/datafusion-ballista/pull/1543#discussion_r3037289653


##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -91,13 +94,72 @@ struct CancelJobResponse {
     pub reason: Option<String>,
 }
 
+#[derive(Debug, serde::Serialize)]
+pub struct TaskSummary {
+    /// task id
+    pub task_id: usize,

Review Comment:
   ```suggestion
       pub id: usize,
   ```



##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -376,9 +489,51 @@ pub async fn get_query_stages<
                         );
                         summary.elapsed_compute =
                             
get_elapsed_compute_nanos(&completed_stage.stage_metrics);
+
+                        summary.tasks = completed_stage
+                            .task_infos
+                            .iter()
+                            .enumerate()
+                            .map(|(partition_id, task_info)| {
+                                let partition_metrics =
+                                    
completed_stage.stage_metrics.get(partition_id);
+                                let (input_rows, output_rows) = 
partition_metrics
+                                    .map(|m| {
+                                        let input = get_combined_count(
+                                            std::slice::from_ref(m),
+                                            "input_rows",
+                                        );
+                                        let output = get_combined_count(
+                                            std::slice::from_ref(m),
+                                            "output_rows",
+                                        );
+                                        (input,output)
+                                    })
+                                    .unwrap_or((0,0));
+
+                                let start_exec_time = 
task_info.start_exec_time as u64;
+                                let end_exec_time = task_info.end_exec_time as 
u64;
+                                let task_status = 
(&task_info.task_status).into();
+                                Some(TaskSummary {
+                                    task_id: task_info.task_id,

Review Comment:
   ```suggestion
                                       id: task_info.task_id,
   ```



##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -364,8 +431,54 @@ pub async fn get_query_stages<
                             .as_ref()
                             .map(|m| get_elapsed_compute_nanos(m.as_slice()))
                             .unwrap_or_default();
+                        summary.tasks = running_stage
+                            .task_infos
+                            .iter()
+                            .enumerate()
+                            .map(|(partition_id, task_info)| {
+                                task_info.as_ref().map(|info| {
+                                    let partition_metrics = running_stage
+                                        .stage_metrics
+                                        .as_deref()
+                                        .and_then(|m| m.get(partition_id));
+                                    let (input_rows, output_rows) = 
partition_metrics
+                                    .map(|m| {
+                                        let input = get_combined_count(
+                                            std::slice::from_ref(m),
+                                            "input_rows",
+                                        );
+                                        let output = get_combined_count(
+                                            std::slice::from_ref(m),
+                                            "output_rows",
+                                        );
+                                        (input,output)
+                                    })
+                                    .unwrap_or((0,0));
+
+                                    let start_exec_time = info.start_exec_time 
as u64;
+                                    let end_exec_time = info.end_exec_time as 
u64;
+
+                                    let task_status: TaskStatus = 
(&info.task_status).into();
+
+                                    TaskSummary {
+                                        task_id: info.task_id,

Review Comment:
   ```suggestion
                                           id: info.task_id,
   ```



##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -91,13 +94,72 @@ struct CancelJobResponse {
     pub reason: Option<String>,
 }
 
+#[derive(Debug, serde::Serialize)]
+pub struct TaskSummary {
+    /// task id
+    pub task_id: usize,
+    /// Task status
+    pub task_status: TaskStatus,

Review Comment:
   ```suggestion
       pub status: TaskStatus,
   ```



##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -376,9 +489,51 @@ pub async fn get_query_stages<
                         );
                         summary.elapsed_compute =
                             
get_elapsed_compute_nanos(&completed_stage.stage_metrics);
+
+                        summary.tasks = completed_stage
+                            .task_infos
+                            .iter()
+                            .enumerate()
+                            .map(|(partition_id, task_info)| {
+                                let partition_metrics =
+                                    
completed_stage.stage_metrics.get(partition_id);
+                                let (input_rows, output_rows) = 
partition_metrics
+                                    .map(|m| {
+                                        let input = get_combined_count(
+                                            std::slice::from_ref(m),
+                                            "input_rows",
+                                        );
+                                        let output = get_combined_count(
+                                            std::slice::from_ref(m),
+                                            "output_rows",
+                                        );
+                                        (input,output)
+                                    })
+                                    .unwrap_or((0,0));
+
+                                let start_exec_time = 
task_info.start_exec_time as u64;
+                                let end_exec_time = task_info.end_exec_time as 
u64;
+                                let task_status = 
(&task_info.task_status).into();
+                                Some(TaskSummary {
+                                    task_id: task_info.task_id,
+                                    partition_id: partition_id as u32,
+                                    scheduled_time: task_info.scheduled_time 
as u64,
+                                    launch_time: task_info.launch_time as u64,
+                                    start_exec_time,
+                                    end_exec_time,
+                                    exec_duration: 
end_exec_time.saturating_sub(start_exec_time),
+                                    finish_time: task_info.finish_time as u64,
+                                    input_rows,
+                                    output_rows,
+                                    task_status

Review Comment:
   ```suggestion
                                       status: task_status
   ```



##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -364,8 +431,54 @@ pub async fn get_query_stages<
                             .as_ref()
                             .map(|m| get_elapsed_compute_nanos(m.as_slice()))
                             .unwrap_or_default();
+                        summary.tasks = running_stage
+                            .task_infos
+                            .iter()
+                            .enumerate()
+                            .map(|(partition_id, task_info)| {
+                                task_info.as_ref().map(|info| {
+                                    let partition_metrics = running_stage
+                                        .stage_metrics
+                                        .as_deref()
+                                        .and_then(|m| m.get(partition_id));
+                                    let (input_rows, output_rows) = 
partition_metrics
+                                    .map(|m| {
+                                        let input = get_combined_count(
+                                            std::slice::from_ref(m),
+                                            "input_rows",
+                                        );
+                                        let output = get_combined_count(
+                                            std::slice::from_ref(m),
+                                            "output_rows",
+                                        );
+                                        (input,output)
+                                    })
+                                    .unwrap_or((0,0));
+
+                                    let start_exec_time = info.start_exec_time 
as u64;
+                                    let end_exec_time = info.end_exec_time as 
u64;
+
+                                    let task_status: TaskStatus = 
(&info.task_status).into();
+
+                                    TaskSummary {
+                                        task_id: info.task_id,
+                                        partition_id: partition_id as u32,
+                                        scheduled_time: info.scheduled_time as 
u64,
+                                        launch_time: info.launch_time as u64,
+                                        start_exec_time,
+                                        end_exec_time,
+                                        exec_duration: 
end_exec_time.saturating_sub(start_exec_time),
+                                        finish_time: info.finish_time as u64,
+                                        input_rows,
+                                        output_rows,
+                                        task_status

Review Comment:
   ```suggestion
                                           status: task_status
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to