This is an automated email from the ASF dual-hosted git repository.

leerho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datasketches-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a8543e  feat: add theta sketch (part 1) (#45)
0a8543e is described below

commit 0a8543e304adb7bd741be0c494967b1469bd1861
Author: ZENOTME <[email protected]>
AuthorDate: Wed Dec 31 00:15:02 2025 +0800

    feat: add theta sketch (part 1) (#45)
    
    * feat: add theta sketch (part 1)
    
    - add hash table for theta sketch
    - add ThetaSketch
    
    * refine some function name and comment
    
    * refine: use unified default seed DEFAULT_UPDATE_SEED
    
    * refine some style
---
 datasketches/src/lib.rs                   |   1 +
 datasketches/src/theta/hash_table.rs      | 698 ++++++++++++++++++++++++++++++
 datasketches/src/{lib.rs => theta/mod.rs} |  31 +-
 datasketches/src/theta/sketch.rs          | 209 +++++++++
 datasketches/tests/theta_sketch_test.rs   | 125 ++++++
 examples/Cargo.toml                       |   4 +
 examples/src/theta_sketch.rs              |  49 +++
 7 files changed, 1100 insertions(+), 17 deletions(-)

diff --git a/datasketches/src/lib.rs b/datasketches/src/lib.rs
index c7c2a14..fc4679e 100644
--- a/datasketches/src/lib.rs
+++ b/datasketches/src/lib.rs
@@ -34,5 +34,6 @@ pub mod countmin;
 pub mod error;
 pub mod hll;
 pub mod tdigest;
+pub mod theta;
 
 mod hash;
diff --git a/datasketches/src/theta/hash_table.rs 
b/datasketches/src/theta/hash_table.rs
new file mode 100644
index 0000000..634f232
--- /dev/null
+++ b/datasketches/src/theta/hash_table.rs
@@ -0,0 +1,698 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::hash::Hash;
+
+use crate::hash::MurmurHash3X64128;
+
+/// Maximum theta value (signed max for compatibility with Java)
+pub const MAX_THETA: u64 = i64::MAX as u64;
+
+/// Minimum log2 of K
+pub const MIN_LG_K: u8 = 5;
+
+/// Maximum log2 of K
+pub const MAX_LG_K: u8 = 26;
+
+/// Default log2 of K
+pub const DEFAULT_LG_K: u8 = 12;
+
+/// Hash table resize factor
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum ResizeFactor {
+    /// Resize by factor of 2
+    X2 = 2,
+    /// Resize by factor of 4
+    X4 = 4,
+    /// Resize by factor of 8
+    #[default]
+    X8 = 8,
+}
+
+impl ResizeFactor {
+    pub fn lg_value(&self) -> u8 {
+        match self {
+            ResizeFactor::X2 => 1,
+            ResizeFactor::X4 => 2,
+            ResizeFactor::X8 => 3,
+        }
+    }
+}
+
+/// Resize threshold (0.5 = 50% load factor)
+const RESIZE_THRESHOLD: f64 = 0.5;
+
+/// Rebuild threshold (15/16 = 93.75% load factor)
+const REBUILD_THRESHOLD: f64 = 15.0 / 16.0;
+
+/// Stride hash bits (7 bits for stride calculation)
+const STRIDE_HASH_BITS: u8 = 7;
+
+/// Stride mask
+const STRIDE_MASK: u64 = (1 << STRIDE_HASH_BITS) - 1;
+
+/// Specific hash table for theta sketch
+///
+/// It maintains an array capacity max to 2^lg_max_size:
+/// * Before it reaches the max capacity, it will extend the array based on 
resize_factor.
+/// * After it reaches the capacity bigger than 2^lg_nom_size, every time the 
number of entries
+///   exceeds the threshold, it will rebuild the table: only keep the min 
2^lg_nom_size entries and
+///   update the theta to the k-th smallest entry.
+#[derive(Debug)]
+pub(crate) struct ThetaHashTable {
+    lg_cur_size: u8,
+    lg_nom_size: u8,
+    lg_max_size: u8,
+    resize_factor: ResizeFactor,
+    sampling_probability: f32,
+    hash_seed: u64,
+
+    theta: u64,
+
+    entries: Vec<u64>,
+    num_entries: usize,
+}
+
+impl ThetaHashTable {
+    /// Create a new hash table
+    pub fn new(
+        lg_nom_size: u8,
+        resize_factor: ResizeFactor,
+        sampling_probability: f32,
+        hash_seed: u64,
+    ) -> Self {
+        let lg_max_size = lg_nom_size + 1;
+        let lg_cur_size = starting_sub_multiple(lg_max_size, MIN_LG_K, 
resize_factor.lg_value());
+        let size = if lg_cur_size > 0 { 1 << lg_cur_size } else { 0 };
+        let entries = vec![0u64; size];
+
+        Self {
+            lg_cur_size,
+            lg_nom_size,
+            lg_max_size,
+            resize_factor,
+            sampling_probability,
+            theta: 
starting_theta_from_sampling_probability(sampling_probability),
+            hash_seed,
+            entries,
+            num_entries: 0,
+        }
+    }
+
+    /// Hash and screen a value
+    ///
+    /// Returns the hash value if it passes the theta threshold, otherwise 0.
+    pub fn hash_and_screen<T: Hash>(&mut self, value: T) -> u64 {
+        let mut hasher = MurmurHash3X64128::with_seed(self.hash_seed);
+        value.hash(&mut hasher);
+        let (h1, _) = hasher.finish128();
+        let hash = h1 >> 1; // To make it compatible with Java version
+        if hash >= self.theta {
+            return 0; // hash == 0 is reserved for empty slots
+        }
+        hash
+    }
+
+    /// Find an entry in the hash table.
+    ///
+    /// Returns the index of the entry if found, otherwise None. The entry may 
have been inserted or
+    /// empty.
+    fn find_in_curr_entries(&self, key: u64) -> Option<usize> {
+        Self::find_in_entries(&self.entries, key, self.lg_cur_size)
+    }
+
+    /// Find index in a given entries.
+    ///
+    /// Returns the index of the entry if found, otherwise None. The entry may 
have been inserted or
+    /// empty.
+    fn find_in_entries(entries: &[u64], key: u64, lg_size: u8) -> 
Option<usize> {
+        if entries.is_empty() {
+            return None;
+        }
+
+        let size = entries.len();
+        let mask = size - 1;
+        let stride = Self::get_stride(key, lg_size);
+        let mut index = (key as usize) & mask;
+        let loop_index = index;
+
+        loop {
+            let probe = entries[index];
+            if probe == 0 || probe == key {
+                return Some(index);
+            }
+            index = (index + stride) & mask;
+            if index == loop_index {
+                return None;
+            }
+        }
+    }
+
+    /// Insert a hash value into the table
+    ///
+    /// Returns true if the value was inserted (new), false otherwise.
+    pub fn try_insert(&mut self, hash: u64) -> bool {
+        if hash == 0 {
+            return false;
+        }
+
+        let Some(index) = self.find_in_curr_entries(hash) else {
+            unreachable!(
+                "Resize or rebuild should be called to make sure it always can 
find the entry."
+            );
+        };
+
+        // Already exists
+        if self.entries[index] == hash {
+            return false;
+        }
+
+        assert_eq!(self.entries[index], 0, "Entry should be empty");
+        self.entries[index] = hash;
+        self.num_entries += 1;
+
+        // Check if we need to resize or rebuild
+        let capacity = self.get_capacity();
+        if self.num_entries > capacity {
+            if self.lg_cur_size <= self.lg_nom_size {
+                self.resize();
+            } else {
+                self.rebuild();
+            }
+        }
+        true
+    }
+
+    /// Get capacity threshold
+    fn get_capacity(&self) -> usize {
+        let fraction = if self.lg_cur_size <= self.lg_nom_size {
+            RESIZE_THRESHOLD
+        } else {
+            REBUILD_THRESHOLD
+        };
+        (fraction * self.entries.len() as f64) as usize
+    }
+
+    /// Resize the hash table
+    fn resize(&mut self) {
+        let new_lg_size = std::cmp::min(
+            self.lg_cur_size + self.resize_factor.lg_value(),
+            self.lg_max_size,
+        );
+        let new_size = 1 << new_lg_size;
+
+        // Get new entries and rehash all entries
+        let mut new_entries = vec![0u64; new_size];
+        for &entry in &self.entries {
+            if entry != 0 {
+                let new_index = Self::find_in_entries(&new_entries, entry, 
new_lg_size);
+                if let Some(idx) = new_index {
+                    new_entries[idx] = entry;
+                } else {
+                    unreachable!(
+                        "find_in_entries should always return Some if the 
entry is not empty."
+                    );
+                }
+            }
+        }
+
+        self.entries = new_entries;
+        self.lg_cur_size = new_lg_size;
+    }
+
+    /// Rebuild the hash table:
+    /// The number of entries will be reduced to the nominal size k.
+    fn rebuild(&mut self) {
+        // Select the k-th smallest entry as new theta and keep the lesser 
entries.
+        self.entries.retain(|&e| e != 0);
+        let k = 1u64 << self.lg_nom_size;
+        let (lesser, kth, _) = self.entries.select_nth_unstable(k as usize);
+        self.theta = *kth;
+
+        // Rebuild the table with the lesser entries.
+        let size = 1 << self.lg_cur_size;
+        let mut new_entries = vec![0u64; size];
+        let mut num_inserted = 0;
+        for entry in lesser {
+            if let Some(idx) = Self::find_in_entries(&new_entries, *entry, 
self.lg_cur_size) {
+                new_entries[idx] = *entry;
+                num_inserted += 1;
+            } else {
+                unreachable!(
+                    "find_in_entries should always return Some if the entry is 
not empty."
+                );
+            }
+        }
+
+        assert_eq!(
+            num_inserted, k as usize,
+            "Number of inserted entries should be equal to k."
+        );
+        self.num_entries = num_inserted;
+        self.entries = new_entries;
+    }
+
+    /// Trim the table to nominal size k
+    pub fn trim(&mut self) {
+        if self.num_entries > (1 << self.lg_nom_size) {
+            self.rebuild();
+        }
+    }
+
+    /// Reset the table to empty state
+    pub fn reset(&mut self) {
+        let init_theta = 
starting_theta_from_sampling_probability(self.sampling_probability);
+        let init_lg_cur = starting_sub_multiple(
+            self.lg_nom_size + 1,
+            MIN_LG_K,
+            self.resize_factor.lg_value(),
+        );
+
+        // clear entries
+        if self.entries.len() != 1 << init_lg_cur {
+            self.entries.resize(1 << init_lg_cur, 0);
+        }
+        self.entries.fill(0);
+        self.num_entries = 0;
+        self.theta = init_theta;
+        self.lg_cur_size = init_lg_cur;
+    }
+
+    /// Get number of entries
+    pub fn num_entries(&self) -> usize {
+        self.num_entries
+    }
+
+    /// Get theta
+    pub fn theta(&self) -> u64 {
+        self.theta
+    }
+
+    /// Check if empty
+    pub fn is_empty(&self) -> bool {
+        self.num_entries == 0
+    }
+
+    /// Get iterator over entries
+    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
+        self.entries.iter().copied().filter(|&e| e != 0)
+    }
+
+    /// Get log2 of nominal size
+    pub fn lg_nom_size(&self) -> u8 {
+        self.lg_nom_size
+    }
+
+    /// Get stride for hash table probing
+    fn get_stride(key: u64, lg_size: u8) -> usize {
+        (2 * ((key >> (lg_size)) & STRIDE_MASK) + 1) as usize
+    }
+}
+
+/// Compute initial lg_size for hash table based on target lg_size, minimum 
lg_size, and resize
+/// factor. Make sure `lg_target = lg_init + n * lg_resize_factor`, where `n` 
is an integer and
+/// `lg_init >= lg_min`
+fn starting_sub_multiple(lg_target: u8, lg_min: u8, lg_resize_factor: u8) -> 
u8 {
+    if lg_target <= lg_min {
+        lg_min
+    } else {
+        if lg_resize_factor == 0 {
+            lg_target
+        } else {
+            ((lg_target - lg_min) % lg_resize_factor) + lg_min
+        }
+    }
+}
+
+/// Compute initial theta for hash table based on sampling probability.
+fn starting_theta_from_sampling_probability(sampling_probability: f32) -> u64 {
+    if sampling_probability < 1.0 {
+        (MAX_THETA as f64 * sampling_probability as f64) as u64
+    } else {
+        MAX_THETA
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::hash::DEFAULT_UPDATE_SEED;
+
+    #[test]
+    fn test_new_hash_table() {
+        let table = ThetaHashTable::new(8, ResizeFactor::X8, 1.0, 
DEFAULT_UPDATE_SEED);
+
+        assert_eq!(
+            table.lg_cur_size,
+            starting_sub_multiple(8 + 1, MIN_LG_K, ResizeFactor::X8.lg_value())
+        );
+        assert_eq!(table.theta, starting_theta_from_sampling_probability(1.0));
+        assert_eq!(table.num_entries(), 0);
+        assert!(table.is_empty());
+        assert_eq!(table.iter().count(), 0);
+    }
+
+    #[test]
+    fn test_hash_and_screen() {
+        let mut table = ThetaHashTable::new(8, ResizeFactor::X8, 1.0, 
DEFAULT_UPDATE_SEED);
+
+        // With MAX_THETA, all hashes should pass
+        let hash1 = table.hash_and_screen("test1");
+        let hash2 = table.hash_and_screen("test2");
+        assert_ne!(hash1, 0);
+        assert_ne!(hash2, 0);
+        assert_ne!(hash1, hash2);
+
+        // With low theta, some hashes should be filtered
+        table.theta = 0;
+        let hash3 = table.hash_and_screen("test3");
+        assert_eq!(hash3, 0);
+    }
+
+    #[test]
+    fn test_try_insert() {
+        let mut table = ThetaHashTable::new(5, ResizeFactor::X8, 1.0, 
DEFAULT_UPDATE_SEED);
+
+        // Insert a hash value
+        let hash = table.hash_and_screen("test_value");
+        assert_ne!(hash, 0);
+        assert!(table.try_insert(hash));
+        assert_eq!(table.num_entries(), 1);
+        assert!(!table.is_empty());
+
+        // Try to insert the same hash again (should fail)
+        assert!(!table.try_insert(hash));
+        assert_eq!(table.num_entries(), 1);
+
+        // Try to insert 0 (should fail)
+        assert!(!table.try_insert(0));
+        assert_eq!(table.num_entries(), 1);
+    }
+
+    #[test]
+    fn test_insert_multiple_values() {
+        let mut table = ThetaHashTable::new(8, ResizeFactor::X8, 1.0, 
DEFAULT_UPDATE_SEED);
+
+        // Insert multiple distinct values
+        let mut inserted_count = 0;
+        for i in 0..10 {
+            let hash = table.hash_and_screen(format!("value_{}", i));
+            if hash != 0 && table.try_insert(hash) {
+                inserted_count += 1;
+            }
+        }
+
+        assert_eq!(table.num_entries(), inserted_count);
+        assert!(!table.is_empty());
+        assert_eq!(table.iter().count(), inserted_count);
+    }
+
+    #[test]
+    fn test_resize() {
+        {
+            let mut table = ThetaHashTable::new(8, ResizeFactor::X2, 1.0, 
DEFAULT_UPDATE_SEED);
+
+            assert_eq!(table.entries.len(), 32);
+
+            // Insert enough values to trigger resize (50% threshold)
+            // Capacity = 32 * 0.5 = 16
+            let mut inserted = 0;
+            for i in 0..20 {
+                let hash = table.hash_and_screen(format!("value_{}", i));
+                if hash != 0 && table.try_insert(hash) {
+                    inserted += 1;
+                }
+            }
+
+            // Table should have resized and all values should be inserted
+            assert!(table.num_entries() > 0);
+            assert_eq!(table.num_entries(), inserted);
+            assert_eq!(table.entries.len(), 64);
+        }
+
+        // Test different resize factors
+        {
+            let mut table = ThetaHashTable::new(8, ResizeFactor::X4, 1.0, 
DEFAULT_UPDATE_SEED);
+
+            assert_eq!(table.entries.len(), 32);
+
+            // Insert enough values to trigger resize (50% threshold)
+            // Capacity = 32 * 0.5 = 16
+            let mut inserted = 0;
+            for i in 0..20 {
+                let hash = table.hash_and_screen(format!("value_{}", i));
+                if hash != 0 && table.try_insert(hash) {
+                    inserted += 1;
+                }
+            }
+
+            // Table should have resized and all values should be inserted
+            assert!(table.num_entries() > 0);
+            assert_eq!(table.num_entries(), inserted);
+            assert_eq!(table.entries.len(), 128);
+        }
+    }
+
+    #[test]
+    fn test_rebuild() {
+        let mut table = ThetaHashTable::new(5, ResizeFactor::X8, 1.0, 
DEFAULT_UPDATE_SEED);
+
+        assert_eq!(table.lg_cur_size, 6);
+        assert_eq!(table.entries.len(), 64);
+        assert_eq!(table.theta, MAX_THETA);
+
+        // Insert many values to trigger rebuild
+        for i in 0..100 {
+            let hash = table.hash_and_screen(format!("value_{}", i));
+            if hash != 0 {
+                table.try_insert(hash);
+            }
+        }
+
+        // After rebuild, theta should be reduced (rebuild is called 
automatically during insert)
+        let new_theta = table.theta();
+        assert!(
+            new_theta < MAX_THETA,
+            "Theta should be reduced after rebuild"
+        );
+
+        // Continue to insert values to trigger rebuild again
+        for i in 100..200 {
+            let hash = table.hash_and_screen(format!("value_{}", i));
+            if hash != 0 {
+                table.try_insert(hash);
+            }
+        }
+
+        assert_eq!(table.lg_cur_size, 6);
+        assert!(table.entries.len() >= 64);
+        assert!(table.theta < new_theta);
+    }
+
+    #[test]
+    fn test_trim() {
+        let mut table = ThetaHashTable::new(5, ResizeFactor::X8, 1.0, 
DEFAULT_UPDATE_SEED);
+
+        // Insert more than k values
+        for i in 0..100 {
+            let hash = table.hash_and_screen(format!("value_{}", i));
+            if hash != 0 {
+                table.try_insert(hash);
+            }
+        }
+
+        let before_trim = table.num_entries();
+        assert!(before_trim > 32);
+
+        table.trim();
+        let after_trim = table.num_entries();
+        assert!(after_trim <= 32);
+        assert!(table.theta() < MAX_THETA);
+    }
+
+    #[test]
+    fn test_trim_when_not_needed() {
+        let mut table = ThetaHashTable::new(8, ResizeFactor::X8, 1.0, 
DEFAULT_UPDATE_SEED);
+
+        // Insert fewer than k values
+        for i in 0..10 {
+            let hash = table.hash_and_screen(format!("value_{}", i));
+            if hash != 0 {
+                table.try_insert(hash);
+            }
+        }
+
+        let before_trim = table.num_entries();
+        let before_theta = table.theta();
+        table.trim();
+        let after_trim = table.num_entries();
+
+        // Should not change if already <= k
+        assert_eq!(before_trim, after_trim);
+        assert_eq!(before_theta, table.theta());
+    }
+
+    #[test]
+    fn test_reset() {
+        let mut table = ThetaHashTable::new(8, ResizeFactor::X8, 1.0, 
DEFAULT_UPDATE_SEED);
+        let init_theta = table.theta();
+        let init_lg_cur = table.lg_cur_size;
+        let init_entries = table.entries.len();
+
+        // Insert some values
+        for i in 0..10 {
+            let hash = table.hash_and_screen(format!("value_{}", i));
+            if hash != 0 {
+                table.try_insert(hash);
+            }
+        }
+
+        assert!(!table.is_empty());
+        assert!(table.num_entries() > 0);
+
+        // Reset
+        table.reset();
+
+        assert!(table.is_empty());
+        assert_eq!(table.num_entries(), 0);
+        assert_eq!(table.theta(), init_theta);
+        assert_eq!(table.lg_cur_size, init_lg_cur);
+        assert_eq!(table.entries.len(), init_entries);
+        assert_eq!(table.iter().count(), 0);
+    }
+
+    #[test]
+    fn test_table_with_sampling() {
+        let mut table = ThetaHashTable::new(
+            8,
+            ResizeFactor::X8,
+            0.5, // sampling_probability = 0.5
+            DEFAULT_UPDATE_SEED,
+        );
+        assert_eq!(table.theta(), (MAX_THETA as f64 * 0.5) as u64);
+
+        // Insert some values
+        for i in 0..10 {
+            let hash = table.hash_and_screen(format!("value_{}", i));
+            if hash != 0 {
+                table.try_insert(hash);
+            }
+        }
+
+        table.reset();
+
+        // With sampling_probability = 0.5, theta should be MAX_THETA * 0.5
+        assert_eq!(table.theta(), (MAX_THETA as f64 * 0.5) as u64);
+        assert!(table.is_empty());
+    }
+
+    #[test]
+    fn test_iterator() {
+        let mut table = ThetaHashTable::new(8, ResizeFactor::X8, 1.0, 
DEFAULT_UPDATE_SEED);
+
+        // Insert some values
+        let mut inserted_hashes = Vec::new();
+        for i in 0..10 {
+            let hash = table.hash_and_screen(format!("value_{}", i));
+            if hash != 0 && table.try_insert(hash) {
+                inserted_hashes.push(hash);
+            }
+        }
+
+        // Check iterator
+        let iter_hashes: Vec<u64> = table.iter().collect();
+        assert_eq!(iter_hashes.len(), table.num_entries());
+        assert_eq!(iter_hashes.len(), inserted_hashes.len());
+
+        // All inserted hashes should be in iterator
+        for hash in &inserted_hashes {
+            assert!(iter_hashes.contains(hash));
+        }
+
+        // Iterator should not contain 0
+        assert!(!iter_hashes.contains(&0));
+    }
+
+    #[test]
+    fn test_empty_table_operations() {
+        let mut table = ThetaHashTable::new(8, ResizeFactor::X8, 1.0, 
DEFAULT_UPDATE_SEED);
+
+        assert!(table.is_empty());
+        assert_eq!(table.num_entries(), 0);
+        assert_eq!(table.iter().count(), 0);
+
+        // Trim on empty table should not panic
+        table.trim();
+        assert!(table.is_empty());
+
+        // Reset on empty table should not panic
+        table.reset();
+        assert!(table.is_empty());
+    }
+
+    #[test]
+    fn test_rebuild_preserves_entries_less_than_kth() {
+        let mut table = ThetaHashTable::new(5, ResizeFactor::X8, 1.0, 
DEFAULT_UPDATE_SEED);
+        let k = 1u64 << 5; // k = 32
+
+        // Insert many values to trigger rebuild
+        let mut i = 0;
+        let mut inserted_hashes = Vec::new();
+        loop {
+            let hash = table.hash_and_screen(format!("value_{}", i));
+            i += 1;
+            if hash != 0 {
+                table.try_insert(hash);
+                inserted_hashes.push(hash);
+            }
+            if table.num_entries() >= k as usize {
+                break;
+            }
+        }
+
+        let rebuild_threshold = table.get_capacity();
+
+        loop {
+            let hash = table.hash_and_screen(format!("value_{}", i));
+            i += 1;
+            if hash != 0 {
+                table.try_insert(hash);
+                inserted_hashes.push(hash);
+            }
+            if table.num_entries() >= rebuild_threshold {
+                break;
+            }
+        }
+
+        // trigger rebuild
+        loop {
+            let hash = table.hash_and_screen(format!("value_{}", i));
+            i += 1;
+            if hash != 0 {
+                table.try_insert(hash);
+                inserted_hashes.push(hash);
+                break;
+            }
+        }
+
+        // assert all entries are less than kth
+        inserted_hashes.sort();
+        let kth = inserted_hashes[k as usize];
+        assert!(table.iter().all(|e| e < kth));
+        assert_eq!(table.theta(), kth);
+    }
+}
diff --git a/datasketches/src/lib.rs b/datasketches/src/theta/mod.rs
similarity index 53%
copy from datasketches/src/lib.rs
copy to datasketches/src/theta/mod.rs
index c7c2a14..e34d30e 100644
--- a/datasketches/src/lib.rs
+++ b/datasketches/src/theta/mod.rs
@@ -15,24 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! # Apache® DataSketches™ Core Rust Library Component
+//! Theta sketch implementation for cardinality estimation.
 //!
-//! The Sketching Core Library provides a range of stochastic streaming 
algorithms and closely
-//! related Rust technologies that are particularly useful when integrating 
this technology into
-//! systems that must deal with massive data.
+//! Theta sketch is a generalization of the Kth Minimum Value (KMV) sketch 
that uses
+//! a hash table to store retained entries and a theta parameter (sampling 
threshold)
+//! to control memory usage. When the hash table reaches capacity, theta is 
reduced
+//! to maintain the nominal size k.
 //!
