This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 78def883fc Don't optimize AnalyzeExec (#6379) (try 2) (#6494)
78def883fc is described below

commit 78def883fc05d0c48c1983b9423836d4a017fc53
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed May 31 15:32:09 2023 -0400

    Don't optimize AnalyzeExec (#6379) (try 2) (#6494)
    
    * Don't optimize AnalyzeExec (#6379)
    
    * Rewrite explain analyze to handle arbitrary inputs
    
    * Make it clear some inputs are ignored
    
    * Fix builds fail with error: symbol init___rust_ctor___ctor is already 
defined #6495
    
    * Simplify AnalyzeExec
    
    ---------
    
    Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
 .../src/physical_optimizer/pipeline_checker.rs     |   2 +-
 datafusion/core/src/physical_plan/analyze.rs       | 187 +++++++++++----------
 datafusion/core/src/physical_plan/planner.rs       |  12 +-
 datafusion/core/tests/sql/explain_analyze.rs       |  20 ++-
 4 files changed, 125 insertions(+), 96 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs 
b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
index 1421e25be7..b12c4ef93f 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -387,7 +387,7 @@ mod sql_tests {
         };
         let test2 = UnaryTestCase {
             source_type: SourceType::Unbounded,
-            expect_fail: true,
+            expect_fail: false,
         };
         let case = QueryCase {
             sql: "EXPLAIN ANALYZE SELECT * FROM test".to_string(),
diff --git a/datafusion/core/src/physical_plan/analyze.rs 
b/datafusion/core/src/physical_plan/analyze.rs
index 08b5bb34ed..84d74c512b 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -29,9 +29,11 @@ use crate::{
 };
 use arrow::{array::StringBuilder, datatypes::SchemaRef, 
record_batch::RecordBatch};
 use futures::StreamExt;
+use tokio::task::JoinSet;
 
 use super::expressions::PhysicalSortExpr;
-use super::{stream::RecordBatchReceiverStream, Distribution, 
SendableRecordBatchStream};
+use super::stream::RecordBatchStreamAdapter;
+use super::{Distribution, SendableRecordBatchStream};
 use crate::execution::context::TaskContext;
 
 /// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
@@ -71,23 +73,18 @@ impl ExecutionPlan for AnalyzeExec {
         vec![self.input.clone()]
     }
 
-    /// Specifies we want the input as a single stream
+    /// AnalyzeExec is handled specially so this value is ignored
     fn required_input_distribution(&self) -> Vec<Distribution> {
-        vec![Distribution::SinglePartition]
+        vec![]
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
     /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.
-    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
-        if children[0] {
-            Err(DataFusionError::Plan(
-                "Analyze Error: Analysis is not supported for unbounded inputs"
-                    .to_string(),
-            ))
-        } else {
-            Ok(false)
-        }
+    fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
+        Err(DataFusionError::Internal(
+            "Optimization not supported for ANALYZE".to_string(),
+        ))
     }
 
     /// Get the output partitioning of this plan
@@ -121,96 +118,62 @@ impl ExecutionPlan for AnalyzeExec {
             )));
         }
 
