This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a92f803298 Minor: Refactor memory size estimation for HashTable 
(#10748)
a92f803298 is described below

commit a92f803298da35776ffc40adce161bac88601938
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Mon Jun 3 20:40:32 2024 +0200

    Minor: Refactor memory size estimation for HashTable (#10748)
    
    * refactor: extract estimate_memory_size
    
    * refactor: cap at usize::MAX
    
    * refactor: use estimate_memory_size
    
    * chore: add examples
    
    * refactor: return Result<usize>; add testcase
    
    * fix: docs
    
    * fix: remove unneccessary checked_div
    
    * fix: remove additional and_then
---
 datafusion/common/src/utils/memory.rs              | 134 +++++++++++++++++++++
 datafusion/common/src/utils/mod.rs                 |   1 +
 .../src/aggregate/count_distinct/native.rs         |  35 ++----
 datafusion/physical-plan/src/joins/hash_join.rs    |  25 ++--
 4 files changed, 153 insertions(+), 42 deletions(-)

diff --git a/datafusion/common/src/utils/memory.rs 
b/datafusion/common/src/utils/memory.rs
new file mode 100644
index 0000000000..17668cf93d
--- /dev/null
+++ b/datafusion/common/src/utils/memory.rs
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides a function to estimate the memory size of a HashTable 
prior to alloaction
+
+use crate::{DataFusionError, Result};
+
+/// Estimates the memory size required for a hash table prior to allocation.
+///
+/// # Parameters
+/// - `num_elements`: The number of elements expected in the hash table.
+/// - `fixed_size`: A fixed overhead size associated with the collection
+/// (e.g., HashSet or HashTable).
+/// - `T`: The type of elements stored in the hash table.
+///
+/// # Details
+/// This function calculates the estimated memory size by considering:
+/// - An overestimation of buckets to keep approximately 1/8 of them empty.
+/// - The total memory size is computed as:
+///   - The size of each entry (`T`) multiplied by the estimated number of
+///     buckets.
+///   - One byte overhead for each bucket.
+///   - The fixed size overhead of the collection.
+/// - If the estimation overflows, we return a [`DataFusionError`]
+///
+/// # Examples
+/// ---
+///
+/// ## From within a struct
+///
+/// ```rust
+/// # use datafusion_common::utils::memory::estimate_memory_size;
+/// # use datafusion_common::Result;
+///
+/// struct MyStruct<T> {
+///     values: Vec<T>,
+///     other_data: usize,
+/// }
+///
+/// impl<T> MyStruct<T> {
+///     fn size(&self) -> Result<usize> {
+///         let num_elements = self.values.len();
+///         let fixed_size = std::mem::size_of_val(self) +
+///           std::mem::size_of_val(&self.values);
+///
+///         estimate_memory_size::<T>(num_elements, fixed_size)
+///     }
+/// }
+/// ```
+/// ---
+/// ## With a simple collection
+///
+/// ```rust
+/// # use datafusion_common::utils::memory::estimate_memory_size;
+/// # use std::collections::HashMap;
+///
+/// let num_rows = 100;
+/// let fixed_size = std::mem::size_of::<HashMap<u64, u64>>();
+/// let estimated_hashtable_size =
+///   estimate_memory_size::<(u64, u64)>(num_rows,fixed_size)
+///     .expect("Size estimation failed");
+/// ```
+pub fn estimate_memory_size<T>(num_elements: usize, fixed_size: usize) -> 
Result<usize> {
+    // For the majority of cases hashbrown overestimates the bucket quantity
+    // to keep ~1/8 of them empty. We take this factor into account by
+    // multiplying the number of elements with a fixed ratio of 8/7 (~1.14).
+    // This formula leads to overallocation for small tables (< 8 elements)
+    // but should be fine overall.
+    num_elements
+        .checked_mul(8)
+        .and_then(|overestimate| {
+            let estimated_buckets = (overestimate / 7).next_power_of_two();
+            // + size of entry * number of buckets
+            // + 1 byte for each bucket
+            // + fixed size of collection (HashSet/HashTable)
+            std::mem::size_of::<T>()
+                .checked_mul(estimated_buckets)?
+                .checked_add(estimated_buckets)?
+                .checked_add(fixed_size)
+        })
+        .ok_or_else(|| {
+            DataFusionError::Execution(
+                "usize overflow while estimating the number of 
buckets".to_string(),
+            )
+        })
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashSet;
+
+    use super::estimate_memory_size;
+
+    #[test]
+    fn test_estimate_memory() {
+        // size (bytes): 48
+        let fixed_size = std::mem::size_of::<HashSet<u32>>();
+
+        // estimated buckets: 16 = (8 * 8 / 7).next_power_of_two()
+        let num_elements = 8;
+        // size (bytes): 128 = 16 * 4 + 16 + 48
+        let estimated = estimate_memory_size::<u32>(num_elements, 
fixed_size).unwrap();
+        assert_eq!(estimated, 128);
+
+        // estimated buckets: 64 = (40 * 8 / 7).next_power_of_two()
+        let num_elements = 40;
+        // size (bytes): 368 = 64 * 4 + 64 + 48
+        let estimated = estimate_memory_size::<u32>(num_elements, 
fixed_size).unwrap();
+        assert_eq!(estimated, 368);
+    }
+
+    #[test]
+    fn test_estimate_memory_overflow() {
+        let num_elements = usize::MAX;
+        let fixed_size = std::mem::size_of::<HashSet<u32>>();
+        let estimated = estimate_memory_size::<u32>(num_elements, fixed_size);
+
+        assert!(estimated.is_err());
+    }
+}
diff --git a/datafusion/common/src/utils/mod.rs 
b/datafusion/common/src/utils/mod.rs
index 402ec95b33..ae444c2cb2 100644
--- a/datafusion/common/src/utils/mod.rs
+++ b/datafusion/common/src/utils/mod.rs
@@ -17,6 +17,7 @@
 
 //! This module provides the bisect function, which implements binary search.
 
+pub mod memory;
 pub mod proxy;
 
 use crate::error::{_internal_datafusion_err, _internal_err};
diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs 
b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs
index 95d8662e0f..0e7483d4a1 100644
--- a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs
+++ b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs
@@ -33,6 +33,7 @@ use arrow_schema::DataType;
 
 use datafusion_common::cast::{as_list_array, as_primitive_array};
 use datafusion_common::utils::array_into_list_array;
+use datafusion_common::utils::memory::estimate_memory_size;
 use datafusion_common::ScalarValue;
 use datafusion_expr::Accumulator;
 
@@ -115,18 +116,11 @@ where
     }
 
     fn size(&self) -> usize {
-        let estimated_buckets = 
(self.values.len().checked_mul(8).unwrap_or(usize::MAX)
-            / 7)
-        .next_power_of_two();
-
-        // Size of accumulator
-        // + size of entry * number of buckets
-        // + 1 byte for each bucket
-        // + fixed size of HashSet
-        std::mem::size_of_val(self)
-            + std::mem::size_of::<T::Native>() * estimated_buckets
-            + estimated_buckets
-            + std::mem::size_of_val(&self.values)
+        let num_elements = self.values.len();
+        let fixed_size =
+            std::mem::size_of_val(self) + std::mem::size_of_val(&self.values);
+
+        estimate_memory_size::<T::Native>(num_elements, fixed_size).unwrap()
     }
 }
 