-//! This library is divided into modules that constitute distinct groups of 
functionality.
-
-#![cfg_attr(docsrs, feature(doc_cfg))]
-#![deny(missing_docs)]
-
-// See https://github.com/apache/datasketches-rust/issues/28 for more 
information.
-#[cfg(target_endian = "big")]
-compile_error!("datasketches does not support big-endian targets");
+//! # Overview
+//!
+//! Theta sketches provide approximate distinct count (cardinality) estimation 
with
+//! configurable accuracy and memory usage. The implementation supports:
+//!
+//! - **ThetaSketch**: Mutable sketch for building from input data
 
-pub mod countmin;
-pub mod error;
-pub mod hll;
-pub mod tdigest;
+mod hash_table;
+mod sketch;
 
-mod hash;
+pub use self::sketch::ThetaSketch;
diff --git a/datasketches/src/theta/sketch.rs b/datasketches/src/theta/sketch.rs
new file mode 100644
index 0000000..5f67a2e
--- /dev/null
+++ b/datasketches/src/theta/sketch.rs
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Theta sketch implementation
+//!
+//! This module provides ThetaSketch (mutable) and CompactThetaSketch 
(immutable)
+//! for cardinality estimation.
+
+use std::hash::Hash;
+
+use crate::hash::DEFAULT_UPDATE_SEED;
+use crate::theta::hash_table::DEFAULT_LG_K;
+use crate::theta::hash_table::MAX_LG_K;
+use crate::theta::hash_table::MAX_THETA;
+use crate::theta::hash_table::MIN_LG_K;
+use crate::theta::hash_table::ResizeFactor;
+use crate::theta::hash_table::ThetaHashTable;
+
+/// Mutable theta sketch for building from input data
+#[derive(Debug)]
+pub struct ThetaSketch {
+    table: ThetaHashTable,
+}
+
+impl ThetaSketch {
+    /// Create a new builder for ThetaSketch
+    pub fn builder() -> ThetaSketchBuilder {
+        ThetaSketchBuilder::default()
+    }
+
+    /// Update the sketch with a hashable value
+    pub fn update<T: Hash>(&mut self, value: T) {
+        let hash = self.table.hash_and_screen(value);
+        if hash != 0 {
+            self.table.try_insert(hash);
+        }
+    }
+
+    /// Update the sketch with a f64 value
+    pub fn update_f64(&mut self, value: f64) {
+        // Canonicalize double for compatibility with Java
+        let canonical = canonical_double(value);
+        self.update(canonical);
+    }
+
+    /// Update the sketch with a f32 value
+    pub fn update_f32(&mut self, value: f32) {
+        self.update_f64(value as f64);
+    }
+
+    /// Return cardinality estimate
+    pub fn estimate(&self) -> f64 {
+        if self.is_empty() {
+            return 0.0;
+        }
+        let num_retained = self.table.num_entries() as f64;
+        let theta = self.table.theta() as f64 / MAX_THETA as f64;
+        num_retained / theta
+    }
+
+    /// Return theta as a fraction (0.0 to 1.0)
+    pub fn theta(&self) -> f64 {
+        self.table.theta() as f64 / MAX_THETA as f64
+    }
+
+    /// Return theta as u64
+    pub fn theta64(&self) -> u64 {
+        self.table.theta()
+    }
+
+    /// Check if sketch is empty
+    pub fn is_empty(&self) -> bool {
+        self.table.is_empty()
+    }
+
+    /// Check if sketch is in estimation mode
+    pub fn is_estimation_mode(&self) -> bool {
+        self.table.theta() < MAX_THETA
+    }
+
+    /// Return number of retained entries
+    pub fn num_retained(&self) -> usize {
+        self.table.num_entries()
+    }
+
+    /// Return lg_k
+    pub fn lg_k(&self) -> u8 {
+        self.table.lg_nom_size()
+    }
+
+    /// Trim the sketch to nominal size k
+    pub fn trim(&mut self) {
+        self.table.trim();
+    }
+
+    /// Reset the sketch to empty state
+    pub fn reset(&mut self) {
+        self.table.reset();
+    }
+
+    /// Return iterator over hash values
+    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
+        self.table.iter()
+    }
+}
+
+/// Builder for ThetaSketch
+#[derive(Debug)]
+pub struct ThetaSketchBuilder {
+    lg_k: u8,
+    resize_factor: ResizeFactor,
+    sampling_probability: f32,
+    seed: u64,
+}
+
+impl Default for ThetaSketchBuilder {
+    fn default() -> Self {
+        Self {
+            lg_k: DEFAULT_LG_K,
+            resize_factor: ResizeFactor::default(),
+            sampling_probability: 1.0,
+            seed: DEFAULT_UPDATE_SEED,
+        }
+    }
+}
+
+impl ThetaSketchBuilder {
+    /// Set lg_k (log2 of nominal size k)
+    ///
+    /// # Panics
+    ///
+    /// If lg_k is not in range [5, 26]
+    pub fn lg_k(mut self, lg_k: u8) -> Self {
+        assert!(
+            (MIN_LG_K..=MAX_LG_K).contains(&lg_k),
+            "lg_k must be in [{}, {}], got {}",
+            MIN_LG_K,
+            MAX_LG_K,
+            lg_k
+        );
+        self.lg_k = lg_k;
+        self
+    }
+
+    /// Set resize factor
+    pub fn resize_factor(mut self, rf: ResizeFactor) -> Self {
+        self.resize_factor = rf;
+        self
+    }
+
+    /// Set sampling probability p
+    ///
+    /// # Panics
+    ///
+    /// If p is not in range [0.0, 1.0]
+    pub fn sampling_probability(mut self, p: f32) -> Self {
+        assert!(
+            (0.0..=1.0).contains(&p),
+            "p must be in [0.0, 1.0], got {}",
+            p
+        );
+        self.sampling_probability = p;
+        self
+    }
+
+    /// Set hash seed
+    pub fn seed(mut self, seed: u64) -> Self {
+        self.seed = seed;
+        self
+    }
+
+    /// Build the ThetaSketch
+    pub fn build(self) -> ThetaSketch {
+        let table = ThetaHashTable::new(
+            self.lg_k,
+            self.resize_factor,
+            self.sampling_probability,
+            self.seed,
+        );
+
+        ThetaSketch { table }
+    }
+}
+
+/// Canonicalize double value for compatibility with Java
+fn canonical_double(value: f64) -> i64 {
+    if value.is_nan() {
+        0x7ff8000000000000i64 // Java's Double.doubleToLongBits() NaN value
+    } else {
+        // -0.0 + 0.0 == +0.0 under IEEE754 roundTiesToEven rounding mode,
+        // which Rust guarantees. Thus by adding a positive zero we
+        // canonicalize signed zero without any branches in one instruction.
+        (value + 0.0).to_bits() as i64
+    }
+}
diff --git a/datasketches/tests/theta_sketch_test.rs 
b/datasketches/tests/theta_sketch_test.rs
new file mode 100644
index 0000000..24ff06f
--- /dev/null
+++ b/datasketches/tests/theta_sketch_test.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datasketches::theta::ThetaSketch;
+
+#[test]
+fn test_basic_update() {
+    let mut sketch = ThetaSketch::builder().lg_k(12).build();
+    assert!(sketch.is_empty());
+    assert_eq!(sketch.estimate(), 0.0);
+
+    sketch.update("value1");
+    assert!(!sketch.is_empty());
+    assert_eq!(sketch.estimate(), 1.0);
+
+    sketch.update("value2");
+    assert_eq!(sketch.estimate(), 2.0);
+}
+
+#[test]
+fn test_update_various_types() {
+    let mut sketch = ThetaSketch::builder().lg_k(12).build();
+
+    sketch.update("string");
+    sketch.update(42i64);
+    sketch.update(42u64);
+    sketch.update_f64(3.15);
+    sketch.update_f64(3.15);
+    sketch.update_f32(3.15);
+    sketch.update_f32(3.15);
+    sketch.update([1u8, 2, 3]);
+
+    assert!(!sketch.is_empty());
+    assert_eq!(sketch.estimate(), 5.0);
+}
+
+#[test]
+fn test_duplicate_updates() {
+    let mut sketch = ThetaSketch::builder().lg_k(12).build();
+
+    for _ in 0..100 {
+        sketch.update("same_value");
+    }
+
+    assert_eq!(sketch.estimate(), 1.0);
+}
+
+#[test]
+fn test_theta_reduction() {
+    let mut sketch = ThetaSketch::builder().lg_k(5).build(); // Small k to 
trigger theta reduction
+    assert!(!sketch.is_estimation_mode()); // Should be in estimation mode
+
+    // Insert many values to trigger theta reduction
+    for i in 0..1000 {
+        sketch.update(format!("value_{}", i));
+    }
+
+    assert!(sketch.is_estimation_mode()); // Should be in estimation mode
+    assert!(sketch.theta() < 1.0);
+}
+
+#[test]
+fn test_trim() {
+    let mut sketch = ThetaSketch::builder().lg_k(5).build();
+
+    // Insert many values
+    for i in 0..1000 {
+        sketch.update(format!("value_{}", i));
+    }
+
+    let before_trim = sketch.num_retained();
+    sketch.trim();
+    let after_trim = sketch.num_retained();
+
+    // After trim, should have approximately k entries
+    assert!(after_trim <= before_trim);
+    assert_eq!(sketch.num_retained(), 32);
+}
+
+#[test]
+fn test_reset() {
+    let mut sketch = ThetaSketch::builder().lg_k(5).build();
+
+    // Insert many values
+    for i in 0..1000 {
+        sketch.update(format!("value_{}", i));
+    }
+    assert!(!sketch.is_empty());
+    assert!(sketch.is_estimation_mode());
+    assert!(sketch.num_retained() > 32);
+    assert!(sketch.theta() < 1.0);
+
+    sketch.reset();
+    assert!(sketch.is_empty());
+    assert_eq!(sketch.estimate(), 0.0);
+    assert_eq!(sketch.theta(), 1.0);
+    assert_eq!(sketch.num_retained(), 0);
+    assert!(!sketch.is_estimation_mode());
+}
+
+#[test]
+fn test_iterator() {
+    let mut sketch = ThetaSketch::builder().lg_k(12).build();
+
+    sketch.update("value1");
+    sketch.update("value2");
+    sketch.update("value3");
+
+    let count: usize = sketch.iter().count();
+    assert_eq!(count, sketch.num_retained());
+}
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 0942c3d..f79a9f4 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -26,6 +26,10 @@ rust-version.workspace = true
 name = "hll_update"
 path = "src/hll_update.rs"
 