-        // should be ensured by `SinglePartition`  above
-        let input_partitions = 
self.input.output_partitioning().partition_count();
-        if input_partitions != 1 {
-            return Err(DataFusionError::Internal(format!(
-                "AnalyzeExec invalid number of input partitions. Expected 1, 
got {input_partitions}"
-            )));
+        // Gather futures that will run each input partition in
+        // parallel (on a separate tokio task) using a JoinSet to
+        // cancel outstanding futures on drop
+        let mut set = JoinSet::new();
+        let num_input_partitions = 
self.input.output_partitioning().partition_count();
+
+        for input_partition in 0..num_input_partitions {
+            let input_stream = self.input.execute(input_partition, 
context.clone());
+
+            set.spawn(async move {
+                let mut total_rows = 0;
+                let mut input_stream = input_stream?;
+                while let Some(batch) = input_stream.next().await {
+                    let batch = batch?;
+                    total_rows += batch.num_rows();
+                }
+                Ok(total_rows) as Result<usize>
+            });
         }
 
-        let (tx, rx) = tokio::sync::mpsc::channel(input_partitions);
-
+        let start = Instant::now();
         let captured_input = self.input.clone();
-        let mut input_stream = captured_input.execute(0, context)?;
         let captured_schema = self.schema.clone();
         let verbose = self.verbose;
 
-        // Task reads batches the input and when complete produce a
-        // RecordBatch with a report that is written to `tx` when done
-        let join_handle = tokio::task::spawn(async move {
-            let start = Instant::now();
+        // future that gathers the results from all the tasks in the
+        // JoinSet that computes the overall row count and final
+        // record batch
+        let output = async move {
             let mut total_rows = 0;
-
-            // Note the code below ignores errors sending on tx. An
-            // error sending means the plan is being torn down and
-            // nothing is left that will handle the error (aka no one
-            // will hear us scream)
-            while let Some(b) = input_stream.next().await {
-                match b {
-                    Ok(batch) => {
-                        total_rows += batch.num_rows();
-                    }
-                    b @ Err(_) => {
-                        // try and pass on errors from input
-                        if tx.send(b).await.is_err() {
-                            // receiver hung up, stop executing (no
-                            // one will look at any further results we
-                            // send)
-                            return;
-                        }
+            while let Some(res) = set.join_next().await {
+                // translate join errors (aka task panic's) into 
ExecutionErrors
+                match res {
+                    Ok(row_count) => total_rows += row_count?,
+                    Err(e) => {
+                        return Err(DataFusionError::Execution(format!(
+                            "Join error in AnalyzeExec: {e}"
+                        )))
                     }
                 }
             }
-            let end = Instant::now();
-
-            let mut type_builder = StringBuilder::with_capacity(1, 1024);
-            let mut plan_builder = StringBuilder::with_capacity(1, 1024);
-
-            // TODO use some sort of enum rather than strings?
-            type_builder.append_value("Plan with Metrics");
-
-            let annotated_plan =
-                DisplayableExecutionPlan::with_metrics(captured_input.as_ref())
-                    .indent()
-                    .to_string();
-            plan_builder.append_value(annotated_plan);
-
-            // Verbose output
-            // TODO make this more sophisticated
-            if verbose {
-                type_builder.append_value("Plan with Full Metrics");
-
-                let annotated_plan =
-                    
DisplayableExecutionPlan::with_full_metrics(captured_input.as_ref())
-                        .indent()
-                        .to_string();
-                plan_builder.append_value(annotated_plan);
-
-                type_builder.append_value("Output Rows");
-                plan_builder.append_value(total_rows.to_string());
-
-                type_builder.append_value("Duration");
-                plan_builder.append_value(format!("{:?}", end - start));
-            }
 
-            let maybe_batch = RecordBatch::try_new(
+            let duration = Instant::now() - start;
+            create_output_batch(
+                verbose,
+                total_rows,
+                duration,
+                captured_input,
                 captured_schema,
-                vec![
-                    Arc::new(type_builder.finish()),
-                    Arc::new(plan_builder.finish()),
-                ],
             )
-            .map_err(Into::into);
-            // again ignore error
-            tx.send(maybe_batch).await.ok();
-        });
-
-        Ok(RecordBatchReceiverStream::create(
-            &self.schema,
-            rx,
-            join_handle,
-        ))
+        };
+
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            self.schema.clone(),
+            futures::stream::once(output),
+        )))
     }
 
     fn fmt_as(
@@ -231,6 +194,52 @@ impl ExecutionPlan for AnalyzeExec {
     }
 }
 
+/// Creates the ouput of AnalyzeExec as a RecordBatch
+fn create_output_batch(
+    verbose: bool,
+    total_rows: usize,
+    duration: std::time::Duration,
+    input: Arc<dyn ExecutionPlan>,
+    schema: SchemaRef,
+) -> Result<RecordBatch> {
+    let mut type_builder = StringBuilder::with_capacity(1, 1024);
+    let mut plan_builder = StringBuilder::with_capacity(1, 1024);
+
+    // TODO use some sort of enum rather than strings?
+    type_builder.append_value("Plan with Metrics");
+
+    let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
+        .indent()
+        .to_string();
+    plan_builder.append_value(annotated_plan);
+
+    // Verbose output
+    // TODO make this more sophisticated
+    if verbose {
+        type_builder.append_value("Plan with Full Metrics");
+
+        let annotated_plan = 
DisplayableExecutionPlan::with_full_metrics(input.as_ref())
+            .indent()
+            .to_string();
+        plan_builder.append_value(annotated_plan);
+
+        type_builder.append_value("Output Rows");
+        plan_builder.append_value(total_rows.to_string());
+
+        type_builder.append_value("Duration");
+        plan_builder.append_value(format!("{:?}", duration));
+    }
+
+    RecordBatch::try_new(
+        schema,
+        vec![
+            Arc::new(type_builder.finish()),
+            Arc::new(plan_builder.finish()),
+        ],
+    )
+    .map_err(DataFusionError::from)
+}
+
 #[cfg(test)]
 mod tests {
     use arrow::datatypes::{DataType, Field, Schema};
diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index 3532a6949a..35b209c7c5 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1192,11 +1192,9 @@ impl DefaultPhysicalPlanner {
                         "Unsupported logical plan: Distinct should be replaced 
to Aggregate".to_string(),
                     ))
                 }
