martin-g commented on code in PR #1543:
URL:
https://github.com/apache/datafusion-ballista/pull/1543#discussion_r3037289653
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -91,13 +94,72 @@ struct CancelJobResponse {
pub reason: Option<String>,
}
+#[derive(Debug, serde::Serialize)]
+pub struct TaskSummary {
+ /// task id
+ pub task_id: usize,
Review Comment:
```suggestion
pub id: usize,
```
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -376,9 +489,51 @@ pub async fn get_query_stages<
);
summary.elapsed_compute =
get_elapsed_compute_nanos(&completed_stage.stage_metrics);
+
+ summary.tasks = completed_stage
+ .task_infos
+ .iter()
+ .enumerate()
+ .map(|(partition_id, task_info)| {
+ let partition_metrics =
+
completed_stage.stage_metrics.get(partition_id);
+ let (input_rows, output_rows) =
partition_metrics
+ .map(|m| {
+ let input = get_combined_count(
+ std::slice::from_ref(m),
+ "input_rows",
+ );
+ let output = get_combined_count(
+ std::slice::from_ref(m),
+ "output_rows",
+ );
+ (input,output)
+ })
+ .unwrap_or((0,0));
+
+ let start_exec_time =
task_info.start_exec_time as u64;
+ let end_exec_time = task_info.end_exec_time as
u64;
+ let task_status =
(&task_info.task_status).into();
+ Some(TaskSummary {
+ task_id: task_info.task_id,
Review Comment:
```suggestion
id: task_info.task_id,
```
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -364,8 +431,54 @@ pub async fn get_query_stages<
.as_ref()
.map(|m| get_elapsed_compute_nanos(m.as_slice()))
.unwrap_or_default();
+ summary.tasks = running_stage
+ .task_infos
+ .iter()
+ .enumerate()
+ .map(|(partition_id, task_info)| {
+ task_info.as_ref().map(|info| {
+ let partition_metrics = running_stage
+ .stage_metrics
+ .as_deref()
+ .and_then(|m| m.get(partition_id));
+ let (input_rows, output_rows) =
partition_metrics
+ .map(|m| {
+ let input = get_combined_count(
+ std::slice::from_ref(m),
+ "input_rows",
+ );
+ let output = get_combined_count(
+ std::slice::from_ref(m),
+ "output_rows",
+ );
+ (input,output)
+ })
+ .unwrap_or((0,0));
+
+ let start_exec_time = info.start_exec_time
as u64;
+ let end_exec_time = info.end_exec_time as
u64;
+
+ let task_status: TaskStatus =
(&info.task_status).into();
+
+ TaskSummary {
+ task_id: info.task_id,
Review Comment:
```suggestion
id: info.task_id,
```
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -91,13 +94,72 @@ struct CancelJobResponse {
pub reason: Option<String>,
}
+#[derive(Debug, serde::Serialize)]
+pub struct TaskSummary {
+ /// task id
+ pub task_id: usize,
+ /// Task status
+ pub task_status: TaskStatus,
Review Comment:
```suggestion
pub status: TaskStatus,
```
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -376,9 +489,51 @@ pub async fn get_query_stages<
);
summary.elapsed_compute =
get_elapsed_compute_nanos(&completed_stage.stage_metrics);
+
+ summary.tasks = completed_stage
+ .task_infos
+ .iter()
+ .enumerate()
+ .map(|(partition_id, task_info)| {
+ let partition_metrics =
+
completed_stage.stage_metrics.get(partition_id);
+ let (input_rows, output_rows) =
partition_metrics
+ .map(|m| {
+ let input = get_combined_count(
+ std::slice::from_ref(m),
+ "input_rows",
+ );
+ let output = get_combined_count(
+ std::slice::from_ref(m),
+ "output_rows",
+ );
+ (input,output)
+ })
+ .unwrap_or((0,0));
+
+ let start_exec_time =
task_info.start_exec_time as u64;
+ let end_exec_time = task_info.end_exec_time as
u64;
+ let task_status =
(&task_info.task_status).into();
+ Some(TaskSummary {
+ task_id: task_info.task_id,
+ partition_id: partition_id as u32,
+ scheduled_time: task_info.scheduled_time
as u64,
+ launch_time: task_info.launch_time as u64,
+ start_exec_time,
+ end_exec_time,
+ exec_duration:
end_exec_time.saturating_sub(start_exec_time),
+ finish_time: task_info.finish_time as u64,
+ input_rows,
+ output_rows,
+ task_status
Review Comment:
```suggestion
status: task_status
```
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -364,8 +431,54 @@ pub async fn get_query_stages<
.as_ref()
.map(|m| get_elapsed_compute_nanos(m.as_slice()))
.unwrap_or_default();
+ summary.tasks = running_stage
+ .task_infos
+ .iter()
+ .enumerate()
+ .map(|(partition_id, task_info)| {
+ task_info.as_ref().map(|info| {
+ let partition_metrics = running_stage
+ .stage_metrics
+ .as_deref()
+ .and_then(|m| m.get(partition_id));
+ let (input_rows, output_rows) =
partition_metrics
+ .map(|m| {
+ let input = get_combined_count(
+ std::slice::from_ref(m),
+ "input_rows",
+ );
+ let output = get_combined_count(
+ std::slice::from_ref(m),
+ "output_rows",
+ );
+ (input,output)
+ })
+ .unwrap_or((0,0));
+
+ let start_exec_time = info.start_exec_time
as u64;
+ let end_exec_time = info.end_exec_time as
u64;
+
+ let task_status: TaskStatus =
(&info.task_status).into();
+
+ TaskSummary {
+ task_id: info.task_id,
+ partition_id: partition_id as u32,
+ scheduled_time: info.scheduled_time as
u64,
+ launch_time: info.launch_time as u64,
+ start_exec_time,
+ end_exec_time,
+ exec_duration:
end_exec_time.saturating_sub(start_exec_time),
+ finish_time: info.finish_time as u64,
+ input_rows,
+ output_rows,
+ task_status
Review Comment:
```suggestion
status: task_status
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]