This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch cpcwrapper in repository https://gitbox.apache.org/repos/asf/datasketches-rust.git
commit 69a7529430ca93e180ac054310c84e7718f66ac9 Author: tison <[email protected]> AuthorDate: Fri Feb 20 00:16:45 2026 +0800 feat: impl CpcWrapper Signed-off-by: tison <[email protected]> --- CHANGELOG.md | 1 + datasketches/src/cpc/estimator.rs | 56 ++++++++--- datasketches/src/cpc/mod.rs | 3 + datasketches/src/cpc/serialization.rs | 48 +++++++++ datasketches/src/cpc/sketch.rs | 76 ++++++--------- datasketches/src/cpc/wrapper.rs | 173 +++++++++++++++++++++++++++++++++ datasketches/tests/cpc_wrapper_test.rs | 78 +++++++++++++++ 7 files changed, 373 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af1928f..ce094cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ All significant changes to this project will be documented in this file. * `CountMinSketch` with unsigned values now supports `halve` and `decay` operations. * `CpcSketch` and `CpcUnion` are now available for cardinality estimation. +* `CpcWrapper` is now available for read `CpcSketch`'s estimation from a serialized sketch without full deserialization. * `FrequentItemsSketch` now supports serde for any value implement `FrequentItemValue` (builtin supports for `i64`, `u64`, and `String`). * Expose `codec::SketchBytes`, `codec::SketchSlice`, and `FrequentItemValue` as public API. diff --git a/datasketches/src/cpc/estimator.rs b/datasketches/src/cpc/estimator.rs index 81a2252..ef00b0b 100644 --- a/datasketches/src/cpc/estimator.rs +++ b/datasketches/src/cpc/estimator.rs @@ -88,7 +88,43 @@ static HIP_HIGH_SIDE_DATA: [u16; 33] = [ 5880, 5914, 5953, // 14 1000297 ]; -pub(super) fn icon_confidence_lb(lg_k: u8, num_coupons: u32, kappa: NumStdDev) -> f64 { +pub(super) fn estimate(merge_flag: bool, hip_est_accum: f64, lg_k: u8, num_coupons: u32) -> f64 { + if !merge_flag { + hip_est_accum + } else { + icon_estimate(lg_k, num_coupons) + } +} + +pub(super) fn lower_bound( + merge_flag: bool, + hip_est_accum: f64, + lg_k: u8, + num_coupons: u32, + kappa: NumStdDev, +) -> f64 { + if !merge_flag { + hip_confidence_lb(lg_k, num_coupons, hip_est_accum, kappa) + } else { + icon_confidence_lb(lg_k, num_coupons, kappa) + } +} + +pub(super) fn upper_bound( + merge_flag: bool, + hip_est_accum: f64, + lg_k: u8, + num_coupons: u32, + kappa: NumStdDev, +) -> f64 { + if !merge_flag { + hip_confidence_ub(lg_k, num_coupons, hip_est_accum, kappa) + } else { + icon_confidence_ub(lg_k, num_coupons, kappa) + } +} + +fn icon_confidence_lb(lg_k: u8, num_coupons: u32, kappa: NumStdDev) -> f64 { if num_coupons == 0 { return 0.0; } @@ -112,7 +148,7 @@ pub(super) fn icon_confidence_lb(lg_k: u8, num_coupons: u32, kappa: NumStdDev) - } } -pub(super) fn icon_confidence_ub(lg_k: u8, num_coupons: u32, kappa: NumStdDev) -> f64 { +fn icon_confidence_ub(lg_k: u8, num_coupons: u32, kappa: NumStdDev) -> f64 { if num_coupons == 0 { return 0.0; } @@ -132,12 +168,7 @@ pub(super) fn icon_confidence_ub(lg_k: u8, num_coupons: u32, kappa: NumStdDev) - result.ceil() // slight widening of interval to be conservative } -pub(super) fn hip_confidence_lb( - lg_k: u8, - num_coupons: u32, - hip_est_accum: f64, - kappa: NumStdDev, -) -> f64 { +fn hip_confidence_lb(lg_k: u8, num_coupons: u32, hip_est_accum: f64, kappa: NumStdDev) -> f64 { if num_coupons == 0 { return 0.0; } @@ -160,12 +191,7 @@ pub(super) fn hip_confidence_lb( } } -pub(super) fn hip_confidence_ub( - lg_k: u8, - num_coupons: u32, - hip_est_accum: f64, - kappa: NumStdDev, -) -> f64 { +fn hip_confidence_ub(lg_k: u8, num_coupons: u32, hip_est_accum: f64, kappa: NumStdDev) -> f64 { if num_coupons == 0 { return 0.0; } @@ -362,7 +388,7 @@ fn icon_exponential_approximation(k: f64, c: f64) -> f64 { 0.7940236163830469 * k * 2f64.powf(c / k) } -pub(super) fn icon_estimate(lg_k: u8, num_coupons: u32) -> f64 { +fn icon_estimate(lg_k: u8, num_coupons: u32) -> f64 { let lg_k = lg_k as usize; assert!( (ICON_MIN_LOG_K..=ICON_MAX_LOG_K).contains(&lg_k), diff --git a/datasketches/src/cpc/mod.rs b/datasketches/src/cpc/mod.rs index bbc38f8..0313c98 100644 --- a/datasketches/src/cpc/mod.rs +++ b/datasketches/src/cpc/mod.rs @@ -40,11 +40,14 @@ mod compression_data; mod estimator; mod kxp_byte_lookup; mod pair_table; +mod serialization; mod sketch; mod union; +mod wrapper; pub use self::sketch::CpcSketch; pub use self::union::CpcUnion; +pub use self::wrapper::CpcWrapper; /// Default log2 of K. const DEFAULT_LG_K: u8 = 11; diff --git a/datasketches/src/cpc/serialization.rs b/datasketches/src/cpc/serialization.rs new file mode 100644 index 0000000..3267ec3 --- /dev/null +++ b/datasketches/src/cpc/serialization.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(super) const SERIAL_VERSION: u8 = 1; +pub(super) const FLAG_COMPRESSED: u8 = 1; +pub(super) const FLAG_HAS_HIP: u8 = 2; +pub(super) const FLAG_HAS_TABLE: u8 = 3; +pub(super) const FLAG_HAS_WINDOW: u8 = 4; + +pub(super) fn make_preamble_ints( + num_coupons: u32, + has_hip: bool, + has_table: bool, + has_window: bool, +) -> u8 { + let mut preamble_ints = 2; + if num_coupons > 0 { + preamble_ints += 1; // number of coupons + if has_hip { + preamble_ints += 4; // HIP + } + if has_table { + preamble_ints += 1; // table data length + // number of values (if there is no window it is the same as number of coupons) + if has_window { + preamble_ints += 1; + } + } + if has_window { + preamble_ints += 1; // window length + } + } + preamble_ints +} diff --git a/datasketches/src/cpc/sketch.rs b/datasketches/src/cpc/sketch.rs index 581fce5..15c818f 100644 --- a/datasketches/src/cpc/sketch.rs +++ b/datasketches/src/cpc/sketch.rs @@ -33,13 +33,17 @@ use crate::cpc::compression::CompressedState; use crate::cpc::count_bits_set_in_matrix; use crate::cpc::determine_correct_offset; use crate::cpc::determine_flavor; -use crate::cpc::estimator::hip_confidence_lb; -use crate::cpc::estimator::hip_confidence_ub; -use crate::cpc::estimator::icon_confidence_lb; -use crate::cpc::estimator::icon_confidence_ub; -use crate::cpc::estimator::icon_estimate; +use crate::cpc::estimator::estimate; +use crate::cpc::estimator::lower_bound; +use crate::cpc::estimator::upper_bound; use crate::cpc::kxp_byte_lookup::KXP_BYTE_TABLE; use crate::cpc::pair_table::PairTable; +use crate::cpc::serialization::FLAG_COMPRESSED; +use crate::cpc::serialization::FLAG_HAS_HIP; +use crate::cpc::serialization::FLAG_HAS_TABLE; +use crate::cpc::serialization::FLAG_HAS_WINDOW; +use crate::cpc::serialization::SERIAL_VERSION; +use crate::cpc::serialization::make_preamble_ints; use crate::error::Error; use crate::error::ErrorKind; use crate::hash::DEFAULT_UPDATE_SEED; @@ -130,29 +134,34 @@ impl CpcSketch { /// Returns the best estimate of the cardinality of the sketch. pub fn estimate(&self) -> f64 { - if !self.merge_flag { - self.hip_est_accum - } else { - icon_estimate(self.lg_k, self.num_coupons) - } + estimate( + self.merge_flag, + self.hip_est_accum, + self.lg_k, + self.num_coupons, + ) } /// Returns the best estimate of the lower bound of the confidence interval given `kappa`. pub fn lower_bound(&self, kappa: NumStdDev) -> f64 { - if !self.merge_flag { - hip_confidence_lb(self.lg_k, self.num_coupons, self.hip_est_accum, kappa) - } else { - icon_confidence_lb(self.lg_k, self.num_coupons, kappa) - } + lower_bound( + self.merge_flag, + self.hip_est_accum, + self.lg_k, + self.num_coupons, + kappa, + ) } /// Returns the best estimate of the upper bound of the confidence interval given `kappa`. pub fn upper_bound(&self, kappa: NumStdDev) -> f64 { - if !self.merge_flag { - hip_confidence_ub(self.lg_k, self.num_coupons, self.hip_est_accum, kappa) - } else { - icon_confidence_ub(self.lg_k, self.num_coupons, kappa) - } + upper_bound( + self.merge_flag, + self.hip_est_accum, + self.lg_k, + self.num_coupons, + kappa, + ) } /// Returns true if the sketch is empty. @@ -437,12 +446,6 @@ impl CpcSketch { } } -const SERIAL_VERSION: u8 = 1; -const FLAG_COMPRESSED: u8 = 1; -const FLAG_HAS_HIP: u8 = 2; -const FLAG_HAS_TABLE: u8 = 3; -const FLAG_HAS_WINDOW: u8 = 4; - impl CpcSketch { /// Serializes this CpcSketch to bytes. pub fn serialize(&self) -> Vec<u8> { @@ -637,27 +640,6 @@ impl CpcSketch { } } -fn make_preamble_ints(num_coupons: u32, has_hip: bool, has_table: bool, has_window: bool) -> u8 { - let mut preamble_ints = 2; - if num_coupons > 0 { - preamble_ints += 1; // number of coupons - if has_hip { - preamble_ints += 4; // HIP - } - if has_table { - preamble_ints += 1; // table data length - // number of values (if there is no window it is the same as number of coupons) - if has_window { - preamble_ints += 1; - } - } - if has_window { - preamble_ints += 1; // window length - } - } - preamble_ints -} - impl CpcSketch { /// Returns the estimated maximum compressed serialized size of a sketch. /// diff --git a/datasketches/src/cpc/wrapper.rs b/datasketches/src/cpc/wrapper.rs new file mode 100644 index 0000000..05fd1ec --- /dev/null +++ b/datasketches/src/cpc/wrapper.rs @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::codec::SketchSlice; +use crate::codec::family::Family; +use crate::codec::utility::ensure_preamble_longs_in; +use crate::codec::utility::ensure_serial_version_is; +use crate::common::NumStdDev; +use crate::cpc::MAX_LG_K; +use crate::cpc::MIN_LG_K; +use crate::cpc::estimator::estimate; +use crate::cpc::estimator::lower_bound; +use crate::cpc::estimator::upper_bound; +use crate::cpc::serialization::FLAG_COMPRESSED; +use crate::cpc::serialization::FLAG_HAS_HIP; +use crate::cpc::serialization::FLAG_HAS_TABLE; +use crate::cpc::serialization::FLAG_HAS_WINDOW; +use crate::cpc::serialization::SERIAL_VERSION; +use crate::cpc::serialization::make_preamble_ints; +use crate::error::Error; +use crate::error::ErrorKind; + +/// A read-only view of a serialized image of a CpcSketch. +#[derive(Debug, Clone)] +pub struct CpcWrapper { + lg_k: u8, + merge_flag: bool, + num_coupons: u32, + hip_est_accum: f64, +} + +impl CpcWrapper { + /// Creates a new `CpcWrapper` from the given byte slice without copying. + pub fn new(bytes: &[u8]) -> Result<Self, Error> { + fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) -> Error { + move |_| Error::insufficient_data(tag) + } + + let mut cursor = SketchSlice::new(bytes); + let preamble_ints = cursor.read_u8().map_err(make_error("preamble_ints"))?; + let serial_version = cursor.read_u8().map_err(make_error("serial_version"))?; + let family_id = cursor.read_u8().map_err(make_error("family_id"))?; + Family::CPC.validate_id(family_id)?; + ensure_serial_version_is(SERIAL_VERSION, serial_version)?; + + let lg_k = cursor.read_u8().map_err(make_error("lg_k"))?; + let first_interesting_column = cursor + .read_u8() + .map_err(make_error("first_interesting_column"))?; + if !(MIN_LG_K..=MAX_LG_K).contains(&lg_k) { + return Err(Error::invalid_argument(format!( + "lg_k out of range; got {}", + lg_k + ))); + } + if first_interesting_column > 63 { + return Err(Error::invalid_argument(format!( + "first_interesting_column out of range; got {}", + first_interesting_column + ))); + } + + let flags = cursor.read_u8().map_err(make_error("flags"))?; + let is_compressed = flags & (1 << FLAG_COMPRESSED) != 0; + if !is_compressed { + return Err(Error::new( + ErrorKind::InvalidData, + "only compressed sketches are supported", + )); + } + let has_hip = flags & (1 << FLAG_HAS_HIP) != 0; + let has_table = flags & (1 << FLAG_HAS_TABLE) != 0; + let has_window = flags & (1 << FLAG_HAS_WINDOW) != 0; + + cursor.read_u16_le().map_err(make_error("seed_hash"))?; + + let mut num_coupons = 0; + let mut hip_est_accum = 0.0; + + if has_table || has_window { + num_coupons = cursor.read_u32_le().map_err(make_error("num_coupons"))?; + if has_table && has_window { + cursor + .read_u32_le() + .map_err(make_error("table_num_entries"))?; + if has_hip { + cursor.read_f64_le().map_err(make_error("kxp"))?; + hip_est_accum = cursor.read_f64_le().map_err(make_error("hip_est_accum"))?; + } + } + if has_table { + cursor + .read_u32_le() + .map_err(make_error("table_data_words"))? as usize; + } + if has_window { + cursor + .read_u32_le() + .map_err(make_error("window_data_words"))? as usize; + } + if has_hip && !(has_table && has_window) { + cursor.read_f64_le().map_err(make_error("kxp"))?; + hip_est_accum = cursor.read_f64_le().map_err(make_error("hip_est_accum"))?; + } + } + + let expected_preamble_ints = + make_preamble_ints(num_coupons, has_hip, has_table, has_window); + ensure_preamble_longs_in(&[expected_preamble_ints], preamble_ints)?; + Ok(CpcWrapper { + lg_k, + merge_flag: !has_hip, + num_coupons, + hip_est_accum, + }) + } + + /// Return the parameter lg_k. + pub fn lg_k(&self) -> u8 { + self.lg_k + } + + /// Returns the best estimate of the cardinality of the sketch. + pub fn estimate(&self) -> f64 { + estimate( + self.merge_flag, + self.hip_est_accum, + self.lg_k, + self.num_coupons, + ) + } + + /// Returns the best estimate of the lower bound of the confidence interval given `kappa`. + pub fn lower_bound(&self, kappa: NumStdDev) -> f64 { + lower_bound( + self.merge_flag, + self.hip_est_accum, + self.lg_k, + self.num_coupons, + kappa, + ) + } + + /// Returns the best estimate of the upper bound of the confidence interval given `kappa`. + pub fn upper_bound(&self, kappa: NumStdDev) -> f64 { + upper_bound( + self.merge_flag, + self.hip_est_accum, + self.lg_k, + self.num_coupons, + kappa, + ) + } + + /// Returns true if the sketch is empty. + pub fn is_empty(&self) -> bool { + self.num_coupons == 0 + } +} diff --git a/datasketches/tests/cpc_wrapper_test.rs b/datasketches/tests/cpc_wrapper_test.rs new file mode 100644 index 0000000..846048c --- /dev/null +++ b/datasketches/tests/cpc_wrapper_test.rs @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datasketches::common::NumStdDev; +use datasketches::cpc::CpcSketch; +use datasketches::cpc::CpcUnion; +use datasketches::cpc::CpcWrapper; +use googletest::assert_that; +use googletest::prelude::contains_substring; +use googletest::prelude::eq; + +#[test] +fn test_cpc_wrapper() { + let lg_k = 10; + let mut sk1 = CpcSketch::new(lg_k); + let mut sk2 = CpcSketch::new(lg_k); + let mut sk_dst = CpcSketch::new(lg_k); + + let n = 100000; + for i in 0..n { + sk1.update(i); + sk2.update(i + n); + sk_dst.update(i); + sk_dst.update(i + n); + } + + let dst_est = sk_dst.estimate(); + let dst_lb = sk_dst.lower_bound(NumStdDev::Two); + let dst_ub = sk_dst.upper_bound(NumStdDev::Two); + + let concat_bytes = sk_dst.serialize(); + let concat_wrapper = CpcWrapper::new(&concat_bytes).unwrap(); + assert_that!(concat_wrapper.lg_k(), eq(lg_k)); + assert_that!(concat_wrapper.estimate(), eq(dst_est)); + assert_that!(concat_wrapper.lower_bound(NumStdDev::Two), eq(dst_lb)); + assert_that!(concat_wrapper.upper_bound(NumStdDev::Two), eq(dst_ub)); + + let mut union = CpcUnion::new(lg_k); + union.update(&sk1); + union.update(&sk2); + let merged = union.to_sketch(); + let merged_est = merged.estimate(); + let merged_lb = merged.lower_bound(NumStdDev::Two); + let merged_ub = merged.upper_bound(NumStdDev::Two); + + let merged_bytes = merged.serialize(); + let merged_wrapper = CpcWrapper::new(&merged_bytes).unwrap(); + assert_that!(merged_wrapper.lg_k(), eq(lg_k)); + assert_that!(merged_wrapper.estimate(), eq(merged_est)); + assert_that!(merged_wrapper.lower_bound(NumStdDev::Two), eq(merged_lb)); + assert_that!(merged_wrapper.upper_bound(NumStdDev::Two), eq(merged_ub)); +} + +#[test] +fn test_is_compressed() { + let sketch = CpcSketch::new(10); + let mut bytes = sketch.serialize(); + bytes[5] &= (-3i8) as u8; // clear compressed flag + let err = CpcWrapper::new(&bytes).unwrap_err(); + assert_that!( + err.message(), + contains_substring("only compressed sketches are supported") + ); +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