+[[bin]]
+name = "theta_sketch"
+path = "src/theta_sketch.rs"
+
 [package.metadata.release]
 release = false
 
diff --git a/examples/src/theta_sketch.rs b/examples/src/theta_sketch.rs
new file mode 100644
index 0000000..d76d240
--- /dev/null
+++ b/examples/src/theta_sketch.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Example demonstrating theta sketch usage
+
+use datasketches::theta::ThetaSketch;
+
+fn main() {
+    println!("=== Theta Sketch Example ===\n");
+
+    // Example 1: Basic usage
+    println!("1. Basic Theta Sketch Usage:");
+    let mut sketch = ThetaSketch::builder().lg_k(10).build();
+
+    for i in 0..100 {
+        sketch.update(format!("item_{}", i));
+    }
+    sketch.update("duplicatee_item");
+    sketch.update("duplicatee_item");
+
+    println!("   Estimate: {:.2}", sketch.estimate());
+    println!("   Theta: {:.6}", sketch.theta());
+    println!("   Num retained: {}", sketch.num_retained());
+    println!();
+
+    // Example 2: Add more data to enter estimation mode
+    println!("2. Add more data to enter estimation mode:");
+    for i in 0..5000 {
+        sketch.update(format!("item_{}", i));
+    }
+    println!("   Estimate: {:.2}", sketch.estimate());
+    println!("   Theta: {:.6}", sketch.theta());
+    println!("   Num retained: {}", sketch.num_retained());
+    println!();
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to