This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch cpcwrapper
in repository https://gitbox.apache.org/repos/asf/datasketches-rust.git

commit 69a7529430ca93e180ac054310c84e7718f66ac9
Author: tison <[email protected]>
AuthorDate: Fri Feb 20 00:16:45 2026 +0800

    feat: impl CpcWrapper
    
    Signed-off-by: tison <[email protected]>
---
 CHANGELOG.md                           |   1 +
 datasketches/src/cpc/estimator.rs      |  56 ++++++++---
 datasketches/src/cpc/mod.rs            |   3 +
 datasketches/src/cpc/serialization.rs  |  48 +++++++++
 datasketches/src/cpc/sketch.rs         |  76 ++++++---------
 datasketches/src/cpc/wrapper.rs        | 173 +++++++++++++++++++++++++++++++++
 datasketches/tests/cpc_wrapper_test.rs |  78 +++++++++++++++
 7 files changed, 373 insertions(+), 62 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index af1928f..ce094cf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -13,6 +13,7 @@ All significant changes to this project will be documented in 
this file.
 
 * `CountMinSketch` with unsigned values now supports `halve` and `decay` 
operations.
 * `CpcSketch` and `CpcUnion` are now available for cardinality estimation.
+* `CpcWrapper` is now available for read `CpcSketch`'s estimation from a 
serialized sketch without full deserialization.
 * `FrequentItemsSketch` now supports serde for any value implement 
`FrequentItemValue` (builtin supports for `i64`, `u64`, and `String`).
 * Expose `codec::SketchBytes`, `codec::SketchSlice`, and `FrequentItemValue` 
as public API.
 
diff --git a/datasketches/src/cpc/estimator.rs 
b/datasketches/src/cpc/estimator.rs
index 81a2252..ef00b0b 100644
--- a/datasketches/src/cpc/estimator.rs
+++ b/datasketches/src/cpc/estimator.rs
@@ -88,7 +88,43 @@ static HIP_HIGH_SIDE_DATA: [u16; 33] = [
     5880, 5914, 5953, // 14 1000297
 ];
 
-pub(super) fn icon_confidence_lb(lg_k: u8, num_coupons: u32, kappa: NumStdDev) 
-> f64 {
+pub(super) fn estimate(merge_flag: bool, hip_est_accum: f64, lg_k: u8, 
num_coupons: u32) -> f64 {
+    if !merge_flag {
+        hip_est_accum
+    } else {
+        icon_estimate(lg_k, num_coupons)
+    }
+}
+
+pub(super) fn lower_bound(
+    merge_flag: bool,
+    hip_est_accum: f64,
+    lg_k: u8,
+    num_coupons: u32,
+    kappa: NumStdDev,
+) -> f64 {
+    if !merge_flag {
+        hip_confidence_lb(lg_k, num_coupons, hip_est_accum, kappa)
+    } else {
+        icon_confidence_lb(lg_k, num_coupons, kappa)
+    }
+}
+
+pub(super) fn upper_bound(
+    merge_flag: bool,
+    hip_est_accum: f64,
+    lg_k: u8,
+    num_coupons: u32,
+    kappa: NumStdDev,
+) -> f64 {
+    if !merge_flag {
+        hip_confidence_ub(lg_k, num_coupons, hip_est_accum, kappa)
+    } else {
+        icon_confidence_ub(lg_k, num_coupons, kappa)
+    }
+}
+
+fn icon_confidence_lb(lg_k: u8, num_coupons: u32, kappa: NumStdDev) -> f64 {
     if num_coupons == 0 {
         return 0.0;
     }
@@ -112,7 +148,7 @@ pub(super) fn icon_confidence_lb(lg_k: u8, num_coupons: 
u32, kappa: NumStdDev) -
     }
 }
 
