mingmwang commented on code in PR #124:
URL: https://github.com/apache/arrow-ballista/pull/124#discussion_r945396976


##########
ballista/rust/scheduler/src/state/execution_graph.rs:
##########
@@ -513,6 +574,40 @@ impl ExecutionGraph {
                     } else if let 
task_status::Status::Completed(completed_task) =
                         task_status
                     {
+                        // update task metrics for completed task
+                        stage.update_task_metrics(partition, 
operator_metrics)?;
+
+                        // if this stage is completed, we want to combine the 
stage metrics to plan's metric set and print out the plan
+                        if stage_complete && 
stage.stage_metrics.as_ref().is_some() {
+                            let mut plan_metrics = 
collect_plan_metrics(&stage_plan);
+                            let stage_metrics = stage
+                                .stage_metrics
+                                .as_ref()
+                                .expect("stage metrics should not be None.");
+                            if plan_metrics.len() != stage_metrics.len() {
+                                return 
Err(BallistaError::Internal(format!("Error combine stage metrics to plan for 
stage {},  plan metrics array size {} does not equal \
+                to the stage metrics array size {}", stage_id, 
plan_metrics.len(), stage_metrics.len())));
+                            }
+                            
plan_metrics.iter_mut().zip(stage_metrics).for_each(
+                                |(plan_metric, stage_metric)| {
+                                    stage_metric
+                                        .iter()
+                                        .for_each(|s| 
plan_metric.push(s.clone()));
+                                },
+                            );
+
+                            // TODO the plan_metrics update above is a 
snapshot clone from the plan metrics.
+                            // TODO Need to modify DataFusion to return 
metricset reference, not clone.
+
+                            println!(
+                                "=== [{}/{}/{}] Stage finished, physical plan 
with metrics ===\n{}\n",
+                                job_id,
+                                stage_id,
+                                partition,
+                                
DisplayableExecutionPlan::with_full_metrics(stage_plan.as_ref()).indent()
+                            );

Review Comment:
   I will change it to use log! here. But I realize that the println! is used 
in many other places like ExecutorMetricsCollector, 
execution_graph.resolve_shuffles() etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to