alamb commented on a change in pull request #1691: URL: https://github.com/apache/arrow-datafusion/pull/1691#discussion_r794735689
########## File path: datafusion/src/execution/memory_manager.rs ########## @@ -267,41 +267,37 @@ impl MemoryManager { ); Arc::new(Self { - requesters: Arc::new(Mutex::new(HashMap::new())), - trackers: Arc::new(Mutex::new(HashMap::new())), + requesters: Arc::new(Mutex::new(HashSet::new())), pool_size, requesters_total: Arc::new(Mutex::new(0)), + trackers_total: Arc::new(Mutex::new(0)), cv: Condvar::new(), }) } } } fn get_tracker_total(&self) -> usize { - let trackers = self.trackers.lock().unwrap(); - if trackers.len() > 0 { - trackers.values().fold(0usize, |acc, y| match y.upgrade() { - None => acc, - Some(t) => acc + t.mem_used(), - }) - } else { - 0 - } + *self.trackers_total.lock().unwrap() Review comment: well that sure looks nicer 👍 ########## File path: datafusion/src/execution/memory_manager.rs ########## @@ -245,10 +245,10 @@ The memory management architecture is the following: /// Manage memory usage during physical plan execution #[derive(Debug)] pub struct MemoryManager { - requesters: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>, - trackers: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>, + requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>, pool_size: usize, requesters_total: Arc<Mutex<usize>>, Review comment: Maybe as a follow on PR this can be changed to be an `AtomicUsize` and avoid the mutex and I think the fetch and update code will be nicer. I think that would be a nice to have - not required. ########## File path: datafusion/src/physical_plan/metrics/composite.rs ########## @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metrics common for complex operators with multiple steps. + +use crate::execution::runtime_env::RuntimeEnv; +use crate::physical_plan::metrics::tracker::MemTrackingMetrics; +use crate::physical_plan::metrics::{ + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time, + Timestamp, +}; +use crate::physical_plan::Metric; +use chrono::{TimeZone, Utc}; +use std::sync::Arc; +use std::time::Duration; + +#[derive(Debug, Clone)] +/// Collects all metrics during a complex operation, which is composed of multiple steps and +/// each stage reports its statistics separately. +/// Give sort as an example, when the dataset is more significant than available memory, it will report +/// multiple in-mem sort metrics and final merge-sort metrics from `SortPreservingMergeStream`. +/// Therefore, We need a separation of metrics for which are final metrics (for output_rows accumulation), +/// and which are intermediate metrics that we only account for elapsed_compute time. +pub struct CompositeMetricsSet { Review comment: this is a nicer name and a good description for `Aggregated` metrics ########## File path: datafusion/src/execution/memory_manager.rs ########## @@ -528,23 +520,29 @@ mod tests { .with_memory_manager(MemoryManagerConfig::try_new_limit(100, 1.0).unwrap()); let runtime = Arc::new(RuntimeEnv::new(config).unwrap()); - let tracker1 = Arc::new(DummyTracker::new(0, runtime.clone(), 5)); - runtime.register_consumer(&(tracker1.clone() as Arc<dyn MemoryConsumer>)); + DummyTracker::new(0, runtime.clone(), 5); assert_eq!(runtime.memory_manager.get_tracker_total(), 5); - let tracker2 = Arc::new(DummyTracker::new(0, runtime.clone(), 10)); - runtime.register_consumer(&(tracker2.clone() as Arc<dyn MemoryConsumer>)); + let tracker1 = DummyTracker::new(0, runtime.clone(), 10); assert_eq!(runtime.memory_manager.get_tracker_total(), 15); - let tracker3 = Arc::new(DummyTracker::new(0, runtime.clone(), 15)); - runtime.register_consumer(&(tracker3.clone() as Arc<dyn MemoryConsumer>)); + DummyTracker::new(0, runtime.clone(), 15); assert_eq!(runtime.memory_manager.get_tracker_total(), 30); - runtime.drop_consumer(tracker2.id()); + runtime.drop_consumer(tracker1.id(), tracker1.mem_used); + assert_eq!(runtime.memory_manager.get_tracker_total(), 20); + + // MemTrackingMetrics as an easy way to track memory + let ms = ExecutionPlanMetricsSet::new(); + let tracking_metric = MemTrackingMetrics::new_with_rt(&ms, 0, runtime.clone()); + tracking_metric.init_mem_used(15); Review comment: 👌 --very nice ########## File path: datafusion/src/physical_plan/metrics/tracker.rs ########## @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metrics with memory usage tracking capability + +use crate::execution::runtime_env::RuntimeEnv; +use crate::execution::MemoryConsumerId; +use crate::physical_plan::metrics::{ + BaselineMetrics, Count, ExecutionPlanMetricsSet, Time, +}; +use std::sync::Arc; +use std::task::Poll; + +use arrow::{error::ArrowError, record_batch::RecordBatch}; + +/// Simplified version of tracking memory consumer, +/// see also: [`Tracking`](crate::execution::memory_manager::ConsumerType::Tracking) +/// +/// You could use this to replace [BaselineMetrics], report the memory, Review comment: I think it would make sense tocombine `MemTrackingMetrics` and `BaselineMetrics` -- for example make all put the `runtime` and `id` fields into `BaselineMetrics` The rationale is that we would eventually like all `ExecutionPlan` operations to report their memory usage (as well as row count, and time spent) so using the existing `BaselineMetrics` would make it easy ########## File path: datafusion/src/physical_plan/sorts/sort.rs ########## @@ -139,41 +143,43 @@ impl ExternalSorter { let stream = read_spill_as_stream(spill, self.schema.clone())?; streams.push(SortedStream::new(stream, 0)); } - let baseline_metrics = self.metrics.new_final_baseline(partition); + let tracking_metrics = self + .metrics_set + .new_final_tracking(partition, self.runtime.clone()); Ok(Box::pin(SortPreservingMergeStream::new_from_streams( streams, self.schema.clone(), &self.expr, - baseline_metrics, - partition, + tracking_metrics, self.runtime.clone(), ))) } else if in_mem_batches.len() > 0 { - let baseline_metrics = self.metrics.new_final_baseline(partition); + let tracking_metrics = self + .metrics_set + .new_final_tracking(partition, self.runtime.clone()); let result = in_mem_partial_sort( &mut *in_mem_batches, self.schema.clone(), &self.expr, - baseline_metrics, + tracking_metrics, ); - self.inner_metrics.mem_used().set(0); - // TODO: the result size is not tracked + self.metrics.mem_used().set(0); Review comment: ```suggestion // Report to the memory manager we are no longer using memory self.metrics.mem_used().set(0); ``` ########## File path: datafusion/src/physical_plan/metrics/tracker.rs ########## @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metrics with memory usage tracking capability + +use crate::execution::runtime_env::RuntimeEnv; +use crate::execution::MemoryConsumerId; +use crate::physical_plan::metrics::{ + BaselineMetrics, Count, ExecutionPlanMetricsSet, Time, +}; +use std::sync::Arc; +use std::task::Poll; + +use arrow::{error::ArrowError, record_batch::RecordBatch}; + +/// Simplified version of tracking memory consumer, +/// see also: [`Tracking`](crate::execution::memory_manager::ConsumerType::Tracking) +/// +/// You could use this to replace [BaselineMetrics], report the memory, +/// and get the memory usage bookkeeping in the memory manager easily. +#[derive(Debug)] +pub struct MemTrackingMetrics { + id: MemoryConsumerId, + runtime: Option<Arc<RuntimeEnv>>, + metrics: BaselineMetrics, +} + +/// Delegates most of the metrics functionalities to the inner BaselineMetrics, +/// intercept memory metrics functionalities and do memory manager bookkeeping. +impl MemTrackingMetrics { + /// Create metrics similar to [BaselineMetrics] + pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + let id = MemoryConsumerId::new(partition); + Self { + id, + runtime: None, + metrics: BaselineMetrics::new(metrics, partition), + } + } + + /// Create memory tracking metrics with reference to runtime + pub fn new_with_rt( + metrics: &ExecutionPlanMetricsSet, + partition: usize, + runtime: Arc<RuntimeEnv>, + ) -> Self { + let id = MemoryConsumerId::new(partition); + Self { + id, + runtime: Some(runtime), + metrics: BaselineMetrics::new(metrics, partition), + } + } + + /// return the metric for cpu time spend in this operator + pub fn elapsed_compute(&self) -> &Time { + self.metrics.elapsed_compute() + } + + /// return the size for current memory usage + pub fn mem_used(&self) -> usize { + self.metrics.mem_used().value() + } + + /// setup initial memory usage and register it with memory manager + pub fn init_mem_used(&self, size: usize) { + self.metrics.mem_used().set(size); + if let Some(rt) = self.runtime.as_ref() { + rt.memory_manager.grow_tracker_usage(size); + } + } + + /// return the metric for the total number of output rows produced + pub fn output_rows(&self) -> &Count { + self.metrics.output_rows() + } + + /// Records the fact that this operator's execution is complete + /// (recording the `end_time` metric). + /// + /// Note care should be taken to call `done()` manually if + /// `MemTrackingMetrics` is not `drop`ped immediately upon operator + /// completion, as async streams may not be dropped immediately + /// depending on the consumer. + pub fn done(&self) { + self.metrics.done() + } + + /// Record that some number of rows have been produced as output + /// + /// See the [`RecordOutput`] for conveniently recording record + /// batch output for other thing + pub fn record_output(&self, num_rows: usize) { + self.metrics.record_output(num_rows) + } + + /// Process a poll result of a stream producing output for an + /// operator, recording the output rows and stream done time and + /// returning the same poll result + pub fn record_poll( + &self, + poll: Poll<Option<Result<RecordBatch, ArrowError>>>, + ) -> Poll<Option<Result<RecordBatch, ArrowError>>> { + self.metrics.record_poll(poll) + } +} + +impl Drop for MemTrackingMetrics { + fn drop(&mut self) { + self.metrics.try_done(); + if self.mem_used() != 0 { + if let Some(rt) = self.runtime.as_ref() { + rt.drop_consumer(&self.id, self.mem_used()); Review comment: 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org