thinkharderdev commented on code in PR #124:
URL: https://github.com/apache/arrow-ballista/pull/124#discussion_r943232086
##########
ballista/rust/scheduler/src/state/execution_graph.rs:
##########
@@ -153,6 +157,7 @@ impl ExecutionStage {
task_statuses: vec![None; num_tasks],
output_link,
resolved,
+ stage_metrics: None,
Review Comment:
I'm not sure about storing the raw metrics in the `ExecutionGraph`. It could
potentially be a lot of data (eg if you had a `ParquetExec` across many files
then you can have a potentially very big set of metrics) and we have to
read/write the graph to the state store a lot.
I think it might be worth introducing a `MetricStore` trait where we can
make the storage (or non-storage as the case may be) of metrics extensible.
##########
ballista/rust/scheduler/src/state/execution_graph.rs:
##########
@@ -513,6 +574,40 @@ impl ExecutionGraph {
} else if let
task_status::Status::Completed(completed_task) =
task_status
{
+ // update task metrics for completed task
+ stage.update_task_metrics(partition,
operator_metrics)?;
+
+ // if this stage is completed, we want to combine the
stage metrics to plan's metric set and print out the plan
+ if stage_complete &&
stage.stage_metrics.as_ref().is_some() {
+ let mut plan_metrics =
collect_plan_metrics(&stage_plan);
+ let stage_metrics = stage
+ .stage_metrics
+ .as_ref()
+ .expect("stage metrics should not be None.");
+ if plan_metrics.len() != stage_metrics.len() {
+ return
Err(BallistaError::Internal(format!("Error combine stage metrics to plan for
stage {}, plan metrics array size {} does not equal \
+ to the stage metrics array size {}", stage_id,
plan_metrics.len(), stage_metrics.len())));
+ }
+
plan_metrics.iter_mut().zip(stage_metrics).for_each(
+ |(plan_metric, stage_metric)| {
+ stage_metric
+ .iter()
+ .for_each(|s|
plan_metric.push(s.clone()));
+ },
+ );
+
+ // TODO the plan_metrics update above is a
snapshot clone from the plan metrics.
+ // TODO Need to modify DataFusion to return
metricset reference, not clone.
+
+ println!(
+ "=== [{}/{}/{}] Stage finished, physical plan
with metrics ===\n{}\n",
+ job_id,
+ stage_id,
+ partition,
+
DisplayableExecutionPlan::with_full_metrics(stage_plan.as_ref()).indent()
+ );
Review Comment:
I'm not sure we should be printing to stdout here. If we want to print
something it should be controlled by the log level (i.e. people can turn it off
if it's too noisy)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]