This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a07f1d  Add baseline metrics to `SortPreservingMergeExec` (#948)
9a07f1d is described below

commit 9a07f1d8b3c8916c3741613330313f13c6545c24
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Aug 27 15:34:39 2021 -0400

    Add baseline metrics to `SortPreservingMergeExec` (#948)
    
    * Add runtime metrics for SortPreservingMergeStream
    
    * improve test
---
 .../src/physical_plan/sort_preserving_merge.rs     | 95 +++++++++++++++++++++-
 1 file changed, 93 insertions(+), 2 deletions(-)

diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs 
b/datafusion/src/physical_plan/sort_preserving_merge.rs
index b4bcc29..1bcdd63 100644
--- a/datafusion/src/physical_plan/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sort_preserving_merge.rs
@@ -17,6 +17,7 @@
 
 //! Defines the sort preserving merge plan
 
+use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use std::any::Any;
 use std::cmp::Ordering;
 use std::collections::VecDeque;
@@ -59,6 +60,8 @@ pub struct SortPreservingMergeExec {
     expr: Vec<PhysicalSortExpr>,
     /// The target size of yielded batches
     target_batch_size: usize,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl SortPreservingMergeExec {
@@ -72,6 +75,7 @@ impl SortPreservingMergeExec {
             input,
             expr,
             target_batch_size,
+            metrics: ExecutionPlanMetricsSet::new(),
         }
     }
 
@@ -134,6 +138,8 @@ impl ExecutionPlan for SortPreservingMergeExec {
             )));
         }
 
+        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+
         let input_partitions = 
self.input.output_partitioning().partition_count();
         match input_partitions {
             0 => Err(DataFusionError::Internal(
@@ -141,7 +147,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
                     .to_owned(),
             )),
             1 => {
-                // bypass if there is only one partition to merge
+                // bypass if there is only one partition to merge (no metrics 
in this case either)
                 self.input.execute(0).await
             }
             _ => {
@@ -159,6 +165,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
                     self.schema(),
                     &self.expr,
                     self.target_batch_size,
+                    baseline_metrics,
                 )))
             }
         }
@@ -176,6 +183,10 @@ impl ExecutionPlan for SortPreservingMergeExec {
             }
         }
     }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
 }
 
 /// A `SortKeyCursor` is created from a `RecordBatch`, and a set of
@@ -338,6 +349,8 @@ struct SortPreservingMergeStream {
     sort_options: Vec<SortOptions>,
     /// The desired RecordBatch size to yield
     target_batch_size: usize,
+    /// used to record execution metrics
+    baseline_metrics: BaselineMetrics,
     /// If the stream has encountered an error
     aborted: bool,
 
@@ -351,6 +364,7 @@ impl SortPreservingMergeStream {
         schema: SchemaRef,
         expressions: &[PhysicalSortExpr],
         target_batch_size: usize,
+        baseline_metrics: BaselineMetrics,
     ) -> Self {
         let cursors = (0..streams.len())
             .into_iter()
@@ -364,6 +378,7 @@ impl SortPreservingMergeStream {
             column_expressions: expressions.iter().map(|x| 
x.expr.clone()).collect(),
             sort_options: expressions.iter().map(|x| x.options).collect(),
             target_batch_size,
+            baseline_metrics,
             aborted: false,
             in_progress: vec![],
             next_batch_index: 0,
@@ -390,7 +405,7 @@ impl SortPreservingMergeStream {
             return Poll::Ready(Ok(()));
         }
 
-        // Fetch a new record and create a cursor from it
+        // Fetch a new input record and create a cursor from it
         match futures::ready!(stream.poll_next_unpin(cx)) {
             None => return Poll::Ready(Ok(())),
             Some(Err(e)) => {
@@ -539,6 +554,17 @@ impl Stream for SortPreservingMergeStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
+        let poll = self.poll_next_inner(cx);
+        self.baseline_metrics.record_poll(poll)
+    }
+}
+
+impl SortPreservingMergeStream {
+    #[inline]
+    fn poll_next_inner(
+        self: &mut Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<ArrowResult<RecordBatch>>> {
         if self.aborted {
             return Poll::Ready(None);
         }
@@ -556,6 +582,11 @@ impl Stream for SortPreservingMergeStream {
         }
 
         loop {
+            // NB timer records time taken on drop, so there are no
+            // calls to `timer.done()` below.
+            let elapsed_compute = 
self.baseline_metrics.elapsed_compute().clone();
+            let _timer = elapsed_compute.timer();
+
             let stream_idx = match self.next_stream_idx() {
                 Ok(Some(idx)) => idx,
                 Ok(None) if self.in_progress.is_empty() => return 
Poll::Ready(None),
@@ -607,6 +638,7 @@ impl RecordBatchStream for SortPreservingMergeStream {
 
 #[cfg(test)]
 mod tests {
+    use crate::physical_plan::metrics::MetricValue;
     use std::iter::FromIterator;
 
     use crate::arrow::array::{Int32Array, StringArray, 
TimestampNanosecondArray};
@@ -1149,11 +1181,15 @@ mod tests {
             streams.push(receiver);
         }
 
+        let metrics = ExecutionPlanMetricsSet::new();
+        let baseline_metrics = BaselineMetrics::new(&metrics, 0);
+
         let merge_stream = SortPreservingMergeStream::new(
             streams,
             batches.schema(),
             sort.as_slice(),
             1024,
+            baseline_metrics,
         );
 
         let mut merged = 
common::collect(Box::pin(merge_stream)).await.unwrap();
@@ -1172,4 +1208,59 @@ mod tests {
 
         assert_eq!(basic, partition);
     }
+
+    #[tokio::test]
+    async fn test_merge_metrics() {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"), 
Some("c")]));
+        let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap();
+
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![10, 20]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("b"), 
Some("d")]));
+        let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap();
+
+        let schema = b1.schema();
+        let sort = vec![PhysicalSortExpr {
+            expr: col("b", &schema).unwrap(),
+            options: Default::default(),
+        }];
+        let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, 
None).unwrap();
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, 
Arc::new(exec), 1024));
+
+        let collected = collect(merge.clone()).await.unwrap();
+        let expected = vec![
+            "+----+---+",
+            "| a  | b |",
+            "+----+---+",
+            "| 1  | a |",
+            "| 10 | b |",
+            "| 2  | c |",
+            "| 20 | d |",
+            "+----+---+",
+        ];
+        assert_batches_eq!(expected, collected.as_slice());
+
+        // Now, validate metrics
+        let metrics = merge.metrics().unwrap();
+
+        assert_eq!(metrics.output_rows().unwrap(), 4);
+        assert!(metrics.elapsed_compute().unwrap() > 0);
+
+        let mut saw_start = false;
+        let mut saw_end = false;
+        metrics.iter().for_each(|m| match m.value() {
+            MetricValue::StartTimestamp(ts) => {
+                saw_start = true;
+                assert!(ts.value().unwrap().timestamp_nanos() > 0);
+            }
+            MetricValue::EndTimestamp(ts) => {
+                saw_end = true;
+                assert!(ts.value().unwrap().timestamp_nanos() > 0);
+            }
+            _ => {}
+        });
+
+        assert!(saw_start);
+        assert!(saw_end);
+    }
 }

Reply via email to