This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new c63cfd4  Move AggregatedMetricsSet to metrics for further reuse (#1663)
c63cfd4 is described below

commit c63cfd4f38dce3aa81a5e00a9cdcbc1d1823a050
Author: Yijie Shen <[email protected]>
AuthorDate: Tue Jan 25 04:04:09 2022 +0800

    Move AggregatedMetricsSet to metrics for further reuse (#1663)
---
 datafusion/src/physical_plan/metrics/aggregated.rs | 149 +++++++++++++++++++++
 datafusion/src/physical_plan/metrics/mod.rs        |   2 +
 datafusion/src/physical_plan/sorts/sort.rs         | 130 +-----------------
 3 files changed, 153 insertions(+), 128 deletions(-)

diff --git a/datafusion/src/physical_plan/metrics/aggregated.rs 
b/datafusion/src/physical_plan/metrics/aggregated.rs
new file mode 100644
index 0000000..cfcdcb7
--- /dev/null
+++ b/datafusion/src/physical_plan/metrics/aggregated.rs
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics common for complex operators with multiple steps.
+
+use crate::physical_plan::metrics::{
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
+};
+use std::sync::Arc;
+use std::time::Duration;
+
+#[derive(Debug, Clone)]
+/// Aggregates all metrics during a complex operation, which is composed of 
multiple steps and
+/// each stage reports its statistics separately.
+/// Give sort as an example, when the dataset is more significant than 
available memory, it will report
+/// multiple in-mem sort metrics and final merge-sort  metrics from 
`SortPreservingMergeStream`.
+/// Therefore, We need a separation of metrics for which are final metrics 
(for output_rows accumulation),
+/// and which are intermediate metrics that we only account for 
elapsed_compute time.
+pub struct AggregatedMetricsSet {
+    intermediate: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
+    final_: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
+}
+
+impl AggregatedMetricsSet {
+    /// Create a new aggregated set
+    pub(crate) fn new() -> Self {
+        Self {
+            intermediate: Arc::new(std::sync::Mutex::new(vec![])),
+            final_: Arc::new(std::sync::Mutex::new(vec![])),
+        }
+    }
+
+    /// create a new intermediate baseline
+    pub(crate) fn new_intermediate_baseline(&self, partition: usize) -> 
BaselineMetrics {
+        let ms = ExecutionPlanMetricsSet::new();
+        let result = BaselineMetrics::new(&ms, partition);
+        self.intermediate.lock().unwrap().push(ms);
+        result
+    }
+
+    /// create a new final baseline
+    pub(crate) fn new_final_baseline(&self, partition: usize) -> 
BaselineMetrics {
+        let ms = ExecutionPlanMetricsSet::new();
+        let result = BaselineMetrics::new(&ms, partition);
+        self.final_.lock().unwrap().push(ms);
+        result
+    }
+
+    fn merge_compute_time(&self, dest: &Time) {
+        let time1 = self
+            .intermediate
+            .lock()
+            .unwrap()
+            .iter()
+            .map(|es| {
+                es.clone_inner()
+                    .elapsed_compute()
+                    .map_or(0u64, |v| v as u64)
+            })
+            .sum();
+        let time2 = self
+            .final_
+            .lock()
+            .unwrap()
+            .iter()
+            .map(|es| {
+                es.clone_inner()
+                    .elapsed_compute()
+                    .map_or(0u64, |v| v as u64)
+            })
+            .sum();
+        dest.add_duration(Duration::from_nanos(time1));
+        dest.add_duration(Duration::from_nanos(time2));
+    }
+
+    fn merge_spill_count(&self, dest: &Count) {
+        let count1 = self
+            .intermediate
+            .lock()
+            .unwrap()
+            .iter()
+            .map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
+            .sum();
+        let count2 = self
+            .final_
+            .lock()
+            .unwrap()
+            .iter()
+            .map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
+            .sum();
+        dest.add(count1);
+        dest.add(count2);
+    }
+
+    fn merge_spilled_bytes(&self, dest: &Count) {
+        let count1 = self
+            .intermediate
+            .lock()
+            .unwrap()
+            .iter()
+            .map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
+            .sum();
+        let count2 = self
+            .final_
+            .lock()
+            .unwrap()
+            .iter()
+            .map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
+            .sum();
+        dest.add(count1);
+        dest.add(count2);
+    }
+
+    fn merge_output_count(&self, dest: &Count) {
+        let count = self
+            .final_
+            .lock()
+            .unwrap()
+            .iter()
+            .map(|es| es.clone_inner().output_rows().map_or(0, |v| v))
+            .sum();
+        dest.add(count);
+    }
+
+    /// Aggregate all metrics into a one
+    pub(crate) fn aggregate_all(&self) -> MetricsSet {
+        let metrics = ExecutionPlanMetricsSet::new();
+        let baseline = BaselineMetrics::new(&metrics, 0);
+        self.merge_compute_time(baseline.elapsed_compute());
+        self.merge_spill_count(baseline.spill_count());
+        self.merge_spilled_bytes(baseline.spilled_bytes());
+        self.merge_output_count(baseline.output_rows());
+        metrics.clone_inner()
+    }
+}
diff --git a/datafusion/src/physical_plan/metrics/mod.rs 
b/datafusion/src/physical_plan/metrics/mod.rs
index 232583f..9174fa3 100644
--- a/datafusion/src/physical_plan/metrics/mod.rs
+++ b/datafusion/src/physical_plan/metrics/mod.rs
@@ -17,6 +17,7 @@
 
 //! Metrics for recording information about execution
 
+mod aggregated;
 mod baseline;
 mod builder;
 mod value;
@@ -30,6 +31,7 @@ use std::{
 use hashbrown::HashMap;
 
 // public exports
+pub use aggregated::AggregatedMetricsSet;
 pub use baseline::{BaselineMetrics, RecordOutput};
 pub use builder::MetricBuilder;
 pub use value::{Count, MetricValue, ScopedTimerGuard, Time, Timestamp};
diff --git a/datafusion/src/physical_plan/sorts/sort.rs 
b/datafusion/src/physical_plan/sorts/sort.rs
index e091464..456023f 100644
--- a/datafusion/src/physical_plan/sorts/sort.rs
+++ b/datafusion/src/physical_plan/sorts/sort.rs
@@ -26,9 +26,7 @@ use crate::execution::memory_manager::{
 use crate::execution::runtime_env::RuntimeEnv;
 use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::physical_plan::metrics::{
-    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
-};
+use crate::physical_plan::metrics::{AggregatedMetricsSet, BaselineMetrics, 
MetricsSet};
 use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
 use crate::physical_plan::sorts::SortedStream;
 use crate::physical_plan::stream::RecordBatchReceiverStream;
@@ -54,7 +52,6 @@ use std::fs::File;
 use std::io::BufReader;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::time::Duration;
 use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
 use tokio::task;
 
@@ -365,121 +362,6 @@ pub struct SortExec {
     preserve_partitioning: bool,
 }
 
-#[derive(Debug, Clone)]
-/// Aggregates all metrics during a complex operation, which is composed of 
multiple steps and
-/// each stage reports its statistics separately.
-/// Give sort as an example, when the dataset is more significant than 
available memory, it will report
-/// multiple in-mem sort metrics and final merge-sort  metrics from 
`SortPreservingMergeStream`.
-/// Therefore, We need a separation of metrics for which are final metrics 
(for output_rows accumulation),
-/// and which are intermediate metrics that we only account for 
elapsed_compute time.
-struct AggregatedMetricsSet {
-    intermediate: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
-    final_: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
-}
-
-impl AggregatedMetricsSet {
-    fn new() -> Self {
-        Self {
-            intermediate: Arc::new(std::sync::Mutex::new(vec![])),
-            final_: Arc::new(std::sync::Mutex::new(vec![])),
-        }
-    }
-
-    fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics {
-        let ms = ExecutionPlanMetricsSet::new();
-        let result = BaselineMetrics::new(&ms, partition);
-        self.intermediate.lock().unwrap().push(ms);
-        result
-    }
-
-    fn new_final_baseline(&self, partition: usize) -> BaselineMetrics {
-        let ms = ExecutionPlanMetricsSet::new();
-        let result = BaselineMetrics::new(&ms, partition);
-        self.final_.lock().unwrap().push(ms);
-        result
-    }
-
-    /// We should accumulate all times from all steps' reports for the total 
time consumption.
-    fn merge_compute_time(&self, dest: &Time) {
-        let time1 = self
-            .intermediate
-            .lock()
-            .unwrap()
-            .iter()
-            .map(|es| {
-                es.clone_inner()
-                    .elapsed_compute()
-                    .map_or(0u64, |v| v as u64)
-            })
-            .sum();
-        let time2 = self
-            .final_
-            .lock()
-            .unwrap()
-            .iter()
-            .map(|es| {
-                es.clone_inner()
-                    .elapsed_compute()
-                    .map_or(0u64, |v| v as u64)
-            })
-            .sum();
-        dest.add_duration(Duration::from_nanos(time1));
-        dest.add_duration(Duration::from_nanos(time2));
-    }
-
-    /// We should accumulate all count from all steps' reports for the total 
spill count.
-    fn merge_spill_count(&self, dest: &Count) {
-        let count1 = self
-            .intermediate
-            .lock()
-            .unwrap()
-            .iter()
-            .map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
-            .sum();
-        let count2 = self
-            .final_
-            .lock()
-            .unwrap()
-            .iter()
-            .map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
-            .sum();
-        dest.add(count1);
-        dest.add(count2);
-    }
-
-    /// We should accumulate all spilled bytes from all steps' reports for the 
total spilled bytes.
-    fn merge_spilled_bytes(&self, dest: &Count) {
-        let count1 = self
-            .intermediate
-            .lock()
-            .unwrap()
-            .iter()
-            .map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
-            .sum();
-        let count2 = self
-            .final_
-            .lock()
-            .unwrap()
-            .iter()
-            .map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
-            .sum();
-        dest.add(count1);
-        dest.add(count2);
-    }
-
-    /// We should only care about output from the final stage metrics.
-    fn merge_output_count(&self, dest: &Count) {
-        let count = self
-            .final_
-            .lock()
-            .unwrap()
-            .iter()
-            .map(|es| es.clone_inner().output_rows().map_or(0, |v| v))
-            .sum();
-        dest.add(count);
-    }
-}
-
 impl SortExec {
     /// Create a new sort execution plan
     pub fn try_new(
@@ -595,15 +477,7 @@ impl ExecutionPlan for SortExec {
     }
 
     fn metrics(&self) -> Option<MetricsSet> {
-        let metrics = ExecutionPlanMetricsSet::new();
-        let baseline = BaselineMetrics::new(&metrics, 0);
-        self.all_metrics
-            .merge_compute_time(baseline.elapsed_compute());
-        self.all_metrics.merge_spill_count(baseline.spill_count());
-        self.all_metrics
-            .merge_spilled_bytes(baseline.spilled_bytes());
-        self.all_metrics.merge_output_count(baseline.output_rows());
-        Some(metrics.clone_inner())
+        Some(self.all_metrics.aggregate_all())
     }
 
     fn fmt_as(

Reply via email to