adriangb commented on code in PR #21767:
URL: https://github.com/apache/datafusion/pull/21767#discussion_r3320811137
##########
datafusion/sql/src/statement.rs:
##########
@@ -2022,30 +2022,41 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported");
}
+ // Resolve the requested output format.
+ //
+ // Verbose mode only supports indent format, and for EXPLAIN ANALYZE
+ // only `Indent` and `PostgresJSON` are supported today — `Tree` and
+ // `Graphviz` require additional work to render with live metrics.
+ let options = self.context_provider.options();
+ let format = if verbose {
+ ExplainFormat::Indent
+ } else if let Some(format) = format {
+ ExplainFormat::from_str(&format)?
+ } else if analyze {
+ ExplainFormat::Indent
+ } else {
+ options.explain.format.clone()
+ };
+
if analyze {
- if format.is_some() {
- return plan_err!("EXPLAIN ANALYZE with FORMAT is not
supported");
+ match format {
+ ExplainFormat::Indent | ExplainFormat::PostgresJSON => {}
+ ExplainFormat::Tree | ExplainFormat::Graphviz => {
+ return plan_err!(
+ "EXPLAIN ANALYZE with FORMAT {format} is not supported"
+ );
+ }
}
Ok(LogicalPlan::Analyze(Analyze {
verbose,
+ format,
input: plan,
Review Comment:
Fixed — changed `match format` to `match &format` so the owned value is
still available for the `Analyze { format, .. }` struct literal below.
##########
datafusion/physical-plan/src/analyze.rs:
##########
@@ -29,8 +29,10 @@ use crate::metrics::{MetricCategory, MetricType};
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
use arrow::{array::StringBuilder, datatypes::SchemaRef,
record_batch::RecordBatch};
+use datafusion_common::format::ExplainFormat;
use datafusion_common::instant::Instant;
-use datafusion_common::{DataFusionError, Result, assert_eq_or_internal_err};
+use datafusion_common::tree_node::TreeNodeRecursion;
Review Comment:
Already removed in the previous commit (rebase + build-fix). The import was
a leftover from an earlier draft.
##########
datafusion/physical-plan/src/display.rs:
##########
@@ -611,6 +710,183 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> {
}
}
+/// Formats physical plans into PostgreSQL-style JSON output with live
+/// per-operator metrics.
+///
+/// This visitor mirrors the logical-plan `PgJsonVisitor` in
+/// `datafusion-expr`: during `pre_visit` it assembles a JSON object for the
+/// current node; during `post_visit` it attaches that object into its
+/// parent's `"Plans"` array (or stores it as the root).
+struct PgJsonExecutionPlanVisitor<'a> {
+ verbose: bool,
+ show_metrics: ShowMetrics,
+ show_schema: bool,
+ metric_types: &'a [MetricType],
+ metric_categories: Option<&'a [MetricCategory]>,
+ objects: HashMap<u32, serde_json::Value>,
+ parent_ids: Vec<u32>,
+ next_id: u32,
+ root: Option<serde_json::Value>,
+}
+
+impl PgJsonExecutionPlanVisitor<'_> {
+ /// Produce the one-line `DisplayAs::Default` rendering of a node.
+ fn one_line_details(plan: &dyn ExecutionPlan) -> String {
+ struct One<'b>(&'b dyn ExecutionPlan);
+ impl fmt::Display for One<'_> {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ self.0.fmt_as(DisplayFormatType::Default, f)
+ }
+ }
+ // Some operators include internal newlines; collapse them so the
+ // rendered JSON value stays on a single line.
+ format!("{}", One(plan))
+ .replace('\n', " ")
+ .trim()
+ .to_string()
+ }
+
+ /// Render the given `MetricValue` into the most natural
`serde_json::Value`
+ /// we can produce: a number for simple counts/gauges/times, a float-ms for
+ /// `ElapsedCompute`, and a string fallback for anything else.
+ fn metric_value_to_json(value: &MetricValue) -> serde_json::Value {
+ match value {
+ MetricValue::OutputRows(c) => serde_json::Value::from(c.value()),
+ MetricValue::SpillCount(c)
+ | MetricValue::OutputBatches(c)
+ | MetricValue::SpilledRows(c) =>
serde_json::Value::from(c.value()),
+ MetricValue::SpilledBytes(c) | MetricValue::OutputBytes(c) => {
+ serde_json::Value::from(c.value())
+ }
+ MetricValue::CurrentMemoryUsage(g) =>
serde_json::Value::from(g.value()),
+ MetricValue::ElapsedCompute(t) => {
+ // Emit as float milliseconds to align with PG's
+ // `"Actual Total Time"` convention. DataFusion tracks compute
+ // time (summed across partitions), not wall time — visualizers
+ // should be read with that caveat in mind.
+ let ms = (t.value() as f64) / 1_000_000.0;
+ serde_json::Value::from(ms)
+ }
+ MetricValue::Count { count, .. } =>
serde_json::Value::from(count.value()),
+ MetricValue::Gauge { gauge, .. } =>
serde_json::Value::from(gauge.value()),
+ MetricValue::Time { time, .. } => {
+ let ms = (time.value() as f64) / 1_000_000.0;
+ serde_json::Value::from(ms)
+ }
+ // Timestamps, PruningMetrics, Ratio, Custom: fall back to Display.
+ other => serde_json::Value::String(format!("{other}")),
+ }
+ }
+
+ /// Populate `"Actual Rows"`, `"Actual Total Time"`, and `"Extras"` for
+ /// the given node from its aggregated `MetricsSet`, honoring the same
+ /// filtering pipeline used by `IndentVisitor`.
+ fn attach_metrics(&self, plan: &dyn ExecutionPlan, object: &mut
serde_json::Value) {
+ if matches!(self.show_metrics, ShowMetrics::None) {
+ return;
+ }
+ let Some(metrics) = plan.metrics() else {
+ object["Metrics"] = serde_json::json!(null);
Review Comment:
Fixed — when `plan.metrics()` returns `None` we now just `return` early
without writing any field to the JSON object, rather than emitting `"Metrics":
null`.
##########
datafusion/physical-plan/src/display.rs:
##########
@@ -611,6 +710,183 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> {
}
}
+/// Formats physical plans into PostgreSQL-style JSON output with live
+/// per-operator metrics.
+///
+/// This visitor mirrors the logical-plan `PgJsonVisitor` in
+/// `datafusion-expr`: during `pre_visit` it assembles a JSON object for the
+/// current node; during `post_visit` it attaches that object into its
+/// parent's `"Plans"` array (or stores it as the root).
+struct PgJsonExecutionPlanVisitor<'a> {
+ verbose: bool,
+ show_metrics: ShowMetrics,
+ show_schema: bool,
+ metric_types: &'a [MetricType],
+ metric_categories: Option<&'a [MetricCategory]>,
+ objects: HashMap<u32, serde_json::Value>,
+ parent_ids: Vec<u32>,
+ next_id: u32,
+ root: Option<serde_json::Value>,
+}
+
+impl PgJsonExecutionPlanVisitor<'_> {
+ /// Produce the one-line `DisplayAs::Default` rendering of a node.
+ fn one_line_details(plan: &dyn ExecutionPlan) -> String {
+ struct One<'b>(&'b dyn ExecutionPlan);
+ impl fmt::Display for One<'_> {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ self.0.fmt_as(DisplayFormatType::Default, f)
+ }
+ }
+ // Some operators include internal newlines; collapse them so the
+ // rendered JSON value stays on a single line.
+ format!("{}", One(plan))
+ .replace('\n', " ")
+ .trim()
+ .to_string()
+ }
+
+ /// Render the given `MetricValue` into the most natural
`serde_json::Value`
+ /// we can produce: a number for simple counts/gauges/times, a float-ms for
+ /// `ElapsedCompute`, and a string fallback for anything else.
+ fn metric_value_to_json(value: &MetricValue) -> serde_json::Value {
+ match value {
+ MetricValue::OutputRows(c) => serde_json::Value::from(c.value()),
+ MetricValue::SpillCount(c)
+ | MetricValue::OutputBatches(c)
+ | MetricValue::SpilledRows(c) =>
serde_json::Value::from(c.value()),
+ MetricValue::SpilledBytes(c) | MetricValue::OutputBytes(c) => {
+ serde_json::Value::from(c.value())
+ }
+ MetricValue::CurrentMemoryUsage(g) =>
serde_json::Value::from(g.value()),
+ MetricValue::ElapsedCompute(t) => {
+ // Emit as float milliseconds to align with PG's
+ // `"Actual Total Time"` convention. DataFusion tracks compute
+ // time (summed across partitions), not wall time — visualizers
+ // should be read with that caveat in mind.
+ let ms = (t.value() as f64) / 1_000_000.0;
+ serde_json::Value::from(ms)
+ }
+ MetricValue::Count { count, .. } =>
serde_json::Value::from(count.value()),
+ MetricValue::Gauge { gauge, .. } =>
serde_json::Value::from(gauge.value()),
+ MetricValue::Time { time, .. } => {
+ let ms = (time.value() as f64) / 1_000_000.0;
+ serde_json::Value::from(ms)
+ }
+ // Timestamps, PruningMetrics, Ratio, Custom: fall back to Display.
+ other => serde_json::Value::String(format!("{other}")),
+ }
+ }
+
+ /// Populate `"Actual Rows"`, `"Actual Total Time"`, and `"Extras"` for
+ /// the given node from its aggregated `MetricsSet`, honoring the same
+ /// filtering pipeline used by `IndentVisitor`.
+ fn attach_metrics(&self, plan: &dyn ExecutionPlan, object: &mut
serde_json::Value) {
+ if matches!(self.show_metrics, ShowMetrics::None) {
+ return;
+ }
+ let Some(metrics) = plan.metrics() else {
+ object["Metrics"] = serde_json::json!(null);
+ return;
+ };
+
+ let metrics = match self.show_metrics {
+ ShowMetrics::None => return,
+ ShowMetrics::Aggregated => metrics
+ .filter_by_metric_types(self.metric_types)
+ .aggregate_by_name()
+ .sorted_for_display()
+ .timestamps_removed(),
+ ShowMetrics::Full =>
metrics.filter_by_metric_types(self.metric_types),
+ };
+ let metrics = if let Some(cats) = self.metric_categories {
+ metrics.filter_by_categories(cats)
+ } else {
+ metrics
+ };
+
+ // Build the Extras bucket, while extracting PG-canonical keys to the
+ // top level.
+ let mut extras = serde_json::Map::new();
+ for metric in metrics.iter() {
+ let value = metric.value();
+ match value {
+ MetricValue::OutputRows(c) => {
+ object["Actual Rows"] = serde_json::Value::from(c.value());
+ }
+ MetricValue::ElapsedCompute(t) => {
+ let ms = (t.value() as f64) / 1_000_000.0;
+ object["Actual Total Time"] = serde_json::Value::from(ms);
+ }
+ _ => {
+ extras.insert(
+ value.name().to_string(),
+ Self::metric_value_to_json(value),
+ );
+ }
+ }
+ }
+ if !extras.is_empty() {
+ object["Extras"] = serde_json::Value::Object(extras);
+ }
+ }
+}
+
+impl ExecutionPlanVisitor for PgJsonExecutionPlanVisitor<'_> {
+ type Error = fmt::Error;
+
+ fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool,
Self::Error> {
+ let id = self.next_id;
+ self.next_id += 1;
+
+ // Build fields in reading order: Node Type, Details, (schema),
+ // (metrics), Plans last — so the JSON output reads top-down like a
+ // PostgreSQL plan.
+ let mut object = serde_json::json!({
+ "Node Type": plan.name(),
+ "Details": Self::one_line_details(plan),
+ });
Review Comment:
Fixed — promoted `serde_json` to `features = ["preserve_order"]` in the main
(non-dev) dependency so key insertion order is guaranteed in production builds,
not just tests. The dev-dep override is now redundant and was removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]