This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a4ef46  ARROW-12432: [Rust] [DataFusion] Add metrics to SortExec
9a4ef46 is described below

commit 9a4ef4696b8b9d46e203f164345ee9c19cbac46c
Author: Andy Grove <[email protected]>
AuthorDate: Sun Apr 18 07:43:49 2021 -0600

    ARROW-12432: [Rust] [DataFusion] Add metrics to SortExec
    
    Add `outputRows` and `sortTime` metrics to SortExec.
    
    Example output from Ballista:
    
    ```
    SortExec { input: ProjectionExec { expr: [(Column { name: "l_shipmode" }, 
"l_shipmode"), (Column { name: "SUM(CASE WHEN
      Metrics: sortTime=44444, outputRows=2
    ```
    
    Closes #10078 from andygrove/sortexec-metrics
    
    Authored-by: Andy Grove <[email protected]>
    Signed-off-by: Andy Grove <[email protected]>
---
 .../datafusion/src/physical_plan/hash_aggregate.rs |  7 +--
 rust/datafusion/src/physical_plan/mod.rs           | 14 ++++-
 rust/datafusion/src/physical_plan/sort.rs          | 71 +++++++++++++++++++---
 3 files changed, 77 insertions(+), 15 deletions(-)

diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs 
b/rust/datafusion/src/physical_plan/hash_aggregate.rs
index b78e8bc..2342650 100644
--- a/rust/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs
@@ -28,7 +28,7 @@ use futures::{
 };
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{Accumulator, AggregateExpr, MetricType, SQLMetric};
+use crate::physical_plan::{Accumulator, AggregateExpr, SQLMetric};
 use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning, 
