martin-g commented on code in PR #1543:
URL:
https://github.com/apache/datafusion-ballista/pull/1543#discussion_r3036804713
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -91,13 +92,50 @@ struct CancelJobResponse {
pub reason: Option<String>,
}
+#[derive(Debug, serde::Serialize)]
+pub struct TaskSummary {
+ /// task id
+ pub task_id: u32,
+ /// partition id
+ pub partition_id: u32,
+ /// Scheduler schedule time
+ pub scheduled_time: u64,
+ /// Scheduler launch time
+ pub launch_time: u64,
+ /// The time the Executor start to run the task
+ pub start_exec_time: u64,
+ /// The time the Executor finish the task
+ pub end_exec_time: u64,
+ /// total execution time
+ pub exec_duration: u64,
+ /// Scheduler side finish time
+ pub finish_time: u64,
+ /// Number of input rows
+ pub input_rows: usize,
+ /// Number of output rows
+ pub output_rows: usize,
+}
+
+#[derive(Debug, serde::Serialize)]
+pub struct DurationStats {
+ pub min: u64,
+ pub p25: u64,
+ pub median: u64,
+ pub p75: u64,
+ pub max: u64,
+}
+
#[derive(Debug, serde::Serialize)]
pub struct QueryStageSummary {
pub stage_id: String,
pub stage_status: String,
pub input_rows: usize,
pub output_rows: usize,
pub elapsed_compute: String,
+ pub stage_plan: Option<String>,
Review Comment:
```suggestion
#[serde(skip_serializing_if = "Option::is_none")]
pub stage_plan: Option<String>,
```
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -376,9 +463,50 @@ pub async fn get_query_stages<
);
summary.elapsed_compute =
get_elapsed_compute_nanos(&completed_stage.stage_metrics);
+
+ summary.tasks = completed_stage
+ .task_infos
+ .iter()
+ .enumerate()
+ .map(|(partition_id, task_info)| {
+ let partition_metrics =
+
completed_stage.stage_metrics.get(partition_id);
+ let input_rows = partition_metrics
+ .map(|m| {
+ get_combined_count(
+ std::slice::from_ref(m),
+ "input_rows",
+ )
+ })
+ .unwrap_or(0);
+ let output_rows = partition_metrics
+ .map(|m| {
+ get_combined_count(
+ std::slice::from_ref(m),
+ "output_rows",
+ )
+ })
+ .unwrap_or(0);
+ let start_exec_time =
task_info.start_exec_time as u64;
+ let end_exec_time = task_info.end_exec_time as
u64;
+ Some(TaskSummary {
+ task_id: task_info.task_id as u32,
Review Comment:
Why casting to u32 ? `TaskSummary::task_id` could be `usize` too
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -376,9 +463,50 @@ pub async fn get_query_stages<
);
summary.elapsed_compute =
get_elapsed_compute_nanos(&completed_stage.stage_metrics);
+
+ summary.tasks = completed_stage
+ .task_infos
+ .iter()
+ .enumerate()
+ .map(|(partition_id, task_info)| {
+ let partition_metrics =
+
completed_stage.stage_metrics.get(partition_id);
+ let input_rows = partition_metrics
+ .map(|m| {
+ get_combined_count(
+ std::slice::from_ref(m),
+ "input_rows",
+ )
+ })
+ .unwrap_or(0);
+ let output_rows = partition_metrics
Review Comment:
Same here.
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -91,13 +92,50 @@ struct CancelJobResponse {
pub reason: Option<String>,
}
+#[derive(Debug, serde::Serialize)]
+pub struct TaskSummary {
Review Comment:
Let's add a `status` field - with values `Running` and `Completed`
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -346,9 +384,13 @@ pub async fn get_query_stages<
input_rows: 0,
output_rows: 0,
elapsed_compute: "".to_string(),
+ tasks: vec![],
+ task_duration_stats: None,
+ stage_plan: None
Review Comment:
```suggestion
stage_plan: None,
```
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -91,13 +92,50 @@ struct CancelJobResponse {
pub reason: Option<String>,
}
+#[derive(Debug, serde::Serialize)]
+pub struct TaskSummary {
+ /// task id
+ pub task_id: u32,
+ /// partition id
+ pub partition_id: u32,
+ /// Scheduler schedule time
+ pub scheduled_time: u64,
Review Comment:
Let's add the time units for all time related fields.
I guess it is millis since epoch but it would be nice to be documented.
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -364,8 +406,53 @@ pub async fn get_query_stages<
.as_ref()
.map(|m| get_elapsed_compute_nanos(m.as_slice()))
.unwrap_or_default();
+ summary.tasks = running_stage
+ .task_infos
+ .iter()
+ .enumerate()
+ .map(|(partition_id, task_info)| {
+ task_info.as_ref().map(|info| {
+ let partition_metrics = running_stage
+ .stage_metrics
+ .as_deref()
+ .and_then(|m| m.get(partition_id));
+ let input_rows = partition_metrics
+ .map(|m| {
+ get_combined_count(
+ std::slice::from_ref(m),
+ "input_rows",
+ )
+ })
+ .unwrap_or(0);
+ let output_rows = partition_metrics
Review Comment:
could we merge the extractions for `input_rows` and `output_rows` ?
currently it iterates `partition_metrics` twice.
it would be more optimal to return a tuple with both.
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -91,13 +92,50 @@ struct CancelJobResponse {
pub reason: Option<String>,
}
+#[derive(Debug, serde::Serialize)]
+pub struct TaskSummary {
+ /// task id
+ pub task_id: u32,
+ /// partition id
+ pub partition_id: u32,
+ /// Scheduler schedule time
+ pub scheduled_time: u64,
+ /// Scheduler launch time
+ pub launch_time: u64,
+ /// The time the Executor start to run the task
+ pub start_exec_time: u64,
+ /// The time the Executor finish the task
+ pub end_exec_time: u64,
+ /// total execution time
+ pub exec_duration: u64,
Review Comment:
`exec_duration` could be calculated from `end_exec_time - start_exec_time`.
No need to be a field.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]