-pub(super) fn icon_confidence_ub(lg_k: u8, num_coupons: u32, kappa: NumStdDev) 
-> f64 {
+fn icon_confidence_ub(lg_k: u8, num_coupons: u32, kappa: NumStdDev) -> f64 {
     if num_coupons == 0 {
         return 0.0;
     }
@@ -132,12 +168,7 @@ pub(super) fn icon_confidence_ub(lg_k: u8, num_coupons: 
u32, kappa: NumStdDev) -
     result.ceil() // slight widening of interval to be conservative
 }
 
-pub(super) fn hip_confidence_lb(
-    lg_k: u8,
-    num_coupons: u32,
-    hip_est_accum: f64,
-    kappa: NumStdDev,
-) -> f64 {
+fn hip_confidence_lb(lg_k: u8, num_coupons: u32, hip_est_accum: f64, kappa: 
NumStdDev) -> f64 {
     if num_coupons == 0 {
         return 0.0;
     }
@@ -160,12 +191,7 @@ pub(super) fn hip_confidence_lb(
     }
 }
 
-pub(super) fn hip_confidence_ub(
-    lg_k: u8,
-    num_coupons: u32,
-    hip_est_accum: f64,
-    kappa: NumStdDev,
-) -> f64 {
+fn hip_confidence_ub(lg_k: u8, num_coupons: u32, hip_est_accum: f64, kappa: 
NumStdDev) -> f64 {
     if num_coupons == 0 {
         return 0.0;
     }
@@ -362,7 +388,7 @@ fn icon_exponential_approximation(k: f64, c: f64) -> f64 {
     0.7940236163830469 * k * 2f64.powf(c / k)
 }
 
-pub(super) fn icon_estimate(lg_k: u8, num_coupons: u32) -> f64 {
+fn icon_estimate(lg_k: u8, num_coupons: u32) -> f64 {
     let lg_k = lg_k as usize;
     assert!(
         (ICON_MIN_LOG_K..=ICON_MAX_LOG_K).contains(&lg_k),
diff --git a/datasketches/src/cpc/mod.rs b/datasketches/src/cpc/mod.rs
index bbc38f8..0313c98 100644
--- a/datasketches/src/cpc/mod.rs
+++ b/datasketches/src/cpc/mod.rs
@@ -40,11 +40,14 @@ mod compression_data;
 mod estimator;
 mod kxp_byte_lookup;
 mod pair_table;
+mod serialization;
 mod sketch;
 mod union;
+mod wrapper;
 
 pub use self::sketch::CpcSketch;
 pub use self::union::CpcUnion;
+pub use self::wrapper::CpcWrapper;
 
 /// Default log2 of K.
 const DEFAULT_LG_K: u8 = 11;
diff --git a/datasketches/src/cpc/serialization.rs 
b/datasketches/src/cpc/serialization.rs
new file mode 100644
index 0000000..3267ec3
--- /dev/null
+++ b/datasketches/src/cpc/serialization.rs
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub(super) const SERIAL_VERSION: u8 = 1;
+pub(super) const FLAG_COMPRESSED: u8 = 1;
+pub(super) const FLAG_HAS_HIP: u8 = 2;
+pub(super) const FLAG_HAS_TABLE: u8 = 3;
+pub(super) const FLAG_HAS_WINDOW: u8 = 4;
+
+pub(super) fn make_preamble_ints(
+    num_coupons: u32,
+    has_hip: bool,
+    has_table: bool,
+    has_window: bool,
+) -> u8 {
+    let mut preamble_ints = 2;
+    if num_coupons > 0 {
+        preamble_ints += 1; // number of coupons
+        if has_hip {
+            preamble_ints += 4; // HIP
+        }
+        if has_table {
+            preamble_ints += 1; // table data length
+            // number of values (if there is no window it is the same as 
number of coupons)
+            if has_window {
+                preamble_ints += 1;
+            }
+        }
+        if has_window {
+            preamble_ints += 1; // window length
+        }
+    }
+    preamble_ints
+}
diff --git a/datasketches/src/cpc/sketch.rs b/datasketches/src/cpc/sketch.rs
index 581fce5..15c818f 100644
--- a/datasketches/src/cpc/sketch.rs
+++ b/datasketches/src/cpc/sketch.rs
@@ -33,13 +33,17 @@ use crate::cpc::compression::CompressedState;
 use crate::cpc::count_bits_set_in_matrix;
 use crate::cpc::determine_correct_offset;
 use crate::cpc::determine_flavor;
-use crate::cpc::estimator::hip_confidence_lb;
-use crate::cpc::estimator::hip_confidence_ub;
-use crate::cpc::estimator::icon_confidence_lb;
-use crate::cpc::estimator::icon_confidence_ub;
-use crate::cpc::estimator::icon_estimate;
+use crate::cpc::estimator::estimate;
+use crate::cpc::estimator::lower_bound;
+use crate::cpc::estimator::upper_bound;
 use crate::cpc::kxp_byte_lookup::KXP_BYTE_TABLE;
 use crate::cpc::pair_table::PairTable;
+use crate::cpc::serialization::FLAG_COMPRESSED;
+use crate::cpc::serialization::FLAG_HAS_HIP;
+use crate::cpc::serialization::FLAG_HAS_TABLE;
+use crate::cpc::serialization::FLAG_HAS_WINDOW;
+use crate::cpc::serialization::SERIAL_VERSION;
+use crate::cpc::serialization::make_preamble_ints;
 use crate::error::Error;
 use crate::error::ErrorKind;
 use crate::hash::DEFAULT_UPDATE_SEED;
@@ -130,29 +134,34 @@ impl CpcSketch {
 
     /// Returns the best estimate of the cardinality of the sketch.
     pub fn estimate(&self) -> f64 {
-        if !self.merge_flag {
-            self.hip_est_accum
-        } else {
-            icon_estimate(self.lg_k, self.num_coupons)
-        }
+        estimate(
+            self.merge_flag,
+            self.hip_est_accum,
+            self.lg_k,
+            self.num_coupons,
+        )
     }
 
     /// Returns the best estimate of the lower bound of the confidence 
interval given `kappa`.
     pub fn lower_bound(&self, kappa: NumStdDev) -> f64 {
-        if !self.merge_flag {
-            hip_confidence_lb(self.lg_k, self.num_coupons, self.hip_est_accum, 
kappa)
-        } else {
-            icon_confidence_lb(self.lg_k, self.num_coupons, kappa)
-        }
+        lower_bound(
+            self.merge_flag,
+            self.hip_est_accum,
+            self.lg_k,
+            self.num_coupons,
+            kappa,
+        )
     }
 
     /// Returns the best estimate of the upper bound of the confidence 
interval given `kappa`.
     pub fn upper_bound(&self, kappa: NumStdDev) -> f64 {
-        if !self.merge_flag {
-            hip_confidence_ub(self.lg_k, self.num_coupons, self.hip_est_accum, 
kappa)
-        } else {
-            icon_confidence_ub(self.lg_k, self.num_coupons, kappa)
-        }
+        upper_bound(
+            self.merge_flag,
+            self.hip_est_accum,
+            self.lg_k,
+            self.num_coupons,
+            kappa,
+        )
     }
 
     /// Returns true if the sketch is empty.
@@ -437,12 +446,6 @@ impl CpcSketch {
     }
 }
 
-const SERIAL_VERSION: u8 = 1;
-const FLAG_COMPRESSED: u8 = 1;
-const FLAG_HAS_HIP: u8 = 2;
-const FLAG_HAS_TABLE: u8 = 3;
-const FLAG_HAS_WINDOW: u8 = 4;
-
 impl CpcSketch {
     /// Serializes this CpcSketch to bytes.
     pub fn serialize(&self) -> Vec<u8> {
@@ -637,27 +640,6 @@ impl CpcSketch {
     }
 }
 
-fn make_preamble_ints(num_coupons: u32, has_hip: bool, has_table: bool, 
has_window: bool) -> u8 {
-    let mut preamble_ints = 2;
-    if num_coupons > 0 {
-        preamble_ints += 1; // number of coupons
-        if has_hip {
-            preamble_ints += 4; // HIP
-        }
-        if has_table {
-            preamble_ints += 1; // table data length
-            // number of values (if there is no window it is the same as 
number of coupons)
-            if has_window {
-                preamble_ints += 1;
-            }
-        }
-        if has_window {
-            preamble_ints += 1; // window length
-        }
-    }
-    preamble_ints
-}
-
 impl CpcSketch {
     /// Returns the estimated maximum compressed serialized size of a sketch.
     ///
diff --git a/datasketches/src/cpc/wrapper.rs b/datasketches/src/cpc/wrapper.rs
new file mode 100644
index 0000000..05fd1ec
--- /dev/null
+++ b/datasketches/src/cpc/wrapper.rs
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::codec::SketchSlice;
+use crate::codec::family::Family;
+use crate::codec::utility::ensure_preamble_longs_in;
+use crate::codec::utility::ensure_serial_version_is;
+use crate::common::NumStdDev;
+use crate::cpc::MAX_LG_K;
+use crate::cpc::MIN_LG_K;
+use crate::cpc::estimator::estimate;
+use crate::cpc::estimator::lower_bound;
+use crate::cpc::estimator::upper_bound;
+use crate::cpc::serialization::FLAG_COMPRESSED;
+use crate::cpc::serialization::FLAG_HAS_HIP;
+use crate::cpc::serialization::FLAG_HAS_TABLE;
+use crate::cpc::serialization::FLAG_HAS_WINDOW;
+use crate::cpc::serialization::SERIAL_VERSION;
+use crate::cpc::serialization::make_preamble_ints;
+use crate::error::Error;
+use crate::error::ErrorKind;
+
+/// A read-only view of a serialized image of a CpcSketch.
+#[derive(Debug, Clone)]
+pub struct CpcWrapper {
+    lg_k: u8,
+    merge_flag: bool,
+    num_coupons: u32,
+    hip_est_accum: f64,
+}
+
+impl CpcWrapper {
+    /// Creates a new `CpcWrapper` from the given byte slice without copying.
+    pub fn new(bytes: &[u8]) -> Result<Self, Error> {
+        fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) -> 
Error {
+            move |_| Error::insufficient_data(tag)
+        }
+
+        let mut cursor = SketchSlice::new(bytes);
+        let preamble_ints = 
cursor.read_u8().map_err(make_error("preamble_ints"))?;
+        let serial_version = 
cursor.read_u8().map_err(make_error("serial_version"))?;
+        let family_id = cursor.read_u8().map_err(make_error("family_id"))?;
+        Family::CPC.validate_id(family_id)?;
+        ensure_serial_version_is(SERIAL_VERSION, serial_version)?;
+
+        let lg_k = cursor.read_u8().map_err(make_error("lg_k"))?;
+        let first_interesting_column = cursor
+            .read_u8()
+            .map_err(make_error("first_interesting_column"))?;
+        if !(MIN_LG_K..=MAX_LG_K).contains(&lg_k) {
+            return Err(Error::invalid_argument(format!(
+                "lg_k out of range; got {}",
+                lg_k
+            )));
+        }
+        if first_interesting_column > 63 {
+            return Err(Error::invalid_argument(format!(
+                "first_interesting_column out of range; got {}",
+                first_interesting_column
+            )));
+        }
+
+        let flags = cursor.read_u8().map_err(make_error("flags"))?;
+        let is_compressed = flags & (1 << FLAG_COMPRESSED) != 0;
+        if !is_compressed {
+            return Err(Error::new(
+                ErrorKind::InvalidData,
+                "only compressed sketches are supported",
+            ));
+        }
+        let has_hip = flags & (1 << FLAG_HAS_HIP) != 0;
+        let has_table = flags & (1 << FLAG_HAS_TABLE) != 0;
+        let has_window = flags & (1 << FLAG_HAS_WINDOW) != 0;
+
+        cursor.read_u16_le().map_err(make_error("seed_hash"))?;
+
+        let mut num_coupons = 0;
+        let mut hip_est_accum = 0.0;
+
+        if has_table || has_window {
+            num_coupons = 
cursor.read_u32_le().map_err(make_error("num_coupons"))?;
+            if has_table && has_window {
+                cursor
+                    .read_u32_le()
+                    .map_err(make_error("table_num_entries"))?;
+                if has_hip {
+                    cursor.read_f64_le().map_err(make_error("kxp"))?;
+                    hip_est_accum = 
cursor.read_f64_le().map_err(make_error("hip_est_accum"))?;
+                }
+            }
+            if has_table {
+                cursor
+                    .read_u32_le()
+                    .map_err(make_error("table_data_words"))? as usize;
+            }
+            if has_window {
+                cursor
+                    .read_u32_le()
+                    .map_err(make_error("window_data_words"))? as usize;
+            }
+            if has_hip && !(has_table && has_window) {
+                cursor.read_f64_le().map_err(make_error("kxp"))?;
+                hip_est_accum = 
cursor.read_f64_le().map_err(make_error("hip_est_accum"))?;
+            }
+        }
+
+        let expected_preamble_ints =
+            make_preamble_ints(num_coupons, has_hip, has_table, has_window);
+        ensure_preamble_longs_in(&[expected_preamble_ints], preamble_ints)?;
+        Ok(CpcWrapper {
+            lg_k,
+            merge_flag: !has_hip,
+            num_coupons,
+            hip_est_accum,
+        })
+    }
+
+    /// Return the parameter lg_k.
+    pub fn lg_k(&self) -> u8 {
+        self.lg_k
+    }
+
+    /// Returns the best estimate of the cardinality of the sketch.
+    pub fn estimate(&self) -> f64 {
+        estimate(
+            self.merge_flag,
+            self.hip_est_accum,
+            self.lg_k,
+            self.num_coupons,
+        )
+    }
+
+    /// Returns the best estimate of the lower bound of the confidence 
interval given `kappa`.
+    pub fn lower_bound(&self, kappa: NumStdDev) -> f64 {
+        lower_bound(
+            self.merge_flag,
+            self.hip_est_accum,
+            self.lg_k,
+            self.num_coupons,
+            kappa,
+        )
+    }
+
+    /// Returns the best estimate of the upper bound of the confidence 
interval given `kappa`.
+    pub fn upper_bound(&self, kappa: NumStdDev) -> f64 {
+        upper_bound(
+            self.merge_flag,
+            self.hip_est_accum,
+            self.lg_k,
+            self.num_coupons,
+            kappa,
+        )
+    }
+
+    /// Returns true if the sketch is empty.
+    pub fn is_empty(&self) -> bool {
+        self.num_coupons == 0
+    }
+}
diff --git a/datasketches/tests/cpc_wrapper_test.rs 
b/datasketches/tests/cpc_wrapper_test.rs
new file mode 100644
index 0000000..846048c
--- /dev/null
+++ b/datasketches/tests/cpc_wrapper_test.rs
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datasketches::common::NumStdDev;
+use datasketches::cpc::CpcSketch;
+use datasketches::cpc::CpcUnion;
+use datasketches::cpc::CpcWrapper;
+use googletest::assert_that;
+use googletest::prelude::contains_substring;
+use googletest::prelude::eq;
+
+#[test]
+fn test_cpc_wrapper() {
+    let lg_k = 10;
+    let mut sk1 = CpcSketch::new(lg_k);
+    let mut sk2 = CpcSketch::new(lg_k);
+    let mut sk_dst = CpcSketch::new(lg_k);
+
+    let n = 100000;
+    for i in 0..n {
+        sk1.update(i);
+        sk2.update(i + n);
+        sk_dst.update(i);
+        sk_dst.update(i + n);
+    }
+
+    let dst_est = sk_dst.estimate();
+    let dst_lb = sk_dst.lower_bound(NumStdDev::Two);
+    let dst_ub = sk_dst.upper_bound(NumStdDev::Two);
+
+    let concat_bytes = sk_dst.serialize();
+    let concat_wrapper = CpcWrapper::new(&concat_bytes).unwrap();
+    assert_that!(concat_wrapper.lg_k(), eq(lg_k));
+    assert_that!(concat_wrapper.estimate(), eq(dst_est));
+    assert_that!(concat_wrapper.lower_bound(NumStdDev::Two), eq(dst_lb));
+    assert_that!(concat_wrapper.upper_bound(NumStdDev::Two), eq(dst_ub));
+
+    let mut union = CpcUnion::new(lg_k);
+    union.update(&sk1);
+    union.update(&sk2);
+    let merged = union.to_sketch();
+    let merged_est = merged.estimate();
+    let merged_lb = merged.lower_bound(NumStdDev::Two);
+    let merged_ub = merged.upper_bound(NumStdDev::Two);
+
+    let merged_bytes = merged.serialize();
+    let merged_wrapper = CpcWrapper::new(&merged_bytes).unwrap();
+    assert_that!(merged_wrapper.lg_k(), eq(lg_k));
+    assert_that!(merged_wrapper.estimate(), eq(merged_est));
+    assert_that!(merged_wrapper.lower_bound(NumStdDev::Two), eq(merged_lb));
+    assert_that!(merged_wrapper.upper_bound(NumStdDev::Two), eq(merged_ub));
+}
+
+#[test]
+fn test_is_compressed() {
+    let sketch = CpcSketch::new(10);
+    let mut bytes = sketch.serialize();
+    bytes[5] &= (-3i8) as u8; // clear compressed flag
+    let err = CpcWrapper::new(&bytes).unwrap_err();
+    assert_that!(
+        err.message(),
+        contains_substring("only compressed sketches are supported")
+    );
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to