alamb commented on code in PR #9372:
URL: https://github.com/apache/arrow-rs/pull/9372#discussion_r3208770338


##########
parquet/src/encodings/decoding/alp.rs:
##########
@@ -0,0 +1,1258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use bytes::Bytes;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::encodings::decoding::Decoder;
+use crate::errors::{ParquetError, Result};
+use crate::util::bit_util::{BitReader, FromBytes};
+
+const ALP_HEADER_SIZE: usize = 8;
+const ALP_VERSION: u8 = 1;
+const ALP_COMPRESSION_MODE: u8 = 0;
+const ALP_INTEGER_ENCODING_FOR_BIT_PACK: u8 = 0;
+const ALP_MAX_LOG_VECTOR_SIZE: u8 = 16;
+const ALP_MAX_EXPONENT_F32: u8 = 10;
+const ALP_MAX_EXPONENT_F64: u8 = 18;
+
+/// Page-level ALP header (version 1, 8 bytes).
+///
+/// Layout in bytes:
+/// - `[0]` `version`
+/// - `[1]` `compression_mode`
+/// - `[2]` `integer_encoding`
+/// - `[3]` `log_vector_size`
+/// - `[4..8]` `num_elements` (little-endian `i32`)
+#[derive(Debug, Clone, Copy)]
+struct AlpHeader {
+    version: u8,
+    compression_mode: u8,
+    integer_encoding: u8,
+    log_vector_size: u8,
+    num_elements: i32,
+}
+
+impl AlpHeader {
+    fn num_elements_usize(&self) -> usize {
+        self.num_elements as usize
+    }
+
+    fn vector_size(&self) -> usize {
+        1usize << self.log_vector_size
+    }
+
+    fn num_vectors(&self) -> usize {
+        if self.num_elements == 0 {
+            0
+        } else {
+            self.num_elements_usize().div_ceil(self.vector_size())
+        }
+    }
+
+    fn vector_num_elements(&self, vector_index: usize) -> u16 {
+        let vector_size = self.vector_size();
+        let num_full_vectors = self.num_elements_usize() / vector_size;
+        let remainder = self.num_elements_usize() % vector_size;
+        if vector_index < num_full_vectors {
+            vector_size as u16

Review Comment:
   I personally recommend changing all computation of offsets to use `usize` -- 
e.g. here return a usize
   
   Then we can carefully check overflow, etc on write / read once 
   



##########
parquet/src/encodings/decoding/alp.rs:
##########
@@ -0,0 +1,1524 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use bytes::Bytes;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::encodings::decoding::Decoder;
+use crate::errors::{ParquetError, Result};
+use crate::util::bit_util::{BitReader, FromBytes};
+
+const ALP_HEADER_SIZE: usize = 7;
+const ALP_COMPRESSION_MODE: u8 = 0;
+const ALP_INTEGER_ENCODING_FOR_BIT_PACK: u8 = 0;
+const ALP_MIN_LOG_VECTOR_SIZE: u8 = 3;
+const ALP_MAX_LOG_VECTOR_SIZE: u8 = 15;
+const ALP_MAX_EXPONENT_F32: u8 = 10;
+const ALP_MAX_EXPONENT_F64: u8 = 18;
+
+/// Page-level ALP header (7 bytes).
+///
+/// Layout in bytes:
+/// - `[0]` `compression_mode`
+/// - `[1]` `integer_encoding`
+/// - `[2]` `log_vector_size`
+/// - `[3..7]` `num_elements` (little-endian `i32`)
+#[derive(Debug, Clone, Copy)]
+struct AlpHeader {
+    compression_mode: u8,
+    integer_encoding: u8,
+    log_vector_size: u8,
+    num_elements: i32,
+}
+
+impl AlpHeader {
+    fn num_elements_usize(&self) -> usize {
+        self.num_elements as usize
+    }
+
+    fn vector_size(&self) -> usize {
+        1usize << self.log_vector_size
+    }
+
+    fn num_vectors(&self) -> usize {
+        if self.num_elements == 0 {
+            0
+        } else {
+            self.num_elements_usize().div_ceil(self.vector_size())
+        }
+    }
+
+    fn vector_num_elements(&self, vector_index: usize) -> u16 {
+        let vector_size = self.vector_size();
+        let num_full_vectors = self.num_elements_usize() / vector_size;
+        let remainder = self.num_elements_usize() % vector_size;
+        if vector_index < num_full_vectors {
+            vector_size as u16
+        } else if vector_index == num_full_vectors && remainder > 0 {
+            remainder as u16
+        } else {
+            0
+        }
+    }
+}
+
+/// Per-vector ALP metadata (4 bytes), equivalent to C++ 
`AlpEncodedVectorInfo`.
+#[derive(Debug, Clone, Copy)]
+struct AlpEncodedVectorInfo {
+    exponent: u8,
+    factor: u8,
+    num_exceptions: u16,
+}
+
+impl AlpEncodedVectorInfo {
+    const STORED_SIZE: usize = 4;
+}
+
+/// Per-vector FOR metadata for exact integer type (`u32` for `f32`, `u64` for 
`f64`).
+#[derive(Debug, Clone, Copy)]
+struct AlpEncodedForVectorInfo<Exact: AlpExact> {

Review Comment:
   Ithink the spec calls this `ForInfo` so it might make sense to use that 
terminology here too (e.g `AlpEncodedForVectorInfo` --> `AlpForInfo` or just 
`ForInfo`)



##########
parquet/src/encodings/decoding/alp.rs:
##########
@@ -0,0 +1,1524 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use bytes::Bytes;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::encodings::decoding::Decoder;
+use crate::errors::{ParquetError, Result};
+use crate::util::bit_util::{BitReader, FromBytes};
+
+const ALP_HEADER_SIZE: usize = 7;
+const ALP_COMPRESSION_MODE: u8 = 0;
+const ALP_INTEGER_ENCODING_FOR_BIT_PACK: u8 = 0;
+const ALP_MIN_LOG_VECTOR_SIZE: u8 = 3;
+const ALP_MAX_LOG_VECTOR_SIZE: u8 = 15;
+const ALP_MAX_EXPONENT_F32: u8 = 10;
+const ALP_MAX_EXPONENT_F64: u8 = 18;
+
+/// Page-level ALP header (7 bytes).
+///
+/// Layout in bytes:
+/// - `[0]` `compression_mode`
+/// - `[1]` `integer_encoding`
+/// - `[2]` `log_vector_size`
+/// - `[3..7]` `num_elements` (little-endian `i32`)
+#[derive(Debug, Clone, Copy)]
+struct AlpHeader {
+    compression_mode: u8,
+    integer_encoding: u8,
+    log_vector_size: u8,
+    num_elements: i32,
+}
+
+impl AlpHeader {
+    fn num_elements_usize(&self) -> usize {

Review Comment:
   A stylistic thing might be to have these struct fields as the actual values 
needed (e.g. num_elements as usize) and then have some sort of `serialize` and 
`deserialize` methods that converted them to/from the on disk format (e.g. 
using i32, vectori siz witu u8, etc)



##########
parquet/src/encodings/decoding/alp.rs:
##########
@@ -0,0 +1,1524 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use bytes::Bytes;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::encodings::decoding::Decoder;
+use crate::errors::{ParquetError, Result};
+use crate::util::bit_util::{BitReader, FromBytes};
+
+const ALP_HEADER_SIZE: usize = 7;
+const ALP_COMPRESSION_MODE: u8 = 0;
+const ALP_INTEGER_ENCODING_FOR_BIT_PACK: u8 = 0;
+const ALP_MIN_LOG_VECTOR_SIZE: u8 = 3;
+const ALP_MAX_LOG_VECTOR_SIZE: u8 = 15;
+const ALP_MAX_EXPONENT_F32: u8 = 10;
+const ALP_MAX_EXPONENT_F64: u8 = 18;
+
+/// Page-level ALP header (7 bytes).
+///
+/// Layout in bytes:
+/// - `[0]` `compression_mode`
+/// - `[1]` `integer_encoding`
+/// - `[2]` `log_vector_size`
+/// - `[3..7]` `num_elements` (little-endian `i32`)
+#[derive(Debug, Clone, Copy)]
+struct AlpHeader {
+    compression_mode: u8,
+    integer_encoding: u8,
+    log_vector_size: u8,
+    num_elements: i32,
+}
+
+impl AlpHeader {
+    fn num_elements_usize(&self) -> usize {
+        self.num_elements as usize
+    }
+
+    fn vector_size(&self) -> usize {
+        1usize << self.log_vector_size
+    }
+
+    fn num_vectors(&self) -> usize {
+        if self.num_elements == 0 {
+            0
+        } else {
+            self.num_elements_usize().div_ceil(self.vector_size())
+        }
+    }
+
+    fn vector_num_elements(&self, vector_index: usize) -> u16 {
+        let vector_size = self.vector_size();
+        let num_full_vectors = self.num_elements_usize() / vector_size;
+        let remainder = self.num_elements_usize() % vector_size;
+        if vector_index < num_full_vectors {
+            vector_size as u16
+        } else if vector_index == num_full_vectors && remainder > 0 {
+            remainder as u16
+        } else {
+            0
+        }
+    }
+}
+
+/// Per-vector ALP metadata (4 bytes), equivalent to C++ 
`AlpEncodedVectorInfo`.
+#[derive(Debug, Clone, Copy)]
+struct AlpEncodedVectorInfo {
+    exponent: u8,
+    factor: u8,
+    num_exceptions: u16,
+}
+
+impl AlpEncodedVectorInfo {
+    const STORED_SIZE: usize = 4;
+}
+
+/// Per-vector FOR metadata for exact integer type (`u32` for `f32`, `u64` for 
`f64`).
+#[derive(Debug, Clone, Copy)]
+struct AlpEncodedForVectorInfo<Exact: AlpExact> {
+    frame_of_reference: Exact,
+    bit_width: u8,
+}
+
+impl<Exact: AlpExact> AlpEncodedForVectorInfo<Exact> {
+    fn stored_size() -> usize {
+        Exact::WIDTH + 1
+    }
+
+    fn get_bit_packed_size(&self, num_elements: u16) -> usize {
+        (self.bit_width as usize * num_elements as usize).div_ceil(8)
+    }
+
+    fn get_data_stored_size(&self, num_elements: u16, num_exceptions: u16) -> 
usize {
+        let bit_packed_size = self.get_bit_packed_size(num_elements);
+        bit_packed_size
+            + num_exceptions as usize * std::mem::size_of::<u16>()
+            + num_exceptions as usize * Exact::WIDTH
+    }
+}
+
+/// Parsed view of one vector's metadata and data slices.
+///
+/// `packed_values` is a zero-copy range into page body bytes.
+/// Exception positions/values are copied for straightforward decode handling.
+#[derive(Debug)]
+struct AlpEncodedVectorView<Exact: AlpExact> {
+    num_elements: u16,
+    alp_info: AlpEncodedVectorInfo,
+    for_info: AlpEncodedForVectorInfo<Exact>,
+    packed_values: Range<usize>,
+    exception_positions: Vec<u16>,
+    exception_values: Vec<Exact>,
+}
+
+impl<Exact: AlpExact> AlpEncodedVectorView<Exact> {
+    fn expected_stored_size(&self) -> usize {
+        AlpEncodedVectorInfo::STORED_SIZE
+            + AlpEncodedForVectorInfo::<Exact>::stored_size()
+            + self
+                .for_info
+                .get_data_stored_size(self.num_elements, 
self.alp_info.num_exceptions)
+    }
+}
+
+/// Parsed ALP page layout for one exact integer width (`u32` for float pages,
+/// `u64` for double pages).
+#[derive(Debug)]
+struct AlpPageLayout<Exact: AlpExact> {
+    header: AlpHeader,
+    body: Bytes,
+    vectors: Vec<AlpEncodedVectorView<Exact>>,
+}
+
+/// Exact integer type used by FOR reconstruction.
+///
+/// This mirrors C++:
+/// - `float`  -> `uint32_t`
+/// - `double` -> `uint64_t`
+///
+/// Why unsigned (not `i32`/`i64`)?
+/// - FOR stores non-negative deltas optimized for bitpacking.
+/// - Unsigned arithmetic avoids signed-overflow edge cases in FOR stage.
+/// - Signed interpretation is applied later during decimal reconstruction.
+pub(super) trait AlpExact: Copy + std::fmt::Debug {
+    const WIDTH: usize;
+    type Signed: Copy;
+    fn from_le_slice(slice: &[u8]) -> Self;
+    fn zero() -> Self;
+    fn wrapping_add(self, rhs: Self) -> Self;
+    fn reinterpret_as_signed(self) -> Self::Signed;
+}
+
+impl AlpExact for u32 {
+    const WIDTH: usize = 4;
+    type Signed = i32;
+
+    fn from_le_slice(slice: &[u8]) -> Self {
+        u32::from_le_bytes([slice[0], slice[1], slice[2], slice[3]])
+    }
+
+    fn zero() -> Self {
+        0
+    }
+
+    fn wrapping_add(self, rhs: Self) -> Self {
+        self.wrapping_add(rhs)
+    }
+
+    fn reinterpret_as_signed(self) -> Self::Signed {
+        i32::from_ne_bytes(self.to_ne_bytes())
+    }
+}
+
+impl AlpExact for u64 {
+    const WIDTH: usize = 8;
+    type Signed = i64;
+
+    fn from_le_slice(slice: &[u8]) -> Self {
+        u64::from_le_bytes([
+            slice[0], slice[1], slice[2], slice[3], slice[4], slice[5], 
slice[6], slice[7],
+        ])
+    }
+
+    fn zero() -> Self {
+        0
+    }
+
+    fn wrapping_add(self, rhs: Self) -> Self {
+        self.wrapping_add(rhs)
+    }
+
+    fn reinterpret_as_signed(self) -> Self::Signed {
+        i64::from_ne_bytes(self.to_ne_bytes())
+    }
+}
+
+const ALP_POW10_F32: [f32; 11] = [
+    1.0,
+    10.0,
+    100.0,
+    1000.0,
+    10000.0,
+    100000.0,
+    1000000.0,
+    10000000.0,
+    100000000.0,
+    1000000000.0,
+    10000000000.0,
+];
+
+const ALP_POW10_F64: [f64; 19] = [
+    1.0,
+    10.0,
+    100.0,
+    1000.0,
+    10000.0,
+    100000.0,
+    1000000.0,
+    10000000.0,
+    100000000.0,
+    1000000000.0,
+    10000000000.0,
+    100000000000.0,
+    1000000000000.0,
+    10000000000000.0,
+    100000000000000.0,
+    1000000000000000.0,
+    10000000000000000.0,
+    100000000000000000.0,
+    1000000000000000000.0,
+];
+
+const ALP_NEG_POW10_F32: [f32; 11] = [
+    1.0,
+    0.1,
+    0.01,
+    0.001,
+    0.0001,
+    0.00001,
+    0.000001,
+    0.0000001,
+    0.00000001,
+    0.000000001,
+    0.0000000001,
+];
+
+const ALP_NEG_POW10_F64: [f64; 19] = [
+    1.0,
+    0.1,
+    0.01,
+    0.001,
+    0.0001,
+    0.00001,
+    0.000001,
+    0.0000001,
+    0.00000001,
+    0.000000001,
+    0.0000000001,
+    0.00000000001,
+    0.000000000001,
+    0.0000000000001,
+    0.00000000000001,
+    0.000000000000001,
+    0.0000000000000001,
+    0.00000000000000001,
+    0.000000000000000001,
+];
+
+pub(super) trait AlpFloat: Copy + Default {
+    type Exact: AlpExact + FromBytes;
+    type Scale: Copy;
+
+    /// Precompute vector-level ALP decimal scale constants for:
+    /// `value = (encoded * 10^(factor)) * 10^(-exponent)`.
+    ///
+    /// Preconditions are validated during page parse.
+    fn decode_scale(exponent: u8, factor: u8) -> Self::Scale;
+
+    /// Decode one signed exact integer using a precomputed two-step scale.
+    fn decode_value(signed_encoded: <Self::Exact as AlpExact>::Signed, scale: 
Self::Scale) -> Self;
+
+    fn from_exact_bits(bits: Self::Exact) -> Self;
+}
+
+impl AlpFloat for f32 {
+    type Exact = u32;
+    type Scale = (f32, f32);
+
+    fn decode_scale(exponent: u8, factor: u8) -> Self::Scale {
+        debug_assert!(exponent <= ALP_MAX_EXPONENT_F32);
+        debug_assert!(factor <= exponent);
+        (
+            ALP_POW10_F32[factor as usize],
+            ALP_NEG_POW10_F32[exponent as usize],
+        )
+    }
+
+    fn decode_value(signed_encoded: i32, scale: Self::Scale) -> Self {
+        ((signed_encoded as f32) * scale.0) * scale.1
+    }
+
+    fn from_exact_bits(bits: Self::Exact) -> Self {
+        f32::from_bits(bits)
+    }
+}
+
+impl AlpFloat for f64 {
+    type Exact = u64;
+    type Scale = (f64, f64);
+
+    fn decode_scale(exponent: u8, factor: u8) -> Self::Scale {
+        debug_assert!(exponent <= ALP_MAX_EXPONENT_F64);
+        debug_assert!(factor <= exponent);
+        (
+            ALP_POW10_F64[factor as usize],
+            ALP_NEG_POW10_F64[exponent as usize],
+        )
+    }
+
+    fn decode_value(signed_encoded: i64, scale: Self::Scale) -> Self {
+        ((signed_encoded as f64) * scale.0) * scale.1
+    }
+
+    fn from_exact_bits(bits: Self::Exact) -> Self {
+        f64::from_bits(bits)
+    }
+}
+
+/// Parse and validate a full ALP-encoded page body.
+///
+/// Validation includes:
+/// - header fields/encoding
+/// - non-negative `num_elements`
+/// - offsets bounds + monotonicity
+/// - per-vector metadata/data section lengths
+fn parse_alp_page_layout<Exact: AlpExact>(data: Bytes) -> 
Result<AlpPageLayout<Exact>> {
+    let data_ref = data.as_ref();
+    if data_ref.len() < ALP_HEADER_SIZE {
+        return Err(general_err!(
+            "Invalid ALP page: expected at least {} bytes for header, got {}",
+            ALP_HEADER_SIZE,
+            data_ref.len()
+        ));
+    }
+
+    let header = AlpHeader {
+        compression_mode: data_ref[0],
+        integer_encoding: data_ref[1],
+        log_vector_size: data_ref[2],
+        num_elements: i32::from_le_bytes([data_ref[3], data_ref[4], 
data_ref[5], data_ref[6]]),
+    };
+
+    if header.compression_mode != ALP_COMPRESSION_MODE {
+        return Err(general_err!(
+            "Invalid ALP page: unsupported compression mode {}",
+            header.compression_mode
+        ));
+    }
+
+    if header.integer_encoding != ALP_INTEGER_ENCODING_FOR_BIT_PACK {
+        return Err(general_err!(
+            "Invalid ALP page: unsupported integer encoding {}",
+            header.integer_encoding
+        ));
+    }
+
+    if header.log_vector_size < ALP_MIN_LOG_VECTOR_SIZE {
+        return Err(general_err!(
+            "Invalid ALP page: log_vector_size {} below min {}",
+            header.log_vector_size,
+            ALP_MIN_LOG_VECTOR_SIZE
+        ));
+    }
+
+    if header.log_vector_size > ALP_MAX_LOG_VECTOR_SIZE {
+        return Err(general_err!(
+            "Invalid ALP page: log_vector_size {} exceeds max {}",
+            header.log_vector_size,
+            ALP_MAX_LOG_VECTOR_SIZE
+        ));
+    }
+
+    if header.num_elements < 0 {
+        return Err(general_err!(
+            "Invalid ALP page: num_elements {} must be >= 0",
+            header.num_elements
+        ));
+    }
+
+    let num_vectors = header.num_vectors();
+
+    let offsets_len = num_vectors
+        .checked_mul(std::mem::size_of::<u32>())
+        .ok_or_else(|| general_err!("Invalid ALP page: offsets length 
overflow"))?;
+    let offsets_end = ALP_HEADER_SIZE
+        .checked_add(offsets_len)
+        .ok_or_else(|| general_err!("Invalid ALP page: header + offsets length 
overflow"))?;
+
+    if data_ref.len() < offsets_end {
+        return Err(general_err!(
+            "Invalid ALP page: expected at least {} bytes for {} offsets, got 
{}",
+            offsets_end,
+            num_vectors,
+            data_ref.len()
+        ));
+    }
+
+    let body = data.slice(ALP_HEADER_SIZE..);
+    let body_ref = body.as_ref();
+    let body_len = body_ref.len();
+    let offsets_section_size = num_vectors * std::mem::size_of::<u32>();
+
+    let mut offsets = Vec::with_capacity(num_vectors);
+    for i in 0..num_vectors {
+        let start = ALP_HEADER_SIZE + i * 4;
+        let offset = u32::from_le_bytes([
+            data_ref[start],
+            data_ref[start + 1],
+            data_ref[start + 2],
+            data_ref[start + 3],
+        ]);
+
+        if offset as usize >= body_len {
+            return Err(general_err!(
+                "Invalid ALP page: vector offset {} out of bounds for body 
length {}",
+                offset,
+                body_len
+            ));
+        }
+
+        if (offset as usize) < offsets_section_size {
+            return Err(general_err!(
+                "Invalid ALP page: vector offset {} points into offsets 
section {}",
+                offset,
+                offsets_section_size
+            ));
+        }
+
+        offsets.push(offset);
+    }
+
+    let mut vectors = Vec::with_capacity(num_vectors);
+    let mut expected_next_offset = offsets_section_size;
+    for (vector_idx, vector_offset) in offsets.iter().enumerate() {
+        let vector_start = *vector_offset as usize;
+        if vector_start != expected_next_offset {
+            return Err(general_err!(
+                "Invalid ALP page: vector offset {} at index {} does not match 
expected {}",
+                vector_start,
+                vector_idx,
+                expected_next_offset
+            ));
+        }
+
+        let vector_end = if vector_idx + 1 < offsets.len() {
+            offsets[vector_idx + 1] as usize
+        } else {
+            body_len
+        };
+
+        if vector_end < vector_start {
+            return Err(general_err!(
+                "Invalid ALP page: vector offsets are not monotonic at index 
{}",
+                vector_idx
+            ));
+        }
+
+        let vector_num_elements = header.vector_num_elements(vector_idx);
+        let vector =
+            parse_vector_view::<Exact>(body_ref, vector_start, vector_end, 
vector_num_elements)?;
+        expected_next_offset = vector_start
+            .checked_add(vector.expected_stored_size())
+            .ok_or_else(|| {
+                general_err!("Invalid ALP page: expected next vector offset 
overflow")
+            })?;
+        vectors.push(vector);
+    }
+
+    if expected_next_offset != body_len {
+        return Err(general_err!(
+            "Invalid ALP page: body size {} does not match expected {} 
(offsets + vectors)",
+            body_len,
+            expected_next_offset
+        ));
+    }
+
+    Ok(AlpPageLayout {
+        header,
+        body,
+        vectors,
+    })
+}
+
+/// Parse a single vector section:
+/// `[AlpInfo][ForInfo][PackedValues][ExceptionPositions][ExceptionValues]`.
+fn parse_vector_view<Exact: AlpExact>(
+    body: &[u8],
+    vector_start: usize,
+    vector_end: usize,
+    num_elements: u16,
+) -> Result<AlpEncodedVectorView<Exact>> {
+    let vector_bytes = &body[vector_start..vector_end];
+
+    let metadata_size =
+        AlpEncodedVectorInfo::STORED_SIZE + 
AlpEncodedForVectorInfo::<Exact>::stored_size();
+    if vector_bytes.len() < metadata_size {
+        return Err(general_err!(
+            "Invalid ALP page: vector metadata too short, expected at least {} 
bytes, got {}",
+            metadata_size,
+            vector_bytes.len()
+        ));
+    }
+
+    let alp_info = AlpEncodedVectorInfo {
+        exponent: vector_bytes[0],
+        factor: vector_bytes[1],
+        num_exceptions: u16::from_le_bytes([vector_bytes[2], vector_bytes[3]]),
+    };
+
+    let max_exponent = if Exact::WIDTH == 4 {
+        ALP_MAX_EXPONENT_F32
+    } else {
+        ALP_MAX_EXPONENT_F64
+    };
+
+    if alp_info.exponent > max_exponent {
+        return Err(general_err!(
+            "Invalid ALP page: exponent {} exceeds max {}",
+            alp_info.exponent,
+            max_exponent
+        ));
+    }
+
+    if alp_info.factor > alp_info.exponent {
+        return Err(general_err!(
+            "Invalid ALP page: factor {} exceeds exponent {}",
+            alp_info.factor,
+            alp_info.exponent
+        ));
+    }
+
+    if alp_info.num_exceptions > num_elements {
+        return Err(general_err!(
+            "Invalid ALP page: num_exceptions {} exceeds vector num_elements 
{}",
+            alp_info.num_exceptions,
+            num_elements
+        ));
+    }
+
+    let for_start = AlpEncodedVectorInfo::STORED_SIZE;
+    let for_end = for_start + Exact::WIDTH;
+    let frame_of_reference = 
Exact::from_le_slice(&vector_bytes[for_start..for_end]);
+    let bit_width = vector_bytes[for_end];
+
+    if bit_width as usize > Exact::WIDTH * 8 {
+        return Err(general_err!(
+            "Invalid ALP page: bit width {} exceeds {}",
+            bit_width,
+            Exact::WIDTH * 8
+        ));
+    }
+
+    let for_info = AlpEncodedForVectorInfo::<Exact> {
+        frame_of_reference,
+        bit_width,
+    };
+
+    let data_size = for_info.get_data_stored_size(num_elements, 
alp_info.num_exceptions);
+    let expected_size = metadata_size + data_size;
+    if vector_bytes.len() < expected_size {

Review Comment:
   FWIW the inclusion of the actual offset was intended to allow for fast 
single row lookups (aka avoid having to parse all the FORInfo headers in a 
page). 
   
   To really take advantage of that in this code, I think we would have to 
restructure things to it incrementally parsed the header as the decode 
progressed, rather than parse it all up front (more of a streaming situation). 
   
   



##########
parquet/tests/arrow_reader/alp.rs:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::compute::concat_batches;
+use arrow::util::test_util::parquet_test_data;
+use arrow_array::cast::as_primitive_array;
+use arrow_array::types::Float32Type;
+use arrow_array::{Array, ArrayRef, Float32Array, RecordBatch};
+use arrow_schema::{DataType, Field, Schema};
+use parquet::arrow::arrow_reader::ArrowReaderBuilder;
+use std::fs::File;
+use std::io::{BufRead, BufReader};
+use std::path::PathBuf;
+use std::sync::Arc;
+
+#[test]
+fn test_read_f32_alp() {
+    let data_dir = PathBuf::from(parquet_test_data());
+    let parquet_path = data_dir.join("alp_float_arade.parquet");
+    let expected_csv_path = data_dir.join("alp_arade_expect.csv");
+    if !parquet_path.exists() || !expected_csv_path.exists() {
+        eprintln!("Skipping ALP test files not found");

Review Comment:
   we probably should error the test hard if this file doesn't exist, rather 
than just eprint and contnuing



##########
parquet/tests/arrow_reader/alp.rs:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::compute::concat_batches;
+use arrow::util::test_util::parquet_test_data;
+use arrow_array::cast::as_primitive_array;
+use arrow_array::types::Float32Type;
+use arrow_array::{Array, ArrayRef, Float32Array, RecordBatch};
+use arrow_schema::{DataType, Field, Schema};
+use parquet::arrow::arrow_reader::ArrowReaderBuilder;
+use std::fs::File;
+use std::io::{BufRead, BufReader};
+use std::path::PathBuf;
+use std::sync::Arc;
+
+#[test]
+fn test_read_f32_alp() {
+    let data_dir = PathBuf::from(parquet_test_data());
+    let parquet_path = data_dir.join("alp_float_arade.parquet");
+    let expected_csv_path = data_dir.join("alp_arade_expect.csv");
+    if !parquet_path.exists() || !expected_csv_path.exists() {
+        eprintln!("Skipping ALP test files not found");
+        return;
+    }
+
+    let expected = read_expected_csv_batch(&expected_csv_path);
+    let actual = read_parquet_batch(&parquet_path);
+
+    assert_eq!(actual.schema(), expected.schema(), "schema mismatch");
+    assert_eq!(
+        actual.num_columns(),
+        expected.num_columns(),
+        "column mismatch"
+    );
+    assert_eq!(actual.num_rows(), expected.num_rows(), "row count mismatch");
+
+    for col_idx in 0..actual.num_columns() {
+        let col_name = actual.schema().field(col_idx).name().clone();
+        let actual_col = 
as_primitive_array::<Float32Type>(actual.column(col_idx).as_ref());
+        let expected_col = 
as_primitive_array::<Float32Type>(expected.column(col_idx).as_ref());
+
+        for row_idx in 0..actual.num_rows() {
+            assert_eq!(
+                actual_col.is_valid(row_idx),
+                expected_col.is_valid(row_idx),
+                "null mismatch at column {col_name} row {row_idx}"
+            );
+            if actual_col.is_valid(row_idx) {
+                let actual_value = actual_col.value(row_idx);
+                let expected_value = expected_col.value(row_idx);
+                assert!(
+                    actual_value.to_bits() == expected_value.to_bits(),
+                    "bit mismatch at column {col_name} row {row_idx}: 
expected={expected_value} actual={actual_value}"
+                );
+            }
+        }
+    }
+}
+
+fn alp_schema() -> Arc<Schema> {
+    Arc::new(Schema::new(vec![
+        Field::new("value1", DataType::Float32, true),
+        Field::new("value2", DataType::Float32, true),
+        Field::new("value3", DataType::Float32, true),
+        Field::new("value4", DataType::Float32, true),
+    ]))
+}
+
+fn read_parquet_batch(path: &PathBuf) -> RecordBatch {
+    let file = File::open(path).unwrap();
+    let reader = ArrowReaderBuilder::try_new(file).unwrap().build().unwrap();
+    let mut batches = Vec::new();
+    for batch in reader {
+        batches.push(batch.unwrap());
+    }
+    assert!(!batches.is_empty(), "expected non-empty parquet batch set");
+    concat_batches(batches[0].schema_ref(), &batches).unwrap()
+}
+
+fn read_expected_csv_batch(path: &PathBuf) -> RecordBatch {
+    let file = File::open(path).unwrap();

Review Comment:
   We could probably use the arrow-csv reader here: 
https://docs.rs/arrow-csv/latest/arrow_csv/reader/index.html



##########
parquet/src/encodings/decoding/alp.rs:
##########
@@ -0,0 +1,1524 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use bytes::Bytes;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::encodings::decoding::Decoder;
+use crate::errors::{ParquetError, Result};
+use crate::util::bit_util::{BitReader, FromBytes};
+
+const ALP_HEADER_SIZE: usize = 7;
+const ALP_COMPRESSION_MODE: u8 = 0;
+const ALP_INTEGER_ENCODING_FOR_BIT_PACK: u8 = 0;
+const ALP_MIN_LOG_VECTOR_SIZE: u8 = 3;
+const ALP_MAX_LOG_VECTOR_SIZE: u8 = 15;
+const ALP_MAX_EXPONENT_F32: u8 = 10;
+const ALP_MAX_EXPONENT_F64: u8 = 18;
+
+/// Page-level ALP header (7 bytes).
+///
+/// Layout in bytes:

Review Comment:
   There is some nice ASCII art in @prtkgaur format spec PR that we can pull 
eventually 
   -  https://github.com/apache/parquet-format/pull/557 
   
   For example
   
   ```
   
+-------------+-----------------------------+--------------------------------------+
   |   Header    |        Offset Array         |            Vector Data         
      |
   |  (7 bytes)  |   (num_vectors * 4 bytes)   |            (variable)          
      |
   
+-------------+------+------+-----+---------+----------+----------+-----+----------+
   | Page Header | off0 | off1 | ... | off N-1 | Vector 0 | Vector 1 | ... | 
Vec N-1  |
   |  (7 bytes)  | (4B) | (4B) |     |  (4B)   |(variable)|(variable)|     
|(variable)|
   
+-------------+------+------+-----+---------+----------+----------+-----+----------+
   ```
   
   And 
   
   ##### Header (7 bytes)
   
   All multi-byte values are stored in little-endian order.
   
   ```
    Byte:    0              1               2              3    4    5    6
          +----------------+---------------+--------------+----+----+----+----+
          | compression    | integer       | log_vector   |     num_elements  |
          | _mode          | _encoding     | _size        |     (int32 LE)    |
          +----------------+---------------+--------------+----+----+----+----+
   ```



##########
parquet/src/encodings/decoding/alp.rs:
##########
@@ -0,0 +1,1524 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use bytes::Bytes;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::encodings::decoding::Decoder;
+use crate::errors::{ParquetError, Result};
+use crate::util::bit_util::{BitReader, FromBytes};
+
+const ALP_HEADER_SIZE: usize = 7;
+const ALP_COMPRESSION_MODE: u8 = 0;
+const ALP_INTEGER_ENCODING_FOR_BIT_PACK: u8 = 0;
+const ALP_MIN_LOG_VECTOR_SIZE: u8 = 3;
+const ALP_MAX_LOG_VECTOR_SIZE: u8 = 15;
+const ALP_MAX_EXPONENT_F32: u8 = 10;
+const ALP_MAX_EXPONENT_F64: u8 = 18;
+
+/// Page-level ALP header (7 bytes).
+///
+/// Layout in bytes:
+/// - `[0]` `compression_mode`
+/// - `[1]` `integer_encoding`
+/// - `[2]` `log_vector_size`
+/// - `[3..7]` `num_elements` (little-endian `i32`)
+#[derive(Debug, Clone, Copy)]
+struct AlpHeader {
+    compression_mode: u8,
+    integer_encoding: u8,
+    log_vector_size: u8,
+    num_elements: i32,
+}
+
+impl AlpHeader {
+    fn num_elements_usize(&self) -> usize {
+        self.num_elements as usize
+    }
+
+    fn vector_size(&self) -> usize {
+        1usize << self.log_vector_size
+    }
+
+    fn num_vectors(&self) -> usize {
+        if self.num_elements == 0 {
+            0
+        } else {
+            self.num_elements_usize().div_ceil(self.vector_size())
+        }
+    }
+
+    fn vector_num_elements(&self, vector_index: usize) -> u16 {
+        let vector_size = self.vector_size();
+        let num_full_vectors = self.num_elements_usize() / vector_size;
+        let remainder = self.num_elements_usize() % vector_size;
+        if vector_index < num_full_vectors {
+            vector_size as u16
+        } else if vector_index == num_full_vectors && remainder > 0 {
+            remainder as u16
+        } else {
+            0
+        }
+    }
+}
+
+/// Per-vector ALP metadata (4 bytes), equivalent to C++ 
`AlpEncodedVectorInfo`.
+#[derive(Debug, Clone, Copy)]
+struct AlpEncodedVectorInfo {
+    exponent: u8,
+    factor: u8,
+    num_exceptions: u16,
+}
+
+impl AlpEncodedVectorInfo {
+    const STORED_SIZE: usize = 4;
+}
+
+/// Per-vector FOR metadata for exact integer type (`u32` for `f32`, `u64` for 
`f64`).
+#[derive(Debug, Clone, Copy)]
+struct AlpEncodedForVectorInfo<Exact: AlpExact> {
+    frame_of_reference: Exact,
+    bit_width: u8,
+}
+
+impl<Exact: AlpExact> AlpEncodedForVectorInfo<Exact> {
+    fn stored_size() -> usize {
+        Exact::WIDTH + 1
+    }
+
+    fn get_bit_packed_size(&self, num_elements: u16) -> usize {
+        (self.bit_width as usize * num_elements as usize).div_ceil(8)
+    }
+
+    fn get_data_stored_size(&self, num_elements: u16, num_exceptions: u16) -> 
usize {
+        let bit_packed_size = self.get_bit_packed_size(num_elements);
+        bit_packed_size
+            + num_exceptions as usize * std::mem::size_of::<u16>()
+            + num_exceptions as usize * Exact::WIDTH
+    }
+}
+
+/// Parsed view of one vector's metadata and data slices.
+///
+/// `packed_values` is a zero-copy range into page body bytes.
+/// Exception positions/values are copied for straightforward decode handling.
+#[derive(Debug)]
+struct AlpEncodedVectorView<Exact: AlpExact> {
+    num_elements: u16,
+    alp_info: AlpEncodedVectorInfo,
+    for_info: AlpEncodedForVectorInfo<Exact>,
+    packed_values: Range<usize>,
+    exception_positions: Vec<u16>,
+    exception_values: Vec<Exact>,
+}
+
+impl<Exact: AlpExact> AlpEncodedVectorView<Exact> {
+    fn expected_stored_size(&self) -> usize {
+        AlpEncodedVectorInfo::STORED_SIZE
+            + AlpEncodedForVectorInfo::<Exact>::stored_size()
+            + self
+                .for_info
+                .get_data_stored_size(self.num_elements, 
self.alp_info.num_exceptions)
+    }
+}
+
+/// Parsed ALP page layout for one exact integer width (`u32` for float pages,
+/// `u64` for double pages).
+#[derive(Debug)]
+struct AlpPageLayout<Exact: AlpExact> {
+    header: AlpHeader,
+    body: Bytes,
+    vectors: Vec<AlpEncodedVectorView<Exact>>,
+}
+
+/// Exact integer type used by FOR reconstruction.
+///
+/// This mirrors C++:
+/// - `float`  -> `uint32_t`
+/// - `double` -> `uint64_t`
+///
+/// Why unsigned (not `i32`/`i64`)?
+/// - FOR stores non-negative deltas optimized for bitpacking.
+/// - Unsigned arithmetic avoids signed-overflow edge cases in FOR stage.
+/// - Signed interpretation is applied later during decimal reconstruction.
+pub(super) trait AlpExact: Copy + std::fmt::Debug {
+    const WIDTH: usize;
+    type Signed: Copy;
+    fn from_le_slice(slice: &[u8]) -> Self;
+    fn zero() -> Self;
+    fn wrapping_add(self, rhs: Self) -> Self;
+    fn reinterpret_as_signed(self) -> Self::Signed;
+}
+
+impl AlpExact for u32 {
+    const WIDTH: usize = 4;
+    type Signed = i32;
+
+    fn from_le_slice(slice: &[u8]) -> Self {
+        u32::from_le_bytes([slice[0], slice[1], slice[2], slice[3]])
+    }
+
+    fn zero() -> Self {
+        0
+    }
+
+    fn wrapping_add(self, rhs: Self) -> Self {
+        self.wrapping_add(rhs)
+    }
+
+    fn reinterpret_as_signed(self) -> Self::Signed {
+        i32::from_ne_bytes(self.to_ne_bytes())
+    }
+}
+
+impl AlpExact for u64 {
+    const WIDTH: usize = 8;
+    type Signed = i64;
+
+    fn from_le_slice(slice: &[u8]) -> Self {
+        u64::from_le_bytes([
+            slice[0], slice[1], slice[2], slice[3], slice[4], slice[5], 
slice[6], slice[7],
+        ])
+    }
+
+    fn zero() -> Self {
+        0
+    }
+
+    fn wrapping_add(self, rhs: Self) -> Self {
+        self.wrapping_add(rhs)
+    }
+
+    fn reinterpret_as_signed(self) -> Self::Signed {
+        i64::from_ne_bytes(self.to_ne_bytes())
+    }
+}
+
+const ALP_POW10_F32: [f32; 11] = [
+    1.0,
+    10.0,
+    100.0,
+    1000.0,
+    10000.0,
+    100000.0,
+    1000000.0,
+    10000000.0,
+    100000000.0,
+    1000000000.0,
+    10000000000.0,
+];
+
+const ALP_POW10_F64: [f64; 19] = [
+    1.0,
+    10.0,
+    100.0,
+    1000.0,
+    10000.0,
+    100000.0,
+    1000000.0,
+    10000000.0,
+    100000000.0,
+    1000000000.0,
+    10000000000.0,
+    100000000000.0,
+    1000000000000.0,
+    10000000000000.0,
+    100000000000000.0,
+    1000000000000000.0,
+    10000000000000000.0,
+    100000000000000000.0,
+    1000000000000000000.0,
+];
+
+const ALP_NEG_POW10_F32: [f32; 11] = [
+    1.0,
+    0.1,
+    0.01,
+    0.001,
+    0.0001,
+    0.00001,
+    0.000001,
+    0.0000001,
+    0.00000001,
+    0.000000001,
+    0.0000000001,
+];
+
+const ALP_NEG_POW10_F64: [f64; 19] = [
+    1.0,
+    0.1,
+    0.01,
+    0.001,
+    0.0001,
+    0.00001,
+    0.000001,
+    0.0000001,
+    0.00000001,
+    0.000000001,
+    0.0000000001,
+    0.00000000001,
+    0.000000000001,
+    0.0000000000001,
+    0.00000000000001,
+    0.000000000000001,
+    0.0000000000000001,
+    0.00000000000000001,
+    0.000000000000000001,
+];
+
+pub(super) trait AlpFloat: Copy + Default {
+    type Exact: AlpExact + FromBytes;
+    type Scale: Copy;
+
+    /// Precompute vector-level ALP decimal scale constants for:
+    /// `value = (encoded * 10^(factor)) * 10^(-exponent)`.
+    ///
+    /// Preconditions are validated during page parse.
+    fn decode_scale(exponent: u8, factor: u8) -> Self::Scale;
+
+    /// Decode one signed exact integer using a precomputed two-step scale.
+    fn decode_value(signed_encoded: <Self::Exact as AlpExact>::Signed, scale: 
Self::Scale) -> Self;
+
+    fn from_exact_bits(bits: Self::Exact) -> Self;
+}
+
+impl AlpFloat for f32 {
+    type Exact = u32;
+    type Scale = (f32, f32);
+
+    fn decode_scale(exponent: u8, factor: u8) -> Self::Scale {
+        debug_assert!(exponent <= ALP_MAX_EXPONENT_F32);
+        debug_assert!(factor <= exponent);
+        (
+            ALP_POW10_F32[factor as usize],
+            ALP_NEG_POW10_F32[exponent as usize],
+        )
+    }
+
+    fn decode_value(signed_encoded: i32, scale: Self::Scale) -> Self {
+        ((signed_encoded as f32) * scale.0) * scale.1
+    }
+
+    fn from_exact_bits(bits: Self::Exact) -> Self {
+        f32::from_bits(bits)
+    }
+}
+
+impl AlpFloat for f64 {
+    type Exact = u64;
+    type Scale = (f64, f64);
+
+    fn decode_scale(exponent: u8, factor: u8) -> Self::Scale {
+        debug_assert!(exponent <= ALP_MAX_EXPONENT_F64);
+        debug_assert!(factor <= exponent);
+        (
+            ALP_POW10_F64[factor as usize],
+            ALP_NEG_POW10_F64[exponent as usize],
+        )
+    }
+
+    fn decode_value(signed_encoded: i64, scale: Self::Scale) -> Self {
+        ((signed_encoded as f64) * scale.0) * scale.1
+    }
+
+    fn from_exact_bits(bits: Self::Exact) -> Self {
+        f64::from_bits(bits)
+    }
+}
+
+/// Parse and validate a full ALP-encoded page body.
+///
+/// Validation includes:
+/// - header fields/encoding
+/// - non-negative `num_elements`
+/// - offsets bounds + monotonicity
+/// - per-vector metadata/data section lengths
+fn parse_alp_page_layout<Exact: AlpExact>(data: Bytes) -> 
Result<AlpPageLayout<Exact>> {
+    let data_ref = data.as_ref();
+    if data_ref.len() < ALP_HEADER_SIZE {
+        return Err(general_err!(
+            "Invalid ALP page: expected at least {} bytes for header, got {}",
+            ALP_HEADER_SIZE,
+            data_ref.len()
+        ));
+    }
+
+    let header = AlpHeader {
+        compression_mode: data_ref[0],
+        integer_encoding: data_ref[1],
+        log_vector_size: data_ref[2],
+        num_elements: i32::from_le_bytes([data_ref[3], data_ref[4], 
data_ref[5], data_ref[6]]),
+    };
+
+    if header.compression_mode != ALP_COMPRESSION_MODE {
+        return Err(general_err!(
+            "Invalid ALP page: unsupported compression mode {}",
+            header.compression_mode
+        ));
+    }
+
+    if header.integer_encoding != ALP_INTEGER_ENCODING_FOR_BIT_PACK {
+        return Err(general_err!(
+            "Invalid ALP page: unsupported integer encoding {}",
+            header.integer_encoding
+        ));
+    }
+
+    if header.log_vector_size < ALP_MIN_LOG_VECTOR_SIZE {
+        return Err(general_err!(
+            "Invalid ALP page: log_vector_size {} below min {}",
+            header.log_vector_size,
+            ALP_MIN_LOG_VECTOR_SIZE
+        ));
+    }
+
+    if header.log_vector_size > ALP_MAX_LOG_VECTOR_SIZE {
+        return Err(general_err!(
+            "Invalid ALP page: log_vector_size {} exceeds max {}",
+            header.log_vector_size,
+            ALP_MAX_LOG_VECTOR_SIZE
+        ));
+    }
+
+    if header.num_elements < 0 {
+        return Err(general_err!(
+            "Invalid ALP page: num_elements {} must be >= 0",
+            header.num_elements
+        ));
+    }
+
+    let num_vectors = header.num_vectors();
+
+    let offsets_len = num_vectors
+        .checked_mul(std::mem::size_of::<u32>())
+        .ok_or_else(|| general_err!("Invalid ALP page: offsets length 
overflow"))?;
+    let offsets_end = ALP_HEADER_SIZE
+        .checked_add(offsets_len)
+        .ok_or_else(|| general_err!("Invalid ALP page: header + offsets length 
overflow"))?;
+
+    if data_ref.len() < offsets_end {
+        return Err(general_err!(
+            "Invalid ALP page: expected at least {} bytes for {} offsets, got 
{}",
+            offsets_end,
+            num_vectors,
+            data_ref.len()
+        ));
+    }
+
+    let body = data.slice(ALP_HEADER_SIZE..);
+    let body_ref = body.as_ref();
+    let body_len = body_ref.len();
+    let offsets_section_size = num_vectors * std::mem::size_of::<u32>();
+
+    let mut offsets = Vec::with_capacity(num_vectors);
+    for i in 0..num_vectors {
+        let start = ALP_HEADER_SIZE + i * 4;
+        let offset = u32::from_le_bytes([
+            data_ref[start],
+            data_ref[start + 1],
+            data_ref[start + 2],
+            data_ref[start + 3],
+        ]);
+
+        if offset as usize >= body_len {
+            return Err(general_err!(
+                "Invalid ALP page: vector offset {} out of bounds for body 
length {}",
+                offset,
+                body_len
+            ));
+        }
+
+        if (offset as usize) < offsets_section_size {
+            return Err(general_err!(
+                "Invalid ALP page: vector offset {} points into offsets 
section {}",
+                offset,
+                offsets_section_size
+            ));
+        }
+
+        offsets.push(offset);
+    }
+
+    let mut vectors = Vec::with_capacity(num_vectors);
+    let mut expected_next_offset = offsets_section_size;
+    for (vector_idx, vector_offset) in offsets.iter().enumerate() {
+        let vector_start = *vector_offset as usize;
+        if vector_start != expected_next_offset {
+            return Err(general_err!(
+                "Invalid ALP page: vector offset {} at index {} does not match 
expected {}",
+                vector_start,
+                vector_idx,
+                expected_next_offset
+            ));
+        }
+
+        let vector_end = if vector_idx + 1 < offsets.len() {
+            offsets[vector_idx + 1] as usize
+        } else {
+            body_len
+        };
+
+        if vector_end < vector_start {
+            return Err(general_err!(
+                "Invalid ALP page: vector offsets are not monotonic at index 
{}",
+                vector_idx
+            ));
+        }
+
+        let vector_num_elements = header.vector_num_elements(vector_idx);
+        let vector =
+            parse_vector_view::<Exact>(body_ref, vector_start, vector_end, 
vector_num_elements)?;
+        expected_next_offset = vector_start
+            .checked_add(vector.expected_stored_size())
+            .ok_or_else(|| {
+                general_err!("Invalid ALP page: expected next vector offset 
overflow")
+            })?;
+        vectors.push(vector);
+    }
+
+    if expected_next_offset != body_len {
+        return Err(general_err!(
+            "Invalid ALP page: body size {} does not match expected {} 
(offsets + vectors)",
+            body_len,
+            expected_next_offset
+        ));
+    }
+
+    Ok(AlpPageLayout {
+        header,
+        body,
+        vectors,
+    })
+}
+
+/// Parse a single vector section:
+/// `[AlpInfo][ForInfo][PackedValues][ExceptionPositions][ExceptionValues]`.
+fn parse_vector_view<Exact: AlpExact>(
+    body: &[u8],
+    vector_start: usize,
+    vector_end: usize,
+    num_elements: u16,
+) -> Result<AlpEncodedVectorView<Exact>> {
+    let vector_bytes = &body[vector_start..vector_end];
+
+    let metadata_size =
+        AlpEncodedVectorInfo::STORED_SIZE + 
AlpEncodedForVectorInfo::<Exact>::stored_size();
+    if vector_bytes.len() < metadata_size {
+        return Err(general_err!(
+            "Invalid ALP page: vector metadata too short, expected at least {} 
bytes, got {}",
+            metadata_size,
+            vector_bytes.len()
+        ));
+    }
+
+    let alp_info = AlpEncodedVectorInfo {
+        exponent: vector_bytes[0],
+        factor: vector_bytes[1],
+        num_exceptions: u16::from_le_bytes([vector_bytes[2], vector_bytes[3]]),
+    };
+
+    let max_exponent = if Exact::WIDTH == 4 {
+        ALP_MAX_EXPONENT_F32
+    } else {
+        ALP_MAX_EXPONENT_F64
+    };
+
+    if alp_info.exponent > max_exponent {
+        return Err(general_err!(
+            "Invalid ALP page: exponent {} exceeds max {}",
+            alp_info.exponent,
+            max_exponent
+        ));
+    }
+
+    if alp_info.factor > alp_info.exponent {
+        return Err(general_err!(
+            "Invalid ALP page: factor {} exceeds exponent {}",
+            alp_info.factor,
+            alp_info.exponent
+        ));
+    }
+
+    if alp_info.num_exceptions > num_elements {
+        return Err(general_err!(
+            "Invalid ALP page: num_exceptions {} exceeds vector num_elements 
{}",
+            alp_info.num_exceptions,
+            num_elements
+        ));
+    }
+
+    let for_start = AlpEncodedVectorInfo::STORED_SIZE;
+    let for_end = for_start + Exact::WIDTH;
+    let frame_of_reference = 
Exact::from_le_slice(&vector_bytes[for_start..for_end]);
+    let bit_width = vector_bytes[for_end];
+
+    if bit_width as usize > Exact::WIDTH * 8 {
+        return Err(general_err!(
+            "Invalid ALP page: bit width {} exceeds {}",
+            bit_width,
+            Exact::WIDTH * 8
+        ));
+    }
+
+    let for_info = AlpEncodedForVectorInfo::<Exact> {
+        frame_of_reference,
+        bit_width,
+    };
+
+    let data_size = for_info.get_data_stored_size(num_elements, 
alp_info.num_exceptions);
+    let expected_size = metadata_size + data_size;
+    if vector_bytes.len() < expected_size {
+        return Err(general_err!(
+            "Invalid ALP page: vector data too short, expected at least {} 
bytes, got {}",
+            expected_size,
+            vector_bytes.len()
+        ));
+    }
+    if vector_bytes.len() > expected_size {
+        return Err(general_err!(
+            "Invalid ALP page: vector data too long, expected {} bytes, got 
{}",
+            expected_size,
+            vector_bytes.len()
+        ));
+    }
+
+    let data = &vector_bytes[metadata_size..expected_size];
+    let packed_size = for_info.get_bit_packed_size(num_elements);
+    let positions_size = alp_info.num_exceptions as usize * 
std::mem::size_of::<u16>();
+    let values_size = alp_info.num_exceptions as usize * Exact::WIDTH;
+
+    let packed_start = 0;
+    let packed_end = packed_start + packed_size;
+    let positions_start = packed_end;
+    let positions_end = positions_start + positions_size;
+    let values_start = positions_end;
+    let values_end = values_start + values_size;
+
+    let mut exception_positions = Vec::with_capacity(alp_info.num_exceptions 
as usize);
+    for chunk in data[positions_start..positions_end].chunks_exact(2) {
+        let position = u16::from_le_bytes([chunk[0], chunk[1]]);
+        if position >= num_elements {
+            return Err(general_err!(
+                "Invalid ALP page: exception position {} out of bounds for 
vector length {}",
+                position,
+                num_elements
+            ));
+        }
+        exception_positions.push(position);
+    }
+
+    let packed_values =
+        (vector_start + metadata_size + packed_start)..(vector_start + 
metadata_size + packed_end);
+
+    let mut exception_values = Vec::with_capacity(alp_info.num_exceptions as 
usize);
+    for chunk in data[values_start..values_end].chunks_exact(Exact::WIDTH) {
+        exception_values.push(Exact::from_le_slice(chunk));
+    }
+
+    Ok(AlpEncodedVectorView {
+        num_elements,
+        alp_info,
+        for_info,
+        packed_values,
+        exception_positions,
+        exception_values,
+    })
+}
+
+/// Decode bit-packed deltas into exact integers.
+fn bit_unpack_integers<Exact: AlpExact + FromBytes>(
+    packed_values: Bytes,
+    bit_width: u8,
+    num_elements: u16,
+) -> Result<Vec<Exact>> {
+    if bit_width as usize > Exact::WIDTH * 8 {
+        return Err(general_err!(
+            "Invalid ALP page: bit width {} exceeds {}",
+            bit_width,
+            Exact::WIDTH * 8
+        ));
+    }
+
+    if bit_width == 0 {
+        return Ok(vec![Exact::zero(); num_elements as usize]);
+    }
+
+    let mut out = vec![Exact::zero(); num_elements as usize];
+    let mut reader = BitReader::new(packed_values);
+    let read = reader.get_batch::<Exact>(&mut out, bit_width as usize);
+    if read != out.len() {
+        return Err(general_err!(
+            "Invalid ALP page: bit unpack read {} values, expected {}",
+            read,
+            out.len()
+        ));
+    }
+
+    Ok(out)
+}
+
+/// Apply inverse FOR: `decoded = delta + frame_of_reference`.
+fn inverse_for<Exact: AlpExact>(deltas: &mut [Exact], frame_of_reference: 
Exact) {
+    for value in deltas {
+        *value = value.wrapping_add(frame_of_reference);
+    }
+}
+
+/// Decode one vector into output floating values:
+/// bit-unpack -> inverse FOR -> decimal decode -> patch exceptions.
+fn decode_vector_values<Value: AlpFloat>(
+    body: &Bytes,
+    vector: &AlpEncodedVectorView<Value::Exact>,
+) -> Result<Vec<Value>> {
+    let mut exact_values = bit_unpack_integers(
+        body.slice(vector.packed_values.clone()),
+        vector.for_info.bit_width,
+        vector.num_elements,
+    )?;
+    inverse_for(&mut exact_values, vector.for_info.frame_of_reference);
+
+    let scale = Value::decode_scale(vector.alp_info.exponent, 
vector.alp_info.factor);
+
+    let mut out = Vec::with_capacity(vector.num_elements as usize);

Review Comment:
   Eventually it would be faster to pass in the output slice here (rather than 
allocate a vector and then have to copy it on return)



##########
parquet/src/encodings/decoding/alp.rs:
##########
@@ -0,0 +1,1524 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use bytes::Bytes;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::encodings::decoding::Decoder;
+use crate::errors::{ParquetError, Result};
+use crate::util::bit_util::{BitReader, FromBytes};
+
+const ALP_HEADER_SIZE: usize = 7;
+const ALP_COMPRESSION_MODE: u8 = 0;
+const ALP_INTEGER_ENCODING_FOR_BIT_PACK: u8 = 0;
+const ALP_MIN_LOG_VECTOR_SIZE: u8 = 3;
+const ALP_MAX_LOG_VECTOR_SIZE: u8 = 15;
+const ALP_MAX_EXPONENT_F32: u8 = 10;
+const ALP_MAX_EXPONENT_F64: u8 = 18;
+
+/// Page-level ALP header (7 bytes).
+///
+/// Layout in bytes:
+/// - `[0]` `compression_mode`
+/// - `[1]` `integer_encoding`
+/// - `[2]` `log_vector_size`
+/// - `[3..7]` `num_elements` (little-endian `i32`)
+#[derive(Debug, Clone, Copy)]
+struct AlpHeader {
+    compression_mode: u8,
+    integer_encoding: u8,
+    log_vector_size: u8,
+    num_elements: i32,
+}
+
+impl AlpHeader {
+    fn num_elements_usize(&self) -> usize {
+        self.num_elements as usize
+    }
+
+    fn vector_size(&self) -> usize {
+        1usize << self.log_vector_size
+    }
+
+    fn num_vectors(&self) -> usize {
+        if self.num_elements == 0 {
+            0
+        } else {
+            self.num_elements_usize().div_ceil(self.vector_size())
+        }
+    }
+
+    fn vector_num_elements(&self, vector_index: usize) -> u16 {
+        let vector_size = self.vector_size();
+        let num_full_vectors = self.num_elements_usize() / vector_size;
+        let remainder = self.num_elements_usize() % vector_size;
+        if vector_index < num_full_vectors {
+            vector_size as u16
+        } else if vector_index == num_full_vectors && remainder > 0 {
+            remainder as u16
+        } else {
+            0
+        }
+    }
+}
+
+/// Per-vector ALP metadata (4 bytes), equivalent to C++ 
`AlpEncodedVectorInfo`.
+#[derive(Debug, Clone, Copy)]
+struct AlpEncodedVectorInfo {
+    exponent: u8,
+    factor: u8,
+    num_exceptions: u16,
+}
+
+impl AlpEncodedVectorInfo {
+    const STORED_SIZE: usize = 4;
+}
+
+/// Per-vector FOR metadata for exact integer type (`u32` for `f32`, `u64` for 
`f64`).
+#[derive(Debug, Clone, Copy)]
+struct AlpEncodedForVectorInfo<Exact: AlpExact> {
+    frame_of_reference: Exact,
+    bit_width: u8,
+}
+
+impl<Exact: AlpExact> AlpEncodedForVectorInfo<Exact> {
+    fn stored_size() -> usize {
+        Exact::WIDTH + 1
+    }
+
+    fn get_bit_packed_size(&self, num_elements: u16) -> usize {
+        (self.bit_width as usize * num_elements as usize).div_ceil(8)
+    }
+
+    fn get_data_stored_size(&self, num_elements: u16, num_exceptions: u16) -> 
usize {
+        let bit_packed_size = self.get_bit_packed_size(num_elements);
+        bit_packed_size
+            + num_exceptions as usize * std::mem::size_of::<u16>()
+            + num_exceptions as usize * Exact::WIDTH
+    }
+}
+
+/// Parsed view of one vector's metadata and data slices.
+///
+/// `packed_values` is a zero-copy range into page body bytes.
+/// Exception positions/values are copied for straightforward decode handling.

Review Comment:
   I think the copy (and allocation required for Vec) would be nice to avoid. 
For example, we could make this thing a view (`AlpEncodedVectorView<'a>) maybe:
   
   ```rust
   struct AlpEncodedVectorView<'a, Exact: AlpExact> {
       num_elements: u16,
       alp_info: AlpEncodedVectorInfo,
       for_info: AlpEncodedForVectorInfo<Exact>,
       packed_values: &'a [usize], // just slice
       exception_positions: &'a [u16], // just raw slice
       exception_values: &'a[Exact],
   }
   ```
   
   This could be a follow on performance optimizeation, no need to do it now
   
   Looks like maybe given the representation below (`Bytes`) perhaps keeping 
offsets here might be another good way
   
   ```
   struct AlpEncodedVectorView<'a, Exact: AlpExact> {
       num_elements: u16,
       alp_info: AlpEncodedVectorInfo,
       for_info: AlpEncodedForVectorInfo<Exact>,
       packed_values: usize, // offset to values
       exception_positions: usize, // offset to positions
       exception_values: usize, // offset to exception values
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to