This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new c63cfd4 Move AggregatedMetricsSet to metrics for further reuse (#1663)
c63cfd4 is described below
commit c63cfd4f38dce3aa81a5e00a9cdcbc1d1823a050
Author: Yijie Shen <[email protected]>
AuthorDate: Tue Jan 25 04:04:09 2022 +0800
Move AggregatedMetricsSet to metrics for further reuse (#1663)
---
datafusion/src/physical_plan/metrics/aggregated.rs | 149 +++++++++++++++++++++
datafusion/src/physical_plan/metrics/mod.rs | 2 +
datafusion/src/physical_plan/sorts/sort.rs | 130 +-----------------
3 files changed, 153 insertions(+), 128 deletions(-)
diff --git a/datafusion/src/physical_plan/metrics/aggregated.rs
b/datafusion/src/physical_plan/metrics/aggregated.rs
new file mode 100644
index 0000000..cfcdcb7
--- /dev/null
+++ b/datafusion/src/physical_plan/metrics/aggregated.rs
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics common for complex operators with multiple steps.
+
+use crate::physical_plan::metrics::{
+ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
+};
+use std::sync::Arc;
+use std::time::Duration;
+
+#[derive(Debug, Clone)]
+/// Aggregates all metrics during a complex operation, which is composed of
multiple steps and
+/// each stage reports its statistics separately.
+/// Give sort as an example, when the dataset is more significant than
available memory, it will report
+/// multiple in-mem sort metrics and final merge-sort metrics from
`SortPreservingMergeStream`.
+/// Therefore, We need a separation of metrics for which are final metrics
(for output_rows accumulation),
+/// and which are intermediate metrics that we only account for
elapsed_compute time.
+pub struct AggregatedMetricsSet {
+ intermediate: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
+ final_: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
+}
+
+impl AggregatedMetricsSet {
+ /// Create a new aggregated set
+ pub(crate) fn new() -> Self {
+ Self {
+ intermediate: Arc::new(std::sync::Mutex::new(vec![])),
+ final_: Arc::new(std::sync::Mutex::new(vec![])),
+ }
+ }
+
+ /// create a new intermediate baseline
+ pub(crate) fn new_intermediate_baseline(&self, partition: usize) ->
BaselineMetrics {
+ let ms = ExecutionPlanMetricsSet::new();
+ let result = BaselineMetrics::new(&ms, partition);
+ self.intermediate.lock().unwrap().push(ms);
+ result
+ }
+
+ /// create a new final baseline
+ pub(crate) fn new_final_baseline(&self, partition: usize) ->
BaselineMetrics {
+ let ms = ExecutionPlanMetricsSet::new();
+ let result = BaselineMetrics::new(&ms, partition);
+ self.final_.lock().unwrap().push(ms);
+ result
+ }
+
+ fn merge_compute_time(&self, dest: &Time) {
+ let time1 = self
+ .intermediate
+ .lock()
+ .unwrap()
+ .iter()
+ .map(|es| {
+ es.clone_inner()
+ .elapsed_compute()
+ .map_or(0u64, |v| v as u64)
+ })
+ .sum();
+ let time2 = self
+ .final_
+ .lock()
+ .unwrap()
+ .iter()
+ .map(|es| {
+ es.clone_inner()
+ .elapsed_compute()
+ .map_or(0u64, |v| v as u64)
+ })
+ .sum();
+ dest.add_duration(Duration::from_nanos(time1));
+ dest.add_duration(Duration::from_nanos(time2));
+ }
+
+ fn merge_spill_count(&self, dest: &Count) {
+ let count1 = self
+ .intermediate
+ .lock()
+ .unwrap()
+ .iter()
+ .map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
+ .sum();
+ let count2 = self
+ .final_
+ .lock()
+ .unwrap()
+ .iter()
+ .map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
+ .sum();
+ dest.add(count1);
+ dest.add(count2);
+ }
+
+ fn merge_spilled_bytes(&self, dest: &Count) {
+ let count1 = self
+ .intermediate
+ .lock()
+ .unwrap()
+ .iter()
+ .map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
+ .sum();
+ let count2 = self
+ .final_
+ .lock()
+ .unwrap()
+ .iter()
+ .map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
+ .sum();
+ dest.add(count1);
+ dest.add(count2);
+ }
+
+ fn merge_output_count(&self, dest: &Count) {
+ let count = self
+ .final_
+ .lock()
+ .unwrap()
+ .iter()
+ .map(|es| es.clone_inner().output_rows().map_or(0, |v| v))
+ .sum();
+ dest.add(count);
+ }
+
+ /// Aggregate all metrics into a one
+ pub(crate) fn aggregate_all(&self) -> MetricsSet {
+ let metrics = ExecutionPlanMetricsSet::new();
+ let baseline = BaselineMetrics::new(&metrics, 0);
+ self.merge_compute_time(baseline.elapsed_compute());
+ self.merge_spill_count(baseline.spill_count());
+ self.merge_spilled_bytes(baseline.spilled_bytes());
+ self.merge_output_count(baseline.output_rows());
+ metrics.clone_inner()
+ }
+}
diff --git a/datafusion/src/physical_plan/metrics/mod.rs
b/datafusion/src/physical_plan/metrics/mod.rs
index 232583f..9174fa3 100644
--- a/datafusion/src/physical_plan/metrics/mod.rs
+++ b/datafusion/src/physical_plan/metrics/mod.rs
@@ -17,6 +17,7 @@
//! Metrics for recording information about execution
+mod aggregated;
mod baseline;
mod builder;
mod value;
@@ -30,6 +31,7 @@ use std::{
use hashbrown::HashMap;
// public exports
+pub use aggregated::AggregatedMetricsSet;
pub use baseline::{BaselineMetrics, RecordOutput};
pub use builder::MetricBuilder;
pub use value::{Count, MetricValue, ScopedTimerGuard, Time, Timestamp};
diff --git a/datafusion/src/physical_plan/sorts/sort.rs
b/datafusion/src/physical_plan/sorts/sort.rs
index e091464..456023f 100644
--- a/datafusion/src/physical_plan/sorts/sort.rs
+++ b/datafusion/src/physical_plan/sorts/sort.rs
@@ -26,9 +26,7 @@ use crate::execution::memory_manager::{
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::common::{batch_byte_size, IPCWriter,
SizedRecordBatchStream};
use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::physical_plan::metrics::{
- BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
-};
+use crate::physical_plan::metrics::{AggregatedMetricsSet, BaselineMetrics,
MetricsSet};
use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
use crate::physical_plan::sorts::SortedStream;
use crate::physical_plan::stream::RecordBatchReceiverStream;
@@ -54,7 +52,6 @@ use std::fs::File;
use std::io::BufReader;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
-use std::time::Duration;
use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
use tokio::task;
@@ -365,121 +362,6 @@ pub struct SortExec {
preserve_partitioning: bool,
}
-#[derive(Debug, Clone)]
-/// Aggregates all metrics during a complex operation, which is composed of
multiple steps and
-/// each stage reports its statistics separately.
-/// Give sort as an example, when the dataset is more significant than
available memory, it will report
-/// multiple in-mem sort metrics and final merge-sort metrics from
`SortPreservingMergeStream`.
-/// Therefore, We need a separation of metrics for which are final metrics
(for output_rows accumulation),
-/// and which are intermediate metrics that we only account for
elapsed_compute time.
-struct AggregatedMetricsSet {
- intermediate: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
- final_: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
-}
-
-impl AggregatedMetricsSet {
- fn new() -> Self {
- Self {
- intermediate: Arc::new(std::sync::Mutex::new(vec![])),
- final_: Arc::new(std::sync::Mutex::new(vec![])),
- }
- }
-
- fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics {
- let ms = ExecutionPlanMetricsSet::new();
- let result = BaselineMetrics::new(&ms, partition);
- self.intermediate.lock().unwrap().push(ms);
- result
- }
-
- fn new_final_baseline(&self, partition: usize) -> BaselineMetrics {
- let ms = ExecutionPlanMetricsSet::new();
- let result = BaselineMetrics::new(&ms, partition);
- self.final_.lock().unwrap().push(ms);
- result
- }
-
- /// We should accumulate all times from all steps' reports for the total
time consumption.
- fn merge_compute_time(&self, dest: &Time) {
- let time1 = self
- .intermediate
- .lock()
- .unwrap()
- .iter()
- .map(|es| {
- es.clone_inner()
- .elapsed_compute()
- .map_or(0u64, |v| v as u64)
- })
- .sum();
- let time2 = self
- .final_
- .lock()
- .unwrap()
- .iter()
- .map(|es| {
- es.clone_inner()
- .elapsed_compute()
- .map_or(0u64, |v| v as u64)
- })
- .sum();
- dest.add_duration(Duration::from_nanos(time1));
- dest.add_duration(Duration::from_nanos(time2));
- }
-
- /// We should accumulate all count from all steps' reports for the total
spill count.
- fn merge_spill_count(&self, dest: &Count) {
- let count1 = self
- .intermediate
- .lock()
- .unwrap()
- .iter()
- .map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
- .sum();
- let count2 = self
- .final_
- .lock()
- .unwrap()
- .iter()
- .map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
- .sum();
- dest.add(count1);
- dest.add(count2);
- }
-
- /// We should accumulate all spilled bytes from all steps' reports for the
total spilled bytes.
- fn merge_spilled_bytes(&self, dest: &Count) {
- let count1 = self
- .intermediate
- .lock()
- .unwrap()
- .iter()
- .map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
- .sum();
- let count2 = self
- .final_
- .lock()
- .unwrap()
- .iter()
- .map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
- .sum();
- dest.add(count1);
- dest.add(count2);
- }
-
- /// We should only care about output from the final stage metrics.
- fn merge_output_count(&self, dest: &Count) {
- let count = self
- .final_
- .lock()
- .unwrap()
- .iter()
- .map(|es| es.clone_inner().output_rows().map_or(0, |v| v))
- .sum();
- dest.add(count);
- }
-}
-
impl SortExec {
/// Create a new sort execution plan
pub fn try_new(
@@ -595,15 +477,7 @@ impl ExecutionPlan for SortExec {
}
fn metrics(&self) -> Option<MetricsSet> {
- let metrics = ExecutionPlanMetricsSet::new();
- let baseline = BaselineMetrics::new(&metrics, 0);
- self.all_metrics
- .merge_compute_time(baseline.elapsed_compute());
- self.all_metrics.merge_spill_count(baseline.spill_count());
- self.all_metrics
- .merge_spilled_bytes(baseline.spilled_bytes());
- self.all_metrics.merge_output_count(baseline.output_rows());
- Some(metrics.clone_inner())
+ Some(self.all_metrics.aggregate_all())
}
fn fmt_as(