Copilot commented on code in PR #21767:
URL: https://github.com/apache/datafusion/pull/21767#discussion_r3320744675


##########
datafusion/sql/src/statement.rs:
##########
@@ -2022,30 +2022,41 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
             return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
         }
 
+        // Resolve the requested output format.
+        //
+        // Verbose mode only supports indent format, and for EXPLAIN ANALYZE
+        // only `Indent` and `PostgresJSON` are supported today — `Tree` and
+        // `Graphviz` require additional work to render with live metrics.
+        let options = self.context_provider.options();
+        let format = if verbose {
+            ExplainFormat::Indent
+        } else if let Some(format) = format {
+            ExplainFormat::from_str(&format)?
+        } else if analyze {
+            ExplainFormat::Indent
+        } else {
+            options.explain.format.clone()
+        };
+
         if analyze {
-            if format.is_some() {
-                return plan_err!("EXPLAIN ANALYZE with FORMAT is not 
supported");
+            match format {
+                ExplainFormat::Indent | ExplainFormat::PostgresJSON => {}
+                ExplainFormat::Tree | ExplainFormat::Graphviz => {
+                    return plan_err!(
+                        "EXPLAIN ANALYZE with FORMAT {format} is not supported"
+                    );
+                }
             }
             Ok(LogicalPlan::Analyze(Analyze {
                 verbose,
+                format,
                 input: plan,

Review Comment:
   `ExplainFormat` is not `Copy` (it only derives `Clone`), so `match format { 
... }` moves `format` and then `format` is used again when constructing 
`Analyze { format, ... }`. This should fail to compile with a use-after-move. 
Match on `&format` (or clone `format` for the match) so the owned value can 
still be stored in the logical `Analyze` node.



##########
datafusion/physical-plan/src/analyze.rs:
##########
@@ -29,8 +29,10 @@ use crate::metrics::{MetricCategory, MetricType};
 use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
 
 use arrow::{array::StringBuilder, datatypes::SchemaRef, 
record_batch::RecordBatch};
+use datafusion_common::format::ExplainFormat;
 use datafusion_common::instant::Instant;
-use datafusion_common::{DataFusionError, Result, assert_eq_or_internal_err};
+use datafusion_common::tree_node::TreeNodeRecursion;

Review Comment:
   `TreeNodeRecursion` is imported but never used in this module (the only 
occurrence is the `use` line). With `-D warnings` this will fail the build; 
remove the unused import.
   



##########
datafusion/physical-plan/src/display.rs:
##########
@@ -611,6 +710,183 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> {
     }
 }
 
+/// Formats physical plans into PostgreSQL-style JSON output with live
+/// per-operator metrics.
+///
+/// This visitor mirrors the logical-plan `PgJsonVisitor` in
+/// `datafusion-expr`: during `pre_visit` it assembles a JSON object for the
+/// current node; during `post_visit` it attaches that object into its
+/// parent's `"Plans"` array (or stores it as the root).
+struct PgJsonExecutionPlanVisitor<'a> {
+    verbose: bool,
+    show_metrics: ShowMetrics,
+    show_schema: bool,
+    metric_types: &'a [MetricType],
+    metric_categories: Option<&'a [MetricCategory]>,
+    objects: HashMap<u32, serde_json::Value>,
+    parent_ids: Vec<u32>,
+    next_id: u32,
+    root: Option<serde_json::Value>,
+}
+
+impl PgJsonExecutionPlanVisitor<'_> {
+    /// Produce the one-line `DisplayAs::Default` rendering of a node.
+    fn one_line_details(plan: &dyn ExecutionPlan) -> String {
+        struct One<'b>(&'b dyn ExecutionPlan);
+        impl fmt::Display for One<'_> {
+            fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+                self.0.fmt_as(DisplayFormatType::Default, f)
+            }
+        }
+        // Some operators include internal newlines; collapse them so the
+        // rendered JSON value stays on a single line.
+        format!("{}", One(plan))
+            .replace('\n', " ")
+            .trim()
+            .to_string()
+    }
+
+    /// Render the given `MetricValue` into the most natural 
`serde_json::Value`
+    /// we can produce: a number for simple counts/gauges/times, a float-ms for
+    /// `ElapsedCompute`, and a string fallback for anything else.
+    fn metric_value_to_json(value: &MetricValue) -> serde_json::Value {
+        match value {
+            MetricValue::OutputRows(c) => serde_json::Value::from(c.value()),
+            MetricValue::SpillCount(c)
+            | MetricValue::OutputBatches(c)
+            | MetricValue::SpilledRows(c) => 
serde_json::Value::from(c.value()),
+            MetricValue::SpilledBytes(c) | MetricValue::OutputBytes(c) => {
+                serde_json::Value::from(c.value())
+            }
+            MetricValue::CurrentMemoryUsage(g) => 
serde_json::Value::from(g.value()),
+            MetricValue::ElapsedCompute(t) => {
+                // Emit as float milliseconds to align with PG's
+                // `"Actual Total Time"` convention. DataFusion tracks compute
+                // time (summed across partitions), not wall time — visualizers
+                // should be read with that caveat in mind.
+                let ms = (t.value() as f64) / 1_000_000.0;
+                serde_json::Value::from(ms)
+            }
+            MetricValue::Count { count, .. } => 
serde_json::Value::from(count.value()),
+            MetricValue::Gauge { gauge, .. } => 
serde_json::Value::from(gauge.value()),
+            MetricValue::Time { time, .. } => {
+                let ms = (time.value() as f64) / 1_000_000.0;
+                serde_json::Value::from(ms)
+            }
+            // Timestamps, PruningMetrics, Ratio, Custom: fall back to Display.
+            other => serde_json::Value::String(format!("{other}")),
+        }
+    }
+
+    /// Populate `"Actual Rows"`, `"Actual Total Time"`, and `"Extras"` for
+    /// the given node from its aggregated `MetricsSet`, honoring the same
+    /// filtering pipeline used by `IndentVisitor`.
+    fn attach_metrics(&self, plan: &dyn ExecutionPlan, object: &mut 
serde_json::Value) {
+        if matches!(self.show_metrics, ShowMetrics::None) {
+            return;
+        }
+        let Some(metrics) = plan.metrics() else {
+            object["Metrics"] = serde_json::json!(null);

Review Comment:
   When `show_metrics` is enabled but `plan.metrics()` returns `None`, this 
adds a non-PostgreSQL key (`"Metrics": null`) to the pgjson output. This field 
is not mentioned in the pgjson docs above and differs from how indent/graphviz 
handle missing metrics (they render an empty metrics list). Consider omitting 
metrics fields entirely in this case (or using the existing pgjson keys, e.g. 
no `Actual*`/`Extras`) to keep the output closer to the expected pgjson schema.
   



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -3423,13 +3424,17 @@ impl PartialOrd for Explain {
 pub struct Analyze {
     /// Should extra detail be included?
     pub verbose: bool,
+    /// Output syntax/format for the rendered physical plan + metrics.
+    pub format: ExplainFormat,
     /// The logical plan that is being EXPLAIN ANALYZE'd
     pub input: Arc<LogicalPlan>,
     /// The output schema of the explain (2 columns of text)
     pub schema: DFSchemaRef,
 }

Review Comment:
   This PR adds `Analyze::format`, but DataFusion's protobuf roundtrip 
currently only serializes/deserializes `AnalyzeNode { verbose, input }` (see 
`datafusion/proto/src/logical_plan/mod.rs`), so the format will be lost when 
encoding/decoding logical plans. Similarly, physical plan protobuf encoding for 
`AnalyzeExec` does not include the format and will always decode to the default 
indent format. Please update the proto models + (de)serialization to carry the 
new format so pgjson EXPLAIN ANALYZE can roundtrip correctly.



##########
datafusion/physical-plan/src/display.rs:
##########
@@ -611,6 +710,183 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> {
     }
 }
 
+/// Formats physical plans into PostgreSQL-style JSON output with live
+/// per-operator metrics.
+///
+/// This visitor mirrors the logical-plan `PgJsonVisitor` in
+/// `datafusion-expr`: during `pre_visit` it assembles a JSON object for the
+/// current node; during `post_visit` it attaches that object into its
+/// parent's `"Plans"` array (or stores it as the root).
+struct PgJsonExecutionPlanVisitor<'a> {
+    verbose: bool,
+    show_metrics: ShowMetrics,
+    show_schema: bool,
+    metric_types: &'a [MetricType],
+    metric_categories: Option<&'a [MetricCategory]>,
+    objects: HashMap<u32, serde_json::Value>,
+    parent_ids: Vec<u32>,
+    next_id: u32,
+    root: Option<serde_json::Value>,
+}
+
+impl PgJsonExecutionPlanVisitor<'_> {
+    /// Produce the one-line `DisplayAs::Default` rendering of a node.
+    fn one_line_details(plan: &dyn ExecutionPlan) -> String {
+        struct One<'b>(&'b dyn ExecutionPlan);
+        impl fmt::Display for One<'_> {
+            fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+                self.0.fmt_as(DisplayFormatType::Default, f)
+            }
+        }
+        // Some operators include internal newlines; collapse them so the
+        // rendered JSON value stays on a single line.
+        format!("{}", One(plan))
+            .replace('\n', " ")
+            .trim()
+            .to_string()
+    }
+
+    /// Render the given `MetricValue` into the most natural 
`serde_json::Value`
+    /// we can produce: a number for simple counts/gauges/times, a float-ms for
+    /// `ElapsedCompute`, and a string fallback for anything else.
+    fn metric_value_to_json(value: &MetricValue) -> serde_json::Value {
+        match value {
+            MetricValue::OutputRows(c) => serde_json::Value::from(c.value()),
+            MetricValue::SpillCount(c)
+            | MetricValue::OutputBatches(c)
+            | MetricValue::SpilledRows(c) => 
serde_json::Value::from(c.value()),
+            MetricValue::SpilledBytes(c) | MetricValue::OutputBytes(c) => {
+                serde_json::Value::from(c.value())
+            }
+            MetricValue::CurrentMemoryUsage(g) => 
serde_json::Value::from(g.value()),
+            MetricValue::ElapsedCompute(t) => {
+                // Emit as float milliseconds to align with PG's
+                // `"Actual Total Time"` convention. DataFusion tracks compute
+                // time (summed across partitions), not wall time — visualizers
+                // should be read with that caveat in mind.
+                let ms = (t.value() as f64) / 1_000_000.0;
+                serde_json::Value::from(ms)
+            }
+            MetricValue::Count { count, .. } => 
serde_json::Value::from(count.value()),
+            MetricValue::Gauge { gauge, .. } => 
serde_json::Value::from(gauge.value()),
+            MetricValue::Time { time, .. } => {
+                let ms = (time.value() as f64) / 1_000_000.0;
+                serde_json::Value::from(ms)
+            }
+            // Timestamps, PruningMetrics, Ratio, Custom: fall back to Display.
+            other => serde_json::Value::String(format!("{other}")),
+        }
+    }
+
+    /// Populate `"Actual Rows"`, `"Actual Total Time"`, and `"Extras"` for
+    /// the given node from its aggregated `MetricsSet`, honoring the same
+    /// filtering pipeline used by `IndentVisitor`.
+    fn attach_metrics(&self, plan: &dyn ExecutionPlan, object: &mut 
serde_json::Value) {
+        if matches!(self.show_metrics, ShowMetrics::None) {
+            return;
+        }
+        let Some(metrics) = plan.metrics() else {
+            object["Metrics"] = serde_json::json!(null);
+            return;
+        };
+
+        let metrics = match self.show_metrics {
+            ShowMetrics::None => return,
+            ShowMetrics::Aggregated => metrics
+                .filter_by_metric_types(self.metric_types)
+                .aggregate_by_name()
+                .sorted_for_display()
+                .timestamps_removed(),
+            ShowMetrics::Full => 
metrics.filter_by_metric_types(self.metric_types),
+        };
+        let metrics = if let Some(cats) = self.metric_categories {
+            metrics.filter_by_categories(cats)
+        } else {
+            metrics
+        };
+
+        // Build the Extras bucket, while extracting PG-canonical keys to the
+        // top level.
+        let mut extras = serde_json::Map::new();
+        for metric in metrics.iter() {
+            let value = metric.value();
+            match value {
+                MetricValue::OutputRows(c) => {
+                    object["Actual Rows"] = serde_json::Value::from(c.value());
+                }
+                MetricValue::ElapsedCompute(t) => {
+                    let ms = (t.value() as f64) / 1_000_000.0;
+                    object["Actual Total Time"] = serde_json::Value::from(ms);
+                }
+                _ => {
+                    extras.insert(
+                        value.name().to_string(),
+                        Self::metric_value_to_json(value),
+                    );
+                }
+            }
+        }
+        if !extras.is_empty() {
+            object["Extras"] = serde_json::Value::Object(extras);
+        }
+    }
+}
+
+impl ExecutionPlanVisitor for PgJsonExecutionPlanVisitor<'_> {
+    type Error = fmt::Error;
+
+    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, 
Self::Error> {
+        let id = self.next_id;
+        self.next_id += 1;
+
+        // Build fields in reading order: Node Type, Details, (schema),
+        // (metrics), Plans last — so the JSON output reads top-down like a
+        // PostgreSQL plan.
+        let mut object = serde_json::json!({
+            "Node Type": plan.name(),
+            "Details": Self::one_line_details(plan),
+        });

Review Comment:
   The comment about building JSON fields in "reading order" is misleading in 
non-test builds: without `serde_json`'s `preserve_order` feature, 
`serde_json::Map` is backed by a sorted map, so key insertion order is not 
preserved. Either enable `preserve_order` for the main `serde_json` dependency 
if order is intended to be stable/readable, or adjust the comment to avoid 
implying ordering guarantees.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to