This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 371c240  Update Ballista to use new physical plan formatter utility 
(#344)
371c240 is described below

commit 371c240821f5639c364abe3bfb95056e62e477cc
Author: Andy Grove <[email protected]>
AuthorDate: Wed May 19 09:52:23 2021 -0600

    Update Ballista to use new physical plan formatter utility (#344)
---
 ballista/rust/core/src/utils.rs              | 95 ----------------------------
 ballista/rust/executor/src/flight_service.rs |  6 +-
 ballista/rust/scheduler/src/planner.rs       |  5 +-
 3 files changed, 5 insertions(+), 101 deletions(-)

diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 55541d5..dc570f8 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -102,101 +102,6 @@ pub async fn collect_stream(
     Ok(batches)
 }
 
-pub fn format_plan(plan: &dyn ExecutionPlan, indent: usize) -> Result<String> {
-    let operator_str =
-        if let Some(exec) = plan.as_any().downcast_ref::<HashAggregateExec>() {
-            format!(
-                "HashAggregateExec: groupBy={:?}, aggrExpr={:?}",
-                exec.group_expr()
-                    .iter()
-                    .map(|e| format_expr(e.0.as_ref()))
-                    .collect::<Vec<String>>(),
-                exec.aggr_expr()
-                    .iter()
-                    .map(|e| format_agg_expr(e.as_ref()))
-                    .collect::<Result<Vec<String>>>()?
-            )
-        } else if let Some(exec) = 
plan.as_any().downcast_ref::<HashJoinExec>() {
-            format!(
-                "HashJoinExec: joinType={:?}, on={:?}",
-                exec.join_type(),
-                exec.on()
-            )
-        } else if let Some(exec) = plan.as_any().downcast_ref::<ParquetExec>() 
{
-            let mut num_files = 0;
-            for part in exec.partitions() {
-                num_files += part.filenames().len();
-            }
-            format!(
-                "ParquetExec: partitions={}, files={}",
-                exec.partitions().len(),
-                num_files
-            )
-        } else if let Some(exec) = plan.as_any().downcast_ref::<CsvExec>() {
-            format!(
-                "CsvExec: {}; partitions={}",
-                &exec.path(),
-                exec.output_partitioning().partition_count()
-            )
-        } else if let Some(exec) = plan.as_any().downcast_ref::<FilterExec>() {
-            format!("FilterExec: {}", format_expr(exec.predicate().as_ref()))
-        } else if let Some(exec) = 
plan.as_any().downcast_ref::<QueryStageExec>() {
-            format!(
-                "QueryStageExec: job={}, stage={}",
-                exec.job_id, exec.stage_id
-            )
-        } else if let Some(exec) = 
plan.as_any().downcast_ref::<UnresolvedShuffleExec>() {
-            format!("UnresolvedShuffleExec: stages={:?}", exec.query_stage_ids)
-        } else if let Some(exec) = 
plan.as_any().downcast_ref::<CoalesceBatchesExec>() {
-            format!(
-                "CoalesceBatchesExec: batchSize={}",
-                exec.target_batch_size()
-            )
-        } else if plan.as_any().downcast_ref::<MergeExec>().is_some() {
-            "MergeExec".to_string()
-        } else {
-            let str = format!("{:?}", plan);
-            String::from(&str[0..120])
-        };
-
-    let children_str = plan
-        .children()
-        .iter()
-        .map(|c| format_plan(c.as_ref(), indent + 1))
-        .collect::<Result<Vec<String>>>()?
-        .join("\n");
-
-    let indent_str = "  ".repeat(indent);
-    if plan.children().is_empty() {
-        Ok(format!("{}{}{}", indent_str, &operator_str, children_str))
-    } else {
-        Ok(format!("{}{}\n{}", indent_str, &operator_str, children_str))
-    }
-}
-
-pub fn format_agg_expr(expr: &dyn AggregateExpr) -> Result<String> {
-    Ok(format!(
-        "{} {:?}",
-        expr.field()?.name(),
-        expr.expressions()
-            .iter()
-            .map(|e| format_expr(e.as_ref()))
-            .collect::<Vec<String>>()
-    ))
-}
-
-pub fn format_expr(expr: &dyn PhysicalExpr) -> String {
-    if let Some(e) = expr.as_any().downcast_ref::<Column>() {
-        e.name().to_string()
-    } else if let Some(e) = expr.as_any().downcast_ref::<Literal>() {
-        e.to_string()
-    } else if let Some(e) = expr.as_any().downcast_ref::<BinaryExpr>() {
-        format!("{} {} {}", e.left(), e.op(), e.right())
-    } else {
-        format!("{}", expr)
-    }
-}
-
 pub fn produce_diagram(filename: &str, stages: &[Arc<QueryStageExec>]) -> 
Result<()> {
     let write_file = File::create(filename)?;
     let mut w = BufWriter::new(&write_file);
diff --git a/ballista/rust/executor/src/flight_service.rs 
b/ballista/rust/executor/src/flight_service.rs
index 115e1ab..62aaf7f 100644
--- a/ballista/rust/executor/src/flight_service.rs
+++ b/ballista/rust/executor/src/flight_service.rs
@@ -26,7 +26,7 @@ use std::time::Instant;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::decode_protobuf;
 use ballista_core::serde::scheduler::{Action as BallistaAction, 
PartitionStats};
-use ballista_core::utils::{self, format_plan};
+use ballista_core::utils;
 
 use arrow::array::{ArrayRef, StringBuilder};
 use arrow::datatypes::{DataType, Field, Schema};
@@ -40,6 +40,7 @@ use arrow_flight::{
     PutResult, SchemaResult, Ticket,
 };
 use datafusion::error::DataFusionError;
+use datafusion::physical_plan::displayable;
 use futures::{Stream, StreamExt};
 use log::{info, warn};
 use std::io::{Read, Seek};
@@ -97,8 +98,7 @@ impl FlightService for BallistaFlightService {
                     partition.job_id,
                     partition.stage_id,
                     partition.partition_id,
-                    format_plan(partition.plan.as_ref(), 0)
-                        .map_err(|e| from_ballista_err(&e))?
+                    displayable(partition.plan.as_ref()).indent().to_string()
                 );
 
                 let mut tasks: Vec<JoinHandle<Result<_, BallistaError>>> = 
vec![];
diff --git a/ballista/rust/scheduler/src/planner.rs 
b/ballista/rust/scheduler/src/planner.rs
index b81d7de..2f01e73 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -235,10 +235,9 @@ mod test {
     use ballista_core::error::BallistaError;
     use ballista_core::execution_plans::UnresolvedShuffleExec;
     use ballista_core::serde::protobuf;
-    use ballista_core::utils::format_plan;
     use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
     use datafusion::physical_plan::sort::SortExec;
-    use datafusion::physical_plan::ExecutionPlan;
+    use datafusion::physical_plan::{displayable, ExecutionPlan};
     use datafusion::physical_plan::{merge::MergeExec, 
projection::ProjectionExec};
     use std::convert::TryInto;
     use std::sync::Arc;
@@ -270,7 +269,7 @@ mod test {
         let job_uuid = Uuid::new_v4();
         let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
         for stage in &stages {
-            println!("{}", format_plan(stage.as_ref(), 0)?);
+            println!("{}", displayable(stage.as_ref()).indent().to_string());
         }
 
         /* Expected result:

Reply via email to