This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a92f803298 Minor: Refactor memory size estimation for HashTable
(#10748)
a92f803298 is described below
commit a92f803298da35776ffc40adce161bac88601938
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Mon Jun 3 20:40:32 2024 +0200
Minor: Refactor memory size estimation for HashTable (#10748)
* refactor: extract estimate_memory_size
* refactor: cap at usize::MAX
* refactor: use estimate_memory_size
* chore: add examples
* refactor: return Result<usize>; add testcase
* fix: docs
* fix: remove unneccessary checked_div
* fix: remove additional and_then
---
datafusion/common/src/utils/memory.rs | 134 +++++++++++++++++++++
datafusion/common/src/utils/mod.rs | 1 +
.../src/aggregate/count_distinct/native.rs | 35 ++----
datafusion/physical-plan/src/joins/hash_join.rs | 25 ++--
4 files changed, 153 insertions(+), 42 deletions(-)
diff --git a/datafusion/common/src/utils/memory.rs
b/datafusion/common/src/utils/memory.rs
new file mode 100644
index 0000000000..17668cf93d
--- /dev/null
+++ b/datafusion/common/src/utils/memory.rs
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides a function to estimate the memory size of a HashTable
prior to alloaction
+
+use crate::{DataFusionError, Result};
+
+/// Estimates the memory size required for a hash table prior to allocation.
+///
+/// # Parameters
+/// - `num_elements`: The number of elements expected in the hash table.
+/// - `fixed_size`: A fixed overhead size associated with the collection
+/// (e.g., HashSet or HashTable).
+/// - `T`: The type of elements stored in the hash table.
+///
+/// # Details
+/// This function calculates the estimated memory size by considering:
+/// - An overestimation of buckets to keep approximately 1/8 of them empty.
+/// - The total memory size is computed as:
+/// - The size of each entry (`T`) multiplied by the estimated number of
+/// buckets.
+/// - One byte overhead for each bucket.
+/// - The fixed size overhead of the collection.
+/// - If the estimation overflows, we return a [`DataFusionError`]
+///
+/// # Examples
+/// ---
+///
+/// ## From within a struct
+///
+/// ```rust
+/// # use datafusion_common::utils::memory::estimate_memory_size;
+/// # use datafusion_common::Result;
+///
+/// struct MyStruct<T> {
+/// values: Vec<T>,
+/// other_data: usize,
+/// }
+///
+/// impl<T> MyStruct<T> {
+/// fn size(&self) -> Result<usize> {
+/// let num_elements = self.values.len();
+/// let fixed_size = std::mem::size_of_val(self) +
+/// std::mem::size_of_val(&self.values);
+///
+/// estimate_memory_size::<T>(num_elements, fixed_size)
+/// }
+/// }
+/// ```
+/// ---
+/// ## With a simple collection
+///
+/// ```rust
+/// # use datafusion_common::utils::memory::estimate_memory_size;
+/// # use std::collections::HashMap;
+///
+/// let num_rows = 100;
+/// let fixed_size = std::mem::size_of::<HashMap<u64, u64>>();
+/// let estimated_hashtable_size =
+/// estimate_memory_size::<(u64, u64)>(num_rows,fixed_size)
+/// .expect("Size estimation failed");
+/// ```
+pub fn estimate_memory_size<T>(num_elements: usize, fixed_size: usize) ->
Result<usize> {
+ // For the majority of cases hashbrown overestimates the bucket quantity
+ // to keep ~1/8 of them empty. We take this factor into account by
+ // multiplying the number of elements with a fixed ratio of 8/7 (~1.14).
+ // This formula leads to overallocation for small tables (< 8 elements)
+ // but should be fine overall.
+ num_elements
+ .checked_mul(8)
+ .and_then(|overestimate| {
+ let estimated_buckets = (overestimate / 7).next_power_of_two();
+ // + size of entry * number of buckets
+ // + 1 byte for each bucket
+ // + fixed size of collection (HashSet/HashTable)
+ std::mem::size_of::<T>()
+ .checked_mul(estimated_buckets)?
+ .checked_add(estimated_buckets)?
+ .checked_add(fixed_size)
+ })
+ .ok_or_else(|| {
+ DataFusionError::Execution(
+ "usize overflow while estimating the number of
buckets".to_string(),
+ )
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashSet;
+
+ use super::estimate_memory_size;
+
+ #[test]
+ fn test_estimate_memory() {
+ // size (bytes): 48
+ let fixed_size = std::mem::size_of::<HashSet<u32>>();
+
+ // estimated buckets: 16 = (8 * 8 / 7).next_power_of_two()
+ let num_elements = 8;
+ // size (bytes): 128 = 16 * 4 + 16 + 48
+ let estimated = estimate_memory_size::<u32>(num_elements,
fixed_size).unwrap();
+ assert_eq!(estimated, 128);
+
+ // estimated buckets: 64 = (40 * 8 / 7).next_power_of_two()
+ let num_elements = 40;
+ // size (bytes): 368 = 64 * 4 + 64 + 48
+ let estimated = estimate_memory_size::<u32>(num_elements,
fixed_size).unwrap();
+ assert_eq!(estimated, 368);
+ }
+
+ #[test]
+ fn test_estimate_memory_overflow() {
+ let num_elements = usize::MAX;
+ let fixed_size = std::mem::size_of::<HashSet<u32>>();
+ let estimated = estimate_memory_size::<u32>(num_elements, fixed_size);
+
+ assert!(estimated.is_err());
+ }
+}
diff --git a/datafusion/common/src/utils/mod.rs
b/datafusion/common/src/utils/mod.rs
index 402ec95b33..ae444c2cb2 100644
--- a/datafusion/common/src/utils/mod.rs
+++ b/datafusion/common/src/utils/mod.rs
@@ -17,6 +17,7 @@
//! This module provides the bisect function, which implements binary search.
+pub mod memory;
pub mod proxy;
use crate::error::{_internal_datafusion_err, _internal_err};
diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs
b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs
index 95d8662e0f..0e7483d4a1 100644
--- a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs
+++ b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs
@@ -33,6 +33,7 @@ use arrow_schema::DataType;
use datafusion_common::cast::{as_list_array, as_primitive_array};
use datafusion_common::utils::array_into_list_array;
+use datafusion_common::utils::memory::estimate_memory_size;
use datafusion_common::ScalarValue;
use datafusion_expr::Accumulator;
@@ -115,18 +116,11 @@ where
}
fn size(&self) -> usize {
- let estimated_buckets =
(self.values.len().checked_mul(8).unwrap_or(usize::MAX)
- / 7)
- .next_power_of_two();
-
- // Size of accumulator
- // + size of entry * number of buckets
- // + 1 byte for each bucket
- // + fixed size of HashSet
- std::mem::size_of_val(self)
- + std::mem::size_of::<T::Native>() * estimated_buckets
- + estimated_buckets
- + std::mem::size_of_val(&self.values)
+ let num_elements = self.values.len();
+ let fixed_size =
+ std::mem::size_of_val(self) + std::mem::size_of_val(&self.values);
+
+ estimate_memory_size::<T::Native>(num_elements, fixed_size).unwrap()
}
}
@@ -202,17 +196,10 @@ where
}
fn size(&self) -> usize {
- let estimated_buckets =
(self.values.len().checked_mul(8).unwrap_or(usize::MAX)
- / 7)
- .next_power_of_two();
-
- // Size of accumulator
- // + size of entry * number of buckets
- // + 1 byte for each bucket
- // + fixed size of HashSet
- std::mem::size_of_val(self)
- + std::mem::size_of::<T::Native>() * estimated_buckets
- + estimated_buckets
- + std::mem::size_of_val(&self.values)
+ let num_elements = self.values.len();
+ let fixed_size =
+ std::mem::size_of_val(self) + std::mem::size_of_val(&self.values);
+
+ estimate_memory_size::<T::Native>(num_elements, fixed_size).unwrap()
}
}
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs
b/datafusion/physical-plan/src/joins/hash_join.rs
index e669517be4..784584f03f 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -18,7 +18,6 @@
//! [`HashJoinExec`] Partitioned Hash Join Operator
use std::fmt;
-use std::mem::size_of;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
@@ -59,6 +58,7 @@ use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
use arrow_array::cast::downcast_array;
use arrow_schema::ArrowError;
+use datafusion_common::utils::memory::estimate_memory_size;
use datafusion_common::{
internal_datafusion_err, internal_err, plan_err, project_schema,
DataFusionError,
JoinSide, JoinType, Result,
@@ -875,23 +875,12 @@ async fn collect_left_input(
// Estimation of memory size, required for hashtable, prior to allocation.
// Final result can be verified using `RawTable.allocation_info()`
- //
- // For majority of cases hashbrown overestimates buckets qty to keep ~1/8
of them empty.
- // This formula leads to overallocation for small tables (< 8 elements)
but fine overall.
- let estimated_buckets = (num_rows.checked_mul(8).ok_or_else(|| {
- DataFusionError::Execution(
- "usize overflow while estimating number of hasmap
buckets".to_string(),
- )
- })? / 7)
- .next_power_of_two();
- // 16 bytes per `(u64, u64)`
- // + 1 byte for each bucket
- // + fixed size of JoinHashMap (RawTable + Vec)
- let estimated_hastable_size =
- 16 * estimated_buckets + estimated_buckets + size_of::<JoinHashMap>();
-
- reservation.try_grow(estimated_hastable_size)?;
- metrics.build_mem_used.add(estimated_hastable_size);
+ let fixed_size = std::mem::size_of::<JoinHashMap>();
+ let estimated_hashtable_size =
+ estimate_memory_size::<(u64, u64)>(num_rows, fixed_size)?;
+
+ reservation.try_grow(estimated_hashtable_size)?;
+ metrics.build_mem_used.add(estimated_hashtable_size);
let mut hashmap = JoinHashMap::with_capacity(num_rows);
let mut hashes_buffer = Vec::new();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]