alamb commented on code in PR #9789:
URL: https://github.com/apache/arrow-datafusion/pull/9789#discussion_r1543435747
##########
datafusion/expr/Cargo.toml:
##########
@@ -43,6 +43,7 @@ arrow-array = { workspace = true }
chrono = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
paste = "^1.0"
+serde_json = { workspace = true }
Review Comment:
Would you be willing to make this dependency optional? I realize there are a
bunch of things in the ecosystem that require json_serde (so most projects
likely have this dependency already) but it would be nice to keep the
dependency stack down if possible
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1302,6 +1303,26 @@ impl LogicalPlan {
Wrapper(self)
}
+ /// Return a `format`able structure that produces plan in postgresql JSON
format.
Review Comment:
Likewise here it would be nice to add a link to the spec
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -2781,6 +2802,70 @@ digraph {
Ok(())
}
+ #[test]
+ fn test_display_pg_json() -> Result<()> {
+ let plan = display_plan()?;
+
+ let expected_graphviz = r#"[
+ {
+ "Plan": {
+ "Expressions": [
+ "employee_csv.id"
+ ],
+ "Node Type": "Projection",
+ "Output": [
+ "id"
+ ],
+ "Plans": [
+ {
+ "Condition": "employee_csv.state IN (<subquery>)",
+ "Node Type": "Filter",
+ "Output": [
+ "id",
+ "state"
+ ],
+ "Plans": [
+ {
+ "Node Type": "Subquery",
+ "Output": [
+ "state"
+ ],
+ "Plans": [
+ {
+ "Node Type": "TableScan",
+ "Output": [
+ "state"
+ ],
+ "Plans": [],
+ "Relation Name": "employee_csv"
+ }
+ ]
+ },
+ {
+ "Node Type": "TableScan",
+ "Output": [
+ "id",
+ "state"
+ ],
+ "Plans": [],
+ "Relation Name": "employee_csv"
+ }
+ ]
+ }
+ ]
+ }
+ }
+]"#;
+ println!("{}", plan.display_pg_json());
Review Comment:
is the println still needed?
##########
datafusion/expr/Cargo.toml:
##########
@@ -43,6 +43,7 @@ arrow-array = { workspace = true }
chrono = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
paste = "^1.0"
+serde_json = { workspace = true }
Review Comment:
Maybe we could simply make the JSON directly with string manipulation...
Maybe it doesn't matter
##########
datafusion/expr/src/logical_plan/display.rs:
##########
@@ -221,6 +229,488 @@ impl<'a, 'b> TreeNodeVisitor for GraphvizVisitor<'a, 'b> {
}
}
+/// Formats plans to display as postgresql plan json format.
Review Comment:
I suggest we add a link here to the spec of the postgresql plan format
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -2781,6 +2802,70 @@ digraph {
Ok(())
}
+ #[test]
+ fn test_display_pg_json() -> Result<()> {
+ let plan = display_plan()?;
+
+ let expected_graphviz = r#"[
+ {
+ "Plan": {
+ "Expressions": [
+ "employee_csv.id"
+ ],
+ "Node Type": "Projection",
+ "Output": [
+ "id"
+ ],
+ "Plans": [
+ {
+ "Condition": "employee_csv.state IN (<subquery>)",
+ "Node Type": "Filter",
+ "Output": [
+ "id",
+ "state"
+ ],
+ "Plans": [
+ {
+ "Node Type": "Subquery",
+ "Output": [
+ "state"
+ ],
+ "Plans": [
+ {
+ "Node Type": "TableScan",
+ "Output": [
+ "state"
+ ],
+ "Plans": [],
+ "Relation Name": "employee_csv"
+ }
+ ]
+ },
+ {
+ "Node Type": "TableScan",
+ "Output": [
+ "id",
+ "state"
+ ],
+ "Plans": [],
+ "Relation Name": "employee_csv"
+ }
+ ]
+ }
+ ]
+ }
+ }
+]"#;
+ println!("{}", plan.display_pg_json());
+
+ // just test for a few key lines in the output rather than the
+ // whole thing to make test mainteance easier.
+ let graphviz = format!("{}", plan.display_pg_json());
Review Comment:
The variable names and comments seem out of date
##########
datafusion/expr/src/logical_plan/display.rs:
##########
@@ -221,6 +229,488 @@ impl<'a, 'b> TreeNodeVisitor for GraphvizVisitor<'a, 'b> {
}
}
+/// Formats plans to display as postgresql plan json format.
+///
+/// There are already many existing visualizer for this format, for example
[dalibo](https://explain.dalibo.com/).
+/// Unfortunately, there is no formal spec for this format, but it is widely
used in the PostgreSQL community.
+///
+/// Here is an example of the format:
+///
+/// ```json
+/// [
+/// {
+/// "Plan": {
+/// "Node Type": "Sort",
+/// "Output": [
+/// "question_1.id",
+/// "question_1.title",
+/// "question_1.text",
+/// "question_1.file",
+/// "question_1.type",
+/// "question_1.source",
+/// "question_1.exam_id"
+/// ],
+/// "Sort Key": [
+/// "question_1.id"
+/// ],
+/// "Plans": [
+/// {
+/// "Node Type": "Seq Scan",
+/// "Parent Relationship": "Left",
+/// "Relation Name": "question",
+/// "Schema": "public",
+/// "Alias": "question_1",
+/// "Output": [
+/// "question_1.id",
+/// "question_1.title",
+/// "question_1.text",
+/// "question_1.file",
+/// "question_1.type",
+/// "question_1.source",
+/// "question_1.exam_id"
+/// ],
+/// "Filter": "(question_1.exam_id = 1)"
+/// }
+/// ]
+/// }
+/// }
+/// ]
+/// ```
+pub struct PgJsonVisitor<'a, 'b> {
+ f: &'a mut fmt::Formatter<'b>,
+
+ /// A mapping from plan node id to the plan node json representation.
+ objects: HashMap<u32, serde_json::Value>,
+
+ next_id: u32,
+
+ /// If true, includes summarized schema information
+ with_schema: bool,
+
+ /// Holds the ids (as generated from `graphviz_builder` of all
+ /// parent nodes
+ parent_ids: Vec<u32>,
+}
+
+impl<'a, 'b> PgJsonVisitor<'a, 'b> {
+ pub fn new(f: &'a mut fmt::Formatter<'b>) -> Self {
+ Self {
+ f,
+ objects: HashMap::new(),
+ next_id: 0,
+ with_schema: false,
+ parent_ids: Vec::new(),
+ }
+ }
+
+ /// Sets a flag which controls if the output schema is displayed
+ pub fn with_schema(&mut self, with_schema: bool) {
+ self.with_schema = with_schema;
+ }
+
+ /// Converts a logical plan node to a json object.
+ fn to_json_value(node: &LogicalPlan) -> serde_json::Value {
+ match node {
+ LogicalPlan::EmptyRelation(_) => {
+ json!({
+ "Node Type": "EmptyRelation",
+ })
+ }
+ LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) =>
{
+ json!({
+ "Node Type": "RecursiveQuery",
+ "Is Distinct": is_distinct,
+ })
+ }
+ LogicalPlan::Values(Values { ref values, .. }) => {
+ let str_values = values
+ .iter()
+ // limit to only 5 values to avoid horrible display
+ .take(5)
+ .map(|row| {
+ let item = row
+ .iter()
+ .map(|expr| expr.to_string())
+ .collect::<Vec<_>>()
+ .join(", ");
+ format!("({item})")
+ })
+ .collect::<Vec<_>>()
+ .join(", ");
+
+ let elipse = if values.len() > 5 { "..." } else { "" };
+
+ let values_str = format!("{}{}", str_values, elipse);
+ json!({
+ "Node Type": "Values",
+ "Values": values_str
+ })
+ }
+ LogicalPlan::TableScan(TableScan {
+ ref source,
+ ref table_name,
+ ref filters,
+ ref fetch,
+ ..
+ }) => {
+ let mut object = json!({
+ "Node Type": "TableScan",
+ "Relation Name": table_name.table(),
+ });
+
+ if let Some(s) = table_name.schema() {
+ object["Schema"] =
serde_json::Value::String(s.to_string());
+ }
+
+ if let Some(c) = table_name.catalog() {
+ object["Catalog"] =
serde_json::Value::String(c.to_string());
+ }
+
+ if !filters.is_empty() {
+ let mut full_filter = vec![];
+ let mut partial_filter = vec![];
+ let mut unsupported_filters = vec![];
+ let filters: Vec<&Expr> = filters.iter().collect();
+
+ if let Ok(results) =
source.supports_filters_pushdown(&filters) {
+ filters.iter().zip(results.iter()).for_each(
+ |(x, res)| match res {
+ TableProviderFilterPushDown::Exact =>
full_filter.push(x),
+ TableProviderFilterPushDown::Inexact => {
+ partial_filter.push(x)
+ }
+ TableProviderFilterPushDown::Unsupported => {
+ unsupported_filters.push(x)
+ }
+ },
+ );
+ }
+
+ if !full_filter.is_empty() {
+ object["Full Filters"] = serde_json::Value::String(
+ expr_vec_fmt!(full_filter).to_string(),
+ );
+ };
+ if !partial_filter.is_empty() {
+ object["Partial Filters"] = serde_json::Value::String(
+ expr_vec_fmt!(partial_filter).to_string(),
+ );
+ }
+ if !unsupported_filters.is_empty() {
+ object["Unsupported Filters"] =
serde_json::Value::String(
+ expr_vec_fmt!(unsupported_filters).to_string(),
+ );
+ }
+ }
+
+ if let Some(f) = fetch {
+ object["Fetch"] = serde_json::Value::Number((*f).into());
+ }
+
+ object
+ }
+ LogicalPlan::Projection(Projection { ref expr, .. }) => {
+ json!({
+ "Node Type": "Projection",
+ "Expressions": expr.iter().map(|e|
e.to_string()).collect::<Vec<_>>()
+ })
+ }
+ LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
+ json!({
+ "Node Type": "Projection",
+ "Operation": op.name(),
+ "Table Name": table_name.table()
+ })
+ }
+ LogicalPlan::Copy(CopyTo {
+ input: _,
+ output_url,
+ format_options,
+ partition_by: _,
+ options,
+ }) => {
+ let op_str = options
+ .iter()
+ .map(|(k, v)| format!("{}={}", k, v))
+ .collect::<Vec<_>>()
+ .join(", ");
+ json!({
+ "Node Type": "CopyTo",
+ "Output URL": output_url,
+ "Format Options": format!("{}", format_options),
+ "Options": op_str
+ })
+ }
+ LogicalPlan::Ddl(ddl) => {
+ json!({
+ "Node Type": "Ddl",
+ "Operation": format!("{}", ddl.display())
+ })
+ }
+ LogicalPlan::Filter(Filter {
+ predicate: ref expr,
+ ..
+ }) => {
+ json!({
+ "Node Type": "Filter",
+ "Condition": format!("{}", expr)
+ })
+ }
+ LogicalPlan::Window(Window {
+ ref window_expr, ..
+ }) => {
+ json!({
+ "Node Type": "WindowAggr",
+ "Expressions": expr_vec_fmt!(window_expr)
+ })
+ }
+ LogicalPlan::Aggregate(Aggregate {
+ ref group_expr,
+ ref aggr_expr,
+ ..
+ }) => {
+ json!({
+ "Node Type": "Aggregate",
+ "Group By": expr_vec_fmt!(group_expr),
+ "Aggregates": expr_vec_fmt!(aggr_expr)
+ })
+ }
+ LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
+ let mut object = json!({
+ "Node Type": "Sort",
+ "Sort Key": expr_vec_fmt!(expr),
+ });
+
+ if let Some(fetch) = fetch {
+ object["Fetch"] =
serde_json::Value::Number((*fetch).into());
+ }
+
+ object
+ }
+ LogicalPlan::Join(Join {
+ on: ref keys,
+ filter,
+ join_constraint,
+ join_type,
+ ..
+ }) => {
+ let join_expr: Vec<String> =
+ keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
+ let filter_expr = filter
+ .as_ref()
+ .map(|expr| format!(" Filter: {expr}"))
+ .unwrap_or_else(|| "".to_string());
+ json!({
+ "Node Type": format!("{} Join", join_type),
+ "Join Constraint": format!("{:?}", join_constraint),
+ "Join Keys": join_expr.join(", "),
+ "Filter": format!("{}", filter_expr)
+ })
+ }
+ LogicalPlan::CrossJoin(_) => {
+ json!({
+ "Node Type": "Cross Join"
+ })
+ }
+ LogicalPlan::Repartition(Repartition {
+ partitioning_scheme,
+ ..
+ }) => match partitioning_scheme {
+ Partitioning::RoundRobinBatch(n) => {
+ json!({
+ "Node Type": "Repartition",
+ "Partitioning Scheme": "RoundRobinBatch",
+ "Partition Count": n
+ })
+ }
+ Partitioning::Hash(expr, n) => {
+ let hash_expr: Vec<String> =
+ expr.iter().map(|e| format!("{e}")).collect();
+
+ json!({
+ "Node Type": "Repartition",
+ "Partitioning Scheme": "Hash",
+ "Partition Count": n,
+ "Partitioning Key": hash_expr
+ })
+ }
+ Partitioning::DistributeBy(expr) => {
+ let dist_by_expr: Vec<String> =
+ expr.iter().map(|e| format!("{e}")).collect();
+ json!({
+ "Node Type": "Repartition",
+ "Partitioning Scheme": "DistributeBy",
+ "Partitioning Key": dist_by_expr
+ })
+ }
+ },
+ LogicalPlan::Limit(Limit {
+ ref skip,
+ ref fetch,
+ ..
+ }) => {
+ let mut object = serde_json::json!(
+ {
+ "Node Type": "Limit",
+ "Skip": skip,
+ }
+ );
+ if let Some(f) = fetch {
+ object["Fetch"] = serde_json::Value::Number((*f).into());
+ };
+ object
+ }
+ LogicalPlan::Subquery(Subquery { .. }) => {
+ json!({
+ "Node Type": "Subquery"
+ })
+ }
+ LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
+ json!({
+ "Node Type": "Subquery",
+ "Alias": alias.table(),
+ })
+ }
+ LogicalPlan::Statement(statement) => {
+ json!({
+ "Node Type": "Statement",
+ "Statement": format!("{}", statement.display())
+ })
+ }
+ LogicalPlan::Distinct(distinct) => match distinct {
+ Distinct::All(_) => {
+ json!({
+ "Node Type": "DistinctAll"
+ })
+ }
+ Distinct::On(DistinctOn {
+ on_expr,
+ select_expr,
+ sort_expr,
+ ..
+ }) => {
+ let mut object = json!({
+ "Node Type": "DistinctOn",
+ "On": expr_vec_fmt!(on_expr),
+ "Select": expr_vec_fmt!(select_expr),
+ });
+ if let Some(sort_expr) = sort_expr {
+ object["Sort"] = serde_json::Value::String(
+ expr_vec_fmt!(sort_expr).to_string(),
+ );
+ }
+
+ object
+ }
+ },
+ LogicalPlan::Explain { .. } => {
+ json!({
+ "Node Type": "Explain"
+ })
+ }
+ LogicalPlan::Analyze { .. } => {
+ json!({
+ "Node Type": "Analyze"
+ })
+ }
+ LogicalPlan::Union(_) => {
+ json!({
+ "Node Type": "Union"
+ })
+ }
+ LogicalPlan::Extension(e) => {
+ json!({
+ "Node Type": e.node.name(),
+ "Detail": format!("{:?}", e.node)
+ })
+ }
+ LogicalPlan::Prepare(Prepare {
+ name, data_types, ..
+ }) => {
+ json!({
+ "Node Type": "Prepare",
+ "Name": name,
+ "Data Types": format!("{:?}", data_types)
+ })
+ }
+ LogicalPlan::DescribeTable(DescribeTable { .. }) => {
+ json!({
+ "Node Type": "DescribeTable"
+ })
+ }
+ LogicalPlan::Unnest(Unnest { column, .. }) => {
+ json!({
+ "Node Type": "Unnest",
+ "Column": format!("{}", column)
+ })
+ }
+ }
+ }
+}
+
+impl<'a, 'b> TreeNodeVisitor for PgJsonVisitor<'a, 'b> {
+ type Node = LogicalPlan;
+
+ fn f_down(
+ &mut self,
+ node: &LogicalPlan,
+ ) -> datafusion_common::Result<TreeNodeRecursion> {
+ let id = self.next_id;
+ self.next_id += 1;
+ let mut object = Self::to_json_value(node);
+
+ object["Plans"] = serde_json::Value::Array(vec![]);
+
+ if self.with_schema {
+ object["Output"] = serde_json::Value::Array(
+ node.schema()
+ .fields()
+ .iter()
+ .map(|f| f.name().to_string())
+ .map(serde_json::Value::String)
+ .collect(),
+ );
+ };
+
+ self.objects.insert(id, object);
+ self.parent_ids.push(id);
+ Ok(TreeNodeRecursion::Continue)
+ }
+
+ fn f_up(
+ &mut self,
+ _node: &Self::Node,
+ ) -> datafusion_common::Result<TreeNodeRecursion> {
+ let id = self.parent_ids.pop().unwrap();
+
+ let current_node = self.objects.remove(&id).expect("Missing current
node!");
Review Comment:
I suggest we make this an internal error to avoid panic's
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]