This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch compression in repository https://gitbox.apache.org/repos/asf/datasketches-rust.git
commit 862f287bf798c6ed3d64769bbec529566e1caf45 Author: tison <[email protected]> AuthorDate: Wed Feb 4 16:58:15 2026 +0800 CompressedState Signed-off-by: tison <[email protected]> --- datasketches/src/cpc/compression.rs | 311 +++++++++++++++++++++++++++++++++++- 1 file changed, 307 insertions(+), 4 deletions(-) diff --git a/datasketches/src/cpc/compression.rs b/datasketches/src/cpc/compression.rs index 6702b15..ac5b044 100644 --- a/datasketches/src/cpc/compression.rs +++ b/datasketches/src/cpc/compression.rs @@ -1,15 +1,318 @@ -use crate::cpc::pair_table::PairTable; +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::cpc::compression_data::LENGTH_LIMITED_UNARY_ENCODING_TABLE65; +use crate::cpc::pair_table::{PairTable, introspective_insertion_sort}; +use crate::cpc::{CpcSketch, Flavor}; +use std::cmp::Ordering; pub(super) struct CompressedState { table_data: Vec<u32>, - table_data_words: u32, + table_data_words: usize, // can be different from the number of entries in the sketch in hybrid mode table_num_entries: u32, window_data: Vec<u32>, - window_data_words: u32, + window_data_words: usize, +} + +impl CompressedState { + pub fn compress(&mut self, source: &CpcSketch) { + match source.flavor() { + Flavor::EMPTY => { + // do nothing + return; + } + Flavor::SPARSE => { + self.compress_sparse_flavor(source); + debug_assert!(self.window_data.is_empty(), "window is not expected"); + debug_assert!(!self.table_data.is_empty(), "table is expected"); + } + Flavor::HYBRID => { + self.compress_hybrid_flavor(source); + debug_assert!(self.window_data.is_empty(), "window is not expected"); + debug_assert!(!self.table_data.is_empty(), "table is expected"); + } + Flavor::PINNED => { + self.compress_pinned_flavor(source); + debug_assert!(!self.window_data.is_empty(), "window is expected"); + } + Flavor::SLIDING => { + self.compress_sliding_flavor(source); + debug_assert!(!self.window_data.is_empty(), "window is expected"); + } + } + } + + fn compress_sparse_flavor(&mut self, source: &CpcSketch) { + debug_assert!(source.sliding_window.is_empty()); + let mut pairs = source.surprising_value_table().unwrapping_get_items(); + introspective_insertion_sort(&mut pairs); + self.compress_surprising_values(&pairs, source.lg_k()); + } + + fn compress_hybrid_flavor(&mut self, source: &CpcSketch) { + debug_assert!(!source.sliding_window.is_empty()); + debug_assert_eq!(source.window_offset, 0); + + let k = 1 << source.lg_k(); + let mut pairs = source.surprising_value_table().unwrapping_get_items(); + if !pairs.is_empty() { + introspective_insertion_sort(&mut pairs); + } + let num_pairs_from_table = pairs.len() as u32; + let num_pairs_from_window = source.num_coupons() - num_pairs_from_table; + + let mut all_pairs = tricky_get_pairs_from_window( + &source.sliding_window, + k, + num_pairs_from_window, + num_pairs_from_table, + ); + // u32_table<A>::merge( + // pairs_from_table.data(), 0, pairs_from_table.size(), + // all_pairs.data(), num_pairs_from_table, num_pairs_from_window, + // all_pairs.data(), 0 + // ); // note the overlapping subarray trick + + self.compress_surprising_values(&all_pairs, source.lg_k()); + } + + fn compress_pinned_flavor(&mut self, source: &CpcSketch) {} + + fn compress_sliding_flavor(&mut self, source: &CpcSketch) {} + + fn compress_surprising_values(&mut self, pairs: &[u32], lg_k: u8) { + let k = 1 << lg_k; + let num_pairs = pairs.len() as u32; + let num_base_bits = golomb_choose_number_of_base_bits(k + num_pairs, num_pairs as u64); + let table_len = safe_length_for_compressed_pair_buf(k, num_pairs, num_base_bits); + self.table_data.truncate(table_len); + + let compressed_surprising_values = self.low_level_compress_pairs(&pairs, num_base_bits); + + // At this point we could free the unused portion of the compression output buffer, + // but it is not necessary if it is temporary + // Note: realloc caused strange timing spikes for lgK = 11 and 12. + + self.table_data_words = compressed_surprising_values; + self.table_num_entries = num_pairs; + } + + fn low_level_compress_pairs(&mut self, pairs: &[u32], num_base_bits: u8) -> usize { + let mut bitbuf = 0; + let mut bufbits = 0; + let mut next_word_index = 0; + let golomb_lo_mask = ((1 << num_base_bits) - 1) as u64; + let mut predicted_row_index = 0; + let mut predicted_col_index = 0; + + for pair_index in 0..pairs.len() { + let row_col = pairs[pair_index]; + let row_index = row_col >> 6; + let col_index = row_col & 63; + + if row_index != predicted_row_index { + predicted_col_index = 0; + } + + assert!(row_index >= predicted_row_index); + assert!(col_index >= predicted_col_index); + + let y_delta = row_index - predicted_row_index; + let x_delta = col_index - predicted_col_index; + + predicted_row_index = row_index; + predicted_col_index = col_index + 1; + + let code_info = LENGTH_LIMITED_UNARY_ENCODING_TABLE65[x_delta as usize]; + let code_val = code_info & 0xfff; + let code_len = (code_info >> 12) as u8; + bitbuf |= (code_val << bufbits) as u64; + bufbits += code_len; + + maybe_flush_bitbuf( + &mut bitbuf, + &mut bufbits, + &mut self.table_data, + &mut next_word_index, + ); + + let golomb_lo = (y_delta as u64) & golomb_lo_mask; + let golomb_hi = (y_delta as u64) >> num_base_bits; + write_unary( + &mut self.table_data, + &mut next_word_index, + &mut bitbuf, + &mut bufbits, + golomb_hi, + ); + + bitbuf |= golomb_lo << bufbits; + bufbits += num_base_bits; + maybe_flush_bitbuf( + &mut bitbuf, + &mut bufbits, + &mut self.table_data, + &mut next_word_index, + ); + } + + // Pad the bitstream so that the decompressor's 12-bit peek can't overrun its input. + let padding = 10u8.saturating_sub(num_base_bits); + bufbits += padding; + maybe_flush_bitbuf( + &mut bitbuf, + &mut bufbits, + &mut self.table_data, + &mut next_word_index, + ); + + if bufbits > 0 { + // We are done encoding now, so we flush the bit buffer + assert!(bufbits < 32); + self.table_data[next_word_index] = (bitbuf & 0xffffffff) as u32; + next_word_index += 1; + + // not really necessary unset + //bitbuf = 0; + //bufbits = 0; + } + + next_word_index + } } pub(super) struct UncompressedState { table: PairTable, window: Vec<u8>, -} \ No newline at end of file +} + +/// The empty space that this leaves at the beginning of the output array will be filled in later +/// by the caller. +fn tricky_get_pairs_from_window( + window: &[u8], + k: usize, + num_pairs_to_get: u32, + empty_space: u32, +) -> Vec<u32> { + let output_length = empty_space + num_pairs_to_get; + let mut pairs = vec![0; output_length as usize]; + let mut pair_index = empty_space; + for row_index in 0..k { + let mut byte = window[row_index]; + while byte != 0 { + let col_index = byte.trailing_zeros(); + byte = byte ^ (1 << col_index); // erase the 1 + pairs[pair_index as usize] = ((row_index << 6) as u32) | col_index; + pair_index += 1; + } + } + assert_eq!(pair_index, output_length); + pairs +} + +fn write_unary( + compressed_words: &mut [u32], + next_word_index: &mut usize, + bitbuf: &mut u64, + bufbits: &mut u8, + value: u64, +) { + assert!(*bufbits <= 31); + + let mut remaining = value; + while remaining >= 16 { + remaining -= 16; + // Here we output 16 zeros, but we don't need to physically write them into bitbuf + // because it already contains zeros in that region. + *bufbits += 16; // Record the fact that 16 bits of output have occurred. + maybe_flush_bitbuf(bitbuf, bufbits, compressed_words, next_word_index); + } + + let the_unary_code = 1 << remaining; + *bitbuf |= the_unary_code << *bufbits; + *bufbits += (remaining + 1) as u8; + maybe_flush_bitbuf(bitbuf, bufbits, compressed_words, next_word_index); +} + +fn maybe_flush_bitbuf( + bitbuf: &mut u64, + bufbits: &mut u8, + word: &mut [u32], + word_index: &mut usize, +) { + if *bufbits >= 32 { + word[*word_index] = (*bitbuf & 0xffffffff) as u32; + *word_index += 1; + *bitbuf >>= 32; + *bufbits -= 32; + } +} + +fn safe_length_for_compressed_pair_buf(k: u32, num_pairs: u32, num_base_bits: u8) -> usize { + // Long ybits = k + numPairs; // simpler and safer UB + // The following tighter UB on ybits is based on page 198 + // of the textbook "Managing Gigabytes" by Witten, Moffat, and Bell. + // Notice that if numBaseBits == 0 it coincides with (k + numPairs). + + let k = k as usize; + let num_pairs = num_pairs as usize; + let num_base_bits = num_base_bits as usize; + + let ybits = num_pairs * (1 + num_base_bits) + (k >> num_base_bits); + let xbits = 12 * (num_pairs); + let padding = 10usize.saturating_sub(num_base_bits); + divide_longs_rounding_up(xbits + ybits + padding, 32) +} + +fn divide_longs_rounding_up(x: usize, y: usize) -> usize { + debug_assert_ne!(y, 0); + let quotient = x / y; + if quotient * y == x { + quotient + } else { + quotient + 1 + } +} + +/// Returns an integer that is between zero and ceil(log_2(k)) - 1, inclusive. +fn golomb_choose_number_of_base_bits(k: u32, count: u64) -> u8 { + debug_assert!(k > 0); + debug_assert!(count > 0); + let quotient = ((k as u64) - count) / count; // integer division + if quotient == 0 { + 0 + } else { + floor_log2_of_long(quotient) + } +} + +fn floor_log2_of_long(x: u64) -> u8 { + debug_assert!(x > 0); + let mut p = 0u8; + let mut y = 1u64; + loop { + match u64::cmp(&y, &x) { + Ordering::Equal => return p, + Ordering::Greater => return p - 1, + Ordering::Less => { + p += 1; + y <<= 1; + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
