This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d8a5cf  Add baseline execution stats to `WindowAggExec` and 
`UnionExec`, and fixup `CoalescePartitionsExec` (#1018)
7d8a5cf is described below

commit 7d8a5cf9972d601c4bb7de533e6ab7ffcd85e0ad
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Sep 18 08:02:04 2021 -0400

    Add baseline execution stats to `WindowAggExec` and `UnionExec`, and fixup 
`CoalescePartitionsExec` (#1018)
    
    * Record compute time for CoalescePartitionsExec
    
    * Instrument WindowAggExec, UnionExec
    
    Fixup tests and implelementation
    
    * Use consistent elapsed_compute time
    
    * Fixup test
---
 .../src/physical_plan/coalesce_partitions.rs       | 10 +++-
 datafusion/src/physical_plan/union.rs              | 65 ++++++++++++++++++++--
 .../src/physical_plan/windows/window_agg_exec.rs   | 51 ++++++++++++++---
 datafusion/tests/sql.rs                            | 59 ++++++++++++++------
 4 files changed, 151 insertions(+), 34 deletions(-)

diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs 
b/datafusion/src/physical_plan/coalesce_partitions.rs
index 0d2cc89..329edcb 100644
--- a/datafusion/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/src/physical_plan/coalesce_partitions.rs
@@ -97,8 +97,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
     }
 
     async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream> {
-        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
-
         // CoalescePartitionsExec produces a single partition
         if 0 != partition {
             return Err(DataFusionError::Internal(format!(
@@ -113,10 +111,16 @@ impl ExecutionPlan for CoalescePartitionsExec {
                 "CoalescePartitionsExec requires at least one input 
partition".to_owned(),
             )),
             1 => {
-                // bypass any threading if there is a single partition
+                // bypass any threading / metrics if there is a single 
partition
                 self.input.execute(0).await
             }
             _ => {
+                let baseline_metrics = BaselineMetrics::new(&self.metrics, 
partition);
+                // record the (very) minimal work done so that
+                // elapsed_compute is not reported as 0
+                let elapsed_compute = 
baseline_metrics.elapsed_compute().clone();
+                let _timer = elapsed_compute.timer();
+
                 // use a stream that allows each sender to put in at
                 // least one result in an attempt to maximize
                 // parallelism.
diff --git a/datafusion/src/physical_plan/union.rs 
b/datafusion/src/physical_plan/union.rs
index f30cd57..a2f5952 100644
--- a/datafusion/src/physical_plan/union.rs
+++ b/datafusion/src/physical_plan/union.rs
@@ -23,13 +23,18 @@
 
 use std::{any::Any, sync::Arc};
 
-use arrow::datatypes::SchemaRef;
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use futures::StreamExt;
 
 use super::{
-    ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning,
+    metrics::{ExecutionPlanMetricsSet, MetricsSet},
+    ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, 
RecordBatchStream,
     SendableRecordBatchStream, Statistics,
 };
-use crate::{error::Result, physical_plan::expressions};
+use crate::{
+    error::Result,
+    physical_plan::{expressions, metrics::BaselineMetrics},
+};
 use async_trait::async_trait;
 
 /// UNION ALL execution plan
@@ -37,12 +42,17 @@ use async_trait::async_trait;
 pub struct UnionExec {
     /// Input execution plan
     inputs: Vec<Arc<dyn ExecutionPlan>>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl UnionExec {
     /// Create a new UnionExec
     pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self {
-        UnionExec { inputs }
+        UnionExec {
+            inputs,
+            metrics: ExecutionPlanMetricsSet::new(),
+        }
     }
 }
 
@@ -82,11 +92,18 @@ impl ExecutionPlan for UnionExec {
     }
 
     async fn execute(&self, mut partition: usize) -> 
Result<SendableRecordBatchStream> {
+        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+        // record the tiny amount of work done in this function so
+        // elapsed_compute is reported as non zero
+        let elapsed_compute = baseline_metrics.elapsed_compute().clone();
+        let _timer = elapsed_compute.timer(); // record on drop
+
         // find partition to execute
         for input in self.inputs.iter() {
             // Calculate whether partition belongs to the current partition
             if partition < input.output_partitioning().partition_count() {
-                return input.execute(partition).await;
+                let stream = input.execute(partition).await?;
+                return Ok(Box::pin(ObservedStream::new(stream, 
baseline_metrics)));
             } else {
                 partition -= input.output_partitioning().partition_count();
             }
@@ -110,6 +127,10 @@ impl ExecutionPlan for UnionExec {
         }
     }
 
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
     fn statistics(&self) -> Statistics {
         self.inputs
             .iter()
@@ -119,6 +140,40 @@ impl ExecutionPlan for UnionExec {
     }
 }
 
+/// Stream wrapper that records `BaselineMetrics` for a particular
+/// partition
+struct ObservedStream {
+    inner: SendableRecordBatchStream,
+    baseline_metrics: BaselineMetrics,
+}
+
+impl ObservedStream {
+    fn new(inner: SendableRecordBatchStream, baseline_metrics: 
BaselineMetrics) -> Self {
+        Self {
+            inner,
+            baseline_metrics,
+        }
+    }
+}
+
+impl RecordBatchStream for ObservedStream {
+    fn schema(&self) -> arrow::datatypes::SchemaRef {
+        self.inner.schema()
+    }
+}
+
+impl futures::Stream for ObservedStream {
+    type Item = arrow::error::Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let poll = self.inner.poll_next_unpin(cx);
+        self.baseline_metrics.record_poll(poll)
+    }
+}
+
 fn col_stats_union(
     mut left: ColumnStatistics,
     right: ColumnStatistics,
diff --git a/datafusion/src/physical_plan/windows/window_agg_exec.rs 
b/datafusion/src/physical_plan/windows/window_agg_exec.rs
index 0524adc..03307be 100644
--- a/datafusion/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/src/physical_plan/windows/window_agg_exec.rs
@@ -18,6 +18,9 @@
 //! Stream and channel implementations for window function expressions.
 
 use crate::error::{DataFusionError, Result};
+use crate::physical_plan::metrics::{
+    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
+};
 use crate::physical_plan::{
     common, ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan,
     Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, 
WindowExpr,
@@ -30,7 +33,7 @@ use arrow::{
 };
 use async_trait::async_trait;
 use futures::stream::Stream;
-use futures::Future;
+use futures::FutureExt;
 use pin_project_lite::pin_project;
 use std::any::Any;
 use std::pin::Pin;
@@ -48,6 +51,8 @@ pub struct WindowAggExec {
     schema: SchemaRef,
     /// Schema before the window
     input_schema: SchemaRef,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl WindowAggExec {
@@ -59,11 +64,12 @@ impl WindowAggExec {
     ) -> Result<Self> {
         let schema = create_schema(&input_schema, &window_expr)?;
         let schema = Arc::new(schema);
-        Ok(WindowAggExec {
+        Ok(Self {
             input,
             window_expr,
             schema,
             input_schema,
+            metrics: ExecutionPlanMetricsSet::new(),
         })
     }
 
@@ -140,6 +146,7 @@ impl ExecutionPlan for WindowAggExec {
             self.schema.clone(),
             self.window_expr.clone(),
             input,
+            BaselineMetrics::new(&self.metrics, partition),
         ));
         Ok(stream)
     }
@@ -163,6 +170,10 @@ impl ExecutionPlan for WindowAggExec {
         Ok(())
     }
 
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
     fn statistics(&self) -> Statistics {
         let input_stat = self.input.statistics();
         let win_cols = self.window_expr.len();
@@ -214,6 +225,7 @@ pin_project! {
       #[pin]
       output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
       finished: bool,
+      baseline_metrics: BaselineMetrics,
   }
 }
 
@@ -223,19 +235,24 @@ impl WindowAggStream {
         schema: SchemaRef,
         window_expr: Vec<Arc<dyn WindowExpr>>,
         input: SendableRecordBatchStream,
+        baseline_metrics: BaselineMetrics,
     ) -> Self {
         let (tx, rx) = futures::channel::oneshot::channel();
         let schema_clone = schema.clone();
+        let elapsed_compute = baseline_metrics.elapsed_compute().clone();
         tokio::spawn(async move {
             let schema = schema_clone.clone();
-            let result = WindowAggStream::process(input, window_expr, 
schema).await;
+            let result =
+                WindowAggStream::process(input, window_expr, schema, 
elapsed_compute)
+                    .await;
             tx.send(result)
         });
 
         Self {
+            schema,
             output: rx,
             finished: false,
-            schema,
+            baseline_metrics,
         }
     }
 
@@ -243,11 +260,16 @@ impl WindowAggStream {
         input: SendableRecordBatchStream,
         window_expr: Vec<Arc<dyn WindowExpr>>,
         schema: SchemaRef,
+        elapsed_compute: crate::physical_plan::metrics::Time,
     ) -> ArrowResult<RecordBatch> {
         let input_schema = input.schema();
         let batches = common::collect(input)
             .await
             .map_err(DataFusionError::into_arrow_external_error)?;
+
+        // record compute time on drop
+        let _timer = elapsed_compute.timer();
+
         let batch = common::combine_batches(&batches, input_schema.clone())?;
         if let Some(batch) = batch {
             // calculate window cols
@@ -267,18 +289,31 @@ impl WindowAggStream {
 impl Stream for WindowAggStream {
     type Item = ArrowResult<RecordBatch>;
 
-    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let poll = self.poll_next_inner(cx);
+        self.baseline_metrics.record_poll(poll)
+    }
+}
+
+impl WindowAggStream {
+    #[inline]
+    fn poll_next_inner(
+        self: &mut Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<ArrowResult<RecordBatch>>> {
         if self.finished {
             return Poll::Ready(None);
         }
 
         // is the output ready?
-        let this = self.project();
-        let output_poll = this.output.poll(cx);
+        let output_poll = self.output.poll_unpin(cx);
 
         match output_poll {
             Poll::Ready(result) => {
-                *this.finished = true;
+                self.finished = true;
                 // check for error in receiving channel and unwrap actual 
result
                 let result = match result {
                     Err(e) => 
Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 8916339..1cc903b 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -2302,9 +2302,9 @@ async fn csv_explain_analyze() {
     let formatted = normalize_for_explain(&formatted);
 
     // Only test basic plumbing and try to avoid having to change too
-    // many things
-    let needle =
-        "CoalescePartitionsExec, metrics=[output_rows=5, elapsed_compute=NOT 
RECORDED";
+    // many things. explain_analyze_baseline_metrics covers the values
+    // in greater depth
+    let needle = "CoalescePartitionsExec, metrics=[output_rows=5, 
elapsed_compute=";
     assert_contains!(&formatted, needle);
 
     let verbose_needle = "Output Rows";
@@ -2352,13 +2352,17 @@ async fn explain_analyze_baseline_metrics() {
     register_aggregate_csv_by_sql(&mut ctx).await;
     // a query with as many operators as we have metrics for
     let sql = "EXPLAIN ANALYZE \
-               select count(*) from \
-               (SELECT count(*), c1 \
-               FROM aggregate_test_100 \
-               WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \
-               GROUP BY c1 \
-               ORDER BY c1) \
-               LIMIT 1";
+               SELECT count(*) as cnt FROM \
+                 (SELECT count(*), c1 \
+                  FROM aggregate_test_100 \
+                  WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \
+                  GROUP BY c1 \
+                  ORDER BY c1 ) \
+                 UNION ALL \
+               SELECT 1 as cnt \
+                 UNION ALL \
+               SELECT lead(c1, 1) OVER () as cnt FROM (select 1 as c1) \
+               LIMIT 3";
     println!("running query: {}", sql);
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
@@ -2370,11 +2374,6 @@ async fn explain_analyze_baseline_metrics() {
 
     assert_metrics!(
         &formatted,
-        "CoalescePartitionsExec",
-        "metrics=[output_rows=5, elapsed_compute=NOT RECORDED"
-    );
-    assert_metrics!(
-        &formatted,
         "HashAggregateExec: mode=Partial, gby=[]",
         "metrics=[output_rows=3, elapsed_compute="
     );
@@ -2385,7 +2384,7 @@ async fn explain_analyze_baseline_metrics() {
     );
     assert_metrics!(
         &formatted,
-        "SortExec: [c1@0 ASC]",
+        "SortExec: [c1@1 ASC]",
         "metrics=[output_rows=5, elapsed_compute="
     );
     assert_metrics!(
@@ -2395,11 +2394,16 @@ async fn explain_analyze_baseline_metrics() {
     );
     assert_metrics!(
         &formatted,
-        "GlobalLimitExec: limit=1, ",
+        "GlobalLimitExec: limit=3, ",
         "metrics=[output_rows=1, elapsed_compute="
     );
     assert_metrics!(
         &formatted,
+        "LocalLimitExec: limit=3",
+        "metrics=[output_rows=3, elapsed_compute="
+    );
+    assert_metrics!(
+        &formatted,
         "ProjectionExec: expr=[COUNT(UInt8(1))",
         "metrics=[output_rows=1, elapsed_compute="
     );
@@ -2408,6 +2412,21 @@ async fn explain_analyze_baseline_metrics() {
         "CoalesceBatchesExec: target_batch_size=4096",
         "metrics=[output_rows=5, elapsed_compute"
     );
+    assert_metrics!(
+        &formatted,
+        "CoalescePartitionsExec",
+        "metrics=[output_rows=5, elapsed_compute="
+    );
+    assert_metrics!(
+        &formatted,
+        "UnionExec",
+        "metrics=[output_rows=3, elapsed_compute="
+    );
+    assert_metrics!(
+        &formatted,
+        "WindowAggExec",
+        "metrics=[output_rows=1, elapsed_compute="
+    );
 
     fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
         use datafusion::physical_plan;
@@ -2416,9 +2435,13 @@ async fn explain_analyze_baseline_metrics() {
             || 
plan.as_any().downcast_ref::<physical_plan::hash_aggregate::HashAggregateExec>().is_some()
             // CoalescePartitionsExec doesn't do any work so is not included
             || 
plan.as_any().downcast_ref::<physical_plan::filter::FilterExec>().is_some()
+            || 
plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
+            || 
plan.as_any().downcast_ref::<physical_plan::limit::LocalLimitExec>().is_some()
             || 
plan.as_any().downcast_ref::<physical_plan::projection::ProjectionExec>().is_some()
             || 
plan.as_any().downcast_ref::<physical_plan::coalesce_batches::CoalesceBatchesExec>().is_some()
-            || 
plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
+            || 
plan.as_any().downcast_ref::<physical_plan::coalesce_partitions::CoalescePartitionsExec>().is_some()
+            || 
plan.as_any().downcast_ref::<physical_plan::union::UnionExec>().is_some()
+            || 
plan.as_any().downcast_ref::<physical_plan::windows::WindowAggExec>().is_some()
     }
 
     // Validate that the recorded elapsed compute time was more than

Reply via email to