This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 371c240 Update Ballista to use new physical plan formatter utility
(#344)
371c240 is described below
commit 371c240821f5639c364abe3bfb95056e62e477cc
Author: Andy Grove <[email protected]>
AuthorDate: Wed May 19 09:52:23 2021 -0600
Update Ballista to use new physical plan formatter utility (#344)
---
ballista/rust/core/src/utils.rs | 95 ----------------------------
ballista/rust/executor/src/flight_service.rs | 6 +-
ballista/rust/scheduler/src/planner.rs | 5 +-
3 files changed, 5 insertions(+), 101 deletions(-)
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 55541d5..dc570f8 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -102,101 +102,6 @@ pub async fn collect_stream(
Ok(batches)
}
-pub fn format_plan(plan: &dyn ExecutionPlan, indent: usize) -> Result<String> {
- let operator_str =
- if let Some(exec) = plan.as_any().downcast_ref::<HashAggregateExec>() {
- format!(
- "HashAggregateExec: groupBy={:?}, aggrExpr={:?}",
- exec.group_expr()
- .iter()
- .map(|e| format_expr(e.0.as_ref()))
- .collect::<Vec<String>>(),
- exec.aggr_expr()
- .iter()
- .map(|e| format_agg_expr(e.as_ref()))
- .collect::<Result<Vec<String>>>()?
- )
- } else if let Some(exec) =
plan.as_any().downcast_ref::<HashJoinExec>() {
- format!(
- "HashJoinExec: joinType={:?}, on={:?}",
- exec.join_type(),
- exec.on()
- )
- } else if let Some(exec) = plan.as_any().downcast_ref::<ParquetExec>()
{
- let mut num_files = 0;
- for part in exec.partitions() {
- num_files += part.filenames().len();
- }
- format!(
- "ParquetExec: partitions={}, files={}",
- exec.partitions().len(),
- num_files
- )
- } else if let Some(exec) = plan.as_any().downcast_ref::<CsvExec>() {
- format!(
- "CsvExec: {}; partitions={}",
- &exec.path(),
- exec.output_partitioning().partition_count()
- )
- } else if let Some(exec) = plan.as_any().downcast_ref::<FilterExec>() {
- format!("FilterExec: {}", format_expr(exec.predicate().as_ref()))
- } else if let Some(exec) =
plan.as_any().downcast_ref::<QueryStageExec>() {
- format!(
- "QueryStageExec: job={}, stage={}",
- exec.job_id, exec.stage_id
- )
- } else if let Some(exec) =
plan.as_any().downcast_ref::<UnresolvedShuffleExec>() {
- format!("UnresolvedShuffleExec: stages={:?}", exec.query_stage_ids)
- } else if let Some(exec) =
plan.as_any().downcast_ref::<CoalesceBatchesExec>() {
- format!(
- "CoalesceBatchesExec: batchSize={}",
- exec.target_batch_size()
- )
- } else if plan.as_any().downcast_ref::<MergeExec>().is_some() {
- "MergeExec".to_string()
- } else {
- let str = format!("{:?}", plan);
- String::from(&str[0..120])
- };
-
- let children_str = plan
- .children()
- .iter()
- .map(|c| format_plan(c.as_ref(), indent + 1))
- .collect::<Result<Vec<String>>>()?
- .join("\n");
-
- let indent_str = " ".repeat(indent);
- if plan.children().is_empty() {
- Ok(format!("{}{}{}", indent_str, &operator_str, children_str))
- } else {
- Ok(format!("{}{}\n{}", indent_str, &operator_str, children_str))
- }
-}
-
-pub fn format_agg_expr(expr: &dyn AggregateExpr) -> Result<String> {
- Ok(format!(
- "{} {:?}",
- expr.field()?.name(),
- expr.expressions()
- .iter()
- .map(|e| format_expr(e.as_ref()))
- .collect::<Vec<String>>()
- ))
-}
-
-pub fn format_expr(expr: &dyn PhysicalExpr) -> String {
- if let Some(e) = expr.as_any().downcast_ref::<Column>() {
- e.name().to_string()
- } else if let Some(e) = expr.as_any().downcast_ref::<Literal>() {
- e.to_string()
- } else if let Some(e) = expr.as_any().downcast_ref::<BinaryExpr>() {
- format!("{} {} {}", e.left(), e.op(), e.right())
- } else {
- format!("{}", expr)
- }
-}
-
pub fn produce_diagram(filename: &str, stages: &[Arc<QueryStageExec>]) ->
Result<()> {
let write_file = File::create(filename)?;
let mut w = BufWriter::new(&write_file);
diff --git a/ballista/rust/executor/src/flight_service.rs
b/ballista/rust/executor/src/flight_service.rs
index 115e1ab..62aaf7f 100644
--- a/ballista/rust/executor/src/flight_service.rs
+++ b/ballista/rust/executor/src/flight_service.rs
@@ -26,7 +26,7 @@ use std::time::Instant;
use ballista_core::error::BallistaError;
use ballista_core::serde::decode_protobuf;
use ballista_core::serde::scheduler::{Action as BallistaAction,
PartitionStats};
-use ballista_core::utils::{self, format_plan};
+use ballista_core::utils;
use arrow::array::{ArrayRef, StringBuilder};
use arrow::datatypes::{DataType, Field, Schema};
@@ -40,6 +40,7 @@ use arrow_flight::{
PutResult, SchemaResult, Ticket,
};
use datafusion::error::DataFusionError;
+use datafusion::physical_plan::displayable;
use futures::{Stream, StreamExt};
use log::{info, warn};
use std::io::{Read, Seek};
@@ -97,8 +98,7 @@ impl FlightService for BallistaFlightService {
partition.job_id,
partition.stage_id,
partition.partition_id,
- format_plan(partition.plan.as_ref(), 0)
- .map_err(|e| from_ballista_err(&e))?
+ displayable(partition.plan.as_ref()).indent().to_string()
);
let mut tasks: Vec<JoinHandle<Result<_, BallistaError>>> =
vec![];
diff --git a/ballista/rust/scheduler/src/planner.rs
b/ballista/rust/scheduler/src/planner.rs
index b81d7de..2f01e73 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -235,10 +235,9 @@ mod test {
use ballista_core::error::BallistaError;
use ballista_core::execution_plans::UnresolvedShuffleExec;
use ballista_core::serde::protobuf;
- use ballista_core::utils::format_plan;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::sort::SortExec;
- use datafusion::physical_plan::ExecutionPlan;
+ use datafusion::physical_plan::{displayable, ExecutionPlan};
use datafusion::physical_plan::{merge::MergeExec,
projection::ProjectionExec};
use std::convert::TryInto;
use std::sync::Arc;
@@ -270,7 +269,7 @@ mod test {
let job_uuid = Uuid::new_v4();
let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
for stage in &stages {
- println!("{}", format_plan(stage.as_ref(), 0)?);
+ println!("{}", displayable(stage.as_ref()).indent().to_string());
}
/* Expected result: