This is an automated email from the ASF dual-hosted git repository.
placave pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datasketches-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 9bbe3ff feat(hll): implement HllUnion (#25)
9bbe3ff is described below
commit 9bbe3ff2844684447bad2022e2ce2873404de9cb
Author: Filippo Rossi <[email protected]>
AuthorDate: Wed Dec 17 15:08:52 2025 +0100
feat(hll): implement HllUnion (#25)
* bootstrap
* basic
* get_result
* cleanup
* assertions
* cleanup
* api
* docs
* crate to super visibility
* license header
---
src/hll/array4.rs | 34 ++
src/hll/array6.rs | 23 +-
src/hll/array8.rs | 269 +++++++++++++
src/hll/container.rs | 4 +
src/hll/list.rs | 2 +-
src/hll/mod.rs | 25 +-
src/hll/mode.rs | 32 ++
src/hll/sketch.rs | 89 +++--
src/hll/union.rs | 1022 ++++++++++++++++++++++++++++++++++++++++++++++++++
9 files changed, 1465 insertions(+), 35 deletions(-)
diff --git a/src/hll/array4.rs b/src/hll/array4.rs
index e493fdc..4224381 100644
--- a/src/hll/array4.rs
+++ b/src/hll/array4.rs
@@ -71,6 +71,35 @@ impl Array4 {
}
}
+ /// Get the actual value at a slot (adjusted for cur_min and aux_map)
+ ///
+ /// Returns the true register value:
+ /// - If raw < 15: value = cur_min + raw
+ /// - If raw == 15 (AUX_TOKEN): value is in aux_map
+ pub(super) fn get(&self, slot: u32) -> u8 {
+ let raw = self.get_raw(slot);
+
+ if raw < AUX_TOKEN {
+ self.cur_min + raw
+ } else {
+ // Value is in aux_map
+ self.aux_map
+ .as_ref()
+ .and_then(|map| map.get(slot))
+ .unwrap_or(self.cur_min) // Fallback (shouldn't happen)
+ }
+ }
+
+ /// Get the number of registers (K = 2^lg_config_k)
+ pub(super) fn num_registers(&self) -> usize {
+ 1 << self.lg_config_k
+ }
+
+ /// Get the current HIP accumulator value
+ pub(super) fn hip_accum(&self) -> f64 {
+ self.estimator.hip_accum()
+ }
+
/// Set raw 4-bit value in slot
#[inline]
fn put_raw(&mut self, slot: u32, value: u8) {
@@ -245,6 +274,11 @@ impl Array4 {
self.estimator.set_hip_accum(value);
}
+ /// Check if the sketch is empty (all slots are zero)
+ pub fn is_empty(&self) -> bool {
+ self.num_at_cur_min == (1 << self.lg_config_k) && self.cur_min == 0
+ }
+
/// Deserialize Array4 from HLL mode bytes
///
/// Expects full HLL preamble (40 bytes) followed by packed 4-bit data and
optional aux map.
diff --git a/src/hll/array6.rs b/src/hll/array6.rs
index eee2c7f..3edd17a 100644
--- a/src/hll/array6.rs
+++ b/src/hll/array6.rs
@@ -68,6 +68,22 @@ impl Array6 {
((two_bytes >> shift) & VAL_MASK_6) as u8
}
+ /// Get the unpacked 6-bit value (0-63) at the given slot
+ #[inline]
+ pub(super) fn get(&self, slot: u32) -> u8 {
+ self.get_raw(slot)
+ }
+
+ /// Get the number of registers (K = 2^lg_config_k)
+ pub(super) fn num_registers(&self) -> usize {
+ 1 << self.lg_config_k
+ }
+
+ /// Get the current HIP accumulator value
+ pub(super) fn hip_accum(&self) -> f64 {
+ self.estimator.hip_accum()
+ }
+
/// Set value in a slot (6-bit value)
///
/// Uses read-modify-write on 16-bit window to preserve surrounding bits.
@@ -142,6 +158,11 @@ impl Array6 {
self.estimator.set_hip_accum(value);
}
+ /// Check if the sketch is empty (all slots are zero)
+ pub fn is_empty(&self) -> bool {
+ self.num_zeros == (1 << self.lg_config_k)
+ }
+
/// Deserialize Array6 from HLL mode bytes
///
/// Expects full HLL preamble (40 bytes) followed by packed 6-bit data.
@@ -247,8 +268,6 @@ impl Array6 {
}
}
-// Constants
-
/// Calculate number of bytes needed for k slots with 6 bits each
fn num_bytes_for_k(k: u32) -> usize {
// k slots * 6 bits = k * 6/8 bytes = k * 3/4 bytes
diff --git a/src/hll/array8.rs b/src/hll/array8.rs
index 7c56d0b..c329d21 100644
--- a/src/hll/array8.rs
+++ b/src/hll/array8.rs
@@ -112,6 +112,130 @@ impl Array8 {
self.estimator.set_hip_accum(value);
}
+ /// Check if the sketch is empty (all slots are zero)
+ pub fn is_empty(&self) -> bool {
+ self.num_zeros == (1 << self.lg_config_k)
+ }
+
+ /// Get read access to register values (one byte per register)
+ pub(super) fn values(&self) -> &[u8] {
+ &self.bytes
+ }
+
+ /// Get the number of registers (K = 2^lg_config_k)
+ pub(super) fn num_registers(&self) -> usize {
+ 1 << self.lg_config_k
+ }
+
+ /// Get the current HIP accumulator value
+ pub(super) fn hip_accum(&self) -> f64 {
+ self.estimator.hip_accum()
+ }
+
+ /// Directly set a register value
+ ///
+ /// This bypasses the normal update path and directly modifies the
register.
+ /// Caller must call rebuild_estimator_from_registers() after all
modifications.
+ pub(super) fn set_register(&mut self, slot: usize, value: u8) {
+ self.bytes[slot] = value;
+ }
+
+ /// Rebuild estimator state from current register values
+ ///
+ /// Recomputes num_zeros, kxq0, kxq1, and marks estimator as out-of-order.
+ /// Should be called after bulk register modifications.
+ pub(super) fn rebuild_estimator_from_registers(&mut self) {
+ self.rebuild_cached_values();
+ self.estimator.set_out_of_order(true);
+ }
+
+ /// Merge another Array8 with the same lg_k
+ ///
+ /// Performs register-by-register max merge. Marks estimator as
+ /// out-of-order since HIP cannot be maintained during bulk operations.
+ ///
+ /// # Panics
+ ///
+ /// Panics if src length doesn't match self length (different lg_k).
+ pub(super) fn merge_array_same_lgk(&mut self, src: &[u8]) {
+ assert_eq!(
+ src.len(),
+ self.bytes.len(),
+ "Source and destination must have same lg_k"
+ );
+
+ for (i, &val) in src.iter().enumerate() {
+ self.bytes[i] = self.bytes[i].max(val);
+ }
+
+ self.rebuild_cached_values();
+ self.estimator.set_out_of_order(true);
+ }
+
+ /// Merge an array with larger lg_k (downsampling)
+ ///
+ /// When merging a source with lg_k > dst lg_k, multiple source registers
+ /// map to each destination register using the masking operation:
+ /// `dst_slot = src_slot & ((1 << dst_lg_k) - 1)`
+ ///
+ /// The destination takes the max of all source values that map to it.
+ ///
+ /// # Parameters
+ ///
+ /// * `src` - Source register values (length must be 2^src_lg_k)
+ /// * `src_lg_k` - Log2 of source register count
+ ///
+ /// # Panics
+ ///
+ /// Panics if src_lg_k <= self.lg_config_k (not downsampling).
+ pub(super) fn merge_array_with_downsample(&mut self, src: &[u8], src_lg_k:
u8) {
+ assert!(
+ src_lg_k > self.lg_config_k,
+ "Source lg_k must be greater than destination lg_k for
downsampling"
+ );
+ assert_eq!(
+ src.len(),
+ 1 << src_lg_k,
+ "Source length must match 2^src_lg_k"
+ );
+
+ let dst_mask = (1 << self.lg_config_k) - 1;
+
+ for (src_slot, &val) in src.iter().enumerate() {
+ let dst_slot = (src_slot as u32 & dst_mask) as usize;
+ self.bytes[dst_slot] = self.bytes[dst_slot].max(val);
+ }
+
+ self.rebuild_cached_values();
+ self.estimator.set_out_of_order(true);
+ }
+
+ /// Rebuild cached values after bulk modifications
+ ///
+ /// Recomputes num_zeros by counting zero-valued registers.
+ /// This is needed after merge operations that bypass normal update paths.
+ fn rebuild_cached_values(&mut self) {
+ self.num_zeros = self.bytes.iter().filter(|&&v| v == 0).count() as u32;
+
+ // Recompute kxq values from actual register values
+ // This is essential after bulk merges where registers change but
estimator isn't updated incrementally
+ let mut kxq0_sum = 0.0;
+ let mut kxq1_sum = 0.0;
+
+ for &val in self.bytes.iter() {
+ if val == 0 {
+ kxq0_sum += 1.0;
+ } else if val < 32 {
+ kxq0_sum += 1.0 / (1u64 << val) as f64;
+ } else {
+ kxq1_sum += 1.0 / (1u64 << val) as f64;
+ }
+ }
+
+ self.estimator.set_kxq0(kxq0_sum);
+ self.estimator.set_kxq1(kxq1_sum);
+ }
+
/// Deserialize Array8 from HLL mode bytes
///
/// Expects full HLL preamble (40 bytes) followed by k bytes of data.
@@ -351,4 +475,149 @@ mod tests {
"kxq1 should be very small (1/2^50 ≈ 8.9e-16)"
);
}
+
+ #[test]
+ fn test_values_access() {
+ let mut arr = Array8::new(4); // 16 slots
+
+ // Set some values
+ arr.put(0, 10);
+ arr.put(5, 25);
+ arr.put(15, 63);
+
+ // Test read access via values()
+ let vals = arr.values();
+ assert_eq!(vals.len(), 16);
+ assert_eq!(vals[0], 10);
+ assert_eq!(vals[5], 25);
+ assert_eq!(vals[15], 63);
+ assert_eq!(vals[1], 0); // Untouched slot
+ }
+
+ #[test]
+ fn test_merge_array_same_lgk() {
+ let mut dst = Array8::new(4); // 16 slots
+ let mut src = Array8::new(4); // 16 slots
+
+ // Set up dst with some values
+ dst.put(0, 10);
+ dst.put(1, 20);
+ dst.put(2, 30);
+
+ // Set up src with overlapping and new values
+ src.put(1, 15); // Smaller than dst[1]=20, should keep 20
+ src.put(2, 35); // Larger than dst[2]=30, should update to 35
+ src.put(3, 40); // New value
+
+ // Merge src into dst
+ dst.merge_array_same_lgk(src.values());
+
+ // Check results
+ assert_eq!(dst.get(0), 10, "dst[0] unchanged");
+ assert_eq!(dst.get(1), 20, "dst[1] kept max value");
+ assert_eq!(dst.get(2), 35, "dst[2] updated to larger value");
+ assert_eq!(dst.get(3), 40, "dst[3] got new value");
+
+ // Verify estimator marked as OOO
+ assert!(dst.estimator.is_out_of_order());
+
+ // Verify num_zeros updated (should be 12: 16 - 4 non-zero)
+ assert_eq!(dst.num_zeros, 12);
+ }
+
+ #[test]
+ fn test_merge_array_with_downsample() {
+ // Downsampling from lg_k=5 (32 slots) to lg_k=4 (16 slots)
+ let mut dst = Array8::new(4); // 16 slots
+ let mut src = Array8::new(5); // 32 slots
+
+ // Set up dst
+ dst.put(0, 10);
+ dst.put(1, 20);
+
+ // Set up src - slots 0 and 16 both map to dst slot 0
+ src.put(0, 15); // maps to dst[0], max(10, 15) = 15
+ src.put(16, 25); // maps to dst[0], max(15, 25) = 25
+ src.put(1, 18); // maps to dst[1], max(20, 18) = 20
+ src.put(17, 30); // maps to dst[1], max(20, 30) = 30
+
+ // Merge with downsampling
+ dst.merge_array_with_downsample(src.values(), 5);
+
+ // Check results - dst takes max of all src slots that map to it
+ assert_eq!(dst.get(0), 25, "dst[0] = max(10, 15, 25)");
+ assert_eq!(dst.get(1), 30, "dst[1] = max(20, 18, 30)");
+
+ // Verify estimator marked as OOO
+ assert!(dst.estimator.is_out_of_order());
+ }
+
+ #[test]
+ #[should_panic(expected = "Source and destination must have same lg_k")]
+ fn test_merge_same_lgk_panics_on_size_mismatch() {
+ let mut dst = Array8::new(4); // 16 slots
+ let src = Array8::new(5); // 32 slots - wrong size!
+
+ dst.merge_array_same_lgk(src.values());
+ }
+
+ #[test]
+ #[should_panic(expected = "Source lg_k must be greater")]
+ fn test_merge_downsample_panics_if_not_downsampling() {
+ let mut dst = Array8::new(5); // 32 slots
+ let src = Array8::new(4); // 16 slots - can't upsample!
+
+ dst.merge_array_with_downsample(src.values(), 4);
+ }
+
+ #[test]
+ fn test_rebuild_cached_values() {
+ let mut arr = Array8::new(4); // 16 slots
+
+ // Set some non-zero values
+ arr.put(0, 10);
+ arr.put(1, 20);
+ arr.put(2, 30);
+
+ // Manually corrupt num_zeros
+ arr.num_zeros = 999;
+
+ // Rebuild should fix it
+ arr.rebuild_cached_values();
+
+ // Should be 13 zeros (16 total - 3 non-zero)
+ assert_eq!(arr.num_zeros, 13);
+ }
+
+ #[test]
+ fn test_merge_preserves_max_semantics() {
+ let mut dst = Array8::new(4);
+ let mut src = Array8::new(4);
+
+ // Fill dst with ascending values
+ for i in 0..16 {
+ dst.put(i, i as u8);
+ }
+
+ // Fill src with descending values
+ for i in 0..16 {
+ src.put(i, (15 - i) as u8);
+ }
+
+ dst.merge_array_same_lgk(src.values());
+
+ // Result should be max at each position
+ for i in 0..16 {
+ let expected = (i as u8).max((15 - i) as u8);
+ assert_eq!(
+ dst.get(i),
+ expected,
+ "slot {} should be max({}, {}) = {}",
+ i,
+ i,
+ 15 - i,
+ expected
+ );
+ }
+ }
}
diff --git a/src/hll/container.rs b/src/hll/container.rs
index a2356c5..4d358b1 100644
--- a/src/hll/container.rs
+++ b/src/hll/container.rs
@@ -96,6 +96,10 @@ impl Container {
self.len == self.coupons.len()
}
+ pub fn is_empty(&self) -> bool {
+ self.len == 0
+ }
+
pub fn capacity(&self) -> usize {
self.coupons.len()
}
diff --git a/src/hll/list.rs b/src/hll/list.rs
index f3588a2..8ef726a 100644
--- a/src/hll/list.rs
+++ b/src/hll/list.rs
@@ -100,7 +100,7 @@ impl List {
/// Serialize a List to bytes
pub fn serialize(&self, lg_config_k: u8, hll_type: HllType) -> Vec<u8> {
let compact = true; // Always use compact format
- let empty = self.container.len() == 0;
+ let empty = self.container.is_empty();
let coupon_count = self.container.len();
let lg_arr = self.container.lg_size();
diff --git a/src/hll/mod.rs b/src/hll/mod.rs
index 39e5778..40d1b31 100644
--- a/src/hll/mod.rs
+++ b/src/hll/mod.rs
@@ -33,6 +33,13 @@
//! Mode transitions are automatic and transparent to the user. Each promotion
preserves
//! all previously observed values and maintains estimation accuracy.
//!
+//! # Core Types
+//!
+//! The primary type for cardinality estimation is [`HllSketch`], which
maintains a single
+//! sketch and provides methods to update with new values and retrieve
cardinality estimates.
+//! For combining multiple sketches, use [`HllUnion`], which efficiently
merges sketches
+//! that may have different configurations.
+//!
//! # HLL Types
//!
//! Three target HLL types are supported, trading precision for memory:
@@ -41,6 +48,19 @@
//! - [`HllType::Hll6`]: 6 bits per bucket (balanced)
//! - [`HllType::Hll8`]: 8 bits per bucket (highest precision)
//!
+//! # Union Operations
+//!
+//! The [`HllUnion`] type enables combining multiple HLL sketches into a
unified estimate.
+//! It maintains an internal "gadget" sketch that accumulates the union of all
input sketches
+//! and automatically handles:
+//!
+//! - Sketches with different `lg_k` precision levels (resizes/downsamples as
needed)
+//! - Sketches in different modes (List, Set, or Array)
+//! - Sketches with different target HLL types
+//!
+//! The union operation preserves cardinality estimation accuracy while
enabling distributed
+//! computation patterns where sketches are built independently and merged
later.
+//!
//! # Serialization
//!
//! Sketches can be serialized and deserialized while preserving all state,
including:
@@ -66,11 +86,14 @@ mod estimator;
mod harmonic_numbers;
mod hash_set;
mod list;
+mod mode;
mod serialization;
mod sketch;
+mod union;
pub use self::estimator::NumStdDev;
pub use self::sketch::HllSketch;
+pub use self::union::HllUnion;
/// Target HLL type.
///
@@ -135,7 +158,7 @@ fn coupon<H: Hash>(v: H) -> u32 {
let capped = lz.min(62);
let value = capped + 1;
- value << KEY_BITS_26 | addr26
+ (value << KEY_BITS_26) | addr26
}
#[cfg(test)]
diff --git a/src/hll/mode.rs b/src/hll/mode.rs
new file mode 100644
index 0000000..ad3cc16
--- /dev/null
+++ b/src/hll/mode.rs
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::hll::HllType;
+use crate::hll::array4::Array4;
+use crate::hll::array6::Array6;
+use crate::hll::array8::Array8;
+use crate::hll::hash_set::HashSet;
+use crate::hll::list::List;
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum Mode {
+ List { list: List, hll_type: HllType },
+ Set { set: HashSet, hll_type: HllType },
+ Array4(Array4),
+ Array6(Array6),
+ Array8(Array8),
+}
diff --git a/src/hll/sketch.rs b/src/hll/sketch.rs
index b33cfb0..5aab18b 100644
--- a/src/hll/sketch.rs
+++ b/src/hll/sketch.rs
@@ -29,17 +29,10 @@ use crate::hll::array8::Array8;
use crate::hll::container::Container;
use crate::hll::hash_set::HashSet;
use crate::hll::list::List;
+use crate::hll::mode::Mode;
use crate::hll::serialization::*;
use crate::hll::{HllType, NumStdDev, RESIZE_DENOMINATOR, RESIZE_NUMERATOR,
coupon};
-/// Current sketch mode
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-enum CurMode {
- List = 0,
- Set = 1,
- Hll = 2,
-}
-
/// A HyperLogLog sketch.
///
/// See the [hll module level documentation](crate::hll) for more.
@@ -49,15 +42,6 @@ pub struct HllSketch {
mode: Mode,
}
-#[derive(Debug, Clone, PartialEq)]
-enum Mode {
- List { list: List, hll_type: HllType },
- Set { set: HashSet, hll_type: HllType },
- Array4(Array4),
- Array6(Array6),
- Array8(Array8),
-}
-
impl HllSketch {
/// Create a new HLL sketch
///
@@ -86,6 +70,55 @@ impl HllSketch {
}
}
+ /// Create an HLL sketch directly from a Mode
+ ///
+ /// This is used internally (e.g., by union operations) to construct
+ /// sketches in specific modes without going through List mode first.
+ ///
+ /// # Arguments
+ ///
+ /// * `lg_config_k` - Log2 of the number of buckets (K)
+ /// * `mode` - The mode to initialize the sketch with
+ pub(super) fn from_mode(lg_config_k: u8, mode: Mode) -> Self {
+ Self { lg_config_k, mode }
+ }
+
+ /// Get the current mode of the sketch
+ pub(super) fn mode(&self) -> &Mode {
+ &self.mode
+ }
+
+ /// Get mutable access to the current mode
+ ///
+ /// # Safety
+ ///
+ /// Caller must maintain internal invariants (num_zeros, estimator state).
+ pub(super) fn mode_mut(&mut self) -> &mut Mode {
+ &mut self.mode
+ }
+
+ /// Check if the sketch is empty (no values have been added)
+ pub fn is_empty(&self) -> bool {
+ match &self.mode {
+ Mode::List { list, .. } => list.container().is_empty(),
+ Mode::Set { set, .. } => set.container().is_empty(),
+ Mode::Array4(arr) => arr.is_empty(),
+ Mode::Array6(arr) => arr.is_empty(),
+ Mode::Array8(arr) => arr.is_empty(),
+ }
+ }
+
+ /// Get the target HLL type for this sketch
+ pub fn target_type(&self) -> HllType {
+ match &self.mode {
+ Mode::List { hll_type, .. } => *hll_type,
+ Mode::Set { hll_type, .. } => *hll_type,
+ Mode::Array4(_) => HllType::Hll4,
+ Mode::Array6(_) => HllType::Hll6,
+ Mode::Array8(_) => HllType::Hll8,
+ }
+ }
+
/// Get the configured lg_config_k
pub fn lg_config_k(&self) -> u8 {
self.lg_config_k
@@ -102,9 +135,8 @@ impl HllSketch {
/// Update the sketch with a raw coupon value
///
- /// This is useful when you've already computed the coupon externally,
- /// or when deserializing and replaying coupons.
- fn update_with_coupon(&mut self, coupon: u32) {
+ /// Maintains all sketch invariants including mode transitions and
estimator updates.
+ pub(super) fn update_with_coupon(&mut self, coupon: u32) {
match &mut self.mode {
Mode::List { list, hll_type } => {
list.update(coupon);
@@ -214,13 +246,6 @@ impl HllSketch {
)));
}
- // Extract mode and type
- let cur_mode = match extract_cur_mode(mode_byte) {
- CUR_MODE_LIST => CurMode::List,
- CUR_MODE_SET => CurMode::Set,
- CUR_MODE_HLL => CurMode::Hll,
- mode => return Err(SerdeError::MalformedData(format!("invalid
mode: {}", mode))),
- };
let hll_type = match extract_tgt_hll_type(mode_byte) {
TGT_HLL4 => HllType::Hll4,
TGT_HLL6 => HllType::Hll6,
@@ -232,14 +257,15 @@ impl HllSketch {
)));
}
};
+
let empty = (flags & EMPTY_FLAG_MASK) != 0;
let compact = (flags & COMPACT_FLAG_MASK) != 0;
let ooo = (flags & OUT_OF_ORDER_FLAG_MASK) != 0;
// Deserialize based on mode
let mode =
- match cur_mode {
- CurMode::List => {
+ match extract_cur_mode(mode_byte) {
+ CUR_MODE_LIST => {
if preamble_ints != LIST_PREINTS {
return Err(SerdeError::MalformedData(format!(
"LIST mode preamble: expected {}, got {}",
@@ -250,7 +276,7 @@ impl HllSketch {
let list = List::deserialize(bytes, empty, compact)?;
Mode::List { list, hll_type }
}
- CurMode::Set => {
+ CUR_MODE_SET => {
if preamble_ints != HASH_SET_PREINTS {
return Err(SerdeError::MalformedData(format!(
"SET mode preamble: expected {}, got {}",
@@ -261,7 +287,7 @@ impl HllSketch {
let set = HashSet::deserialize(bytes, compact)?;
Mode::Set { set, hll_type }
}
- CurMode::Hll => {
+ CUR_MODE_HLL => {
if preamble_ints != HLL_PREINTS {
return Err(SerdeError::MalformedData(format!(
"HLL mode preamble: expected {}, got {}",
@@ -278,6 +304,7 @@ impl HllSketch {
.map(Mode::Array8)?,
}
}
+ mode => return Err(SerdeError::MalformedData(format!("invalid
mode: {}", mode))),
};
Ok(HllSketch { lg_config_k, mode })
diff --git a/src/hll/union.rs b/src/hll/union.rs
new file mode 100644
index 0000000..89795a3
--- /dev/null
+++ b/src/hll/union.rs
@@ -0,0 +1,1022 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! HyperLogLog Union for combining multiple HLL sketches
+//!
+//! The HLL Union allows combining multiple HLL sketches into a single unified
+//! sketch, enabling set union operations for cardinality estimation.
+//!
+//! # Overview
+//!
+//! The union maintains an internal "gadget" sketch that accumulates the union
+//! of all input sketches. It can handle sketches with:
+//! - Different lg_k values (automatically resizes as needed)
+//! - Different modes (List, Set, Array4/6/8)
+//! - Different target HLL types
+
+use crate::hll::array4::Array4;
+use crate::hll::array6::Array6;
+use crate::hll::array8::Array8;
+use crate::hll::mode::Mode;
+use crate::hll::{HllSketch, HllType, NumStdDev, pack_coupon};
+use std::hash::Hash;
+
+/// An HLL Union for combining multiple HLL sketches.
+///
+/// The union maintains an internal sketch (the "gadget") that accumulates
+/// the union of all input sketches. It automatically handles sketches with
+/// different configurations and modes.
+///
+/// See the [hll module level documentation](crate::hll) for more.
+#[derive(Debug, Clone)]
+pub struct HllUnion {
+ /// Maximum lg_k that this union can handle
+ lg_max_k: u8,
+ /// Internal sketch that accumulates the union
+ gadget: HllSketch,
+}
+
+impl HllUnion {
+ /// Create a new HLL Union
+ ///
+ /// # Arguments
+ ///
+ /// * `lg_max_k` - Maximum log2 of the number of buckets. Must be in [4,
21].
+ /// This determines the maximum precision the union can handle. Input
sketches
+ /// with larger lg_k will be down-sampled.
+ ///
+ /// # Panics
+ ///
+ /// Panics if `lg_max_k` is not in the range [4, 21].
+ pub fn new(lg_max_k: u8) -> Self {
+ assert!(
+ (4..=21).contains(&lg_max_k),
+ "lg_max_k must be in [4, 21], got {}",
+ lg_max_k
+ );
+
+ // Start with an empty gadget at lg_max_k using Hll8
+ let gadget = HllSketch::new(lg_max_k, HllType::Hll8);
+
+ Self { lg_max_k, gadget }
+ }
+
+ /// Update the union's gadget with a value
+ ///
+ /// This accepts any type that implements `Hash`. The value is hashed
+ /// and converted to a coupon, which is then inserted into the sketch.
+ pub fn update_value<T: Hash>(&mut self, value: T) {
+ self.gadget.update(value);
+ }
+
+ /// Update the union with another sketch
+ ///
+ /// Merges the input sketch into the union's internal gadget, handling:
+ /// - Sketches with different lg_k values (resizes/downsamples as needed)
+ /// - Sketches in different modes (List, Set, Array4/6/8)
+ /// - Sketches with different target HLL types
+ pub fn update(&mut self, sketch: &HllSketch) {
+ if sketch.is_empty() {
+ return;
+ }
+
+ let src_lg_k = sketch.lg_config_k();
+ let dst_lg_k = self.gadget.lg_config_k();
+ let src_mode = sketch.mode();
+
+ match src_mode {
+ Mode::List { .. } | Mode::Set { .. } => {
+ self.update_from_list_or_set(sketch, src_mode, src_lg_k,
dst_lg_k);
+ }
+ Mode::Array4(_) | Mode::Array6(_) | Mode::Array8(_) => {
+ self.update_from_array(src_mode, src_lg_k, dst_lg_k);
+ }
+ }
+ }
+
+ /// Update union from a List or Set mode sketch
+ fn update_from_list_or_set(
+ &mut self,
+ sketch: &HllSketch,
+ src_mode: &Mode,
+ src_lg_k: u8,
+ dst_lg_k: u8,
+ ) {
+ // Fast path: If gadget is empty and lg_k matches, directly copy as
HLL_8
+ if self.gadget.is_empty() && src_lg_k == dst_lg_k {
+ self.gadget = if sketch.target_type() == HllType::Hll8 {
+ sketch.clone()
+ } else {
+ // Convert to Hll8 by changing target type
+ convert_coupon_mode_to_hll8(src_mode, src_lg_k)
+ };
+ } else {
+ // Regular path: merge coupons into gadget
+ merge_coupons_into_gadget(&mut self.gadget, src_mode);
+ }
+ }
+
+ /// Update union from an Array mode sketch
+ fn update_from_array(&mut self, src_mode: &Mode, src_lg_k: u8, dst_lg_k:
u8) {
+ // Fast path: If gadget is empty, just copy/downsample source
+ if self.gadget.is_empty() {
+ let new_array = copy_or_downsample(src_mode, src_lg_k,
self.lg_max_k);
+ let final_lg_k = new_array.num_registers().trailing_zeros() as u8;
+ self.gadget = HllSketch::from_mode(final_lg_k,
Mode::Array8(new_array));
+ return;
+ }
+
+ let is_gadget_array = matches!(self.gadget.mode(), Mode::Array8(_));
+
+ if is_gadget_array {
+ self.merge_array_into_array_gadget(src_mode, src_lg_k, dst_lg_k);
+ } else {
+ self.promote_gadget_and_merge_array(src_mode, src_lg_k);
+ }
+ }
+
+ /// Merge an array source into an array gadget
+ fn merge_array_into_array_gadget(&mut self, src_mode: &Mode, src_lg_k: u8,
dst_lg_k: u8) {
+ if src_lg_k < dst_lg_k {
+ // Source has lower precision - must downsize gadget
+ let mut new_array = Array8::new(src_lg_k);
+
+ match self.gadget.mode() {
+ Mode::Array8(old_gadget) => {
+ merge_array_with_downsample(
+ &mut new_array,
+ src_lg_k,
+ &Mode::Array8(old_gadget.clone()),
+ dst_lg_k,
+ );
+ }
+ _ => {
+ unreachable!("gadget mode changed unexpectedly; should
never be Array4/Array6")
+ }
+ }
+
+ merge_array_same_lgk(&mut new_array, src_mode);
+ self.gadget = HllSketch::from_mode(src_lg_k,
Mode::Array8(new_array));
+ } else {
+ // Standard merge: src_lg_k >= dst_lg_k
+ match self.gadget.mode_mut() {
+ Mode::Array8(dst_array) => {
+ merge_array_into_array8(dst_array, dst_lg_k, src_mode,
src_lg_k);
+ }
+ _ => {
+ unreachable!("gadget mode changed unexpectedly; should
never be Array4/Array6")
+ }
+ }
+ }
+ }
+
+ /// Promote gadget from List/Set to Array and merge array source
+ fn promote_gadget_and_merge_array(&mut self, src_mode: &Mode, src_lg_k:
u8) {
+ let mut new_array = copy_or_downsample(src_mode, src_lg_k,
self.lg_max_k);
+
+ let old_gadget_mode = self.gadget.mode();
+ merge_coupons_into_mode(&mut new_array, old_gadget_mode);
+
+ let final_lg_k = new_array.num_registers().trailing_zeros() as u8;
+ self.gadget = HllSketch::from_mode(final_lg_k,
Mode::Array8(new_array));
+ }
+
+ /// Get the union result as a new sketch
+ ///
+ /// Returns a copy of the internal gadget sketch with the specified target
HLL type.
+ /// If the requested type differs from the gadget's type, conversion is
performed.
+ ///
+ /// # Arguments
+ ///
+ /// * `hll_type` - The target HLL type for the result sketch (Hll4, Hll6,
or Hll8)
+ pub fn get_result(&self, hll_type: HllType) -> HllSketch {
+ let gadget_type = self.gadget.target_type();
+
+ if hll_type == gadget_type {
+ return self.gadget.clone();
+ }
+
+ match self.gadget.mode() {
+ Mode::List { list, .. } => HllSketch::from_mode(
+ self.gadget.lg_config_k(),
+ Mode::List {
+ list: list.clone(),
+ hll_type,
+ },
+ ),
+ Mode::Set { set, .. } => HllSketch::from_mode(
+ self.gadget.lg_config_k(),
+ Mode::Set {
+ set: set.clone(),
+ hll_type,
+ },
+ ),
+ Mode::Array8(array8) => {
+ convert_array8_to_type(array8, self.gadget.lg_config_k(),
hll_type)
+ }
+ Mode::Array4(_) | Mode::Array6(_) => {
+ unreachable!("gadget mode changed unexpectedly; should never
be Array4/Array6")
+ }
+ }
+ }
+
+ /// Get the current lg_config_k of the internal gadget
+ pub fn lg_config_k(&self) -> u8 {
+ self.gadget.lg_config_k()
+ }
+
+ /// Get the maximum lg_k this union can handle
+ pub fn lg_max_k(&self) -> u8 {
+ self.lg_max_k
+ }
+
+ /// Check if the union is empty
+ pub fn is_empty(&self) -> bool {
+ self.gadget.is_empty()
+ }
+
+ /// Reset the union to its initial empty state
+ ///
+ /// Clears all data from the internal gadget, allowing the union to be
reused
+ /// for a new set of operations.
+ pub fn reset(&mut self) {
+ self.gadget = HllSketch::new(self.lg_max_k, HllType::Hll8);
+ }
+
+ /// Get the current cardinality estimate of the union
+ pub fn estimate(&self) -> f64 {
+ self.gadget.estimate()
+ }
+
+ /// Get upper bound for cardinality estimate of the union
+ ///
+ /// Returns the upper confidence bound for the cardinality estimate based
on
+ /// the number of standard deviations requested.
+ pub fn upper_bound(&self, num_std_dev: NumStdDev) -> f64 {
+ self.gadget.upper_bound(num_std_dev)
+ }
+
+ /// Get lower bound for cardinality estimate of the union
+ ///
+ /// Returns the lower confidence bound for the cardinality estimate based
on
+ /// the number of standard deviations requested.
+ pub fn lower_bound(&self, num_std_dev: NumStdDev) -> f64 {
+ self.gadget.lower_bound(num_std_dev)
+ }
+}
+
+/// Convert a coupon mode (List or Set) to Hll8 target type
+fn convert_coupon_mode_to_hll8(src_mode: &Mode, src_lg_k: u8) -> HllSketch {
+ match src_mode {
+ Mode::List { list, .. } => HllSketch::from_mode(
+ src_lg_k,
+ Mode::List {
+ list: list.clone(),
+ hll_type: HllType::Hll8,
+ },
+ ),
+ Mode::Set { set, .. } => HllSketch::from_mode(
+ src_lg_k,
+ Mode::Set {
+ set: set.clone(),
+ hll_type: HllType::Hll8,
+ },
+ ),
+ _ => unreachable!("convert_coupon_mode_to_hll8 called with non-coupon
mode"),
+ }
+}
+
+/// Merge coupons from a List or Set mode into the gadget
+///
+/// Iterates over all coupons in the source and updates the gadget.
+/// The gadget handles mode transitions automatically (List → Set → Array).
+fn merge_coupons_into_gadget(gadget: &mut HllSketch, src_mode: &Mode) {
+ match src_mode {
+ Mode::List { list, .. } => {
+ for coupon in list.container().iter() {
+ gadget.update_with_coupon(coupon);
+ }
+ }
+ Mode::Set { set, .. } => {
+ for coupon in set.container().iter() {
+ gadget.update_with_coupon(coupon);
+ }
+ }
+ Mode::Array4(_) | Mode::Array6(_) | Mode::Array8(_) => {
+ unreachable!(
+ "merge_coupons_into_gadget called with array mode; array modes
should use merge_array_into_array8"
+ );
+ }
+ }
+}
+
+/// Merge coupons from a List or Set mode into an Array8
+fn merge_coupons_into_mode(dst: &mut Array8, src_mode: &Mode) {
+ match src_mode {
+ Mode::List { list, .. } => {
+ for coupon in list.container().iter() {
+ dst.update(coupon);
+ }
+ }
+ Mode::Set { set, .. } => {
+ for coupon in set.container().iter() {
+ dst.update(coupon);
+ }
+ }
+ Mode::Array4(_) | Mode::Array6(_) | Mode::Array8(_) => {
+ unreachable!(
+ "merge_coupons_into_mode called with array mode; array modes
should use copy_or_downsample"
+ );
+ }
+ }
+}
+
+/// Merge an HLL array into an Array8
+///
+/// Handles merging from Array4, Array6, or Array8 sources. Dispatches based
on lg_k:
+/// - Same lg_k: optimized bulk merge
+/// - src lg_k > dst lg_k: downsample src into dst
+/// - src lg_k < dst lg_k: handled by caller (requires gadget replacement)
+fn merge_array_into_array8(dst_array8: &mut Array8, dst_lg_k: u8, src_mode:
&Mode, src_lg_k: u8) {
+ assert!(
+ src_lg_k >= dst_lg_k,
+ "merge_array_into_array8 requires src_lg_k >= dst_lg_k (got src={},
dst={})",
+ src_lg_k,
+ dst_lg_k
+ );
+
+ if dst_lg_k == src_lg_k {
+ merge_array_same_lgk(dst_array8, src_mode);
+ } else {
+ merge_array_with_downsample(dst_array8, dst_lg_k, src_mode, src_lg_k);
+ }
+}
+
+/// Extract HIP accumulator from an array mode
+fn get_array_hip_accum(mode: &Mode) -> f64 {
+ match mode {
+ Mode::Array8(src) => src.hip_accum(),
+ Mode::Array6(src) => src.hip_accum(),
+ Mode::Array4(src) => src.hip_accum(),
+ Mode::List { .. } | Mode::Set { .. } => {
+ unreachable!("get_array_hip_accum called with non-array mode;
List/Set not supported");
+ }
+ }
+}
+
+/// Merge Array4/Array6 into Array8 by iterating registers
+fn merge_array46_same_lgk(dst: &mut Array8, num_registers: usize, get_value:
impl Fn(u32) -> u8) {
+ for slot in 0..num_registers {
+ let val = get_value(slot as u32);
+ let current = dst.values()[slot];
+ if val > current {
+ dst.set_register(slot, val);
+ }
+ }
+ dst.rebuild_estimator_from_registers();
+}
+
+/// Merge arrays with same lg_k
+///
+/// Takes the max of corresponding registers. HIP accumulator is invalidated
by the merge.
+fn merge_array_same_lgk(dst: &mut Array8, src_mode: &Mode) {
+ match src_mode {
+ Mode::Array8(src) => {
+ dst.merge_array_same_lgk(src.values());
+ }
+ Mode::Array6(src) => {
+ merge_array46_same_lgk(dst, src.num_registers(), |slot|
src.get(slot));
+ }
+ Mode::Array4(src) => {
+ merge_array46_same_lgk(dst, src.num_registers(), |slot|
src.get(slot));
+ }
+ _ => {
+ unreachable!("merge_array_same_lgk called with non-array mode;
List/Set not supported")
+ }
+ }
+}
+
+/// Merge Array4/Array6 into Array8 with downsampling
+fn merge_array46_with_downsample(
+ dst: &mut Array8,
+ dst_lg_k: u8,
+ num_registers: usize,
+ get_value: impl Fn(u32) -> u8,
+) {
+ let dst_mask = (1 << dst_lg_k) - 1;
+ for src_slot in 0..num_registers {
+ let val = get_value(src_slot as u32);
+ if val > 0 {
+ let dst_slot = (src_slot as u32 & dst_mask) as usize;
+ let current = dst.values()[dst_slot];
+ if val > current {
+ dst.set_register(dst_slot, val);
+ }
+ }
+ }
+ dst.rebuild_estimator_from_registers();
+}
+
+/// Merge arrays with downsampling (src lg_k > dst lg_k)
+///
+/// Multiple source registers map to each destination register via masking.
+/// HIP accumulator is invalidated by the merge.
+fn merge_array_with_downsample(dst: &mut Array8, dst_lg_k: u8, src_mode:
&Mode, src_lg_k: u8) {
+ assert!(
+ src_lg_k > dst_lg_k,
+ "merge_array_with_downsample requires src_lg_k > dst_lg_k (got src={},
dst={})",
+ src_lg_k,
+ dst_lg_k
+ );
+
+ match src_mode {
+ Mode::Array8(src) => {
+ dst.merge_array_with_downsample(src.values(), src_lg_k);
+ }
+ Mode::Array6(src) => {
+ merge_array46_with_downsample(dst, dst_lg_k, src.num_registers(),
|slot| src.get(slot));
+ }
+ Mode::Array4(src) => {
+ merge_array46_with_downsample(dst, dst_lg_k, src.num_registers(),
|slot| src.get(slot));
+ }
+ _ => unreachable!(
+ "merge_array_with_downsample called with non-array mode; List/Set
not supported"
+ ),
+ }
+}
+
+/// Convert Array8 to a different HLL type
+///
+/// Creates a new sketch with the requested type by copying register values
+/// from the Array8 source. Preserves the HIP accumulator.
+fn convert_array8_to_type(src: &Array8, lg_config_k: u8, target_type: HllType)
-> HllSketch {
+ match target_type {
+ HllType::Hll8 => HllSketch::from_mode(lg_config_k,
Mode::Array8(src.clone())),
+ HllType::Hll6 => {
+ let mut array6 = Array6::new(lg_config_k);
+ for slot in 0..src.num_registers() {
+ let val = src.values()[slot];
+ if val > 0 {
+ let clamped_val = val.min(63);
+ let coupon = pack_coupon(slot as u32, clamped_val);
+ array6.update(coupon);
+ }
+ }
+
+ let src_est = src.estimate();
+ let arr6_est = array6.estimate();
+ if src_est > arr6_est {
+ array6.set_hip_accum(src_est);
+ }
+
+ HllSketch::from_mode(lg_config_k, Mode::Array6(array6))
+ }
+ HllType::Hll4 => {
+ let mut array4 = Array4::new(lg_config_k);
+ for slot in 0..src.num_registers() {
+ let val = src.values()[slot];
+ if val > 0 {
+ let coupon = pack_coupon(slot as u32, val);
+ array4.update(coupon);
+ }
+ }
+
+ let src_est = src.estimate();
+ let arr4_est = array4.estimate();
+ if src_est > arr4_est {
+ array4.set_hip_accum(src_est);
+ }
+
+ HllSketch::from_mode(lg_config_k, Mode::Array4(array4))
+ }
+ }
+}
+
+/// Copy Array4/Array6 registers into Array8 by converting to coupons
+fn copy_array46_via_coupons(dst: &mut Array8, num_registers: usize, get_value:
impl Fn(u32) -> u8) {
+ for slot in 0..num_registers {
+ let val = get_value(slot as u32);
+ if val > 0 {
+ let coupon = pack_coupon(slot as u32, val);
+ dst.update(coupon);
+ }
+ }
+}
+
+/// Copy or downsample a source array to create a new Array8
+///
+/// Directly copies if src_lg_k <= tgt_lg_k, downsamples otherwise.
+/// Result is marked as out-of-order and HIP accumulator is preserved.
+fn copy_or_downsample(src_mode: &Mode, src_lg_k: u8, tgt_lg_k: u8) -> Array8 {
+ if src_lg_k <= tgt_lg_k {
+ let mut result = Array8::new(src_lg_k);
+ let src_hip = get_array_hip_accum(src_mode);
+
+ match src_mode {
+ Mode::Array8(src) => {
+ result.merge_array_same_lgk(src.values());
+ }
+ Mode::Array6(src) => {
+ copy_array46_via_coupons(&mut result, src.num_registers(),
|slot| src.get(slot));
+ }
+ Mode::Array4(src) => {
+ copy_array46_via_coupons(&mut result, src.num_registers(),
|slot| src.get(slot));
+ }
+ Mode::List { .. } | Mode::Set { .. } => {
+ unreachable!(
+ "copy_or_downsample called with non-array mode; List/Set
not supported"
+ );
+ }
+ }
+
+ result.set_hip_accum(src_hip);
+ result
+ } else {
+ // Downsample from src to tgt
+ let mut result = Array8::new(tgt_lg_k);
+ merge_array_with_downsample(&mut result, tgt_lg_k, src_mode, src_lg_k);
+ result
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_union_basic_list_mode() {
+ // Test the simplest case: union of two sketches in List mode
+ let mut union = HllUnion::new(12);
+
+ // Create first sketch and add some values
+ let mut sketch1 = HllSketch::new(12, HllType::Hll8);
+ sketch1.update("foo");
+ sketch1.update("bar");
+ sketch1.update("baz");
+
+ // Create second sketch with overlapping and new values
+ let mut sketch2 = HllSketch::new(12, HllType::Hll8);
+ sketch2.update("bar"); // duplicate
+ sketch2.update("qux"); // new
+ sketch2.update("quux"); // new
+
+ // Union them
+ union.update(&sketch1);
+ union.update(&sketch2);
+
+ // Get result
+ let result = union.get_result(HllType::Hll8);
+
+ // Should have ~5 unique values (foo, bar, baz, qux, quux)
+ let estimate = result.estimate();
+ assert!(
+ (4.0..=6.0).contains(&estimate),
+ "Expected estimate around 5, got {}",
+ estimate
+ );
+
+ // Should not be empty
+ assert!(!result.is_empty());
+ }
+
+ #[test]
+ fn test_union_empty_sketch() {
+ let mut union = HllUnion::new(10);
+ let empty_sketch = HllSketch::new(10, HllType::Hll8);
+
+ // Updating with empty sketch should not panic
+ union.update(&empty_sketch);
+
+ // Union should still be empty
+ assert!(union.is_empty());
+ }
+
+ #[test]
+ fn test_union_estimate_accuracy() {
+ let mut union = HllUnion::new(12);
+
+ // Add 1000 unique values across multiple sketches
+ // This will cause sketches to promote to Array mode
+ let mut sketch1 = HllSketch::new(12, HllType::Hll8);
+ for i in 0..500 {
+ sketch1.update(i);
+ }
+
+ let mut sketch2 = HllSketch::new(12, HllType::Hll8);
+ for i in 400..900 {
+ // 400-500 overlap with sketch1
+ sketch2.update(i);
+ }
+
+ union.update(&sketch1);
+ union.update(&sketch2);
+
+ let result = union.get_result(HllType::Hll8);
+ let estimate = result.estimate();
+
+ // Should estimate ~900 unique values (0-899)
+ // With lg_k=12, we expect ~1.6% relative error
+ // So estimate should be within 900 ± 50 or so
+ assert!(
+ estimate > 800.0 && estimate < 1000.0,
+ "Expected estimate around 900, got {}",
+ estimate
+ );
+ }
+
+ #[test]
+ fn test_union_array_to_array_same_lgk() {
+ // Test merging two Array mode sketches with same lg_k
+ let mut union = HllUnion::new(12);
+
+ // Create two sketches that will be in Array mode (add enough values)
+ let mut sketch1 = HllSketch::new(12, HllType::Hll8);
+ for i in 0..10_000 {
+ sketch1.update(i);
+ }
+
+ let mut sketch2 = HllSketch::new(12, HllType::Hll8);
+ for i in 5_000..15_000 {
+ sketch2.update(i);
+ }
+
+ // Both should be in Array mode now
+ assert!(matches!(sketch1.mode(), Mode::Array8(_)));
+ assert!(matches!(sketch2.mode(), Mode::Array8(_)));
+
+ union.update(&sketch1);
+ union.update(&sketch2);
+
+ let result = union.get_result(HllType::Hll8);
+ let estimate = result.estimate();
+
+ // Should estimate ~15,000 unique values (0-14,999)
+ // With lg_k=12, we expect ~1.6% relative error
+ assert!(
+ estimate > 14_000.0 && estimate < 16_000.0,
+ "Expected estimate around 15000, got {}",
+ estimate
+ );
+ }
+
+ #[test]
+ fn test_union_downsampling_src_larger() {
+ // Test src_lg_k > dst_lg_k (downsampling)
+ let mut union = HllUnion::new(10); // Union at lg_k=10
+
+ // Create sketch at lg_k=12 (higher precision)
+ let mut sketch = HllSketch::new(12, HllType::Hll8);
+ for i in 0..5_000 {
+ sketch.update(i);
+ }
+
+ // Union should downsample sketch to lg_k=10
+ union.update(&sketch);
+
+ let result = union.get_result(HllType::Hll8);
+ let estimate = result.estimate();
+
+ // Should still estimate ~5,000 unique values
+ assert!(
+ estimate > 4_000.0 && estimate < 6_000.0,
+ "Expected estimate around 5000, got {}",
+ estimate
+ );
+ assert_eq!(result.lg_config_k(), 10, "Result should be at lg_k=10");
+ }
+
+ #[test]
+ fn test_union_gadget_downsizing_src_smaller() {
+ // Test src_lg_k < dst_lg_k (gadget downsizing)
+ let mut union = HllUnion::new(12);
+
+ // First update with lg_k=12 sketch to establish gadget at lg_k=12
+ let mut sketch1 = HllSketch::new(12, HllType::Hll8);
+ for i in 0..10_000 {
+ sketch1.update(i);
+ }
+ union.update(&sketch1);
+ assert_eq!(union.lg_config_k(), 12, "Gadget should be at lg_k=12");
+
+ // Now update with lg_k=10 sketch (lower precision)
+ let mut sketch2 = HllSketch::new(10, HllType::Hll8);
+ for i in 5_000..15_000 {
+ sketch2.update(i);
+ }
+
+ // This should trigger gadget downsizing to lg_k=10
+ union.update(&sketch2);
+
+ let result = union.get_result(HllType::Hll8);
+ let estimate = result.estimate();
+
+ // Should estimate ~15,000 unique values
+ assert!(
+ estimate > 13_000.0 && estimate < 17_000.0,
+ "Expected estimate around 15000, got {}",
+ estimate
+ );
+ assert_eq!(
+ result.lg_config_k(),
+ 10,
+ "Gadget should have downsized to lg_k=10"
+ );
+ }
+
+ #[test]
+ fn test_union_list_to_array() {
+ // Test union where first sketch is List and second is Array
+ let mut union = HllUnion::new(12);
+
+ // First sketch: small (List mode)
+ let mut sketch1 = HllSketch::new(12, HllType::Hll8);
+ sketch1.update("a");
+ sketch1.update("b");
+ sketch1.update("c");
+ assert!(matches!(sketch1.mode(), Mode::List { .. }));
+
+ // Second sketch: large (Array mode)
+ let mut sketch2 = HllSketch::new(12, HllType::Hll8);
+ for i in 0..10_000 {
+ sketch2.update(i);
+ }
+ assert!(matches!(sketch2.mode(), Mode::Array8(_)));
+
+ union.update(&sketch1);
+ union.update(&sketch2);
+
+ let result = union.get_result(HllType::Hll8);
+ let estimate = result.estimate();
+
+ // Should estimate ~10,003 unique values
+ assert!(
+ estimate > 9_500.0 && estimate < 10_500.0,
+ "Expected estimate around 10000, got {}",
+ estimate
+ );
+ }
+
+ #[test]
+ fn test_union_array_to_list() {
+ // Test union where first sketch is Array and second is List
+ let mut union = HllUnion::new(12);
+
+ // First sketch: large (Array mode)
+ let mut sketch1 = HllSketch::new(12, HllType::Hll8);
+ for i in 0..10_000 {
+ sketch1.update(i);
+ }
+ assert!(matches!(sketch1.mode(), Mode::Array8(_)));
+
+ // Second sketch: small (List mode)
+ let mut sketch2 = HllSketch::new(12, HllType::Hll8);
+ sketch2.update("a");
+ sketch2.update("b");
+ sketch2.update("c");
+ assert!(matches!(sketch2.mode(), Mode::List { .. }));
+
+ union.update(&sketch1);
+ union.update(&sketch2);
+
+ let result = union.get_result(HllType::Hll8);
+ let estimate = result.estimate();
+
+ // Should estimate ~10,003 unique values
+ assert!(
+ estimate > 9_500.0 && estimate < 10_500.0,
+ "Expected estimate around 10000, got {}",
+ estimate
+ );
+ }
+
+ #[test]
+ fn test_union_mixed_hll_types() {
+ // Test union with different HLL types (Hll4, Hll6, Hll8)
+ let mut union = HllUnion::new(12);
+
+ // Sketch with Hll4
+ let mut sketch1 = HllSketch::new(12, HllType::Hll4);
+ for i in 0..3_000 {
+ sketch1.update(i);
+ }
+
+ // Sketch with Hll6
+ let mut sketch2 = HllSketch::new(12, HllType::Hll6);
+ for i in 2_000..5_000 {
+ sketch2.update(i);
+ }
+
+ // Sketch with Hll8
+ let mut sketch3 = HllSketch::new(12, HllType::Hll8);
+ for i in 4_000..7_000 {
+ sketch3.update(i);
+ }
+
+ union.update(&sketch1);
+ union.update(&sketch2);
+ union.update(&sketch3);
+
+ let result = union.get_result(HllType::Hll8);
+ let estimate = result.estimate();
+
+ // Should estimate ~7,000 unique values (0-6,999)
+ assert!(
+ estimate > 6_000.0 && estimate < 8_000.0,
+ "Expected estimate around 7000, got {}",
+ estimate
+ );
+ }
+
+ #[test]
+ fn test_union_multiple_downsizing_operations() {
+ // Test multiple gadget downsizing operations
+ let mut union = HllUnion::new(12);
+
+ // Start with lg_k=12
+ let mut sketch1 = HllSketch::new(12, HllType::Hll8);
+ for i in 0..5_000 {
+ sketch1.update(i);
+ }
+ union.update(&sketch1);
+ assert_eq!(union.lg_config_k(), 12);
+
+ // Downsize to lg_k=10
+ let mut sketch2 = HllSketch::new(10, HllType::Hll8);
+ for i in 4_000..8_000 {
+ sketch2.update(i);
+ }
+ union.update(&sketch2);
+ assert_eq!(union.lg_config_k(), 10);
+
+ // Downsize again to lg_k=8
+ let mut sketch3 = HllSketch::new(8, HllType::Hll8);
+ for i in 7_000..10_000 {
+ sketch3.update(i);
+ }
+ union.update(&sketch3);
+ assert_eq!(union.lg_config_k(), 8);
+
+ let result = union.get_result(HllType::Hll8);
+ let estimate = result.estimate();
+
+ // Should estimate ~10,000 unique values (0-9,999)
+ // Lower precision means higher error
+ assert!(
+ estimate > 8_000.0 && estimate < 12_000.0,
+ "Expected estimate around 10000, got {}",
+ estimate
+ );
+ }
+
+ #[test]
+ fn test_union_get_result_type_conversion_hll6() {
+ // Test getting result as Hll6
+ let mut union = HllUnion::new(12);
+
+ let mut sketch = HllSketch::new(12, HllType::Hll8);
+ for i in 0..5_000 {
+ sketch.update(i);
+ }
+
+ union.update(&sketch);
+
+ // Get result as Hll6
+ let result = union.get_result(HllType::Hll6);
+
+ // Verify it's Hll6
+ assert_eq!(result.target_type(), HllType::Hll6);
+
+ // Estimate should be similar
+ let estimate = result.estimate();
+ assert!(
+ estimate > 4_000.0 && estimate < 6_000.0,
+ "Expected estimate around 5000, got {}",
+ estimate
+ );
+ }
+
+ #[test]
+ fn test_union_get_result_type_conversion_hll4() {
+ // Test getting result as Hll4
+ let mut union = HllUnion::new(12);
+
+ let mut sketch = HllSketch::new(12, HllType::Hll8);
+ for i in 0..5_000 {
+ sketch.update(i);
+ }
+
+ union.update(&sketch);
+
+ // Get result as Hll4
+ let result = union.get_result(HllType::Hll4);
+
+ // Verify it's Hll4
+ assert_eq!(result.target_type(), HllType::Hll4);
+
+ // Estimate should be similar (Hll4 may have slightly different
precision)
+ let estimate = result.estimate();
+ assert!(
+ estimate > 4_000.0 && estimate < 6_000.0,
+ "Expected estimate around 5000, got {}",
+ estimate
+ );
+ }
+
+ #[test]
+ fn test_union_get_result_no_conversion_needed() {
+ // Test that requesting Hll8 when gadget is Hll8 just clones
+ let mut union = HllUnion::new(12);
+
+ let mut sketch = HllSketch::new(12, HllType::Hll8);
+ for i in 0..1_000 {
+ sketch.update(i);
+ }
+
+ union.update(&sketch);
+
+ // Get result as Hll8 (no conversion needed)
+ let result = union.get_result(HllType::Hll8);
+
+ // Verify it's Hll8
+ assert_eq!(result.target_type(), HllType::Hll8);
+
+ // Estimate should match
+ let estimate = result.estimate();
+ assert!(
+ estimate > 900.0 && estimate < 1_100.0,
+ "Expected estimate around 1000, got {}",
+ estimate
+ );
+ }
+
+ #[test]
+ fn test_union_get_result_from_list_mode() {
+ // Test type conversion when gadget is still in List mode
+ let mut union = HllUnion::new(12);
+
+ // Add just a few values so gadget stays in List mode
+ let mut sketch = HllSketch::new(12, HllType::Hll8);
+ sketch.update("a");
+ sketch.update("b");
+ sketch.update("c");
+
+ union.update(&sketch);
+
+ // Get result as Hll6 - should just change the target type
+ let result = union.get_result(HllType::Hll6);
+
+ assert_eq!(result.target_type(), HllType::Hll6);
+ assert!(matches!(result.mode(), Mode::List { .. }));
+
+ let estimate = result.estimate();
+ assert!(
+ (3.0..=5.0).contains(&estimate),
+ "Expected estimate around 3, got {}",
+ estimate
+ );
+ }
+
+ #[test]
+ fn test_union_hll6_arrays_with_overlap() {
+ // Test unioning Hll6 sketches (which will be in Array6 mode)
+ let mut union = HllUnion::new(12);
+
+ let mut sketch1 = HllSketch::new(12, HllType::Hll6);
+ for i in 0..10_000 {
+ sketch1.update(i);
+ }
+
+ let mut sketch2 = HllSketch::new(12, HllType::Hll6);
+ for i in 5_000..15_000 {
+ sketch2.update(i);
+ }
+
+ union.update(&sketch1);
+ union.update(&sketch2);
+
+ let result = union.get_result(HllType::Hll6);
+ let estimate = result.estimate();
+
+ // Should estimate ~15,000 unique values (0-14,999)
+ // Both sketches are Hll6, so we expect the full union
+ assert!(
+ estimate > 13_000.0 && estimate < 17_000.0,
+ "Expected estimate around 15000, got {}. This suggests sketch2
overwrote sketch1 instead of merging.",
+ estimate
+ );
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]