This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 7d8a5cf Add baseline execution stats to `WindowAggExec` and
`UnionExec`, and fixup `CoalescePartitionsExec` (#1018)
7d8a5cf is described below
commit 7d8a5cf9972d601c4bb7de533e6ab7ffcd85e0ad
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Sep 18 08:02:04 2021 -0400
Add baseline execution stats to `WindowAggExec` and `UnionExec`, and fixup
`CoalescePartitionsExec` (#1018)
* Record compute time for CoalescePartitionsExec
* Instrument WindowAggExec, UnionExec
Fixup tests and implelementation
* Use consistent elapsed_compute time
* Fixup test
---
.../src/physical_plan/coalesce_partitions.rs | 10 +++-
datafusion/src/physical_plan/union.rs | 65 ++++++++++++++++++++--
.../src/physical_plan/windows/window_agg_exec.rs | 51 ++++++++++++++---
datafusion/tests/sql.rs | 59 ++++++++++++++------
4 files changed, 151 insertions(+), 34 deletions(-)
diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs
b/datafusion/src/physical_plan/coalesce_partitions.rs
index 0d2cc89..329edcb 100644
--- a/datafusion/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/src/physical_plan/coalesce_partitions.rs
@@ -97,8 +97,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
}
async fn execute(&self, partition: usize) ->
Result<SendableRecordBatchStream> {
- let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
-
// CoalescePartitionsExec produces a single partition
if 0 != partition {
return Err(DataFusionError::Internal(format!(
@@ -113,10 +111,16 @@ impl ExecutionPlan for CoalescePartitionsExec {
"CoalescePartitionsExec requires at least one input
partition".to_owned(),
)),
1 => {
- // bypass any threading if there is a single partition
+ // bypass any threading / metrics if there is a single
partition
self.input.execute(0).await
}
_ => {
+ let baseline_metrics = BaselineMetrics::new(&self.metrics,
partition);
+ // record the (very) minimal work done so that
+ // elapsed_compute is not reported as 0
+ let elapsed_compute =
baseline_metrics.elapsed_compute().clone();
+ let _timer = elapsed_compute.timer();
+
// use a stream that allows each sender to put in at
// least one result in an attempt to maximize
// parallelism.
diff --git a/datafusion/src/physical_plan/union.rs
b/datafusion/src/physical_plan/union.rs
index f30cd57..a2f5952 100644
--- a/datafusion/src/physical_plan/union.rs
+++ b/datafusion/src/physical_plan/union.rs
@@ -23,13 +23,18 @@
use std::{any::Any, sync::Arc};
-use arrow::datatypes::SchemaRef;
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use futures::StreamExt;
use super::{
- ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning,
+ metrics::{ExecutionPlanMetricsSet, MetricsSet},
+ ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
-use crate::{error::Result, physical_plan::expressions};
+use crate::{
+ error::Result,
+ physical_plan::{expressions, metrics::BaselineMetrics},
+};
use async_trait::async_trait;
/// UNION ALL execution plan
@@ -37,12 +42,17 @@ use async_trait::async_trait;
pub struct UnionExec {
/// Input execution plan
inputs: Vec<Arc<dyn ExecutionPlan>>,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
impl UnionExec {
/// Create a new UnionExec
pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self {
- UnionExec { inputs }
+ UnionExec {
+ inputs,
+ metrics: ExecutionPlanMetricsSet::new(),
+ }
}
}
@@ -82,11 +92,18 @@ impl ExecutionPlan for UnionExec {
}
async fn execute(&self, mut partition: usize) ->
Result<SendableRecordBatchStream> {
+ let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+ // record the tiny amount of work done in this function so
+ // elapsed_compute is reported as non zero
+ let elapsed_compute = baseline_metrics.elapsed_compute().clone();
+ let _timer = elapsed_compute.timer(); // record on drop
+
// find partition to execute
for input in self.inputs.iter() {
// Calculate whether partition belongs to the current partition
if partition < input.output_partitioning().partition_count() {
- return input.execute(partition).await;
+ let stream = input.execute(partition).await?;
+ return Ok(Box::pin(ObservedStream::new(stream,
baseline_metrics)));
} else {
partition -= input.output_partitioning().partition_count();
}
@@ -110,6 +127,10 @@ impl ExecutionPlan for UnionExec {
}
}
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+
fn statistics(&self) -> Statistics {
self.inputs
.iter()
@@ -119,6 +140,40 @@ impl ExecutionPlan for UnionExec {
}
}
+/// Stream wrapper that records `BaselineMetrics` for a particular
+/// partition
+struct ObservedStream {
+ inner: SendableRecordBatchStream,
+ baseline_metrics: BaselineMetrics,
+}
+
+impl ObservedStream {
+ fn new(inner: SendableRecordBatchStream, baseline_metrics:
BaselineMetrics) -> Self {
+ Self {
+ inner,
+ baseline_metrics,
+ }
+ }
+}
+
+impl RecordBatchStream for ObservedStream {
+ fn schema(&self) -> arrow::datatypes::SchemaRef {
+ self.inner.schema()
+ }
+}
+
+impl futures::Stream for ObservedStream {
+ type Item = arrow::error::Result<RecordBatch>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ let poll = self.inner.poll_next_unpin(cx);
+ self.baseline_metrics.record_poll(poll)
+ }
+}
+
fn col_stats_union(
mut left: ColumnStatistics,
right: ColumnStatistics,
diff --git a/datafusion/src/physical_plan/windows/window_agg_exec.rs
b/datafusion/src/physical_plan/windows/window_agg_exec.rs
index 0524adc..03307be 100644
--- a/datafusion/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/src/physical_plan/windows/window_agg_exec.rs
@@ -18,6 +18,9 @@
//! Stream and channel implementations for window function expressions.
use crate::error::{DataFusionError, Result};
+use crate::physical_plan::metrics::{
+ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
+};
use crate::physical_plan::{
common, ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan,
Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics,
WindowExpr,
@@ -30,7 +33,7 @@ use arrow::{
};
use async_trait::async_trait;
use futures::stream::Stream;
-use futures::Future;
+use futures::FutureExt;
use pin_project_lite::pin_project;
use std::any::Any;
use std::pin::Pin;
@@ -48,6 +51,8 @@ pub struct WindowAggExec {
schema: SchemaRef,
/// Schema before the window
input_schema: SchemaRef,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
impl WindowAggExec {
@@ -59,11 +64,12 @@ impl WindowAggExec {
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = Arc::new(schema);
- Ok(WindowAggExec {
+ Ok(Self {
input,
window_expr,
schema,
input_schema,
+ metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -140,6 +146,7 @@ impl ExecutionPlan for WindowAggExec {
self.schema.clone(),
self.window_expr.clone(),
input,
+ BaselineMetrics::new(&self.metrics, partition),
));
Ok(stream)
}
@@ -163,6 +170,10 @@ impl ExecutionPlan for WindowAggExec {
Ok(())
}
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+
fn statistics(&self) -> Statistics {
let input_stat = self.input.statistics();
let win_cols = self.window_expr.len();
@@ -214,6 +225,7 @@ pin_project! {
#[pin]
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
finished: bool,
+ baseline_metrics: BaselineMetrics,
}
}
@@ -223,19 +235,24 @@ impl WindowAggStream {
schema: SchemaRef,
window_expr: Vec<Arc<dyn WindowExpr>>,
input: SendableRecordBatchStream,
+ baseline_metrics: BaselineMetrics,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();
let schema_clone = schema.clone();
+ let elapsed_compute = baseline_metrics.elapsed_compute().clone();
tokio::spawn(async move {
let schema = schema_clone.clone();
- let result = WindowAggStream::process(input, window_expr,
schema).await;
+ let result =
+ WindowAggStream::process(input, window_expr, schema,
elapsed_compute)
+ .await;
tx.send(result)
});
Self {
+ schema,
output: rx,
finished: false,
- schema,
+ baseline_metrics,
}
}
@@ -243,11 +260,16 @@ impl WindowAggStream {
input: SendableRecordBatchStream,
window_expr: Vec<Arc<dyn WindowExpr>>,
schema: SchemaRef,
+ elapsed_compute: crate::physical_plan::metrics::Time,
) -> ArrowResult<RecordBatch> {
let input_schema = input.schema();
let batches = common::collect(input)
.await
.map_err(DataFusionError::into_arrow_external_error)?;
+
+ // record compute time on drop
+ let _timer = elapsed_compute.timer();
+
let batch = common::combine_batches(&batches, input_schema.clone())?;
if let Some(batch) = batch {
// calculate window cols
@@ -267,18 +289,31 @@ impl WindowAggStream {
impl Stream for WindowAggStream {
type Item = ArrowResult<RecordBatch>;
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let poll = self.poll_next_inner(cx);
+ self.baseline_metrics.record_poll(poll)
+ }
+}
+
+impl WindowAggStream {
+ #[inline]
+ fn poll_next_inner(
+ self: &mut Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<ArrowResult<RecordBatch>>> {
if self.finished {
return Poll::Ready(None);
}
// is the output ready?
- let this = self.project();
- let output_poll = this.output.poll(cx);
+ let output_poll = self.output.poll_unpin(cx);
match output_poll {
Poll::Ready(result) => {
- *this.finished = true;
+ self.finished = true;
// check for error in receiving channel and unwrap actual
result
let result = match result {
Err(e) =>
Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 8916339..1cc903b 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -2302,9 +2302,9 @@ async fn csv_explain_analyze() {
let formatted = normalize_for_explain(&formatted);
// Only test basic plumbing and try to avoid having to change too
- // many things
- let needle =
- "CoalescePartitionsExec, metrics=[output_rows=5, elapsed_compute=NOT
RECORDED";
+ // many things. explain_analyze_baseline_metrics covers the values
+ // in greater depth
+ let needle = "CoalescePartitionsExec, metrics=[output_rows=5,
elapsed_compute=";
assert_contains!(&formatted, needle);
let verbose_needle = "Output Rows";
@@ -2352,13 +2352,17 @@ async fn explain_analyze_baseline_metrics() {
register_aggregate_csv_by_sql(&mut ctx).await;
// a query with as many operators as we have metrics for
let sql = "EXPLAIN ANALYZE \
- select count(*) from \
- (SELECT count(*), c1 \
- FROM aggregate_test_100 \
- WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \
- GROUP BY c1 \
- ORDER BY c1) \
- LIMIT 1";
+ SELECT count(*) as cnt FROM \
+ (SELECT count(*), c1 \
+ FROM aggregate_test_100 \
+ WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \
+ GROUP BY c1 \
+ ORDER BY c1 ) \
+ UNION ALL \
+ SELECT 1 as cnt \
+ UNION ALL \
+ SELECT lead(c1, 1) OVER () as cnt FROM (select 1 as c1) \
+ LIMIT 3";
println!("running query: {}", sql);
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
@@ -2370,11 +2374,6 @@ async fn explain_analyze_baseline_metrics() {
assert_metrics!(
&formatted,
- "CoalescePartitionsExec",
- "metrics=[output_rows=5, elapsed_compute=NOT RECORDED"
- );
- assert_metrics!(
- &formatted,
"HashAggregateExec: mode=Partial, gby=[]",
"metrics=[output_rows=3, elapsed_compute="
);
@@ -2385,7 +2384,7 @@ async fn explain_analyze_baseline_metrics() {
);
assert_metrics!(
&formatted,
- "SortExec: [c1@0 ASC]",
+ "SortExec: [c1@1 ASC]",
"metrics=[output_rows=5, elapsed_compute="
);
assert_metrics!(
@@ -2395,11 +2394,16 @@ async fn explain_analyze_baseline_metrics() {
);
assert_metrics!(
&formatted,
- "GlobalLimitExec: limit=1, ",
+ "GlobalLimitExec: limit=3, ",
"metrics=[output_rows=1, elapsed_compute="
);
assert_metrics!(
&formatted,
+ "LocalLimitExec: limit=3",
+ "metrics=[output_rows=3, elapsed_compute="
+ );
+ assert_metrics!(
+ &formatted,
"ProjectionExec: expr=[COUNT(UInt8(1))",
"metrics=[output_rows=1, elapsed_compute="
);
@@ -2408,6 +2412,21 @@ async fn explain_analyze_baseline_metrics() {
"CoalesceBatchesExec: target_batch_size=4096",
"metrics=[output_rows=5, elapsed_compute"
);
+ assert_metrics!(
+ &formatted,
+ "CoalescePartitionsExec",
+ "metrics=[output_rows=5, elapsed_compute="
+ );
+ assert_metrics!(
+ &formatted,
+ "UnionExec",
+ "metrics=[output_rows=3, elapsed_compute="
+ );
+ assert_metrics!(
+ &formatted,
+ "WindowAggExec",
+ "metrics=[output_rows=1, elapsed_compute="
+ );
fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
use datafusion::physical_plan;
@@ -2416,9 +2435,13 @@ async fn explain_analyze_baseline_metrics() {
||
plan.as_any().downcast_ref::<physical_plan::hash_aggregate::HashAggregateExec>().is_some()
// CoalescePartitionsExec doesn't do any work so is not included
||
plan.as_any().downcast_ref::<physical_plan::filter::FilterExec>().is_some()
+ ||
plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
+ ||
plan.as_any().downcast_ref::<physical_plan::limit::LocalLimitExec>().is_some()
||
plan.as_any().downcast_ref::<physical_plan::projection::ProjectionExec>().is_some()
||
plan.as_any().downcast_ref::<physical_plan::coalesce_batches::CoalesceBatchesExec>().is_some()
- ||
plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
+ ||
plan.as_any().downcast_ref::<physical_plan::coalesce_partitions::CoalescePartitionsExec>().is_some()
+ ||
plan.as_any().downcast_ref::<physical_plan::union::UnionExec>().is_some()
+ ||
plan.as_any().downcast_ref::<physical_plan::windows::WindowAggExec>().is_some()
}
// Validate that the recorded elapsed compute time was more than