This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 4c0d430 Add metrics for FilterExec (#960)
4c0d430 is described below
commit 4c0d4301c1210462500ee1d01bf29e462259753e
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Sep 10 11:47:10 2021 -0400
Add metrics for FilterExec (#960)
---
datafusion/src/physical_plan/filter.rs | 25 ++++++++++++++++++++++---
datafusion/tests/sql.rs | 14 +++++++++++++-
2 files changed, 35 insertions(+), 4 deletions(-)
diff --git a/datafusion/src/physical_plan/filter.rs
b/datafusion/src/physical_plan/filter.rs
index 9e7fa9d..52017c6 100644
--- a/datafusion/src/physical_plan/filter.rs
+++ b/datafusion/src/physical_plan/filter.rs
@@ -26,6 +26,7 @@ use std::task::{Context, Poll};
use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
+ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
};
use arrow::array::BooleanArray;
@@ -46,6 +47,8 @@ pub struct FilterExec {
predicate: Arc<dyn PhysicalExpr>,
/// The input plan
input: Arc<dyn ExecutionPlan>,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
impl FilterExec {
@@ -58,6 +61,7 @@ impl FilterExec {
DataType::Boolean => Ok(Self {
predicate,
input: input.clone(),
+ metrics: ExecutionPlanMetricsSet::new(),
}),
other => Err(DataFusionError::Plan(format!(
"Filter predicate must return boolean values, not {:?}",
@@ -115,10 +119,13 @@ impl ExecutionPlan for FilterExec {
}
async fn execute(&self, partition: usize) ->
Result<SendableRecordBatchStream> {
+ let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+
Ok(Box::pin(FilterExecStream {
schema: self.input.schema().clone(),
predicate: self.predicate.clone(),
input: self.input.execute(partition).await?,
+ baseline_metrics,
}))
}
@@ -133,6 +140,10 @@ impl ExecutionPlan for FilterExec {
}
}
}
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
}
/// The FilterExec streams wraps the input iterator and applies the predicate
expression to
@@ -144,6 +155,8 @@ struct FilterExecStream {
predicate: Arc<dyn PhysicalExpr>,
/// The input partition to filter.
input: SendableRecordBatchStream,
+ /// runtime metrics recording
+ baseline_metrics: BaselineMetrics,
}
fn batch_filter(
@@ -176,10 +189,16 @@ impl Stream for FilterExecStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- self.input.poll_next_unpin(cx).map(|x| match x {
- Some(Ok(batch)) => Some(batch_filter(&batch, &self.predicate)),
+ let poll = self.input.poll_next_unpin(cx).map(|x| match x {
+ Some(Ok(batch)) => {
+ let timer = self.baseline_metrics.elapsed_compute().timer();
+ let filtered_batch = batch_filter(&batch, &self.predicate);
+ timer.done();
+ Some(filtered_batch)
+ }
other => other,
- })
+ });
+ self.baseline_metrics.record_poll(poll)
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 804ae7e..ccff292 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -2247,7 +2247,14 @@ async fn explain_analyze_baseline_metrics() {
let mut ctx = ExecutionContext::with_config(config);
register_aggregate_csv_by_sql(&mut ctx).await;
// a query with as many operators as we have metrics for
- let sql = "EXPLAIN ANALYZE select count(*) from (SELECT count(*), c1 FROM
aggregate_test_100 group by c1 ORDER BY c1)";
+ let sql = "EXPLAIN ANALYZE \
+ select count(*) from \
+ (SELECT count(*), c1 \
+ FROM aggregate_test_100 \
+ WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \
+ GROUP BY c1 \
+ ORDER BY c1)";
+ println!("running query: {}", sql);
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
let physical_plan = ctx.create_physical_plan(&plan).unwrap();
@@ -2275,6 +2282,11 @@ async fn explain_analyze_baseline_metrics() {
"SortExec: [c1@0 ASC]",
"metrics=[output_rows=5, elapsed_compute="
);
+ assert_metrics!(
+ &formatted,
+ "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
+ "metrics=[output_rows=99, elapsed_compute="
+ );
fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
use datafusion::physical_plan::{