This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 78def883fc Don't optimize AnalyzeExec (#6379) (try 2) (#6494)
78def883fc is described below
commit 78def883fc05d0c48c1983b9423836d4a017fc53
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed May 31 15:32:09 2023 -0400
Don't optimize AnalyzeExec (#6379) (try 2) (#6494)
* Don't optimize AnalyzeExec (#6379)
* Rewrite explain analyze to handle arbitrary inputs
* Make it clear some inputs are ignored
* Fix builds fail with error: symbol init___rust_ctor___ctor is already
defined #6495
* Simplify AnalyzeExec
---------
Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
.../src/physical_optimizer/pipeline_checker.rs | 2 +-
datafusion/core/src/physical_plan/analyze.rs | 187 +++++++++++----------
datafusion/core/src/physical_plan/planner.rs | 12 +-
datafusion/core/tests/sql/explain_analyze.rs | 20 ++-
4 files changed, 125 insertions(+), 96 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
index 1421e25be7..b12c4ef93f 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -387,7 +387,7 @@ mod sql_tests {
};
let test2 = UnaryTestCase {
source_type: SourceType::Unbounded,
- expect_fail: true,
+ expect_fail: false,
};
let case = QueryCase {
sql: "EXPLAIN ANALYZE SELECT * FROM test".to_string(),
diff --git a/datafusion/core/src/physical_plan/analyze.rs
b/datafusion/core/src/physical_plan/analyze.rs
index 08b5bb34ed..84d74c512b 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -29,9 +29,11 @@ use crate::{
};
use arrow::{array::StringBuilder, datatypes::SchemaRef,
record_batch::RecordBatch};
use futures::StreamExt;
+use tokio::task::JoinSet;
use super::expressions::PhysicalSortExpr;
-use super::{stream::RecordBatchReceiverStream, Distribution,
SendableRecordBatchStream};
+use super::stream::RecordBatchStreamAdapter;
+use super::{Distribution, SendableRecordBatchStream};
use crate::execution::context::TaskContext;
/// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
@@ -71,23 +73,18 @@ impl ExecutionPlan for AnalyzeExec {
vec![self.input.clone()]
}
- /// Specifies we want the input as a single stream
+ /// AnalyzeExec is handled specially so this value is ignored
fn required_input_distribution(&self) -> Vec<Distribution> {
- vec![Distribution::SinglePartition]
+ vec![]
}
/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
- fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
- if children[0] {
- Err(DataFusionError::Plan(
- "Analyze Error: Analysis is not supported for unbounded inputs"
- .to_string(),
- ))
- } else {
- Ok(false)
- }
+ fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
+ Err(DataFusionError::Internal(
+ "Optimization not supported for ANALYZE".to_string(),
+ ))
}
/// Get the output partitioning of this plan
@@ -121,96 +118,62 @@ impl ExecutionPlan for AnalyzeExec {
)));
}
- // should be ensured by `SinglePartition` above
- let input_partitions =
self.input.output_partitioning().partition_count();
- if input_partitions != 1 {
- return Err(DataFusionError::Internal(format!(
- "AnalyzeExec invalid number of input partitions. Expected 1,
got {input_partitions}"
- )));
+ // Gather futures that will run each input partition in
+ // parallel (on a separate tokio task) using a JoinSet to
+ // cancel outstanding futures on drop
+ let mut set = JoinSet::new();
+ let num_input_partitions =
self.input.output_partitioning().partition_count();
+
+ for input_partition in 0..num_input_partitions {
+ let input_stream = self.input.execute(input_partition,
context.clone());
+
+ set.spawn(async move {
+ let mut total_rows = 0;
+ let mut input_stream = input_stream?;
+ while let Some(batch) = input_stream.next().await {
+ let batch = batch?;
+ total_rows += batch.num_rows();
+ }
+ Ok(total_rows) as Result<usize>
+ });
}
- let (tx, rx) = tokio::sync::mpsc::channel(input_partitions);
-
+ let start = Instant::now();
let captured_input = self.input.clone();
- let mut input_stream = captured_input.execute(0, context)?;
let captured_schema = self.schema.clone();
let verbose = self.verbose;
- // Task reads batches the input and when complete produce a
- // RecordBatch with a report that is written to `tx` when done
- let join_handle = tokio::task::spawn(async move {
- let start = Instant::now();
+ // future that gathers the results from all the tasks in the
+ // JoinSet that computes the overall row count and final
+ // record batch
+ let output = async move {
let mut total_rows = 0;
-
- // Note the code below ignores errors sending on tx. An
- // error sending means the plan is being torn down and
- // nothing is left that will handle the error (aka no one
- // will hear us scream)
- while let Some(b) = input_stream.next().await {
- match b {
- Ok(batch) => {
- total_rows += batch.num_rows();
- }
- b @ Err(_) => {
- // try and pass on errors from input
- if tx.send(b).await.is_err() {
- // receiver hung up, stop executing (no
- // one will look at any further results we
- // send)
- return;
- }
+ while let Some(res) = set.join_next().await {
+ // translate join errors (aka task panic's) into
ExecutionErrors
+ match res {
+ Ok(row_count) => total_rows += row_count?,
+ Err(e) => {
+ return Err(DataFusionError::Execution(format!(
+ "Join error in AnalyzeExec: {e}"
+ )))
}
}
}
- let end = Instant::now();
-
- let mut type_builder = StringBuilder::with_capacity(1, 1024);
- let mut plan_builder = StringBuilder::with_capacity(1, 1024);
-
- // TODO use some sort of enum rather than strings?
- type_builder.append_value("Plan with Metrics");
-
- let annotated_plan =
- DisplayableExecutionPlan::with_metrics(captured_input.as_ref())
- .indent()
- .to_string();
- plan_builder.append_value(annotated_plan);
-
- // Verbose output
- // TODO make this more sophisticated
- if verbose {
- type_builder.append_value("Plan with Full Metrics");
-
- let annotated_plan =
-
DisplayableExecutionPlan::with_full_metrics(captured_input.as_ref())
- .indent()
- .to_string();
- plan_builder.append_value(annotated_plan);
-
- type_builder.append_value("Output Rows");
- plan_builder.append_value(total_rows.to_string());
-
- type_builder.append_value("Duration");
- plan_builder.append_value(format!("{:?}", end - start));
- }
- let maybe_batch = RecordBatch::try_new(
+ let duration = Instant::now() - start;
+ create_output_batch(
+ verbose,
+ total_rows,
+ duration,
+ captured_input,
captured_schema,
- vec![
- Arc::new(type_builder.finish()),
- Arc::new(plan_builder.finish()),
- ],
)
- .map_err(Into::into);
- // again ignore error
- tx.send(maybe_batch).await.ok();
- });
-
- Ok(RecordBatchReceiverStream::create(
- &self.schema,
- rx,
- join_handle,
- ))
+ };
+
+ Ok(Box::pin(RecordBatchStreamAdapter::new(
+ self.schema.clone(),
+ futures::stream::once(output),
+ )))
}
fn fmt_as(
@@ -231,6 +194,52 @@ impl ExecutionPlan for AnalyzeExec {
}
}
+/// Creates the ouput of AnalyzeExec as a RecordBatch
+fn create_output_batch(
+ verbose: bool,
+ total_rows: usize,
+ duration: std::time::Duration,
+ input: Arc<dyn ExecutionPlan>,
+ schema: SchemaRef,
+) -> Result<RecordBatch> {
+ let mut type_builder = StringBuilder::with_capacity(1, 1024);
+ let mut plan_builder = StringBuilder::with_capacity(1, 1024);
+
+ // TODO use some sort of enum rather than strings?
+ type_builder.append_value("Plan with Metrics");
+
+ let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
+ .indent()
+ .to_string();
+ plan_builder.append_value(annotated_plan);
+
+ // Verbose output
+ // TODO make this more sophisticated
+ if verbose {
+ type_builder.append_value("Plan with Full Metrics");
+
+ let annotated_plan =
DisplayableExecutionPlan::with_full_metrics(input.as_ref())
+ .indent()
+ .to_string();
+ plan_builder.append_value(annotated_plan);
+
+ type_builder.append_value("Output Rows");
+ plan_builder.append_value(total_rows.to_string());
+
+ type_builder.append_value("Duration");
+ plan_builder.append_value(format!("{:?}", duration));
+ }
+
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(type_builder.finish()),
+ Arc::new(plan_builder.finish()),
+ ],
+ )
+ .map_err(DataFusionError::from)
+}
+
#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field, Schema};
diff --git a/datafusion/core/src/physical_plan/planner.rs
b/datafusion/core/src/physical_plan/planner.rs
index 3532a6949a..35b209c7c5 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1192,11 +1192,9 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: Distinct should be replaced
to Aggregate".to_string(),
))
}
- LogicalPlan::Analyze(a) => {
- let input = self.create_initial_plan(&a.input,
session_state).await?;
- let schema = SchemaRef::new((*a.schema).clone().into());
- Ok(Arc::new(AnalyzeExec::new(a.verbose, input, schema)))
- }
+ LogicalPlan::Analyze(_) => Err(DataFusionError::Internal(
+ "Unsupported logical plan: Analyze must be root of the
plan".to_string(),
+ )),
LogicalPlan::Extension(e) => {
let physical_inputs =
self.create_initial_plan_multi(e.node.inputs(), session_state).await?;
@@ -1851,6 +1849,10 @@ impl DefaultPhysicalPlanner {
stringified_plans,
e.verbose,
))))
+ } else if let LogicalPlan::Analyze(a) = logical_plan {
+ let input = self.create_physical_plan(&a.input,
session_state).await?;
+ let schema = SchemaRef::new((*a.schema).clone().into());
+ Ok(Some(Arc::new(AnalyzeExec::new(a.verbose, input, schema))))
} else {
Ok(None)
}
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index 1ab933022d..971dea8128 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -687,13 +687,31 @@ async fn csv_explain_analyze() {
// Only test basic plumbing and try to avoid having to change too
// many things. explain_analyze_baseline_metrics covers the values
// in greater depth
- let needle = "CoalescePartitionsExec, metrics=[output_rows=5,
elapsed_compute=";
+ let needle = "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[COUNT(UInt8(1))], metrics=[output_rows=5";
assert_contains!(&formatted, needle);
let verbose_needle = "Output Rows";
assert_not_contains!(formatted, verbose_needle);
}
+#[tokio::test]
+#[cfg_attr(tarpaulin, ignore)]
+async fn csv_explain_analyze_order_by() {
+ let ctx = SessionContext::new();
+ register_aggregate_csv_by_sql(&ctx).await;
+ let sql = "EXPLAIN ANALYZE SELECT c1 FROM aggregate_test_100 order by c1";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let formatted = arrow::util::pretty::pretty_format_batches(&actual)
+ .unwrap()
+ .to_string();
+
+ // Ensure that the ordering is not optimized away from the plan
+ // https://github.com/apache/arrow-datafusion/issues/6379
+ let needle =
+ "SortExec: expr=[c1@0 ASC NULLS LAST], metrics=[output_rows=100,
elapsed_compute";
+ assert_contains!(&formatted, needle);
+}
+
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn parquet_explain_analyze() {