-                LogicalPlan::Analyze(a) => {
-                    let input = self.create_initial_plan(&a.input, 
session_state).await?;
-                    let schema = SchemaRef::new((*a.schema).clone().into());
-                    Ok(Arc::new(AnalyzeExec::new(a.verbose, input, schema)))
-                }
+                LogicalPlan::Analyze(_) => Err(DataFusionError::Internal(
+                    "Unsupported logical plan: Analyze must be root of the 
plan".to_string(),
+                )),
                 LogicalPlan::Extension(e) => {
                     let physical_inputs = 
self.create_initial_plan_multi(e.node.inputs(), session_state).await?;
 
@@ -1851,6 +1849,10 @@ impl DefaultPhysicalPlanner {
                 stringified_plans,
                 e.verbose,
             ))))
+        } else if let LogicalPlan::Analyze(a) = logical_plan {
+            let input = self.create_physical_plan(&a.input, 
session_state).await?;
+            let schema = SchemaRef::new((*a.schema).clone().into());
+            Ok(Some(Arc::new(AnalyzeExec::new(a.verbose, input, schema))))
         } else {
             Ok(None)
         }
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index 1ab933022d..971dea8128 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -687,13 +687,31 @@ async fn csv_explain_analyze() {
     // Only test basic plumbing and try to avoid having to change too
     // many things. explain_analyze_baseline_metrics covers the values
     // in greater depth
-    let needle = "CoalescePartitionsExec, metrics=[output_rows=5, 
elapsed_compute=";
+    let needle = "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[COUNT(UInt8(1))], metrics=[output_rows=5";
     assert_contains!(&formatted, needle);
 
     let verbose_needle = "Output Rows";
     assert_not_contains!(formatted, verbose_needle);
 }
 
+#[tokio::test]
+#[cfg_attr(tarpaulin, ignore)]
+async fn csv_explain_analyze_order_by() {
+    let ctx = SessionContext::new();
+    register_aggregate_csv_by_sql(&ctx).await;
+    let sql = "EXPLAIN ANALYZE SELECT c1 FROM aggregate_test_100 order by c1";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let formatted = arrow::util::pretty::pretty_format_batches(&actual)
+        .unwrap()
+        .to_string();
+
+    // Ensure that the ordering is not optimized away from the plan
+    // https://github.com/apache/arrow-datafusion/issues/6379
+    let needle =
+        "SortExec: expr=[c1@0 ASC NULLS LAST], metrics=[output_rows=100, 
elapsed_compute";
+    assert_contains!(&formatted, needle);
+}
+
 #[tokio::test]
 #[cfg_attr(tarpaulin, ignore)]
 async fn parquet_explain_analyze() {

Reply via email to