This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9a4ef46 ARROW-12432: [Rust] [DataFusion] Add metrics to SortExec
9a4ef46 is described below
commit 9a4ef4696b8b9d46e203f164345ee9c19cbac46c
Author: Andy Grove <[email protected]>
AuthorDate: Sun Apr 18 07:43:49 2021 -0600
ARROW-12432: [Rust] [DataFusion] Add metrics to SortExec
Add `outputRows` and `sortTime` metrics to SortExec.
Example output from Ballista:
```
SortExec { input: ProjectionExec { expr: [(Column { name: "l_shipmode" },
"l_shipmode"), (Column { name: "SUM(CASE WHEN
Metrics: sortTime=44444, outputRows=2
```
Closes #10078 from andygrove/sortexec-metrics
Authored-by: Andy Grove <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
---
.../datafusion/src/physical_plan/hash_aggregate.rs | 7 +--
rust/datafusion/src/physical_plan/mod.rs | 14 ++++-
rust/datafusion/src/physical_plan/sort.rs | 71 +++++++++++++++++++---
3 files changed, 77 insertions(+), 15 deletions(-)
diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs
b/rust/datafusion/src/physical_plan/hash_aggregate.rs
index b78e8bc..2342650 100644
--- a/rust/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs
@@ -28,7 +28,7 @@ use futures::{
};
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{Accumulator, AggregateExpr, MetricType, SQLMetric};
+use crate::physical_plan::{Accumulator, AggregateExpr, SQLMetric};
use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning,
PhysicalExpr};
use arrow::{
@@ -144,10 +144,7 @@ impl HashAggregateExec {
let schema = Arc::new(schema);
- let output_rows = Arc::new(Mutex::new(SQLMetric::new(
- "outputRows",
- MetricType::Counter,
- )));
+ let output_rows = SQLMetric::counter("outputRows");
Ok(HashAggregateExec {
mode,
diff --git a/rust/datafusion/src/physical_plan/mod.rs
b/rust/datafusion/src/physical_plan/mod.rs
index 054d585..5036dcb 100644
--- a/rust/datafusion/src/physical_plan/mod.rs
+++ b/rust/datafusion/src/physical_plan/mod.rs
@@ -18,7 +18,7 @@
//! Traits for physical query plan, supporting parallel execution for
partitioned relations.
use std::fmt::{Debug, Display};
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::{any::Any, pin::Pin};
use crate::execution::context::ExecutionContextState;
@@ -52,6 +52,8 @@ pub type SendableRecordBatchStream = Pin<Box<dyn
RecordBatchStream + Send + Sync
pub enum MetricType {
/// Simple counter
Counter,
+ /// Wall clock time in nanoseconds
+ TimeNanos,
}
/// SQL metric such as counter (number of input or output rows) or timing
information about
@@ -67,6 +69,16 @@ pub struct SQLMetric {
}
impl SQLMetric {
+ /// Create a new metric for tracking a counter
+ pub fn counter(name: &str) -> Arc<Mutex<SQLMetric>> {
+ Arc::new(Mutex::new(SQLMetric::new(name, MetricType::Counter)))
+ }
+
+ /// Create a new metric for tracking time in nanoseconds
+ pub fn time_nanos(name: &str) -> Arc<Mutex<SQLMetric>> {
+ Arc::new(Mutex::new(SQLMetric::new(name, MetricType::TimeNanos)))
+ }
+
/// Create a new SQLMetric
pub fn new(name: &str, metric_type: MetricType) -> Self {
Self {
diff --git a/rust/datafusion/src/physical_plan/sort.rs
b/rust/datafusion/src/physical_plan/sort.rs
index 994168c..26855b3 100644
--- a/rust/datafusion/src/physical_plan/sort.rs
+++ b/rust/datafusion/src/physical_plan/sort.rs
@@ -19,11 +19,14 @@
use std::any::Any;
use std::pin::Pin;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
+use std::time::Instant;
+use async_trait::async_trait;
use futures::stream::Stream;
use futures::Future;
+use hashbrown::HashMap;
use pin_project_lite::pin_project;
@@ -37,9 +40,9 @@ use arrow::{array::ArrayRef, error::ArrowError};
use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::physical_plan::{common, Distribution, ExecutionPlan, Partitioning};
-
-use async_trait::async_trait;
+use crate::physical_plan::{
+ common, Distribution, ExecutionPlan, Partitioning, SQLMetric,
+};
/// Sort execution plan
#[derive(Debug)]
@@ -48,6 +51,10 @@ pub struct SortExec {
input: Arc<dyn ExecutionPlan>,
/// Sort expressions
expr: Vec<PhysicalSortExpr>,
+ /// Output rows
+ output_rows: Arc<Mutex<SQLMetric>>,
+ /// Time to sort batches
+ sort_time_nanos: Arc<Mutex<SQLMetric>>,
}
impl SortExec {
@@ -56,7 +63,12 @@ impl SortExec {
expr: Vec<PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
- Ok(Self { expr, input })
+ Ok(Self {
+ expr,
+ input,
+ output_rows: SQLMetric::counter("outputRows"),
+ sort_time_nanos: SQLMetric::time_nanos("sortTime"),
+ })
}
/// Input schema
@@ -125,7 +137,25 @@ impl ExecutionPlan for SortExec {
}
let input = self.input.execute(0).await?;
- Ok(Box::pin(SortStream::new(input, self.expr.clone())))
+ Ok(Box::pin(SortStream::new(
+ input,
+ self.expr.clone(),
+ self.output_rows.clone(),
+ self.sort_time_nanos.clone(),
+ )))
+ }
+
+ fn metrics(&self) -> HashMap<String, SQLMetric> {
+ let mut metrics = HashMap::new();
+ metrics.insert(
+ "outputRows".to_owned(),
+ self.output_rows.lock().unwrap().clone(),
+ );
+ metrics.insert(
+ "sortTime".to_owned(),
+ self.sort_time_nanos.lock().unwrap().clone(),
+ );
+ metrics
}
}
@@ -194,11 +224,17 @@ pin_project! {
output:
futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
finished: bool,
schema: SchemaRef,
+ output_rows: Arc<Mutex<SQLMetric>>,
}
}
impl SortStream {
- fn new(input: SendableRecordBatchStream, expr: Vec<PhysicalSortExpr>) ->
Self {
+ fn new(
+ input: SendableRecordBatchStream,
+ expr: Vec<PhysicalSortExpr>,
+ output_rows: Arc<Mutex<SQLMetric>>,
+ sort_time: Arc<Mutex<SQLMetric>>,
+ ) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();
let schema = input.schema();
@@ -207,7 +243,13 @@ impl SortStream {
let sorted_batch = common::collect(input)
.await
.map_err(DataFusionError::into_arrow_external_error)
- .and_then(move |batches| sort_batches(&batches, &schema,
&expr));
+ .and_then(move |batches| {
+ let now = Instant::now();
+ let result = sort_batches(&batches, &schema, &expr);
+ let mut sort_time = sort_time.lock().unwrap();
+ sort_time.add(now.elapsed().as_nanos() as usize);
+ result
+ });
tx.send(sorted_batch)
});
@@ -216,6 +258,7 @@ impl SortStream {
output: rx,
finished: false,
schema,
+ output_rows,
}
}
}
@@ -224,6 +267,8 @@ impl Stream for SortStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ let output_rows = self.output_rows.clone();
+
if self.finished {
return Poll::Ready(None);
}
@@ -241,6 +286,12 @@ impl Stream for SortStream {
Err(e) =>
Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
Ok(result) => result.transpose(),
};
+
+ if let Some(Ok(batch)) = &result {
+ let mut output_rows = output_rows.lock().unwrap();
+ output_rows.add(batch.num_rows());
+ }
+
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
@@ -379,7 +430,9 @@ mod tests {
assert_eq!(DataType::Float32,
*sort_exec.schema().field(0).data_type());
assert_eq!(DataType::Float64,
*sort_exec.schema().field(1).data_type());
- let result: Vec<RecordBatch> = collect(sort_exec).await?;
+ let result: Vec<RecordBatch> = collect(sort_exec.clone()).await?;
+ assert!(sort_exec.metrics().get("sortTime").unwrap().value > 0);
+ assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value, 8);
assert_eq!(result.len(), 1);
let columns = result[0].columns();