martin-g commented on code in PR #1778:
URL:
https://github.com/apache/datafusion-ballista/pull/1778#discussion_r3309708458
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -523,11 +523,16 @@ pub async fn get_query_stages<
};
match stage {
ExecutionStage::Running(running_stage) => {
- summary.stage_plan = if render_tree {
-
Some(displayable(running_stage.plan.as_ref()).tree_render().to_string())
- } else {
-
Some(displayable(running_stage.plan.as_ref()).indent(false).to_string())
- };
+ summary.stage_plan =
running_stage.stage_metrics.as_deref().map(|m| {
+
format_stage_plan_with_metrics(running_stage.plan.as_ref(), m, render_tree)
+ }).or_else(|| {
+ // no metrics yet
+ Some(if render_tree {
+
displayable(running_stage.plan.as_ref()).tree_render().to_string()
+ } else {
+
displayable(running_stage.plan.as_ref()).indent(false).to_string()
+ })
+ });
Review Comment:
I think this could be simplified to:
```rust
summary.stage_plan =
Some(format_stage_plan_with_metrics(
running_stage.plan.as_ref(),
running_stage.stage_metrics.as_deref().unwrap_or(&[]),
render_tree,
));
```
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -682,6 +687,85 @@ fn task_duration_percentiles(tasks:
&[Option<TaskSummary>]) -> Option<Percentile
})
}
+fn format_stage_plan_with_metrics(
+ plan: &dyn ExecutionPlan,
+ stage_metrics: &[MetricsSet],
+ render_tree: bool,
+) -> String {
+ // If it is empty - fall back to render_tree
+ if stage_metrics.is_empty() {
+ return if render_tree {
+ displayable(plan).tree_render().to_string()
+ } else {
+ displayable(plan).indent(false).to_string()
+ };
+ }
+
+ let mut metric_idx = 0;
+ let result = format_node(plan, stage_metrics, &mut metric_idx, 0);
+
+ debug_assert_eq!(
+ metric_idx,
+ stage_metrics.len(),
+ "metric count mismatch: consumed {} but stage_metrics has {}",
+ metric_idx,
+ stage_metrics.len()
+ );
+
+ result
+}
+
+/// Formatting the node in DFS fashion
+/// It is constructed in the same way it is collected (using in-order
traversal)
Review Comment:
```suggestion
/// It is constructed in the same way it is collected (using pre-order
traversal)
```
`root then children` is pre-order, no ?
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -682,6 +687,85 @@ fn task_duration_percentiles(tasks:
&[Option<TaskSummary>]) -> Option<Percentile
})
}
+fn format_stage_plan_with_metrics(
+ plan: &dyn ExecutionPlan,
+ stage_metrics: &[MetricsSet],
+ render_tree: bool,
+) -> String {
+ // If it is empty - fall back to render_tree
+ if stage_metrics.is_empty() {
+ return if render_tree {
+ displayable(plan).tree_render().to_string()
+ } else {
+ displayable(plan).indent(false).to_string()
+ };
+ }
+
+ let mut metric_idx = 0;
+ let result = format_node(plan, stage_metrics, &mut metric_idx, 0);
+
+ debug_assert_eq!(
+ metric_idx,
+ stage_metrics.len(),
+ "metric count mismatch: consumed {} but stage_metrics has {}",
+ metric_idx,
+ stage_metrics.len()
+ );
+
+ result
+}
+
+/// Formatting the node in DFS fashion
+/// It is constructed in the same way it is collected (using in-order
traversal)
+///
+/// For reference how the metrics are collected on the executor's side - see
ballista-core/utils.rs
+fn format_node(
+ plan: &dyn ExecutionPlan,
+ stage_metrics: &[MetricsSet],
+ metric_idx: &mut usize,
+ indent: usize,
+) -> String {
+ let metrics_set = if plan.metrics().is_some() {
+ let m = stage_metrics.get(*metric_idx);
+ *metric_idx += 1;
Review Comment:
Shouldn't this be incremented even when `plan.metrics().is_none()` ?
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -682,6 +687,85 @@ fn task_duration_percentiles(tasks:
&[Option<TaskSummary>]) -> Option<Percentile
})
}
+fn format_stage_plan_with_metrics(
+ plan: &dyn ExecutionPlan,
+ stage_metrics: &[MetricsSet],
+ render_tree: bool,
+) -> String {
+ // If it is empty - fall back to render_tree
+ if stage_metrics.is_empty() {
+ return if render_tree {
+ displayable(plan).tree_render().to_string()
+ } else {
+ displayable(plan).indent(false).to_string()
+ };
+ }
+
+ let mut metric_idx = 0;
+ let result = format_node(plan, stage_metrics, &mut metric_idx, 0);
+
+ debug_assert_eq!(
+ metric_idx,
+ stage_metrics.len(),
+ "metric count mismatch: consumed {} but stage_metrics has {}",
+ metric_idx,
+ stage_metrics.len()
+ );
+
+ result
+}
+
+/// Formatting the node in DFS fashion
+/// It is constructed in the same way it is collected (using in-order
traversal)
+///
+/// For reference how the metrics are collected on the executor's side - see
ballista-core/utils.rs
+fn format_node(
+ plan: &dyn ExecutionPlan,
+ stage_metrics: &[MetricsSet],
+ metric_idx: &mut usize,
+ indent: usize,
+) -> String {
+ let metrics_set = if plan.metrics().is_some() {
+ let m = stage_metrics.get(*metric_idx);
+ *metric_idx += 1;
+ m
+ } else {
+ None
+ };
+
+ let metric_str = metrics_set
+ .map(|m| {
+ let aggregated = m.aggregate_by_name();
Review Comment:
```suggestion
let aggregated =
m.aggregate_by_name().sorted_for_display().timestamps_removed();
```
as at
https://github.com/apache/datafusion-ballista/blob/7a96c9480632c865c6e387a93dcba8457dca3bc2/ballista/scheduler/src/display.rs#L136-L139
##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -682,6 +687,85 @@ fn task_duration_percentiles(tasks:
&[Option<TaskSummary>]) -> Option<Percentile
})
}
+fn format_stage_plan_with_metrics(
+ plan: &dyn ExecutionPlan,
+ stage_metrics: &[MetricsSet],
+ render_tree: bool,
+) -> String {
+ // If it is empty - fall back to render_tree
+ if stage_metrics.is_empty() {
+ return if render_tree {
+ displayable(plan).tree_render().to_string()
+ } else {
+ displayable(plan).indent(false).to_string()
+ };
+ }
+
+ let mut metric_idx = 0;
+ let result = format_node(plan, stage_metrics, &mut metric_idx, 0);
+
+ debug_assert_eq!(
+ metric_idx,
+ stage_metrics.len(),
+ "metric count mismatch: consumed {} but stage_metrics has {}",
+ metric_idx,
+ stage_metrics.len()
+ );
+
+ result
+}
+
+/// Formatting the node in DFS fashion
+/// It is constructed in the same way it is collected (using in-order
traversal)
+///
+/// For reference how the metrics are collected on the executor's side - see
ballista-core/utils.rs
+fn format_node(
+ plan: &dyn ExecutionPlan,
+ stage_metrics: &[MetricsSet],
+ metric_idx: &mut usize,
+ indent: usize,
+) -> String {
+ let metrics_set = if plan.metrics().is_some() {
+ let m = stage_metrics.get(*metric_idx);
+ *metric_idx += 1;
+ m
+ } else {
+ None
+ };
+
+ let metric_str = metrics_set
+ .map(|m| {
+ let aggregated = m.aggregate_by_name();
+ format!(", metrics=[{}]", aggregated)
+ })
+ .unwrap_or_else(|| ", metrics=[]".to_string());
+
+ // Single-node display without children
+ let node_line = {
+ struct SingleNode<'a>(&'a dyn ExecutionPlan);
+ impl std::fmt::Display for SingleNode<'_> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result
{
+ self.0.fmt_as(DisplayFormatType::Default, f)
+ }
+ }
+ SingleNode(plan).to_string()
+ };
+
+ let prefix = " ".repeat(indent);
+ let mut result = format!("{}{}{}\n", prefix, node_line, metric_str);
Review Comment:
This could be optimized by passing a mutable String at the render start call
and using `write!()` instead of `format!()`. This will avoid many String
allocations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]