This is an automated email from the ASF dual-hosted git repository.
leerho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datasketches-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0a8543e feat: add theta sketch (part 1) (#45)
0a8543e is described below
commit 0a8543e304adb7bd741be0c494967b1469bd1861
Author: ZENOTME <[email protected]>
AuthorDate: Wed Dec 31 00:15:02 2025 +0800
feat: add theta sketch (part 1) (#45)
* feat: add theta sketch (part 1)
- add hash table for theta sketch
- add ThetaSketch
* refine some function name and comment
* refine: use unified default seed DEFAULT_UPDATE_SEED
* refine some style
---
datasketches/src/lib.rs | 1 +
datasketches/src/theta/hash_table.rs | 698 ++++++++++++++++++++++++++++++
datasketches/src/{lib.rs => theta/mod.rs} | 31 +-
datasketches/src/theta/sketch.rs | 209 +++++++++
datasketches/tests/theta_sketch_test.rs | 125 ++++++
examples/Cargo.toml | 4 +
examples/src/theta_sketch.rs | 49 +++
7 files changed, 1100 insertions(+), 17 deletions(-)
diff --git a/datasketches/src/lib.rs b/datasketches/src/lib.rs
index c7c2a14..fc4679e 100644
--- a/datasketches/src/lib.rs
+++ b/datasketches/src/lib.rs
@@ -34,5 +34,6 @@ pub mod countmin;
pub mod error;
pub mod hll;
pub mod tdigest;
+pub mod theta;
mod hash;
diff --git a/datasketches/src/theta/hash_table.rs
b/datasketches/src/theta/hash_table.rs
new file mode 100644
index 0000000..634f232
--- /dev/null
+++ b/datasketches/src/theta/hash_table.rs
@@ -0,0 +1,698 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::hash::Hash;
+
+use crate::hash::MurmurHash3X64128;
+
+/// Maximum theta value (signed max for compatibility with Java)
+pub const MAX_THETA: u64 = i64::MAX as u64;
+
+/// Minimum log2 of K
+pub const MIN_LG_K: u8 = 5;
+
+/// Maximum log2 of K
+pub const MAX_LG_K: u8 = 26;
+
+/// Default log2 of K
+pub const DEFAULT_LG_K: u8 = 12;
+
+/// Hash table resize factor
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum ResizeFactor {
+ /// Resize by factor of 2
+ X2 = 2,
+ /// Resize by factor of 4
+ X4 = 4,
+ /// Resize by factor of 8
+ #[default]
+ X8 = 8,
+}
+
+impl ResizeFactor {
+ pub fn lg_value(&self) -> u8 {
+ match self {
+ ResizeFactor::X2 => 1,
+ ResizeFactor::X4 => 2,
+ ResizeFactor::X8 => 3,
+ }
+ }
+}
+
+/// Resize threshold (0.5 = 50% load factor)
+const RESIZE_THRESHOLD: f64 = 0.5;
+
+/// Rebuild threshold (15/16 = 93.75% load factor)
+const REBUILD_THRESHOLD: f64 = 15.0 / 16.0;
+
+/// Stride hash bits (7 bits for stride calculation)
+const STRIDE_HASH_BITS: u8 = 7;
+
+/// Stride mask
+const STRIDE_MASK: u64 = (1 << STRIDE_HASH_BITS) - 1;
+
+/// Specific hash table for theta sketch
+///
+/// It maintains an array capacity max to 2^lg_max_size:
+/// * Before it reaches the max capacity, it will extend the array based on
resize_factor.
+/// * After it reaches the capacity bigger than 2^lg_nom_size, every time the
number of entries
+/// exceeds the threshold, it will rebuild the table: only keep the min
2^lg_nom_size entries and
+/// update the theta to the k-th smallest entry.
+#[derive(Debug)]
+pub(crate) struct ThetaHashTable {
+ lg_cur_size: u8,
+ lg_nom_size: u8,
+ lg_max_size: u8,
+ resize_factor: ResizeFactor,
+ sampling_probability: f32,
+ hash_seed: u64,
+
+ theta: u64,
+
+ entries: Vec<u64>,
+ num_entries: usize,
+}
+
+impl ThetaHashTable {
+ /// Create a new hash table
+ pub fn new(
+ lg_nom_size: u8,
+ resize_factor: ResizeFactor,
+ sampling_probability: f32,
+ hash_seed: u64,
+ ) -> Self {
+ let lg_max_size = lg_nom_size + 1;
+ let lg_cur_size = starting_sub_multiple(lg_max_size, MIN_LG_K,
resize_factor.lg_value());
+ let size = if lg_cur_size > 0 { 1 << lg_cur_size } else { 0 };
+ let entries = vec![0u64; size];
+
+ Self {
+ lg_cur_size,
+ lg_nom_size,
+ lg_max_size,
+ resize_factor,
+ sampling_probability,
+ theta:
starting_theta_from_sampling_probability(sampling_probability),
+ hash_seed,
+ entries,
+ num_entries: 0,
+ }
+ }
+
+ /// Hash and screen a value
+ ///
+ /// Returns the hash value if it passes the theta threshold, otherwise 0.
+ pub fn hash_and_screen<T: Hash>(&mut self, value: T) -> u64 {
+ let mut hasher = MurmurHash3X64128::with_seed(self.hash_seed);
+ value.hash(&mut hasher);
+ let (h1, _) = hasher.finish128();
+ let hash = h1 >> 1; // To make it compatible with Java version
+ if hash >= self.theta {
+ return 0; // hash == 0 is reserved for empty slots
+ }
+ hash
+ }
+
+ /// Find an entry in the hash table.
+ ///
+ /// Returns the index of the entry if found, otherwise None. The entry may
have been inserted or
+ /// empty.
+ fn find_in_curr_entries(&self, key: u64) -> Option<usize> {
+ Self::find_in_entries(&self.entries, key, self.lg_cur_size)
+ }
+
+ /// Find index in a given entries.
+ ///
+ /// Returns the index of the entry if found, otherwise None. The entry may
have been inserted or
+ /// empty.
+ fn find_in_entries(entries: &[u64], key: u64, lg_size: u8) ->
Option<usize> {
+ if entries.is_empty() {
+ return None;
+ }
+
+ let size = entries.len();
+ let mask = size - 1;
+ let stride = Self::get_stride(key, lg_size);
+ let mut index = (key as usize) & mask;
+ let loop_index = index;
+
+ loop {
+ let probe = entries[index];
+ if probe == 0 || probe == key {
+ return Some(index);
+ }
+ index = (index + stride) & mask;
+ if index == loop_index {
+ return None;
+ }
+ }
+ }
+
+ /// Insert a hash value into the table
+ ///
+ /// Returns true if the value was inserted (new), false otherwise.
+ pub fn try_insert(&mut self, hash: u64) -> bool {
+ if hash == 0 {
+ return false;
+ }
+
+ let Some(index) = self.find_in_curr_entries(hash) else {
+ unreachable!(
+ "Resize or rebuild should be called to make sure it always can
find the entry."
+ );
+ };
+
+ // Already exists
+ if self.entries[index] == hash {
+ return false;
+ }
+
+ assert_eq!(self.entries[index], 0, "Entry should be empty");
+ self.entries[index] = hash;
+ self.num_entries += 1;
+
+ // Check if we need to resize or rebuild
+ let capacity = self.get_capacity();
+ if self.num_entries > capacity {
+ if self.lg_cur_size <= self.lg_nom_size {
+ self.resize();
+ } else {
+ self.rebuild();
+ }
+ }
+ true
+ }
+
+ /// Get capacity threshold
+ fn get_capacity(&self) -> usize {
+ let fraction = if self.lg_cur_size <= self.lg_nom_size {
+ RESIZE_THRESHOLD
+ } else {
+ REBUILD_THRESHOLD
+ };
+ (fraction * self.entries.len() as f64) as usize
+ }
+
+ /// Resize the hash table
+ fn resize(&mut self) {
+ let new_lg_size = std::cmp::min(
+ self.lg_cur_size + self.resize_factor.lg_value(),
+ self.lg_max_size,
+ );
+ let new_size = 1 << new_lg_size;
+
+ // Get new entries and rehash all entries
+ let mut new_entries = vec![0u64; new_size];
+ for &entry in &self.entries {
+ if entry != 0 {
+ let new_index = Self::find_in_entries(&new_entries, entry,
new_lg_size);
+ if let Some(idx) = new_index {
+ new_entries[idx] = entry;
+ } else {
+ unreachable!(
+ "find_in_entries should always return Some if the
entry is not empty."
+ );
+ }
+ }
+ }
+
+ self.entries = new_entries;
+ self.lg_cur_size = new_lg_size;
+ }
+
+ /// Rebuild the hash table:
+ /// The number of entries will be reduced to the nominal size k.
+ fn rebuild(&mut self) {
+ // Select the k-th smallest entry as new theta and keep the lesser
entries.
+ self.entries.retain(|&e| e != 0);
+ let k = 1u64 << self.lg_nom_size;
+ let (lesser, kth, _) = self.entries.select_nth_unstable(k as usize);
+ self.theta = *kth;
+
+ // Rebuild the table with the lesser entries.
+ let size = 1 << self.lg_cur_size;
+ let mut new_entries = vec![0u64; size];
+ let mut num_inserted = 0;
+ for entry in lesser {
+ if let Some(idx) = Self::find_in_entries(&new_entries, *entry,
self.lg_cur_size) {
+ new_entries[idx] = *entry;
+ num_inserted += 1;
+ } else {
+ unreachable!(
+ "find_in_entries should always return Some if the entry is
not empty."
+ );
+ }
+ }
+
+ assert_eq!(
+ num_inserted, k as usize,
+ "Number of inserted entries should be equal to k."
+ );
+ self.num_entries = num_inserted;
+ self.entries = new_entries;
+ }
+
+ /// Trim the table to nominal size k
+ pub fn trim(&mut self) {
+ if self.num_entries > (1 << self.lg_nom_size) {
+ self.rebuild();
+ }
+ }
+
+ /// Reset the table to empty state
+ pub fn reset(&mut self) {
+ let init_theta =
starting_theta_from_sampling_probability(self.sampling_probability);
+ let init_lg_cur = starting_sub_multiple(
+ self.lg_nom_size + 1,
+ MIN_LG_K,
+ self.resize_factor.lg_value(),
+ );
+
+ // clear entries
+ if self.entries.len() != 1 << init_lg_cur {
+ self.entries.resize(1 << init_lg_cur, 0);
+ }
+ self.entries.fill(0);
+ self.num_entries = 0;
+ self.theta = init_theta;
+ self.lg_cur_size = init_lg_cur;
+ }
+
+ /// Get number of entries
+ pub fn num_entries(&self) -> usize {
+ self.num_entries
+ }
+
+ /// Get theta
+ pub fn theta(&self) -> u64 {
+ self.theta
+ }
+
+ /// Check if empty
+ pub fn is_empty(&self) -> bool {
+ self.num_entries == 0
+ }
+
+ /// Get iterator over entries
+ pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
+ self.entries.iter().copied().filter(|&e| e != 0)
+ }
+
+ /// Get log2 of nominal size
+ pub fn lg_nom_size(&self) -> u8 {
+ self.lg_nom_size
+ }
+
+ /// Get stride for hash table probing
+ fn get_stride(key: u64, lg_size: u8) -> usize {
+ (2 * ((key >> (lg_size)) & STRIDE_MASK) + 1) as usize
+ }
+}
+
+/// Compute initial lg_size for hash table based on target lg_size, minimum
lg_size, and resize
+/// factor. Make sure `lg_target = lg_init + n * lg_resize_factor`, where `n`
is an integer and
+/// `lg_init >= lg_min`
+fn starting_sub_multiple(lg_target: u8, lg_min: u8, lg_resize_factor: u8) ->
u8 {
+ if lg_target <= lg_min {
+ lg_min
+ } else {
+ if lg_resize_factor == 0 {
+ lg_target
+ } else {
+ ((lg_target - lg_min) % lg_resize_factor) + lg_min
+ }
+ }
+}
+
+/// Compute initial theta for hash table based on sampling probability.
+fn starting_theta_from_sampling_probability(sampling_probability: f32) -> u64 {
+ if sampling_probability < 1.0 {
+ (MAX_THETA as f64 * sampling_probability as f64) as u64
+ } else {
+ MAX_THETA
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::hash::DEFAULT_UPDATE_SEED;
+
+ #[test]
+ fn test_new_hash_table() {
+ let table = ThetaHashTable::new(8, ResizeFactor::X8, 1.0,
DEFAULT_UPDATE_SEED);
+
+ assert_eq!(
+ table.lg_cur_size,
+ starting_sub_multiple(8 + 1, MIN_LG_K, ResizeFactor::X8.lg_value())
+ );
+ assert_eq!(table.theta, starting_theta_from_sampling_probability(1.0));
+ assert_eq!(table.num_entries(), 0);
+ assert!(table.is_empty());
+ assert_eq!(table.iter().count(), 0);
+ }
+
+ #[test]
+ fn test_hash_and_screen() {
+ let mut table = ThetaHashTable::new(8, ResizeFactor::X8, 1.0,
DEFAULT_UPDATE_SEED);
+
+ // With MAX_THETA, all hashes should pass
+ let hash1 = table.hash_and_screen("test1");
+ let hash2 = table.hash_and_screen("test2");
+ assert_ne!(hash1, 0);
+ assert_ne!(hash2, 0);
+ assert_ne!(hash1, hash2);
+
+ // With low theta, some hashes should be filtered
+ table.theta = 0;
+ let hash3 = table.hash_and_screen("test3");
+ assert_eq!(hash3, 0);
+ }
+
+ #[test]
+ fn test_try_insert() {
+ let mut table = ThetaHashTable::new(5, ResizeFactor::X8, 1.0,
DEFAULT_UPDATE_SEED);
+
+ // Insert a hash value
+ let hash = table.hash_and_screen("test_value");
+ assert_ne!(hash, 0);
+ assert!(table.try_insert(hash));
+ assert_eq!(table.num_entries(), 1);
+ assert!(!table.is_empty());
+
+ // Try to insert the same hash again (should fail)
+ assert!(!table.try_insert(hash));
+ assert_eq!(table.num_entries(), 1);
+
+ // Try to insert 0 (should fail)
+ assert!(!table.try_insert(0));
+ assert_eq!(table.num_entries(), 1);
+ }
+
+ #[test]
+ fn test_insert_multiple_values() {
+ let mut table = ThetaHashTable::new(8, ResizeFactor::X8, 1.0,
DEFAULT_UPDATE_SEED);
+
+ // Insert multiple distinct values
+ let mut inserted_count = 0;
+ for i in 0..10 {
+ let hash = table.hash_and_screen(format!("value_{}", i));
+ if hash != 0 && table.try_insert(hash) {
+ inserted_count += 1;
+ }
+ }
+
+ assert_eq!(table.num_entries(), inserted_count);
+ assert!(!table.is_empty());
+ assert_eq!(table.iter().count(), inserted_count);
+ }
+
+ #[test]
+ fn test_resize() {
+ {
+ let mut table = ThetaHashTable::new(8, ResizeFactor::X2, 1.0,
DEFAULT_UPDATE_SEED);
+
+ assert_eq!(table.entries.len(), 32);
+
+ // Insert enough values to trigger resize (50% threshold)
+ // Capacity = 32 * 0.5 = 16
+ let mut inserted = 0;
+ for i in 0..20 {
+ let hash = table.hash_and_screen(format!("value_{}", i));
+ if hash != 0 && table.try_insert(hash) {
+ inserted += 1;
+ }
+ }
+
+ // Table should have resized and all values should be inserted
+ assert!(table.num_entries() > 0);
+ assert_eq!(table.num_entries(), inserted);
+ assert_eq!(table.entries.len(), 64);
+ }
+
+ // Test different resize factors
+ {
+ let mut table = ThetaHashTable::new(8, ResizeFactor::X4, 1.0,
DEFAULT_UPDATE_SEED);
+
+ assert_eq!(table.entries.len(), 32);
+
+ // Insert enough values to trigger resize (50% threshold)
+ // Capacity = 32 * 0.5 = 16
+ let mut inserted = 0;
+ for i in 0..20 {
+ let hash = table.hash_and_screen(format!("value_{}", i));
+ if hash != 0 && table.try_insert(hash) {
+ inserted += 1;
+ }
+ }
+
+ // Table should have resized and all values should be inserted
+ assert!(table.num_entries() > 0);
+ assert_eq!(table.num_entries(), inserted);
+ assert_eq!(table.entries.len(), 128);
+ }
+ }
+
+ #[test]
+ fn test_rebuild() {
+ let mut table = ThetaHashTable::new(5, ResizeFactor::X8, 1.0,
DEFAULT_UPDATE_SEED);
+
+ assert_eq!(table.lg_cur_size, 6);
+ assert_eq!(table.entries.len(), 64);
+ assert_eq!(table.theta, MAX_THETA);
+
+ // Insert many values to trigger rebuild
+ for i in 0..100 {
+ let hash = table.hash_and_screen(format!("value_{}", i));
+ if hash != 0 {
+ table.try_insert(hash);
+ }
+ }
+
+ // After rebuild, theta should be reduced (rebuild is called
automatically during insert)
+ let new_theta = table.theta();
+ assert!(
+ new_theta < MAX_THETA,
+ "Theta should be reduced after rebuild"
+ );
+
+ // Continue to insert values to trigger rebuild again
+ for i in 100..200 {
+ let hash = table.hash_and_screen(format!("value_{}", i));
+ if hash != 0 {
+ table.try_insert(hash);
+ }
+ }
+
+ assert_eq!(table.lg_cur_size, 6);
+ assert!(table.entries.len() >= 64);
+ assert!(table.theta < new_theta);
+ }
+
+ #[test]
+ fn test_trim() {
+ let mut table = ThetaHashTable::new(5, ResizeFactor::X8, 1.0,
DEFAULT_UPDATE_SEED);
+
+ // Insert more than k values
+ for i in 0..100 {
+ let hash = table.hash_and_screen(format!("value_{}", i));
+ if hash != 0 {
+ table.try_insert(hash);
+ }
+ }
+
+ let before_trim = table.num_entries();
+ assert!(before_trim > 32);
+
+ table.trim();
+ let after_trim = table.num_entries();
+ assert!(after_trim <= 32);
+ assert!(table.theta() < MAX_THETA);
+ }
+
+ #[test]
+ fn test_trim_when_not_needed() {
+ let mut table = ThetaHashTable::new(8, ResizeFactor::X8, 1.0,
DEFAULT_UPDATE_SEED);
+
+ // Insert fewer than k values
+ for i in 0..10 {
+ let hash = table.hash_and_screen(format!("value_{}", i));
+ if hash != 0 {
+ table.try_insert(hash);
+ }
+ }
+
+ let before_trim = table.num_entries();
+ let before_theta = table.theta();
+ table.trim();
+ let after_trim = table.num_entries();
+
+ // Should not change if already <= k
+ assert_eq!(before_trim, after_trim);
+ assert_eq!(before_theta, table.theta());
+ }
+
+ #[test]
+ fn test_reset() {
+ let mut table = ThetaHashTable::new(8, ResizeFactor::X8, 1.0,
DEFAULT_UPDATE_SEED);
+ let init_theta = table.theta();
+ let init_lg_cur = table.lg_cur_size;
+ let init_entries = table.entries.len();
+
+ // Insert some values
+ for i in 0..10 {
+ let hash = table.hash_and_screen(format!("value_{}", i));
+ if hash != 0 {
+ table.try_insert(hash);
+ }
+ }
+
+ assert!(!table.is_empty());
+ assert!(table.num_entries() > 0);
+
+ // Reset
+ table.reset();
+
+ assert!(table.is_empty());
+ assert_eq!(table.num_entries(), 0);
+ assert_eq!(table.theta(), init_theta);
+ assert_eq!(table.lg_cur_size, init_lg_cur);
+ assert_eq!(table.entries.len(), init_entries);
+ assert_eq!(table.iter().count(), 0);
+ }
+
+ #[test]
+ fn test_table_with_sampling() {
+ let mut table = ThetaHashTable::new(
+ 8,
+ ResizeFactor::X8,
+ 0.5, // sampling_probability = 0.5
+ DEFAULT_UPDATE_SEED,
+ );
+ assert_eq!(table.theta(), (MAX_THETA as f64 * 0.5) as u64);
+
+ // Insert some values
+ for i in 0..10 {
+ let hash = table.hash_and_screen(format!("value_{}", i));
+ if hash != 0 {
+ table.try_insert(hash);
+ }
+ }
+
+ table.reset();
+
+ // With sampling_probability = 0.5, theta should be MAX_THETA * 0.5
+ assert_eq!(table.theta(), (MAX_THETA as f64 * 0.5) as u64);
+ assert!(table.is_empty());
+ }
+
+ #[test]
+ fn test_iterator() {
+ let mut table = ThetaHashTable::new(8, ResizeFactor::X8, 1.0,
DEFAULT_UPDATE_SEED);
+
+ // Insert some values
+ let mut inserted_hashes = Vec::new();
+ for i in 0..10 {
+ let hash = table.hash_and_screen(format!("value_{}", i));
+ if hash != 0 && table.try_insert(hash) {
+ inserted_hashes.push(hash);
+ }
+ }
+
+ // Check iterator
+ let iter_hashes: Vec<u64> = table.iter().collect();
+ assert_eq!(iter_hashes.len(), table.num_entries());
+ assert_eq!(iter_hashes.len(), inserted_hashes.len());
+
+ // All inserted hashes should be in iterator
+ for hash in &inserted_hashes {
+ assert!(iter_hashes.contains(hash));
+ }
+
+ // Iterator should not contain 0
+ assert!(!iter_hashes.contains(&0));
+ }
+
+ #[test]
+ fn test_empty_table_operations() {
+ let mut table = ThetaHashTable::new(8, ResizeFactor::X8, 1.0,
DEFAULT_UPDATE_SEED);
+
+ assert!(table.is_empty());
+ assert_eq!(table.num_entries(), 0);
+ assert_eq!(table.iter().count(), 0);
+
+ // Trim on empty table should not panic
+ table.trim();
+ assert!(table.is_empty());
+
+ // Reset on empty table should not panic
+ table.reset();
+ assert!(table.is_empty());
+ }
+
+ #[test]
+ fn test_rebuild_preserves_entries_less_than_kth() {
+ let mut table = ThetaHashTable::new(5, ResizeFactor::X8, 1.0,
DEFAULT_UPDATE_SEED);
+ let k = 1u64 << 5; // k = 32
+
+ // Insert many values to trigger rebuild
+ let mut i = 0;
+ let mut inserted_hashes = Vec::new();
+ loop {
+ let hash = table.hash_and_screen(format!("value_{}", i));
+ i += 1;
+ if hash != 0 {
+ table.try_insert(hash);
+ inserted_hashes.push(hash);
+ }
+ if table.num_entries() >= k as usize {
+ break;
+ }
+ }
+
+ let rebuild_threshold = table.get_capacity();
+
+ loop {
+ let hash = table.hash_and_screen(format!("value_{}", i));
+ i += 1;
+ if hash != 0 {
+ table.try_insert(hash);
+ inserted_hashes.push(hash);
+ }
+ if table.num_entries() >= rebuild_threshold {
+ break;
+ }
+ }
+
+ // trigger rebuild
+ loop {
+ let hash = table.hash_and_screen(format!("value_{}", i));
+ i += 1;
+ if hash != 0 {
+ table.try_insert(hash);
+ inserted_hashes.push(hash);
+ break;
+ }
+ }
+
+ // assert all entries are less than kth
+ inserted_hashes.sort();
+ let kth = inserted_hashes[k as usize];
+ assert!(table.iter().all(|e| e < kth));
+ assert_eq!(table.theta(), kth);
+ }
+}
diff --git a/datasketches/src/lib.rs b/datasketches/src/theta/mod.rs
similarity index 53%
copy from datasketches/src/lib.rs
copy to datasketches/src/theta/mod.rs
index c7c2a14..e34d30e 100644
--- a/datasketches/src/lib.rs
+++ b/datasketches/src/theta/mod.rs
@@ -15,24 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-//! # Apache® DataSketches™ Core Rust Library Component
+//! Theta sketch implementation for cardinality estimation.
//!
-//! The Sketching Core Library provides a range of stochastic streaming
algorithms and closely
-//! related Rust technologies that are particularly useful when integrating
this technology into
-//! systems that must deal with massive data.
+//! Theta sketch is a generalization of the Kth Minimum Value (KMV) sketch
that uses
+//! a hash table to store retained entries and a theta parameter (sampling
threshold)
+//! to control memory usage. When the hash table reaches capacity, theta is
reduced
+//! to maintain the nominal size k.
//!
-//! This library is divided into modules that constitute distinct groups of
functionality.
-
-#![cfg_attr(docsrs, feature(doc_cfg))]
-#![deny(missing_docs)]
-
-// See https://github.com/apache/datasketches-rust/issues/28 for more
information.
-#[cfg(target_endian = "big")]
-compile_error!("datasketches does not support big-endian targets");
+//! # Overview
+//!
+//! Theta sketches provide approximate distinct count (cardinality) estimation
with
+//! configurable accuracy and memory usage. The implementation supports:
+//!
+//! - **ThetaSketch**: Mutable sketch for building from input data
-pub mod countmin;
-pub mod error;
-pub mod hll;
-pub mod tdigest;
+mod hash_table;
+mod sketch;
-mod hash;
+pub use self::sketch::ThetaSketch;
diff --git a/datasketches/src/theta/sketch.rs b/datasketches/src/theta/sketch.rs
new file mode 100644
index 0000000..5f67a2e
--- /dev/null
+++ b/datasketches/src/theta/sketch.rs
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Theta sketch implementation
+//!
+//! This module provides ThetaSketch (mutable) and CompactThetaSketch
(immutable)
+//! for cardinality estimation.
+
+use std::hash::Hash;
+
+use crate::hash::DEFAULT_UPDATE_SEED;
+use crate::theta::hash_table::DEFAULT_LG_K;
+use crate::theta::hash_table::MAX_LG_K;
+use crate::theta::hash_table::MAX_THETA;
+use crate::theta::hash_table::MIN_LG_K;
+use crate::theta::hash_table::ResizeFactor;
+use crate::theta::hash_table::ThetaHashTable;
+
+/// Mutable theta sketch for building from input data
+#[derive(Debug)]
+pub struct ThetaSketch {
+ table: ThetaHashTable,
+}
+
+impl ThetaSketch {
+ /// Create a new builder for ThetaSketch
+ pub fn builder() -> ThetaSketchBuilder {
+ ThetaSketchBuilder::default()
+ }
+
+ /// Update the sketch with a hashable value
+ pub fn update<T: Hash>(&mut self, value: T) {
+ let hash = self.table.hash_and_screen(value);
+ if hash != 0 {
+ self.table.try_insert(hash);
+ }
+ }
+
+ /// Update the sketch with a f64 value
+ pub fn update_f64(&mut self, value: f64) {
+ // Canonicalize double for compatibility with Java
+ let canonical = canonical_double(value);
+ self.update(canonical);
+ }
+
+ /// Update the sketch with a f32 value
+ pub fn update_f32(&mut self, value: f32) {
+ self.update_f64(value as f64);
+ }
+
+ /// Return cardinality estimate
+ pub fn estimate(&self) -> f64 {
+ if self.is_empty() {
+ return 0.0;
+ }
+ let num_retained = self.table.num_entries() as f64;
+ let theta = self.table.theta() as f64 / MAX_THETA as f64;
+ num_retained / theta
+ }
+
+ /// Return theta as a fraction (0.0 to 1.0)
+ pub fn theta(&self) -> f64 {
+ self.table.theta() as f64 / MAX_THETA as f64
+ }
+
+ /// Return theta as u64
+ pub fn theta64(&self) -> u64 {
+ self.table.theta()
+ }
+
+ /// Check if sketch is empty
+ pub fn is_empty(&self) -> bool {
+ self.table.is_empty()
+ }
+
+ /// Check if sketch is in estimation mode
+ pub fn is_estimation_mode(&self) -> bool {
+ self.table.theta() < MAX_THETA
+ }
+
+ /// Return number of retained entries
+ pub fn num_retained(&self) -> usize {
+ self.table.num_entries()
+ }
+
+ /// Return lg_k
+ pub fn lg_k(&self) -> u8 {
+ self.table.lg_nom_size()
+ }
+
+ /// Trim the sketch to nominal size k
+ pub fn trim(&mut self) {
+ self.table.trim();
+ }
+
+ /// Reset the sketch to empty state
+ pub fn reset(&mut self) {
+ self.table.reset();
+ }
+
+ /// Return iterator over hash values
+ pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
+ self.table.iter()
+ }
+}
+
+/// Builder for ThetaSketch
+#[derive(Debug)]
+pub struct ThetaSketchBuilder {
+ lg_k: u8,
+ resize_factor: ResizeFactor,
+ sampling_probability: f32,
+ seed: u64,
+}
+
+impl Default for ThetaSketchBuilder {
+ fn default() -> Self {
+ Self {
+ lg_k: DEFAULT_LG_K,
+ resize_factor: ResizeFactor::default(),
+ sampling_probability: 1.0,
+ seed: DEFAULT_UPDATE_SEED,
+ }
+ }
+}
+
+impl ThetaSketchBuilder {
+ /// Set lg_k (log2 of nominal size k)
+ ///
+ /// # Panics
+ ///
+ /// If lg_k is not in range [5, 26]
+ pub fn lg_k(mut self, lg_k: u8) -> Self {
+ assert!(
+ (MIN_LG_K..=MAX_LG_K).contains(&lg_k),
+ "lg_k must be in [{}, {}], got {}",
+ MIN_LG_K,
+ MAX_LG_K,
+ lg_k
+ );
+ self.lg_k = lg_k;
+ self
+ }
+
+ /// Set resize factor
+ pub fn resize_factor(mut self, rf: ResizeFactor) -> Self {
+ self.resize_factor = rf;
+ self
+ }
+
+ /// Set sampling probability p
+ ///
+ /// # Panics
+ ///
+ /// If p is not in range [0.0, 1.0]
+ pub fn sampling_probability(mut self, p: f32) -> Self {
+ assert!(
+ (0.0..=1.0).contains(&p),
+ "p must be in [0.0, 1.0], got {}",
+ p
+ );
+ self.sampling_probability = p;
+ self
+ }
+
+ /// Set hash seed
+ pub fn seed(mut self, seed: u64) -> Self {
+ self.seed = seed;
+ self
+ }
+
+ /// Build the ThetaSketch
+ pub fn build(self) -> ThetaSketch {
+ let table = ThetaHashTable::new(
+ self.lg_k,
+ self.resize_factor,
+ self.sampling_probability,
+ self.seed,
+ );
+
+ ThetaSketch { table }
+ }
+}
+
+/// Canonicalize double value for compatibility with Java
+fn canonical_double(value: f64) -> i64 {
+ if value.is_nan() {
+ 0x7ff8000000000000i64 // Java's Double.doubleToLongBits() NaN value
+ } else {
+ // -0.0 + 0.0 == +0.0 under IEEE754 roundTiesToEven rounding mode,
+ // which Rust guarantees. Thus by adding a positive zero we
+ // canonicalize signed zero without any branches in one instruction.
+ (value + 0.0).to_bits() as i64
+ }
+}
diff --git a/datasketches/tests/theta_sketch_test.rs
b/datasketches/tests/theta_sketch_test.rs
new file mode 100644
index 0000000..24ff06f
--- /dev/null
+++ b/datasketches/tests/theta_sketch_test.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datasketches::theta::ThetaSketch;
+
+#[test]
+fn test_basic_update() {
+ let mut sketch = ThetaSketch::builder().lg_k(12).build();
+ assert!(sketch.is_empty());
+ assert_eq!(sketch.estimate(), 0.0);
+
+ sketch.update("value1");
+ assert!(!sketch.is_empty());
+ assert_eq!(sketch.estimate(), 1.0);
+
+ sketch.update("value2");
+ assert_eq!(sketch.estimate(), 2.0);
+}
+
+#[test]
+fn test_update_various_types() {
+ let mut sketch = ThetaSketch::builder().lg_k(12).build();
+
+ sketch.update("string");
+ sketch.update(42i64);
+ sketch.update(42u64);
+ sketch.update_f64(3.15);
+ sketch.update_f64(3.15);
+ sketch.update_f32(3.15);
+ sketch.update_f32(3.15);
+ sketch.update([1u8, 2, 3]);
+
+ assert!(!sketch.is_empty());
+ assert_eq!(sketch.estimate(), 5.0);
+}
+
+#[test]
+fn test_duplicate_updates() {
+ let mut sketch = ThetaSketch::builder().lg_k(12).build();
+
+ for _ in 0..100 {
+ sketch.update("same_value");
+ }
+
+ assert_eq!(sketch.estimate(), 1.0);
+}
+
+#[test]
+fn test_theta_reduction() {
+ let mut sketch = ThetaSketch::builder().lg_k(5).build(); // Small k to
trigger theta reduction
+ assert!(!sketch.is_estimation_mode()); // Should be in estimation mode
+
+ // Insert many values to trigger theta reduction
+ for i in 0..1000 {
+ sketch.update(format!("value_{}", i));
+ }
+
+ assert!(sketch.is_estimation_mode()); // Should be in estimation mode
+ assert!(sketch.theta() < 1.0);
+}
+
+#[test]
+fn test_trim() {
+ let mut sketch = ThetaSketch::builder().lg_k(5).build();
+
+ // Insert many values
+ for i in 0..1000 {
+ sketch.update(format!("value_{}", i));
+ }
+
+ let before_trim = sketch.num_retained();
+ sketch.trim();
+ let after_trim = sketch.num_retained();
+
+ // After trim, should have approximately k entries
+ assert!(after_trim <= before_trim);
+ assert_eq!(sketch.num_retained(), 32);
+}
+
+#[test]
+fn test_reset() {
+ let mut sketch = ThetaSketch::builder().lg_k(5).build();
+
+ // Insert many values
+ for i in 0..1000 {
+ sketch.update(format!("value_{}", i));
+ }
+ assert!(!sketch.is_empty());
+ assert!(sketch.is_estimation_mode());
+ assert!(sketch.num_retained() > 32);
+ assert!(sketch.theta() < 1.0);
+
+ sketch.reset();
+ assert!(sketch.is_empty());
+ assert_eq!(sketch.estimate(), 0.0);
+ assert_eq!(sketch.theta(), 1.0);
+ assert_eq!(sketch.num_retained(), 0);
+ assert!(!sketch.is_estimation_mode());
+}
+
+#[test]
+fn test_iterator() {
+ let mut sketch = ThetaSketch::builder().lg_k(12).build();
+
+ sketch.update("value1");
+ sketch.update("value2");
+ sketch.update("value3");
+
+ let count: usize = sketch.iter().count();
+ assert_eq!(count, sketch.num_retained());
+}
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 0942c3d..f79a9f4 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -26,6 +26,10 @@ rust-version.workspace = true
name = "hll_update"
path = "src/hll_update.rs"
+[[bin]]
+name = "theta_sketch"
+path = "src/theta_sketch.rs"
+
[package.metadata.release]
release = false
diff --git a/examples/src/theta_sketch.rs b/examples/src/theta_sketch.rs
new file mode 100644
index 0000000..d76d240
--- /dev/null
+++ b/examples/src/theta_sketch.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Example demonstrating theta sketch usage
+
+use datasketches::theta::ThetaSketch;
+
+fn main() {
+ println!("=== Theta Sketch Example ===\n");
+
+ // Example 1: Basic usage
+ println!("1. Basic Theta Sketch Usage:");
+ let mut sketch = ThetaSketch::builder().lg_k(10).build();
+
+ for i in 0..100 {
+ sketch.update(format!("item_{}", i));
+ }
+ sketch.update("duplicatee_item");
+ sketch.update("duplicatee_item");
+
+ println!(" Estimate: {:.2}", sketch.estimate());
+ println!(" Theta: {:.6}", sketch.theta());
+ println!(" Num retained: {}", sketch.num_retained());
+ println!();
+
+ // Example 2: Add more data to enter estimation mode
+ println!("2. Add more data to enter estimation mode:");
+ for i in 0..5000 {
+ sketch.update(format!("item_{}", i));
+ }
+ println!(" Estimate: {:.2}", sketch.estimate());
+ println!(" Theta: {:.6}", sketch.theta());
+ println!(" Num retained: {}", sketch.num_retained());
+ println!();
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]