This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d023299fa feat: Add display_pg_json for LogicalPlan (#9789)
2d023299fa is described below

commit 2d023299fa2544350cb18b45181cc8aa729eda3f
Author: Renjie Liu <[email protected]>
AuthorDate: Fri Mar 29 21:38:43 2024 +0800

    feat: Add display_pg_json for LogicalPlan (#9789)
    
    * feat: Add display_pg_json for LogicalPlan
    
    * Fix lints
    
    * Fix comments
    
    * Fix format
---
 datafusion-cli/Cargo.lock                   |   1 +
 datafusion/expr/Cargo.toml                  |   1 +
 datafusion/expr/src/logical_plan/display.rs | 494 +++++++++++++++++++++++++++-
 datafusion/expr/src/logical_plan/plan.rs    |  82 +++++
 4 files changed, 577 insertions(+), 1 deletion(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 0277d23f4d..2bbe89f24b 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1249,6 +1249,7 @@ dependencies = [
  "chrono",
  "datafusion-common",
  "paste",
+ "serde_json",
  "sqlparser",
  "strum 0.26.2",
  "strum_macros 0.26.2",
diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml
index 621a320230..6f6147d368 100644
--- a/datafusion/expr/Cargo.toml
+++ b/datafusion/expr/Cargo.toml
@@ -43,6 +43,7 @@ arrow-array = { workspace = true }
 chrono = { workspace = true }
 datafusion-common = { workspace = true, default-features = true }
 paste = "^1.0"
+serde_json = { workspace = true }
 sqlparser = { workspace = true }
 strum = { version = "0.26.1", features = ["derive"] }
 strum_macros = "0.26.0"
diff --git a/datafusion/expr/src/logical_plan/display.rs 
b/datafusion/expr/src/logical_plan/display.rs
index e0cb44626e..edc3afd55d 100644
--- a/datafusion/expr/src/logical_plan/display.rs
+++ b/datafusion/expr/src/logical_plan/display.rs
@@ -16,14 +16,22 @@
 // under the License.
 //! This module provides logic for displaying LogicalPlans in various styles
 
+use std::collections::HashMap;
 use std::fmt;
 
-use crate::LogicalPlan;
+use crate::{
+    expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, 
DmlStatement, Expr,
+    Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, 
RecursiveQuery,
+    Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, 
TableScan,
+    Unnest, Values, Window,
+};
 
+use crate::dml::CopyTo;
 use arrow::datatypes::Schema;
 use datafusion_common::display::GraphvizBuilder;
 use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
 use datafusion_common::DataFusionError;
+use serde_json::json;
 
 /// Formats plans with a single line per node. For example:
 ///
@@ -221,6 +229,490 @@ impl<'a, 'b> TreeNodeVisitor for GraphvizVisitor<'a, 'b> {
     }
 }
 
+/// Formats plans to display as postgresql plan json format.
+///
+/// There are already many existing visualizer for this format, for example 
[dalibo](https://explain.dalibo.com/).
+/// Unfortunately, there is no formal spec for this format, but it is widely 
used in the PostgreSQL community.
+///
+/// Here is an example of the format:
+///
+/// ```json
+/// [
+///     {
+///         "Plan": {
+///             "Node Type": "Sort",
+///             "Output": [
+///                 "question_1.id",
+///                 "question_1.title",
+///                 "question_1.text",
+///                 "question_1.file",
+///                 "question_1.type",
+///                 "question_1.source",
+///                 "question_1.exam_id"
+///             ],
+///             "Sort Key": [
+///                 "question_1.id"
+///             ],
+///             "Plans": [
+///                 {
+///                     "Node Type": "Seq Scan",
+///                     "Parent Relationship": "Left",
+///                     "Relation Name": "question",
+///                     "Schema": "public",
+///                     "Alias": "question_1",
+///                     "Output": [
+///                        "question_1.id",
+///                         "question_1.title",
+///                        "question_1.text",
+///                         "question_1.file",
+///                         "question_1.type",
+///                         "question_1.source",
+///                         "question_1.exam_id"
+///                     ],
+///                     "Filter": "(question_1.exam_id = 1)"
+///                 }
+///             ]
+///         }
+///     }
+/// ]
+/// ```
+pub struct PgJsonVisitor<'a, 'b> {
+    f: &'a mut fmt::Formatter<'b>,
+
+    /// A mapping from plan node id to the plan node json representation.
+    objects: HashMap<u32, serde_json::Value>,
+
+    next_id: u32,
+
+    /// If true, includes summarized schema information
+    with_schema: bool,
+
+    /// Holds the ids (as generated from `graphviz_builder` of all
+    /// parent nodes
+    parent_ids: Vec<u32>,
+}
+
+impl<'a, 'b> PgJsonVisitor<'a, 'b> {
+    pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
+        Self {
+            f,
+            objects: HashMap::new(),
+            next_id: 0,
+            with_schema: false,
+            parent_ids: Vec::new(),
+        }
+    }
+
+    /// Sets a flag which controls if the output schema is displayed
+    pub fn with_schema(&mut self, with_schema: bool) {
+        self.with_schema = with_schema;
+    }
+
+    /// Converts a logical plan node to a json object.
+    fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
+        match node {
+            LogicalPlan::EmptyRelation(_) => {
+                json!({
+                    "Node Type": "EmptyRelation",
+                })
+            }
+            LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => 
{
+                json!({
+                    "Node Type": "RecursiveQuery",
+                    "Is Distinct": is_distinct,
+                })
+            }
+            LogicalPlan::Values(Values { ref values, .. }) => {
+                let str_values = values
+                    .iter()
+                    // limit to only 5 values to avoid horrible display
+                    .take(5)
+                    .map(|row| {
+                        let item = row
+                            .iter()
+                            .map(|expr| expr.to_string())
+                            .collect::<Vec<_>>()
+                            .join(", ");
+                        format!("({item})")
+                    })
+                    .collect::<Vec<_>>()
+                    .join(", ");
+
+                let elipse = if values.len() > 5 { "..." } else { "" };
+
+                let values_str = format!("{}{}", str_values, elipse);
+                json!({
+                    "Node Type": "Values",
+                    "Values": values_str
+                })
+            }
+            LogicalPlan::TableScan(TableScan {
+                ref source,
+                ref table_name,
+                ref filters,
+                ref fetch,
+                ..
+            }) => {
+                let mut object = json!({
+                    "Node Type": "TableScan",
+                    "Relation Name": table_name.table(),
+                });
+
+                if let Some(s) = table_name.schema() {
+                    object["Schema"] = 
serde_json::Value::String(s.to_string());
+                }
+
+                if let Some(c) = table_name.catalog() {
+                    object["Catalog"] = 
serde_json::Value::String(c.to_string());
+                }
+
+                if !filters.is_empty() {
+                    let mut full_filter = vec![];
+                    let mut partial_filter = vec![];
+                    let mut unsupported_filters = vec![];
+                    let filters: Vec<&Expr> = filters.iter().collect();
+
+                    if let Ok(results) = 
source.supports_filters_pushdown(&filters) {
+                        filters.iter().zip(results.iter()).for_each(
+                            |(x, res)| match res {
+                                TableProviderFilterPushDown::Exact => 
full_filter.push(x),
+                                TableProviderFilterPushDown::Inexact => {
+                                    partial_filter.push(x)
+                                }
+                                TableProviderFilterPushDown::Unsupported => {
+                                    unsupported_filters.push(x)
+                                }
+                            },
+                        );
+                    }
+
+                    if !full_filter.is_empty() {
+                        object["Full Filters"] = serde_json::Value::String(
+                            expr_vec_fmt!(full_filter).to_string(),
+                        );
+                    };
+                    if !partial_filter.is_empty() {
+                        object["Partial Filters"] = serde_json::Value::String(
+                            expr_vec_fmt!(partial_filter).to_string(),
+                        );
+                    }
+                    if !unsupported_filters.is_empty() {
+                        object["Unsupported Filters"] = 
serde_json::Value::String(
+                            expr_vec_fmt!(unsupported_filters).to_string(),
+                        );
+                    }
+                }
+
+                if let Some(f) = fetch {
+                    object["Fetch"] = serde_json::Value::Number((*f).into());
+                }
+
+                object
+            }
+            LogicalPlan::Projection(Projection { ref expr, .. }) => {
+                json!({
+                    "Node Type": "Projection",
+                    "Expressions": expr.iter().map(|e| 
e.to_string()).collect::<Vec<_>>()
+                })
+            }
+            LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
+                json!({
+                    "Node Type": "Projection",
+                    "Operation": op.name(),
+                    "Table Name": table_name.table()
+                })
+            }
+            LogicalPlan::Copy(CopyTo {
+                input: _,
+                output_url,
+                format_options,
+                partition_by: _,
+                options,
+            }) => {
+                let op_str = options
+                    .iter()
+                    .map(|(k, v)| format!("{}={}", k, v))
+                    .collect::<Vec<_>>()
+                    .join(", ");
+                json!({
+                    "Node Type": "CopyTo",
+                    "Output URL": output_url,
+                    "Format Options": format!("{}", format_options),
+                    "Options": op_str
+                })
+            }
+            LogicalPlan::Ddl(ddl) => {
+                json!({
+                    "Node Type": "Ddl",
+                    "Operation": format!("{}", ddl.display())
+                })
+            }
+            LogicalPlan::Filter(Filter {
+                predicate: ref expr,
+                ..
+            }) => {
+                json!({
+                    "Node Type": "Filter",
+                    "Condition": format!("{}", expr)
+                })
+            }
+            LogicalPlan::Window(Window {
+                ref window_expr, ..
+            }) => {
+                json!({
+                    "Node Type": "WindowAggr",
+                    "Expressions": expr_vec_fmt!(window_expr)
+                })
+            }
+            LogicalPlan::Aggregate(Aggregate {
+                ref group_expr,
+                ref aggr_expr,
+                ..
+            }) => {
+                json!({
+                    "Node Type": "Aggregate",
+                    "Group By": expr_vec_fmt!(group_expr),
+                    "Aggregates": expr_vec_fmt!(aggr_expr)
+                })
+            }
+            LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
+                let mut object = json!({
+                    "Node Type": "Sort",
+                    "Sort Key": expr_vec_fmt!(expr),
+                });
+
+                if let Some(fetch) = fetch {
+                    object["Fetch"] = 
serde_json::Value::Number((*fetch).into());
+                }
+
+                object
+            }
+            LogicalPlan::Join(Join {
+                on: ref keys,
+                filter,
+                join_constraint,
+                join_type,
+                ..
+            }) => {
+                let join_expr: Vec<String> =
+                    keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
+                let filter_expr = filter
+                    .as_ref()
+                    .map(|expr| format!(" Filter: {expr}"))
+                    .unwrap_or_else(|| "".to_string());
+                json!({
+                    "Node Type": format!("{} Join", join_type),
+                    "Join Constraint": format!("{:?}", join_constraint),
+                    "Join Keys": join_expr.join(", "),
+                    "Filter": format!("{}", filter_expr)
+                })
+            }
+            LogicalPlan::CrossJoin(_) => {
+                json!({
+                    "Node Type": "Cross Join"
+                })
+            }
+            LogicalPlan::Repartition(Repartition {
+                partitioning_scheme,
+                ..
+            }) => match partitioning_scheme {
+                Partitioning::RoundRobinBatch(n) => {
+                    json!({
+                        "Node Type": "Repartition",
+                        "Partitioning Scheme": "RoundRobinBatch",
+                        "Partition Count": n
+                    })
+                }
+                Partitioning::Hash(expr, n) => {
+                    let hash_expr: Vec<String> =
+                        expr.iter().map(|e| format!("{e}")).collect();
+
+                    json!({
+                        "Node Type": "Repartition",
+                        "Partitioning Scheme": "Hash",
+                        "Partition Count": n,
+                        "Partitioning Key": hash_expr
+                    })
+                }
+                Partitioning::DistributeBy(expr) => {
+                    let dist_by_expr: Vec<String> =
+                        expr.iter().map(|e| format!("{e}")).collect();
+                    json!({
+                        "Node Type": "Repartition",
+                        "Partitioning Scheme": "DistributeBy",
+                        "Partitioning Key": dist_by_expr
+                    })
+                }
+            },
+            LogicalPlan::Limit(Limit {
+                ref skip,
+                ref fetch,
+                ..
+            }) => {
+                let mut object = serde_json::json!(
+                    {
+                        "Node Type": "Limit",
+                        "Skip": skip,
+                    }
+                );
+                if let Some(f) = fetch {
+                    object["Fetch"] = serde_json::Value::Number((*f).into());
+                };
+                object
+            }
+            LogicalPlan::Subquery(Subquery { .. }) => {
+                json!({
+                    "Node Type": "Subquery"
+                })
+            }
+            LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
+                json!({
+                    "Node Type": "Subquery",
+                    "Alias": alias.table(),
+                })
+            }
+            LogicalPlan::Statement(statement) => {
+                json!({
+                    "Node Type": "Statement",
+                    "Statement": format!("{}", statement.display())
+                })
+            }
+            LogicalPlan::Distinct(distinct) => match distinct {
+                Distinct::All(_) => {
+                    json!({
+                        "Node Type": "DistinctAll"
+                    })
+                }
+                Distinct::On(DistinctOn {
+                    on_expr,
+                    select_expr,
+                    sort_expr,
+                    ..
+                }) => {
+                    let mut object = json!({
+                        "Node Type": "DistinctOn",
+                        "On": expr_vec_fmt!(on_expr),
+                        "Select": expr_vec_fmt!(select_expr),
+                    });
+                    if let Some(sort_expr) = sort_expr {
+                        object["Sort"] = serde_json::Value::String(
+                            expr_vec_fmt!(sort_expr).to_string(),
+                        );
+                    }
+
+                    object
+                }
+            },
+            LogicalPlan::Explain { .. } => {
+                json!({
+                    "Node Type": "Explain"
+                })
+            }
+            LogicalPlan::Analyze { .. } => {
+                json!({
+                    "Node Type": "Analyze"
+                })
+            }
+            LogicalPlan::Union(_) => {
+                json!({
+                    "Node Type": "Union"
+                })
+            }
+            LogicalPlan::Extension(e) => {
+                json!({
+                    "Node Type": e.node.name(),
+                    "Detail": format!("{:?}", e.node)
+                })
+            }
+            LogicalPlan::Prepare(Prepare {
+                name, data_types, ..
+            }) => {
+                json!({
+                    "Node Type": "Prepare",
+                    "Name": name,
+                    "Data Types": format!("{:?}", data_types)
+                })
+            }
+            LogicalPlan::DescribeTable(DescribeTable { .. }) => {
+                json!({
+                    "Node Type": "DescribeTable"
+                })
+            }
+            LogicalPlan::Unnest(Unnest { column, .. }) => {
+                json!({
+                    "Node Type": "Unnest",
+                    "Column": format!("{}", column)
+                })
+            }
+        }
+    }
+}
+
+impl<'a, 'b> TreeNodeVisitor for PgJsonVisitor<'a, 'b> {
+    type Node = LogicalPlan;
+
+    fn f_down(
+        &mut self,
+        node: &LogicalPlan,
+    ) -> datafusion_common::Result<TreeNodeRecursion> {
+        let id = self.next_id;
+        self.next_id += 1;
+        let mut object = Self::to_json_value(node);
+
+        object["Plans"] = serde_json::Value::Array(vec![]);
+
+        if self.with_schema {
+            object["Output"] = serde_json::Value::Array(
+                node.schema()
+                    .fields()
+                    .iter()
+                    .map(|f| f.name().to_string())
+                    .map(serde_json::Value::String)
+                    .collect(),
+            );
+        };
+
+        self.objects.insert(id, object);
+        self.parent_ids.push(id);
+        Ok(TreeNodeRecursion::Continue)
+    }
+
+    fn f_up(
+        &mut self,
+        _node: &Self::Node,
+    ) -> datafusion_common::Result<TreeNodeRecursion> {
+        let id = self.parent_ids.pop().unwrap();
+
+        let current_node = self.objects.remove(&id).ok_or_else(|| {
+            DataFusionError::Internal("Missing current node!".to_string())
+        })?;
+
+        if let Some(parent_id) = self.parent_ids.last() {
+            let parent_node = self
+                .objects
+                .get_mut(parent_id)
+                .expect("Missing parent node!");
+            let plans = parent_node
+                .get_mut("Plans")
+                .and_then(|p| p.as_array_mut())
+                .expect("Plans should be an array");
+
+            plans.push(current_node);
+        } else {
+            // This is the root node
+            let plan = serde_json::json!([{"Plan": current_node}]);
+            write!(
+                self.f,
+                "{}",
+                serde_json::to_string_pretty(&plan)
+                    .map_err(|e| DataFusionError::External(Box::new(e)))?
+            )?;
+        }
+
+        Ok(TreeNodeRecursion::Continue)
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use arrow::datatypes::{DataType, Field};
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 05d7ac5394..9f4094d483 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -54,6 +54,7 @@ use datafusion_common::{
 };
 
 // backwards compatibility
+use crate::display::PgJsonVisitor;
 pub use datafusion_common::display::{PlanType, StringifiedPlan, 
ToStringifiedPlan};
 pub use datafusion_common::{JoinConstraint, JoinType};
 
@@ -1302,6 +1303,26 @@ impl LogicalPlan {
         Wrapper(self)
     }
 
+    /// Return a displayable structure that produces plan in postgresql JSON 
format.
+    ///
+    /// Users can use this format to visualize the plan in existing plan 
visualization tools, for example [dalibo](https://explain.dalibo.com/)
+    pub fn display_pg_json(&self) -> impl Display + '_ {
+        // Boilerplate structure to wrap LogicalPlan with something
+        // that that can be formatted
+        struct Wrapper<'a>(&'a LogicalPlan);
+        impl<'a> Display for Wrapper<'a> {
+            fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+                let mut visitor = PgJsonVisitor::new(f);
+                visitor.with_schema(true);
+                match self.0.visit(&mut visitor) {
+                    Ok(_) => Ok(()),
+                    Err(_) => Err(fmt::Error),
+                }
+            }
+        }
+        Wrapper(self)
+    }
+
     /// Return a `format`able structure that produces lines meant for
     /// graphical display using the `DOT` language. This format can be
     /// visualized using software from