@@ -202,17 +196,10 @@ where
     }
 
     fn size(&self) -> usize {
-        let estimated_buckets = 
(self.values.len().checked_mul(8).unwrap_or(usize::MAX)
-            / 7)
-        .next_power_of_two();
-
-        // Size of accumulator
-        // + size of entry * number of buckets
-        // + 1 byte for each bucket
-        // + fixed size of HashSet
-        std::mem::size_of_val(self)
-            + std::mem::size_of::<T::Native>() * estimated_buckets
-            + estimated_buckets
-            + std::mem::size_of_val(&self.values)
+        let num_elements = self.values.len();
+        let fixed_size =
+            std::mem::size_of_val(self) + std::mem::size_of_val(&self.values);
+
+        estimate_memory_size::<T::Native>(num_elements, fixed_size).unwrap()
     }
 }
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index e669517be4..784584f03f 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -18,7 +18,6 @@
 //! [`HashJoinExec`] Partitioned Hash Join Operator
 
 use std::fmt;
-use std::mem::size_of;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 use std::task::Poll;
@@ -59,6 +58,7 @@ use arrow::record_batch::RecordBatch;
 use arrow::util::bit_util;
 use arrow_array::cast::downcast_array;
 use arrow_schema::ArrowError;
+use datafusion_common::utils::memory::estimate_memory_size;
 use datafusion_common::{
     internal_datafusion_err, internal_err, plan_err, project_schema, 
DataFusionError,
     JoinSide, JoinType, Result,
@@ -875,23 +875,12 @@ async fn collect_left_input(
 
     // Estimation of memory size, required for hashtable, prior to allocation.
     // Final result can be verified using `RawTable.allocation_info()`
-    //
-    // For majority of cases hashbrown overestimates buckets qty to keep ~1/8 
of them empty.
-    // This formula leads to overallocation for small tables (< 8 elements) 
but fine overall.
-    let estimated_buckets = (num_rows.checked_mul(8).ok_or_else(|| {
-        DataFusionError::Execution(
-            "usize overflow while estimating number of hasmap 
buckets".to_string(),
-        )
-    })? / 7)
-        .next_power_of_two();
-    // 16 bytes per `(u64, u64)`
-    // + 1 byte for each bucket
-    // + fixed size of JoinHashMap (RawTable + Vec)
-    let estimated_hastable_size =
-        16 * estimated_buckets + estimated_buckets + size_of::<JoinHashMap>();
-
-    reservation.try_grow(estimated_hastable_size)?;
-    metrics.build_mem_used.add(estimated_hastable_size);
+    let fixed_size = std::mem::size_of::<JoinHashMap>();
+    let estimated_hashtable_size =
+        estimate_memory_size::<(u64, u64)>(num_rows, fixed_size)?;
+
+    reservation.try_grow(estimated_hashtable_size)?;
+    metrics.build_mem_used.add(estimated_hashtable_size);
 
     let mut hashmap = JoinHashMap::with_capacity(num_rows);
     let mut hashes_buffer = Vec::new();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to