tisonkun commented on code in PR #45:
URL: https://github.com/apache/datasketches-rust/pull/45#discussion_r2653234830


##########
datasketches/src/theta/hash_table.rs:
##########
@@ -0,0 +1,698 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::hash::Hash;
+
+use crate::hash::MurmurHash3X64128;
+
+/// Maximum theta value (signed max for compatibility with Java)
+pub const MAX_THETA: u64 = i64::MAX as u64;
+
+/// Minimum log2 of K
+pub const MIN_LG_K: u8 = 5;
+
+/// Maximum log2 of K
+pub const MAX_LG_K: u8 = 26;
+
+/// Default log2 of K
+pub const DEFAULT_LG_K: u8 = 12;
+
+/// Hash table resize factor
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum ResizeFactor {
+    /// Resize by factor of 2
+    X2 = 2,
+    /// Resize by factor of 4
+    X4 = 4,
+    /// Resize by factor of 8
+    #[default]
+    X8 = 8,
+}
+
+impl ResizeFactor {
+    pub fn lg_value(&self) -> u8 {
+        match self {
+            ResizeFactor::X2 => 1,
+            ResizeFactor::X4 => 2,
+            ResizeFactor::X8 => 3,
+        }
+    }
+}
+
+/// Resize threshold (0.5 = 50% load factor)
+const RESIZE_THRESHOLD: f64 = 0.5;
+
+/// Rebuild threshold (15/16 = 93.75% load factor)
+const REBUILD_THRESHOLD: f64 = 15.0 / 16.0;
+
+/// Stride hash bits (7 bits for stride calculation)
+const STRIDE_HASH_BITS: u8 = 7;
+
+/// Stride mask
+const STRIDE_MASK: u64 = (1 << STRIDE_HASH_BITS) - 1;
+
+/// Specific hash table for theta sketch
+///
+/// It maintain a array capacity max to [2 ** (lg_max_size)]:
+/// - Before it reach the max capacity, it will extend the array based on 
resize_factor.
+/// - After it reach the capacity bigger than 2 ** (lg_nom_size), every time 
the number of entries
+///   exceeds the threshold, it will rebuild the table: only keep the min 2 ** 
lg_nom_size entries
+///   and update the theta to the k-th smallest entry.
+#[derive(Debug)]
+pub(crate) struct ThetaHashTable {
+    lg_cur_size: u8,
+    lg_nom_size: u8,
+    lg_max_size: u8,
+    resize_factor: ResizeFactor,
+    sampling_probability: f32,
+    hash_seed: u64,
+
+    theta: u64,
+
+    entries: Vec<u64>,
+    num_entries: usize,
+}
+
+impl ThetaHashTable {
+    /// Create a new hash table
+    pub fn new(
+        lg_nom_size: u8,
+        resize_factor: ResizeFactor,
+        sampling_probability: f32,
+        hash_seed: u64,
+    ) -> Self {
+        let lg_max_size = lg_nom_size + 1;
+        let lg_cur_size = starting_sub_multiple(lg_max_size, MIN_LG_K, 
resize_factor.lg_value());
+        let size = if lg_cur_size > 0 { 1 << lg_cur_size } else { 0 };
+        let entries = vec![0u64; size];
+
+        Self {
+            lg_cur_size,
+            lg_nom_size,
+            lg_max_size,
+            resize_factor,
+            sampling_probability,
+            theta: 
starting_theta_from_sampling_probability(sampling_probability),
+            hash_seed,
+            entries,
+            num_entries: 0,
+        }
+    }
+
+    /// Hash and screen a value
+    ///
+    /// Returns the hash value if it passes the theta threshold, otherwise 0.
+    pub fn hash_and_screen<T: Hash>(&mut self, value: T) -> u64 {
+        let mut hasher = MurmurHash3X64128::with_seed(self.hash_seed);
+        value.hash(&mut hasher);
+        let (h1, _) = hasher.finish128();
+        let hash = h1 >> 1; // To make it compatible with Java version
+        if hash >= self.theta {
+            return 0; // hash == 0 is reserved for empty slots
+        }
+        hash
+    }
+
+    /// Find an entry in the hash table.
+    ///
+    /// Returns the index of the entry if found, otherwise None. The entry may 
has been inserted or
+    /// empty.
+    fn find_in_curr_entries(&self, key: u64) -> Option<usize> {
+        Self::find_in_entries(&self.entries, key, self.lg_cur_size)
+    }
+
+    /// Find index in a given entries.
+    ///
+    /// Returns the index of the entry if found, otherwise None. The entry may 
has been inserted or
+    /// empty.
+    fn find_in_entries(entries: &[u64], key: u64, lg_size: u8) -> 
Option<usize> {
+        if entries.is_empty() {
+            return None;
+        }
+
+        let size = entries.len();
+        let mask = size - 1;
+        let stride = Self::get_stride(key, lg_size);
+        let mut index = (key as usize) & mask;
+        let loop_index = index;
+
+        loop {
+            let probe = entries[index];
+            if probe == 0 || probe == key {
+                return Some(index);
+            }
+            index = (index + stride) & mask;
+            if index == loop_index {
+                return None;
+            }
+        }
+    }
+
+    /// Insert a hash value into the table
+    ///
+    /// Returns true if the value was inserted (new), false otherwise.
+    pub fn try_insert(&mut self, hash: u64) -> bool {
+        if hash == 0 {
+            return false;
+        }
+
+        let Some(index) = self.find_in_curr_entries(hash) else {
+            unreachable!(
+                "Resize or rebuild should be called to make sure it always can 
find the entry."
+            );
+        };
+
+        // Already exists
+        if self.entries[index] == hash {
+            return false;
+        }
+
+        assert!(self.entries[index] == 0, "Entry should be empty");
+        self.entries[index] = hash;
+        self.num_entries += 1;
+
+        // Check if we need to resize or rebuild
+        let capacity = self.get_capacity();
+        if self.num_entries > capacity {
+            if self.lg_cur_size <= self.lg_nom_size {
+                self.resize();
+            } else {
+                self.rebuild();
+            }
+        }
+        true
+    }
+
+    /// Get capacity threshold
+    fn get_capacity(&self) -> usize {
+        let fraction = if self.lg_cur_size <= self.lg_nom_size {
+            RESIZE_THRESHOLD
+        } else {
+            REBUILD_THRESHOLD
+        };
+        (fraction * self.entries.len() as f64) as usize
+    }
+
+    /// Resize the hash table
+    fn resize(&mut self) {
+        let new_lg_size = std::cmp::min(
+            self.lg_cur_size + self.resize_factor.lg_value(),
+            self.lg_max_size,
+        );
+        let new_size = 1 << new_lg_size;
+
+        // Get new entries and rehash all entries
+        let mut new_entries = vec![0u64; new_size];
+        for &entry in &self.entries {
+            if entry != 0 {
+                let new_index = Self::find_in_entries(&new_entries, entry, 
new_lg_size);
+                if let Some(idx) = new_index {
+                    new_entries[idx] = entry;
+                } else {
+                    unreachable!(
+                        "find_in_entries should always return Some if the 
entry is not empty."
+                    );
+                }
+            }
+        }
+
+        self.entries = new_entries;
+        self.lg_cur_size = new_lg_size;
+    }
+
+    /// Rebuild the hash table:
+    /// The number of entries will be reduced to the nominal size k.
+    fn rebuild(&mut self) {
+        // Select the k-th smallest entry as new theta and keep the lesser 
entries.
+        self.entries.retain(|&e| e != 0);
+        let k = 1u64 << self.lg_nom_size;
+        let (lesser, kth, _) = self.entries.select_nth_unstable(k as usize);
+        self.theta = *kth;
+
+        // Rebuild the table with the lesser entries.
+        let size = 1 << self.lg_cur_size;
+        let mut new_entries = vec![0u64; size];
+        let mut num_inserted = 0;
+        for entry in lesser {
+            if let Some(idx) = Self::find_in_entries(&new_entries, *entry, 
self.lg_cur_size) {
+                new_entries[idx] = *entry;
+                num_inserted += 1;
+            } else {
+                unreachable!(
+                    "find_in_entries should always return Some if the entry is 
not empty."
+                );
+            }
+        }
+
+        assert!(
+            num_inserted == k as usize,
+            "Number of inserted entries should be equal to k."
+        );

Review Comment:
   ```suggestion
           assert_eq!(
               num_inserted, k as usize,
               "Number of inserted entries should be equal to k."
           );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to