This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2d023299fa feat: Add display_pg_json for LogicalPlan (#9789)
2d023299fa is described below
commit 2d023299fa2544350cb18b45181cc8aa729eda3f
Author: Renjie Liu <[email protected]>
AuthorDate: Fri Mar 29 21:38:43 2024 +0800
feat: Add display_pg_json for LogicalPlan (#9789)
* feat: Add display_pg_json for LogicalPlan
* Fix lints
* Fix comments
* Fix format
---
datafusion-cli/Cargo.lock | 1 +
datafusion/expr/Cargo.toml | 1 +
datafusion/expr/src/logical_plan/display.rs | 494 +++++++++++++++++++++++++++-
datafusion/expr/src/logical_plan/plan.rs | 82 +++++
4 files changed, 577 insertions(+), 1 deletion(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 0277d23f4d..2bbe89f24b 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1249,6 +1249,7 @@ dependencies = [
"chrono",
"datafusion-common",
"paste",
+ "serde_json",
"sqlparser",
"strum 0.26.2",
"strum_macros 0.26.2",
diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml
index 621a320230..6f6147d368 100644
--- a/datafusion/expr/Cargo.toml
+++ b/datafusion/expr/Cargo.toml
@@ -43,6 +43,7 @@ arrow-array = { workspace = true }
chrono = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
paste = "^1.0"
+serde_json = { workspace = true }
sqlparser = { workspace = true }
strum = { version = "0.26.1", features = ["derive"] }
strum_macros = "0.26.0"
diff --git a/datafusion/expr/src/logical_plan/display.rs
b/datafusion/expr/src/logical_plan/display.rs
index e0cb44626e..edc3afd55d 100644
--- a/datafusion/expr/src/logical_plan/display.rs
+++ b/datafusion/expr/src/logical_plan/display.rs
@@ -16,14 +16,22 @@
// under the License.
//! This module provides logic for displaying LogicalPlans in various styles
+use std::collections::HashMap;
use std::fmt;
-use crate::LogicalPlan;
+use crate::{
+ expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn,
DmlStatement, Expr,
+ Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection,
RecursiveQuery,
+ Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown,
TableScan,
+ Unnest, Values, Window,
+};
+use crate::dml::CopyTo;
use arrow::datatypes::Schema;
use datafusion_common::display::GraphvizBuilder;
use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::DataFusionError;
+use serde_json::json;
/// Formats plans with a single line per node. For example:
///
@@ -221,6 +229,490 @@ impl<'a, 'b> TreeNodeVisitor for GraphvizVisitor<'a, 'b> {
}
}
+/// Formats plans to display as postgresql plan json format.
+///
+/// There are already many existing visualizer for this format, for example
[dalibo](https://explain.dalibo.com/).
+/// Unfortunately, there is no formal spec for this format, but it is widely
used in the PostgreSQL community.
+///
+/// Here is an example of the format:
+///
+/// ```json
+/// [
+/// {
+/// "Plan": {
+/// "Node Type": "Sort",
+/// "Output": [
+/// "question_1.id",
+/// "question_1.title",
+/// "question_1.text",
+/// "question_1.file",
+/// "question_1.type",
+/// "question_1.source",
+/// "question_1.exam_id"
+/// ],
+/// "Sort Key": [
+/// "question_1.id"
+/// ],
+/// "Plans": [
+/// {
+/// "Node Type": "Seq Scan",
+/// "Parent Relationship": "Left",
+/// "Relation Name": "question",
+/// "Schema": "public",
+/// "Alias": "question_1",
+/// "Output": [
+/// "question_1.id",
+/// "question_1.title",
+/// "question_1.text",
+/// "question_1.file",
+/// "question_1.type",
+/// "question_1.source",
+/// "question_1.exam_id"
+/// ],
+/// "Filter": "(question_1.exam_id = 1)"
+/// }
+/// ]
+/// }
+/// }
+/// ]
+/// ```
+pub struct PgJsonVisitor<'a, 'b> {
+ f: &'a mut fmt::Formatter<'b>,
+
+ /// A mapping from plan node id to the plan node json representation.
+ objects: HashMap<u32, serde_json::Value>,
+
+ next_id: u32,
+
+ /// If true, includes summarized schema information
+ with_schema: bool,
+
+ /// Holds the ids (as generated from `graphviz_builder` of all
+ /// parent nodes
+ parent_ids: Vec<u32>,
+}
+
+impl<'a, 'b> PgJsonVisitor<'a, 'b> {
+ pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
+ Self {
+ f,
+ objects: HashMap::new(),
+ next_id: 0,
+ with_schema: false,
+ parent_ids: Vec::new(),
+ }
+ }
+
+ /// Sets a flag which controls if the output schema is displayed
+ pub fn with_schema(&mut self, with_schema: bool) {
+ self.with_schema = with_schema;
+ }
+
+ /// Converts a logical plan node to a json object.
+ fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
+ match node {
+ LogicalPlan::EmptyRelation(_) => {
+ json!({
+ "Node Type": "EmptyRelation",
+ })
+ }
+ LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) =>
{
+ json!({
+ "Node Type": "RecursiveQuery",
+ "Is Distinct": is_distinct,
+ })
+ }
+ LogicalPlan::Values(Values { ref values, .. }) => {
+ let str_values = values
+ .iter()
+ // limit to only 5 values to avoid horrible display
+ .take(5)
+ .map(|row| {
+ let item = row
+ .iter()
+ .map(|expr| expr.to_string())
+ .collect::<Vec<_>>()
+ .join(", ");
+ format!("({item})")
+ })
+ .collect::<Vec<_>>()
+ .join(", ");
+
+ let elipse = if values.len() > 5 { "..." } else { "" };
+
+ let values_str = format!("{}{}", str_values, elipse);
+ json!({
+ "Node Type": "Values",
+ "Values": values_str
+ })
+ }
+ LogicalPlan::TableScan(TableScan {
+ ref source,
+ ref table_name,
+ ref filters,
+ ref fetch,
+ ..
+ }) => {
+ let mut object = json!({
+ "Node Type": "TableScan",
+ "Relation Name": table_name.table(),
+ });
+
+ if let Some(s) = table_name.schema() {
+ object["Schema"] =
serde_json::Value::String(s.to_string());
+ }
+
+ if let Some(c) = table_name.catalog() {
+ object["Catalog"] =
serde_json::Value::String(c.to_string());
+ }
+
+ if !filters.is_empty() {
+ let mut full_filter = vec![];
+ let mut partial_filter = vec![];
+ let mut unsupported_filters = vec![];
+ let filters: Vec<&Expr> = filters.iter().collect();
+
+ if let Ok(results) =
source.supports_filters_pushdown(&filters) {
+ filters.iter().zip(results.iter()).for_each(
+ |(x, res)| match res {
+ TableProviderFilterPushDown::Exact =>
full_filter.push(x),
+ TableProviderFilterPushDown::Inexact => {
+ partial_filter.push(x)
+ }
+ TableProviderFilterPushDown::Unsupported => {
+ unsupported_filters.push(x)
+ }
+ },
+ );
+ }
+
+ if !full_filter.is_empty() {
+ object["Full Filters"] = serde_json::Value::String(
+ expr_vec_fmt!(full_filter).to_string(),
+ );
+ };
+ if !partial_filter.is_empty() {
+ object["Partial Filters"] = serde_json::Value::String(
+ expr_vec_fmt!(partial_filter).to_string(),
+ );
+ }
+ if !unsupported_filters.is_empty() {
+ object["Unsupported Filters"] =
serde_json::Value::String(
+ expr_vec_fmt!(unsupported_filters).to_string(),
+ );
+ }
+ }
+
+ if let Some(f) = fetch {
+ object["Fetch"] = serde_json::Value::Number((*f).into());
+ }
+
+ object
+ }
+ LogicalPlan::Projection(Projection { ref expr, .. }) => {
+ json!({
+ "Node Type": "Projection",
+ "Expressions": expr.iter().map(|e|
e.to_string()).collect::<Vec<_>>()
+ })
+ }
+ LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
+ json!({
+ "Node Type": "Projection",
+ "Operation": op.name(),
+ "Table Name": table_name.table()
+ })
+ }
+ LogicalPlan::Copy(CopyTo {
+ input: _,
+ output_url,
+ format_options,
+ partition_by: _,
+ options,
+ }) => {
+ let op_str = options
+ .iter()
+ .map(|(k, v)| format!("{}={}", k, v))
+ .collect::<Vec<_>>()
+ .join(", ");
+ json!({
+ "Node Type": "CopyTo",
+ "Output URL": output_url,
+ "Format Options": format!("{}", format_options),
+ "Options": op_str
+ })
+ }
+ LogicalPlan::Ddl(ddl) => {
+ json!({
+ "Node Type": "Ddl",
+ "Operation": format!("{}", ddl.display())
+ })
+ }
+ LogicalPlan::Filter(Filter {
+ predicate: ref expr,
+ ..
+ }) => {
+ json!({
+ "Node Type": "Filter",
+ "Condition": format!("{}", expr)
+ })
+ }
+ LogicalPlan::Window(Window {
+ ref window_expr, ..
+ }) => {
+ json!({
+ "Node Type": "WindowAggr",
+ "Expressions": expr_vec_fmt!(window_expr)
+ })
+ }
+ LogicalPlan::Aggregate(Aggregate {
+ ref group_expr,
+ ref aggr_expr,
+ ..
+ }) => {
+ json!({
+ "Node Type": "Aggregate",
+ "Group By": expr_vec_fmt!(group_expr),
+ "Aggregates": expr_vec_fmt!(aggr_expr)
+ })
+ }
+ LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
+ let mut object = json!({
+ "Node Type": "Sort",
+ "Sort Key": expr_vec_fmt!(expr),
+ });
+
+ if let Some(fetch) = fetch {
+ object["Fetch"] =
serde_json::Value::Number((*fetch).into());
+ }
+
+ object
+ }
+ LogicalPlan::Join(Join {
+ on: ref keys,
+ filter,
+ join_constraint,
+ join_type,
+ ..
+ }) => {
+ let join_expr: Vec<String> =
+ keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
+ let filter_expr = filter
+ .as_ref()
+ .map(|expr| format!(" Filter: {expr}"))
+ .unwrap_or_else(|| "".to_string());
+ json!({
+ "Node Type": format!("{} Join", join_type),
+ "Join Constraint": format!("{:?}", join_constraint),
+ "Join Keys": join_expr.join(", "),
+ "Filter": format!("{}", filter_expr)
+ })
+ }
+ LogicalPlan::CrossJoin(_) => {
+ json!({
+ "Node Type": "Cross Join"
+ })
+ }
+ LogicalPlan::Repartition(Repartition {
+ partitioning_scheme,
+ ..
+ }) => match partitioning_scheme {
+ Partitioning::RoundRobinBatch(n) => {
+ json!({
+ "Node Type": "Repartition",
+ "Partitioning Scheme": "RoundRobinBatch",
+ "Partition Count": n
+ })
+ }
+ Partitioning::Hash(expr, n) => {
+ let hash_expr: Vec<String> =
+ expr.iter().map(|e| format!("{e}")).collect();
+
+ json!({
+ "Node Type": "Repartition",
+ "Partitioning Scheme": "Hash",
+ "Partition Count": n,
+ "Partitioning Key": hash_expr
+ })
+ }
+ Partitioning::DistributeBy(expr) => {
+ let dist_by_expr: Vec<String> =
+ expr.iter().map(|e| format!("{e}")).collect();
+ json!({
+ "Node Type": "Repartition",
+ "Partitioning Scheme": "DistributeBy",
+ "Partitioning Key": dist_by_expr
+ })
+ }
+ },
+ LogicalPlan::Limit(Limit {
+ ref skip,
+ ref fetch,
+ ..
+ }) => {
+ let mut object = serde_json::json!(
+ {
+ "Node Type": "Limit",
+ "Skip": skip,
+ }
+ );
+ if let Some(f) = fetch {
+ object["Fetch"] = serde_json::Value::Number((*f).into());
+ };
+ object
+ }
+ LogicalPlan::Subquery(Subquery { .. }) => {
+ json!({
+ "Node Type": "Subquery"
+ })
+ }
+ LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
+ json!({
+ "Node Type": "Subquery",
+ "Alias": alias.table(),
+ })
+ }
+ LogicalPlan::Statement(statement) => {
+ json!({
+ "Node Type": "Statement",
+ "Statement": format!("{}", statement.display())
+ })
+ }
+ LogicalPlan::Distinct(distinct) => match distinct {
+ Distinct::All(_) => {
+ json!({
+ "Node Type": "DistinctAll"
+ })
+ }
+ Distinct::On(DistinctOn {
+ on_expr,
+ select_expr,
+ sort_expr,
+ ..
+ }) => {
+ let mut object = json!({
+ "Node Type": "DistinctOn",
+ "On": expr_vec_fmt!(on_expr),
+ "Select": expr_vec_fmt!(select_expr),
+ });
+ if let Some(sort_expr) = sort_expr {
+ object["Sort"] = serde_json::Value::String(
+ expr_vec_fmt!(sort_expr).to_string(),
+ );
+ }
+
+ object
+ }
+ },
+ LogicalPlan::Explain { .. } => {
+ json!({
+ "Node Type": "Explain"
+ })
+ }
+ LogicalPlan::Analyze { .. } => {
+ json!({
+ "Node Type": "Analyze"
+ })
+ }
+ LogicalPlan::Union(_) => {
+ json!({
+ "Node Type": "Union"
+ })
+ }
+ LogicalPlan::Extension(e) => {
+ json!({
+ "Node Type": e.node.name(),
+ "Detail": format!("{:?}", e.node)
+ })
+ }
+ LogicalPlan::Prepare(Prepare {
+ name, data_types, ..
+ }) => {
+ json!({
+ "Node Type": "Prepare",
+ "Name": name,
+ "Data Types": format!("{:?}", data_types)
+ })
+ }
+ LogicalPlan::DescribeTable(DescribeTable { .. }) => {
+ json!({
+ "Node Type": "DescribeTable"
+ })
+ }
+ LogicalPlan::Unnest(Unnest { column, .. }) => {
+ json!({
+ "Node Type": "Unnest",
+ "Column": format!("{}", column)
+ })
+ }
+ }
+ }
+}
+
+impl<'a, 'b> TreeNodeVisitor for PgJsonVisitor<'a, 'b> {
+ type Node = LogicalPlan;
+
+ fn f_down(
+ &mut self,
+ node: &LogicalPlan,
+ ) -> datafusion_common::Result<TreeNodeRecursion> {
+ let id = self.next_id;
+ self.next_id += 1;
+ let mut object = Self::to_json_value(node);
+
+ object["Plans"] = serde_json::Value::Array(vec![]);
+
+ if self.with_schema {
+ object["Output"] = serde_json::Value::Array(
+ node.schema()
+ .fields()
+ .iter()
+ .map(|f| f.name().to_string())
+ .map(serde_json::Value::String)
+ .collect(),
+ );
+ };
+
+ self.objects.insert(id, object);
+ self.parent_ids.push(id);
+ Ok(TreeNodeRecursion::Continue)
+ }
+
+ fn f_up(
+ &mut self,
+ _node: &Self::Node,
+ ) -> datafusion_common::Result<TreeNodeRecursion> {
+ let id = self.parent_ids.pop().unwrap();
+
+ let current_node = self.objects.remove(&id).ok_or_else(|| {
+ DataFusionError::Internal("Missing current node!".to_string())
+ })?;
+
+ if let Some(parent_id) = self.parent_ids.last() {
+ let parent_node = self
+ .objects
+ .get_mut(parent_id)
+ .expect("Missing parent node!");
+ let plans = parent_node
+ .get_mut("Plans")
+ .and_then(|p| p.as_array_mut())
+ .expect("Plans should be an array");
+
+ plans.push(current_node);
+ } else {
+ // This is the root node
+ let plan = serde_json::json!([{"Plan": current_node}]);
+ write!(
+ self.f,
+ "{}",
+ serde_json::to_string_pretty(&plan)
+ .map_err(|e| DataFusionError::External(Box::new(e)))?
+ )?;
+ }
+
+ Ok(TreeNodeRecursion::Continue)
+ }
+}
+
#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field};
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 05d7ac5394..9f4094d483 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -54,6 +54,7 @@ use datafusion_common::{
};
// backwards compatibility
+use crate::display::PgJsonVisitor;
pub use datafusion_common::display::{PlanType, StringifiedPlan,
ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};
@@ -1302,6 +1303,26 @@ impl LogicalPlan {
Wrapper(self)
}
+ /// Return a displayable structure that produces plan in postgresql JSON
format.
+ ///
+ /// Users can use this format to visualize the plan in existing plan
visualization tools, for example [dalibo](https://explain.dalibo.com/)
+ pub fn display_pg_json(&self) -> impl Display + '_ {
+ // Boilerplate structure to wrap LogicalPlan with something
+ // that that can be formatted
+ struct Wrapper<'a>(&'a LogicalPlan);
+ impl<'a> Display for Wrapper<'a> {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ let mut visitor = PgJsonVisitor::new(f);
+ visitor.with_schema(true);
+ match self.0.visit(&mut visitor) {
+ Ok(_) => Ok(()),
+ Err(_) => Err(fmt::Error),
+ }
+ }
+ }
+ Wrapper(self)
+ }
+
/// Return a `format`able structure that produces lines meant for
/// graphical display using the `DOT` language. This format can be
/// visualized using software from
@@ -2781,6 +2802,67 @@ digraph {
Ok(())
}
+ #[test]
+ fn test_display_pg_json() -> Result<()> {
+ let plan = display_plan()?;
+
+ let expected_pg_json = r#"[
+ {
+ "Plan": {
+ "Expressions": [
+ "employee_csv.id"
+ ],
+ "Node Type": "Projection",
+ "Output": [
+ "id"
+ ],
+ "Plans": [
+ {
+ "Condition": "employee_csv.state IN (<subquery>)",
+ "Node Type": "Filter",
+ "Output": [
+ "id",
+ "state"
+ ],
+ "Plans": [
+ {
+ "Node Type": "Subquery",
+ "Output": [
+ "state"
+ ],
+ "Plans": [
+ {
+ "Node Type": "TableScan",
+ "Output": [
+ "state"
+ ],
+ "Plans": [],
+ "Relation Name": "employee_csv"
+ }
+ ]
+ },
+ {
+ "Node Type": "TableScan",
+ "Output": [
+ "id",
+ "state"
+ ],
+ "Plans": [],
+ "Relation Name": "employee_csv"
+ }
+ ]
+ }
+ ]
+ }
+ }
+]"#;
+
+ let pg_json = format!("{}", plan.display_pg_json());
+
+ assert_eq!(expected_pg_json, pg_json);
+ Ok(())
+ }
+
/// Tests for the Visitor trait and walking logical plan nodes
#[derive(Debug, Default)]
struct OkVisitor {