sdf-jkl commented on code in PR #9372: URL: https://github.com/apache/arrow-rs/pull/9372#discussion_r2927230325
########## parquet/src/encodings/decoding/alp.rs: ########## @@ -0,0 +1,1258 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::marker::PhantomData; +use std::ops::Range; + +use bytes::Bytes; + +use crate::basic::Encoding; +use crate::data_type::DataType; +use crate::encodings::decoding::Decoder; +use crate::errors::{ParquetError, Result}; +use crate::util::bit_util::{BitReader, FromBytes}; + +const ALP_HEADER_SIZE: usize = 8; +const ALP_VERSION: u8 = 1; +const ALP_COMPRESSION_MODE: u8 = 0; +const ALP_INTEGER_ENCODING_FOR_BIT_PACK: u8 = 0; +const ALP_MAX_LOG_VECTOR_SIZE: u8 = 16; +const ALP_MAX_EXPONENT_F32: u8 = 10; +const ALP_MAX_EXPONENT_F64: u8 = 18; + +/// Page-level ALP header (version 1, 8 bytes). +/// +/// Layout in bytes: +/// - `[0]` `version` +/// - `[1]` `compression_mode` +/// - `[2]` `integer_encoding` +/// - `[3]` `log_vector_size` +/// - `[4..8]` `num_elements` (little-endian `i32`) +#[derive(Debug, Clone, Copy)] +struct AlpHeader { + version: u8, + compression_mode: u8, + integer_encoding: u8, + log_vector_size: u8, + num_elements: i32, +} + +impl AlpHeader { + fn num_elements_usize(&self) -> usize { + self.num_elements as usize + } + + fn vector_size(&self) -> usize { + 1usize << self.log_vector_size + } + + fn num_vectors(&self) -> usize { + if self.num_elements == 0 { + 0 + } else { + self.num_elements_usize().div_ceil(self.vector_size()) + } + } + + fn vector_num_elements(&self, vector_index: usize) -> u16 { + let vector_size = self.vector_size(); + let num_full_vectors = self.num_elements_usize() / vector_size; + let remainder = self.num_elements_usize() % vector_size; + if vector_index < num_full_vectors { + vector_size as u16 + } else if vector_index == num_full_vectors && remainder > 0 { + remainder as u16 + } else { + 0 + } + } +} + +/// Per-vector ALP metadata (4 bytes), equivalent to C++ `AlpEncodedVectorInfo`. +#[derive(Debug, Clone, Copy)] +struct AlpEncodedVectorInfo { + exponent: u8, + factor: u8, + num_exceptions: u16, +} + +impl AlpEncodedVectorInfo { + const STORED_SIZE: usize = 4; +} + +/// Per-vector FOR metadata for exact integer type (`u32` for `f32`, `u64` for `f64`). +#[derive(Debug, Clone, Copy)] +struct AlpEncodedForVectorInfo<Exact: AlpExact> { + frame_of_reference: Exact, + bit_width: u8, +} + +impl<Exact: AlpExact> AlpEncodedForVectorInfo<Exact> { + fn stored_size() -> usize { + Exact::WIDTH + 1 + } + + fn get_bit_packed_size(&self, num_elements: u16) -> usize { + (self.bit_width as usize * num_elements as usize).div_ceil(8) + } + + fn get_data_stored_size(&self, num_elements: u16, num_exceptions: u16) -> usize { + let bit_packed_size = self.get_bit_packed_size(num_elements); + bit_packed_size + + num_exceptions as usize * std::mem::size_of::<u16>() + + num_exceptions as usize * Exact::WIDTH + } +} + +/// Parsed view of one vector's metadata and data slices. +/// +/// `packed_values` is a zero-copy range into page body bytes. +/// Exception positions/values are copied for straightforward decode handling. +#[derive(Debug)] +struct AlpEncodedVectorView<Exact: AlpExact> { + num_elements: u16, + alp_info: AlpEncodedVectorInfo, + for_info: AlpEncodedForVectorInfo<Exact>, + packed_values: Range<usize>, + exception_positions: Vec<u16>, + exception_values: Vec<Exact>, +} + +/// Parsed ALP page layout for one exact integer width (`u32` for float pages, +/// `u64` for double pages). +#[derive(Debug)] +struct AlpPageLayout { + header: AlpHeader, + body: Bytes, + offsets: Vec<u32>, +} + +/// Exact integer type used by FOR reconstruction. +/// +/// This mirrors C++: +/// - `float` -> `uint32_t` +/// - `double` -> `uint64_t` +/// +/// Why unsigned (not `i32`/`i64`)? +/// - FOR stores non-negative deltas optimized for bitpacking. +/// - Unsigned arithmetic avoids signed-overflow edge cases in FOR stage. +/// - Signed interpretation is applied later during decimal reconstruction. +pub(super) trait AlpExact: Copy + std::fmt::Debug { + const WIDTH: usize; + type Signed: Copy; + fn from_le_slice(slice: &[u8]) -> Self; + fn zero() -> Self; + fn wrapping_add(self, rhs: Self) -> Self; + fn reinterpret_as_signed(self) -> Self::Signed; +} + +impl AlpExact for u32 { + const WIDTH: usize = 4; + type Signed = i32; + + fn from_le_slice(slice: &[u8]) -> Self { + u32::from_le_bytes([slice[0], slice[1], slice[2], slice[3]]) + } + + fn zero() -> Self { + 0 + } + + fn wrapping_add(self, rhs: Self) -> Self { + self.wrapping_add(rhs) + } + + fn reinterpret_as_signed(self) -> Self::Signed { + i32::from_ne_bytes(self.to_ne_bytes()) + } +} + +impl AlpExact for u64 { + const WIDTH: usize = 8; + type Signed = i64; + + fn from_le_slice(slice: &[u8]) -> Self { + u64::from_le_bytes([ + slice[0], slice[1], slice[2], slice[3], slice[4], slice[5], slice[6], slice[7], + ]) + } + + fn zero() -> Self { + 0 + } + + fn wrapping_add(self, rhs: Self) -> Self { + self.wrapping_add(rhs) + } + + fn reinterpret_as_signed(self) -> Self::Signed { + i64::from_ne_bytes(self.to_ne_bytes()) + } +} + +const ALP_I64_POW10: [i64; 19] = [ + 1, + 10, + 100, + 1_000, + 10_000, + 100_000, + 1_000_000, + 10_000_000, + 100_000_000, + 1_000_000_000, + 10_000_000_000, + 100_000_000_000, + 1_000_000_000_000, + 10_000_000_000_000, + 100_000_000_000_000, + 1_000_000_000_000_000, + 10_000_000_000_000_000, + 100_000_000_000_000_000, + 1_000_000_000_000_000_000, +]; + +const ALP_NEG_POW10_F32: [f32; 11] = [ + 1.0, + 0.1, + 0.01, + 0.001, + 0.0001, + 0.00001, + 0.000001, + 0.0000001, + 0.00000001, + 0.000000001, + 0.0000000001, +]; + +const ALP_NEG_POW10_F64: [f64; 19] = [ + 1.0, + 0.1, + 0.01, + 0.001, + 0.0001, + 0.00001, + 0.000001, + 0.0000001, + 0.00000001, + 0.000000001, + 0.0000000001, + 0.00000000001, + 0.000000000001, + 0.0000000000001, + 0.00000000000001, + 0.000000000000001, + 0.0000000000000001, + 0.00000000000000001, + 0.000000000000000001, +]; + +pub(super) trait AlpFloat: Copy + Default { + type Exact: AlpExact + FromBytes; + + /// Precompute vector-level ALP decimal scale for: + /// `value = encoded * 10^(factor) * 10^(-exponent)`. + /// + /// Preconditions are validated during page parse. + fn decode_scale(exponent: u8, factor: u8) -> Self; + + /// Decode one signed exact integer using a precomputed scale. + fn decode_value(signed_encoded: <Self::Exact as AlpExact>::Signed, scale: Self) -> Self; + + fn from_exact_bits(bits: Self::Exact) -> Self; +} + +impl AlpFloat for f32 { + type Exact = u32; + + fn decode_scale(exponent: u8, factor: u8) -> Self { + debug_assert!(exponent <= ALP_MAX_EXPONENT_F32); + debug_assert!(factor <= exponent); + (ALP_I64_POW10[factor as usize] as f32) * ALP_NEG_POW10_F32[exponent as usize] + } + + fn decode_value(signed_encoded: i32, scale: Self) -> Self { + (signed_encoded as f32) * scale + } + + fn from_exact_bits(bits: Self::Exact) -> Self { + f32::from_bits(bits) + } +} + +impl AlpFloat for f64 { + type Exact = u64; + + fn decode_scale(exponent: u8, factor: u8) -> Self { + debug_assert!(exponent <= ALP_MAX_EXPONENT_F64); + debug_assert!(factor <= exponent); + (ALP_I64_POW10[factor as usize] as f64) * ALP_NEG_POW10_F64[exponent as usize] + } + + fn decode_value(signed_encoded: i64, scale: Self) -> Self { + (signed_encoded as f64) * scale + } + + fn from_exact_bits(bits: Self::Exact) -> Self { + f64::from_bits(bits) + } +} + +/// Parse and validate a full ALP-encoded page body. +/// +/// Validation includes: +/// - header fields/version/encoding +/// - non-negative `num_elements` +/// - offsets bounds + monotonicity +/// - per-vector metadata/data section lengths +fn parse_alp_page_layout<Exact: AlpExact>(data: Bytes) -> Result<AlpPageLayout> { + let data_ref = data.as_ref(); + if data_ref.len() < ALP_HEADER_SIZE { + return Err(general_err!( + "Invalid ALP page: expected at least {} bytes for header, got {}", + ALP_HEADER_SIZE, + data_ref.len() + )); + } + + let header = AlpHeader { + version: data_ref[0], + compression_mode: data_ref[1], + integer_encoding: data_ref[2], + log_vector_size: data_ref[3], + num_elements: i32::from_le_bytes([data_ref[4], data_ref[5], data_ref[6], data_ref[7]]), + }; + + if header.version != ALP_VERSION { + return Err(general_err!( + "Invalid ALP page: unsupported version {}, expected {}", + header.version, + ALP_VERSION + )); + } + + if header.compression_mode != ALP_COMPRESSION_MODE { + return Err(general_err!( + "Invalid ALP page: unsupported compression mode {}", + header.compression_mode + )); + } + + if header.integer_encoding != ALP_INTEGER_ENCODING_FOR_BIT_PACK { + return Err(general_err!( + "Invalid ALP page: unsupported integer encoding {}", + header.integer_encoding + )); + } + + if header.log_vector_size > ALP_MAX_LOG_VECTOR_SIZE { + return Err(general_err!( + "Invalid ALP page: log_vector_size {} exceeds max {}", + header.log_vector_size, + ALP_MAX_LOG_VECTOR_SIZE + )); + } + + if header.num_elements < 0 { + return Err(general_err!( + "Invalid ALP page: num_elements {} must be >= 0", + header.num_elements + )); + } + + let num_vectors = header.num_vectors(); + + let offsets_len = num_vectors + .checked_mul(std::mem::size_of::<u32>()) + .ok_or_else(|| general_err!("Invalid ALP page: offsets length overflow"))?; + let offsets_end = ALP_HEADER_SIZE + .checked_add(offsets_len) + .ok_or_else(|| general_err!("Invalid ALP page: header + offsets length overflow"))?; + + if data_ref.len() < offsets_end { + return Err(general_err!( + "Invalid ALP page: expected at least {} bytes for {} offsets, got {}", + offsets_end, + num_vectors, + data_ref.len() + )); + } + + let body = data.slice(ALP_HEADER_SIZE..); + let body_ref = body.as_ref(); + let body_len = body_ref.len(); + let offsets_section_size = num_vectors * std::mem::size_of::<u32>(); + + let mut offsets = Vec::with_capacity(num_vectors); + for i in 0..num_vectors { + let start = ALP_HEADER_SIZE + i * 4; + let offset = u32::from_le_bytes([ + data_ref[start], + data_ref[start + 1], + data_ref[start + 2], + data_ref[start + 3], + ]); + + if offset as usize >= body_len { + return Err(general_err!( + "Invalid ALP page: vector offset {} out of bounds for body length {}", + offset, + body_len + )); + } + + if (offset as usize) < offsets_section_size { + return Err(general_err!( + "Invalid ALP page: vector offset {} points into offsets section {}", + offset, + offsets_section_size + )); + } + + offsets.push(offset); + } + + for (vector_idx, vector_offset) in offsets.iter().enumerate() { + let vector_start = *vector_offset as usize; + let vector_end = if vector_idx + 1 < offsets.len() { + offsets[vector_idx + 1] as usize + } else { + body_len + }; + + if vector_end < vector_start { + return Err(general_err!( + "Invalid ALP page: vector offsets are not monotonic at index {}", + vector_idx + )); + } + + let vector_num_elements = header.vector_num_elements(vector_idx); + parse_vector_view::<Exact>(body_ref, vector_start, vector_end, vector_num_elements)?; + } + + Ok(AlpPageLayout { + header, + body, + offsets, + }) +} + +/// Parse a single vector section: +/// `[AlpInfo][ForInfo][PackedValues][ExceptionPositions][ExceptionValues]`. +fn parse_vector_view<Exact: AlpExact>( + body: &[u8], + vector_start: usize, + vector_end: usize, + num_elements: u16, +) -> Result<AlpEncodedVectorView<Exact>> { + let vector_bytes = &body[vector_start..vector_end]; + + let metadata_size = + AlpEncodedVectorInfo::STORED_SIZE + AlpEncodedForVectorInfo::<Exact>::stored_size(); + if vector_bytes.len() < metadata_size { + return Err(general_err!( + "Invalid ALP page: vector metadata too short, expected at least {} bytes, got {}", + metadata_size, + vector_bytes.len() + )); + } + + let alp_info = AlpEncodedVectorInfo { + exponent: vector_bytes[0], + factor: vector_bytes[1], + num_exceptions: u16::from_le_bytes([vector_bytes[2], vector_bytes[3]]), + }; + + let max_exponent = if Exact::WIDTH == 4 { + ALP_MAX_EXPONENT_F32 + } else { + ALP_MAX_EXPONENT_F64 + }; + + if alp_info.exponent > max_exponent { + return Err(general_err!( + "Invalid ALP page: exponent {} exceeds max {}", + alp_info.exponent, + max_exponent + )); + } + + if alp_info.factor > alp_info.exponent { + return Err(general_err!( + "Invalid ALP page: factor {} exceeds exponent {}", + alp_info.factor, + alp_info.exponent + )); + } + + if alp_info.num_exceptions > num_elements { + return Err(general_err!( + "Invalid ALP page: num_exceptions {} exceeds vector num_elements {}", + alp_info.num_exceptions, + num_elements + )); + } + + let for_start = AlpEncodedVectorInfo::STORED_SIZE; + let for_end = for_start + Exact::WIDTH; + let frame_of_reference = Exact::from_le_slice(&vector_bytes[for_start..for_end]); + let bit_width = vector_bytes[for_end]; + + if bit_width as usize > Exact::WIDTH * 8 { + return Err(general_err!( + "Invalid ALP page: bit width {} exceeds {}", + bit_width, + Exact::WIDTH * 8 + )); + } + + let for_info = AlpEncodedForVectorInfo::<Exact> { + frame_of_reference, + bit_width, + }; + + let data_size = for_info.get_data_stored_size(num_elements, alp_info.num_exceptions); + if vector_bytes.len() < metadata_size + data_size { + return Err(general_err!( + "Invalid ALP page: vector data too short, expected at least {} bytes, got {}", + metadata_size + data_size, + vector_bytes.len() + )); + } + + let data = &vector_bytes[metadata_size..metadata_size + data_size]; + let packed_size = for_info.get_bit_packed_size(num_elements); + let positions_size = alp_info.num_exceptions as usize * std::mem::size_of::<u16>(); + let values_size = alp_info.num_exceptions as usize * Exact::WIDTH; + + let packed_start = 0; + let packed_end = packed_start + packed_size; + let positions_start = packed_end; + let positions_end = positions_start + positions_size; + let values_start = positions_end; + let values_end = values_start + values_size; + + let mut exception_positions = Vec::with_capacity(alp_info.num_exceptions as usize); + for chunk in data[positions_start..positions_end].chunks_exact(2) { + let position = u16::from_le_bytes([chunk[0], chunk[1]]); + if position >= num_elements { + return Err(general_err!( + "Invalid ALP page: exception position {} out of bounds for vector length {}", + position, + num_elements + )); + } + exception_positions.push(position); + } + + let packed_values = + (vector_start + metadata_size + packed_start)..(vector_start + metadata_size + packed_end); + + let mut exception_values = Vec::with_capacity(alp_info.num_exceptions as usize); + for chunk in data[values_start..values_end].chunks_exact(Exact::WIDTH) { + exception_values.push(Exact::from_le_slice(chunk)); + } + + Ok(AlpEncodedVectorView { + num_elements, + alp_info, + for_info, + packed_values, + exception_positions, + exception_values, + }) +} + +/// Decode bit-packed deltas into exact integers. +fn bit_unpack_integers<Exact: AlpExact + FromBytes>( + packed_values: &[u8], + bit_width: u8, + num_elements: u16, +) -> Result<Vec<Exact>> { + if bit_width as usize > Exact::WIDTH * 8 { + return Err(general_err!( + "Invalid ALP page: bit width {} exceeds {}", + bit_width, + Exact::WIDTH * 8 + )); + } + + if bit_width == 0 { + return Ok(vec![Exact::zero(); num_elements as usize]); + } + + let mut out = vec![Exact::zero(); num_elements as usize]; + let mut reader = BitReader::new(Bytes::copy_from_slice(packed_values)); Review Comment: fixed https://github.com/apache/arrow-rs/pull/9372/commits/07ca6f06471c95a5198e4bf3be06d47fa2b88a4a -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