@@ -2781,6 +2802,67 @@ digraph {
         Ok(())
     }
 
+    #[test]
+    fn test_display_pg_json() -> Result<()> {
+        let plan = display_plan()?;
+
+        let expected_pg_json = r#"[
+  {
+    "Plan": {
+      "Expressions": [
+        "employee_csv.id"
+      ],
+      "Node Type": "Projection",
+      "Output": [
+        "id"
+      ],
+      "Plans": [
+        {
+          "Condition": "employee_csv.state IN (<subquery>)",
+          "Node Type": "Filter",
+          "Output": [
+            "id",
+            "state"
+          ],
+          "Plans": [
+            {
+              "Node Type": "Subquery",
+              "Output": [
+                "state"
+              ],
+              "Plans": [
+                {
+                  "Node Type": "TableScan",
+                  "Output": [
+                    "state"
+                  ],
+                  "Plans": [],
+                  "Relation Name": "employee_csv"
+                }
+              ]
+            },
+            {
+              "Node Type": "TableScan",
+              "Output": [
+                "id",
+                "state"
+              ],
+              "Plans": [],
+              "Relation Name": "employee_csv"
+            }
+          ]
+        }
+      ]
+    }
+  }
+]"#;
+
+        let pg_json = format!("{}", plan.display_pg_json());
+
+        assert_eq!(expected_pg_json, pg_json);
+        Ok(())
+    }
+
     /// Tests for the Visitor trait and walking logical plan nodes
     #[derive(Debug, Default)]
     struct OkVisitor {

Reply via email to