PhysicalExpr};
 
 use arrow::{
@@ -144,10 +144,7 @@ impl HashAggregateExec {
 
         let schema = Arc::new(schema);
 
-        let output_rows = Arc::new(Mutex::new(SQLMetric::new(
-            "outputRows",
-            MetricType::Counter,
-        )));
+        let output_rows = SQLMetric::counter("outputRows");
 
         Ok(HashAggregateExec {
             mode,
diff --git a/rust/datafusion/src/physical_plan/mod.rs 
b/rust/datafusion/src/physical_plan/mod.rs
index 054d585..5036dcb 100644
--- a/rust/datafusion/src/physical_plan/mod.rs
+++ b/rust/datafusion/src/physical_plan/mod.rs
@@ -18,7 +18,7 @@
 //! Traits for physical query plan, supporting parallel execution for 
partitioned relations.
 
 use std::fmt::{Debug, Display};
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 use std::{any::Any, pin::Pin};
 
 use crate::execution::context::ExecutionContextState;
@@ -52,6 +52,8 @@ pub type SendableRecordBatchStream = Pin<Box<dyn 
RecordBatchStream + Send + Sync
 pub enum MetricType {
     /// Simple counter
     Counter,
+    /// Wall clock time in nanoseconds
+    TimeNanos,
 }
 
 /// SQL metric such as counter (number of input or output rows) or timing 
information about
@@ -67,6 +69,16 @@ pub struct SQLMetric {
 }
 
 impl SQLMetric {
+    /// Create a new metric for tracking a counter
+    pub fn counter(name: &str) -> Arc<Mutex<SQLMetric>> {
+        Arc::new(Mutex::new(SQLMetric::new(name, MetricType::Counter)))
+    }
+
+    /// Create a new metric for tracking time in nanoseconds
+    pub fn time_nanos(name: &str) -> Arc<Mutex<SQLMetric>> {
+        Arc::new(Mutex::new(SQLMetric::new(name, MetricType::TimeNanos)))
+    }
+
     /// Create a new SQLMetric
     pub fn new(name: &str, metric_type: MetricType) -> Self {
         Self {
diff --git a/rust/datafusion/src/physical_plan/sort.rs 
b/rust/datafusion/src/physical_plan/sort.rs
index 994168c..26855b3 100644
--- a/rust/datafusion/src/physical_plan/sort.rs
+++ b/rust/datafusion/src/physical_plan/sort.rs
@@ -19,11 +19,14 @@
 
 use std::any::Any;
 use std::pin::Pin;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 use std::task::{Context, Poll};
+use std::time::Instant;
 
+use async_trait::async_trait;
 use futures::stream::Stream;
 use futures::Future;
+use hashbrown::HashMap;
 
 use pin_project_lite::pin_project;
 
@@ -37,9 +40,9 @@ use arrow::{array::ArrayRef, error::ArrowError};
 use super::{RecordBatchStream, SendableRecordBatchStream};
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::physical_plan::{common, Distribution, ExecutionPlan, Partitioning};
-
-use async_trait::async_trait;
+use crate::physical_plan::{
+    common, Distribution, ExecutionPlan, Partitioning, SQLMetric,
+};
 
 /// Sort execution plan
 #[derive(Debug)]
@@ -48,6 +51,10 @@ pub struct SortExec {
     input: Arc<dyn ExecutionPlan>,
     /// Sort expressions
     expr: Vec<PhysicalSortExpr>,
+    /// Output rows
+    output_rows: Arc<Mutex<SQLMetric>>,
+    /// Time to sort batches
+    sort_time_nanos: Arc<Mutex<SQLMetric>>,
 }
 
 impl SortExec {
@@ -56,7 +63,12 @@ impl SortExec {
         expr: Vec<PhysicalSortExpr>,
         input: Arc<dyn ExecutionPlan>,
     ) -> Result<Self> {
-        Ok(Self { expr, input })
+        Ok(Self {
+            expr,
+            input,
+            output_rows: SQLMetric::counter("outputRows"),
+            sort_time_nanos: SQLMetric::time_nanos("sortTime"),
+        })
     }
 
     /// Input schema
@@ -125,7 +137,25 @@ impl ExecutionPlan for SortExec {
         }
         let input = self.input.execute(0).await?;
 
-        Ok(Box::pin(SortStream::new(input, self.expr.clone())))
+        Ok(Box::pin(SortStream::new(
+            input,
+            self.expr.clone(),
+            self.output_rows.clone(),
+            self.sort_time_nanos.clone(),
+        )))
+    }
+
+    fn metrics(&self) -> HashMap<String, SQLMetric> {
+        let mut metrics = HashMap::new();
+        metrics.insert(
+            "outputRows".to_owned(),
+            self.output_rows.lock().unwrap().clone(),
+        );
+        metrics.insert(
+            "sortTime".to_owned(),
+            self.sort_time_nanos.lock().unwrap().clone(),
+        );
+        metrics
     }
 }
 
@@ -194,11 +224,17 @@ pin_project! {
         output: 
futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
         finished: bool,
         schema: SchemaRef,
+        output_rows: Arc<Mutex<SQLMetric>>,
     }
 }
 
 impl SortStream {
-    fn new(input: SendableRecordBatchStream, expr: Vec<PhysicalSortExpr>) -> 
Self {
+    fn new(
+        input: SendableRecordBatchStream,
+        expr: Vec<PhysicalSortExpr>,
+        output_rows: Arc<Mutex<SQLMetric>>,
+        sort_time: Arc<Mutex<SQLMetric>>,
+    ) -> Self {
         let (tx, rx) = futures::channel::oneshot::channel();
 
         let schema = input.schema();
@@ -207,7 +243,13 @@ impl SortStream {
             let sorted_batch = common::collect(input)
                 .await
                 .map_err(DataFusionError::into_arrow_external_error)
-                .and_then(move |batches| sort_batches(&batches, &schema, 
&expr));
+                .and_then(move |batches| {
+                    let now = Instant::now();
+                    let result = sort_batches(&batches, &schema, &expr);
+                    let mut sort_time = sort_time.lock().unwrap();
+                    sort_time.add(now.elapsed().as_nanos() as usize);
+                    result
+                });
 
             tx.send(sorted_batch)
         });
@@ -216,6 +258,7 @@ impl SortStream {
             output: rx,
             finished: false,
             schema,
+            output_rows,
         }
     }
 }
@@ -224,6 +267,8 @@ impl Stream for SortStream {
     type Item = ArrowResult<RecordBatch>;
 
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        let output_rows = self.output_rows.clone();
+
         if self.finished {
             return Poll::Ready(None);
         }
@@ -241,6 +286,12 @@ impl Stream for SortStream {
                     Err(e) => 
Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
                     Ok(result) => result.transpose(),
                 };
+
+                if let Some(Ok(batch)) = &result {
+                    let mut output_rows = output_rows.lock().unwrap();
+                    output_rows.add(batch.num_rows());
+                }
+
                 Poll::Ready(result)
             }
             Poll::Pending => Poll::Pending,
@@ -379,7 +430,9 @@ mod tests {
         assert_eq!(DataType::Float32, 
*sort_exec.schema().field(0).data_type());
         assert_eq!(DataType::Float64, 
*sort_exec.schema().field(1).data_type());
 
-        let result: Vec<RecordBatch> = collect(sort_exec).await?;
+        let result: Vec<RecordBatch> = collect(sort_exec.clone()).await?;
+        assert!(sort_exec.metrics().get("sortTime").unwrap().value > 0);
+        assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value, 8);
         assert_eq!(result.len(), 1);
 
         let columns = result[0].columns();

Reply via email to