liurenjie1024 commented on code in PR #9789:
URL: https://github.com/apache/arrow-datafusion/pull/9789#discussion_r1543943736


##########
datafusion/expr/src/logical_plan/display.rs:
##########
@@ -221,6 +229,488 @@ impl<'a, 'b> TreeNodeVisitor for GraphvizVisitor<'a, 'b> {
     }
 }
 
+/// Formats plans to display as postgresql plan json format.
+///
+/// There are already many existing visualizer for this format, for example 
[dalibo](https://explain.dalibo.com/).
+/// Unfortunately, there is no formal spec for this format, but it is widely 
used in the PostgreSQL community.
+///
+/// Here is an example of the format:
+///
+/// ```json
+/// [
+///     {
+///         "Plan": {
+///             "Node Type": "Sort",
+///             "Output": [
+///                 "question_1.id",
+///                 "question_1.title",
+///                 "question_1.text",
+///                 "question_1.file",
+///                 "question_1.type",
+///                 "question_1.source",
+///                 "question_1.exam_id"
+///             ],
+///             "Sort Key": [
+///                 "question_1.id"
+///             ],
+///             "Plans": [
+///                 {
+///                     "Node Type": "Seq Scan",
+///                     "Parent Relationship": "Left",
+///                     "Relation Name": "question",
+///                     "Schema": "public",
+///                     "Alias": "question_1",
+///                     "Output": [
+///                        "question_1.id",
+///                         "question_1.title",
+///                        "question_1.text",
+///                         "question_1.file",
+///                         "question_1.type",
+///                         "question_1.source",
+///                         "question_1.exam_id"
+///                     ],
+///                     "Filter": "(question_1.exam_id = 1)"
+///                 }
+///             ]
+///         }
+///     }
+/// ]
+/// ```
+pub struct PgJsonVisitor<'a, 'b> {
+    f: &'a mut fmt::Formatter<'b>,
+
+    /// A mapping from plan node id to the plan node json representation.
+    objects: HashMap<u32, serde_json::Value>,
+
+    next_id: u32,
+
+    /// If true, includes summarized schema information
+    with_schema: bool,
+
+    /// Holds the ids (as generated from `graphviz_builder` of all
+    /// parent nodes
+    parent_ids: Vec<u32>,
+}
+
+impl<'a, 'b> PgJsonVisitor<'a, 'b> {
+    pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
+        Self {
+            f,
+            objects: HashMap::new(),
+            next_id: 0,
+            with_schema: false,
+            parent_ids: Vec::new(),
+        }
+    }
+
+    /// Sets a flag which controls if the output schema is displayed
+    pub fn with_schema(&mut self, with_schema: bool) {
+        self.with_schema = with_schema;
+    }
+
+    /// Converts a logical plan node to a json object.
+    fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
+        match node {
+            LogicalPlan::EmptyRelation(_) => {
+                json!({
+                    "Node Type": "EmptyRelation",
+                })
+            }
+            LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => 
{
+                json!({
+                    "Node Type": "RecursiveQuery",
+                    "Is Distinct": is_distinct,
+                })
+            }
+            LogicalPlan::Values(Values { ref values, .. }) => {
+                let str_values = values
+                    .iter()
+                    // limit to only 5 values to avoid horrible display
+                    .take(5)
+                    .map(|row| {
+                        let item = row
+                            .iter()
+                            .map(|expr| expr.to_string())
+                            .collect::<Vec<_>>()
+                            .join(", ");
+                        format!("({item})")
+                    })
+                    .collect::<Vec<_>>()
+                    .join(", ");
+
+                let elipse = if values.len() > 5 { "..." } else { "" };
+
+                let values_str = format!("{}{}", str_values, elipse);
+                json!({
+                    "Node Type": "Values",
+                    "Values": values_str
+                })
+            }
+            LogicalPlan::TableScan(TableScan {
+                ref source,
+                ref table_name,
+                ref filters,
+                ref fetch,
+                ..
+            }) => {
+                let mut object = json!({
+                    "Node Type": "TableScan",
+                    "Relation Name": table_name.table(),
+                });
+
+                if let Some(s) = table_name.schema() {
+                    object["Schema"] = 
serde_json::Value::String(s.to_string());
+                }
+
+                if let Some(c) = table_name.catalog() {
+                    object["Catalog"] = 
serde_json::Value::String(c.to_string());
+                }
+
+                if !filters.is_empty() {
+                    let mut full_filter = vec![];
+                    let mut partial_filter = vec![];
+                    let mut unsupported_filters = vec![];
+                    let filters: Vec<&Expr> = filters.iter().collect();
+
+                    if let Ok(results) = 
source.supports_filters_pushdown(&filters) {
+                        filters.iter().zip(results.iter()).for_each(
+                            |(x, res)| match res {
+                                TableProviderFilterPushDown::Exact => 
full_filter.push(x),
+                                TableProviderFilterPushDown::Inexact => {
+                                    partial_filter.push(x)
+                                }
+                                TableProviderFilterPushDown::Unsupported => {
+                                    unsupported_filters.push(x)
+                                }
+                            },
+                        );
+                    }
+
+                    if !full_filter.is_empty() {
+                        object["Full Filters"] = serde_json::Value::String(
+                            expr_vec_fmt!(full_filter).to_string(),
+                        );
+                    };
+                    if !partial_filter.is_empty() {
+                        object["Partial Filters"] = serde_json::Value::String(
+                            expr_vec_fmt!(partial_filter).to_string(),
+                        );
+                    }
+                    if !unsupported_filters.is_empty() {
+                        object["Unsupported Filters"] = 
serde_json::Value::String(
+                            expr_vec_fmt!(unsupported_filters).to_string(),
+                        );
+                    }
+                }
+
+                if let Some(f) = fetch {
+                    object["Fetch"] = serde_json::Value::Number((*f).into());
+                }
+
+                object
+            }
+            LogicalPlan::Projection(Projection { ref expr, .. }) => {
+                json!({
+                    "Node Type": "Projection",
+                    "Expressions": expr.iter().map(|e| 
e.to_string()).collect::<Vec<_>>()
+                })
+            }
+            LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
+                json!({
+                    "Node Type": "Projection",
+                    "Operation": op.name(),
+                    "Table Name": table_name.table()
+                })
+            }
+            LogicalPlan::Copy(CopyTo {
+                input: _,
+                output_url,
+                format_options,
+                partition_by: _,
+                options,
+            }) => {
+                let op_str = options
+                    .iter()
+                    .map(|(k, v)| format!("{}={}", k, v))
+                    .collect::<Vec<_>>()
+                    .join(", ");
+                json!({
+                    "Node Type": "CopyTo",
+                    "Output URL": output_url,
+                    "Format Options": format!("{}", format_options),
+                    "Options": op_str
+                })
+            }
+            LogicalPlan::Ddl(ddl) => {
+                json!({
+                    "Node Type": "Ddl",
+                    "Operation": format!("{}", ddl.display())
+                })
+            }
+            LogicalPlan::Filter(Filter {
+                predicate: ref expr,
+                ..
+            }) => {
+                json!({
+                    "Node Type": "Filter",
+                    "Condition": format!("{}", expr)
+                })
+            }
+            LogicalPlan::Window(Window {
+                ref window_expr, ..
+            }) => {
+                json!({
+                    "Node Type": "WindowAggr",
+                    "Expressions": expr_vec_fmt!(window_expr)
+                })
+            }
+            LogicalPlan::Aggregate(Aggregate {
+                ref group_expr,
+                ref aggr_expr,
+                ..
+            }) => {
+                json!({
+                    "Node Type": "Aggregate",
+                    "Group By": expr_vec_fmt!(group_expr),
+                    "Aggregates": expr_vec_fmt!(aggr_expr)
+                })
+            }
+            LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
+                let mut object = json!({
+                    "Node Type": "Sort",
+                    "Sort Key": expr_vec_fmt!(expr),
+                });
+
+                if let Some(fetch) = fetch {
+                    object["Fetch"] = 
serde_json::Value::Number((*fetch).into());
+                }
+
+                object
+            }
+            LogicalPlan::Join(Join {
+                on: ref keys,
+                filter,
+                join_constraint,
+                join_type,
+                ..
+            }) => {
+                let join_expr: Vec<String> =
+                    keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
+                let filter_expr = filter
+                    .as_ref()
+                    .map(|expr| format!(" Filter: {expr}"))
+                    .unwrap_or_else(|| "".to_string());
+                json!({
+                    "Node Type": format!("{} Join", join_type),
+                    "Join Constraint": format!("{:?}", join_constraint),
+                    "Join Keys": join_expr.join(", "),
+                    "Filter": format!("{}", filter_expr)
+                })
+            }
+            LogicalPlan::CrossJoin(_) => {
+                json!({
+                    "Node Type": "Cross Join"
+                })
+            }
+            LogicalPlan::Repartition(Repartition {
+                partitioning_scheme,
+                ..
+            }) => match partitioning_scheme {
+                Partitioning::RoundRobinBatch(n) => {
+                    json!({
+                        "Node Type": "Repartition",
+                        "Partitioning Scheme": "RoundRobinBatch",
+                        "Partition Count": n
+                    })
+                }
+                Partitioning::Hash(expr, n) => {
+                    let hash_expr: Vec<String> =
+                        expr.iter().map(|e| format!("{e}")).collect();
+
+                    json!({
+                        "Node Type": "Repartition",
+                        "Partitioning Scheme": "Hash",
+                        "Partition Count": n,
+                        "Partitioning Key": hash_expr
+                    })
+                }
+                Partitioning::DistributeBy(expr) => {
+                    let dist_by_expr: Vec<String> =
+                        expr.iter().map(|e| format!("{e}")).collect();
+                    json!({
+                        "Node Type": "Repartition",
+                        "Partitioning Scheme": "DistributeBy",
+                        "Partitioning Key": dist_by_expr
+                    })
+                }
+            },
+            LogicalPlan::Limit(Limit {
+                ref skip,
+                ref fetch,
+                ..
+            }) => {
+                let mut object = serde_json::json!(
+                    {
+                        "Node Type": "Limit",
+                        "Skip": skip,
+                    }
+                );
+                if let Some(f) = fetch {
+                    object["Fetch"] = serde_json::Value::Number((*f).into());
+                };
+                object
+            }
+            LogicalPlan::Subquery(Subquery { .. }) => {
+                json!({
+                    "Node Type": "Subquery"
+                })
+            }
+            LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
+                json!({
+                    "Node Type": "Subquery",
+                    "Alias": alias.table(),
+                })
+            }
+            LogicalPlan::Statement(statement) => {
+                json!({
+                    "Node Type": "Statement",
+                    "Statement": format!("{}", statement.display())
+                })
+            }
+            LogicalPlan::Distinct(distinct) => match distinct {
+                Distinct::All(_) => {
+                    json!({
+                        "Node Type": "DistinctAll"
+                    })
+                }
+                Distinct::On(DistinctOn {
+                    on_expr,
+                    select_expr,
+                    sort_expr,
+                    ..
+                }) => {
+                    let mut object = json!({
+                        "Node Type": "DistinctOn",
+                        "On": expr_vec_fmt!(on_expr),
+                        "Select": expr_vec_fmt!(select_expr),
+                    });
+                    if let Some(sort_expr) = sort_expr {
+                        object["Sort"] = serde_json::Value::String(
+                            expr_vec_fmt!(sort_expr).to_string(),
+                        );
+                    }
+
+                    object
+                }
+            },
+            LogicalPlan::Explain { .. } => {
+                json!({
+                    "Node Type": "Explain"
+                })
+            }
+            LogicalPlan::Analyze { .. } => {
+                json!({
+                    "Node Type": "Analyze"
+                })
+            }
+            LogicalPlan::Union(_) => {
+                json!({
+                    "Node Type": "Union"
+                })
+            }
+            LogicalPlan::Extension(e) => {
+                json!({
+                    "Node Type": e.node.name(),
+                    "Detail": format!("{:?}", e.node)
+                })
+            }
+            LogicalPlan::Prepare(Prepare {
+                name, data_types, ..
+            }) => {
+                json!({
+                    "Node Type": "Prepare",
+                    "Name": name,
+                    "Data Types": format!("{:?}", data_types)
+                })
+            }
+            LogicalPlan::DescribeTable(DescribeTable { .. }) => {
+                json!({
+                    "Node Type": "DescribeTable"
+                })
+            }
+            LogicalPlan::Unnest(Unnest { column, .. }) => {
+                json!({
+                    "Node Type": "Unnest",
+                    "Column": format!("{}", column)
+                })
+            }
+        }
+    }
+}
+
+impl<'a, 'b> TreeNodeVisitor for PgJsonVisitor<'a, 'b> {
+    type Node = LogicalPlan;
+
+    fn f_down(
+        &mut self,
+        node: &LogicalPlan,
+    ) -> datafusion_common::Result<TreeNodeRecursion> {
+        let id = self.next_id;
+        self.next_id += 1;
+        let mut object = Self::to_json_value(node);
+
+        object["Plans"] = serde_json::Value::Array(vec![]);
+
+        if self.with_schema {
+            object["Output"] = serde_json::Value::Array(
+                node.schema()
+                    .fields()
+                    .iter()
+                    .map(|f| f.name().to_string())
+                    .map(serde_json::Value::String)
+                    .collect(),
+            );
+        };
+
+        self.objects.insert(id, object);
+        self.parent_ids.push(id);
+        Ok(TreeNodeRecursion::Continue)
+    }
+
+    fn f_up(
+        &mut self,
+        _node: &Self::Node,
+    ) -> datafusion_common::Result<TreeNodeRecursion> {
+        let id = self.parent_ids.pop().unwrap();
+
+        let current_node = self.objects.remove(&id).expect("Missing current 
node!");

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to