emkornfield commented on code in PR #48345:
URL: https://github.com/apache/arrow/pull/48345#discussion_r3411678642


##########
cpp/src/arrow/util/alp/alp.h:
##########
@@ -0,0 +1,795 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Adaptive Lossless floating-Point (ALP) compression implementation
+
+#pragma once
+
+#include <optional>
+#include <vector>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/alp/alp_constants.h"
+#include "arrow/util/span.h"
+
+namespace arrow {
+namespace util {
+namespace alp {
+
+// ----------------------------------------------------------------------
+// ALP Overview
+//
+// IMPORTANT: For abstract interfaces or examples how to use ALP, consult
+// alp_codec.h.
+// This is our implementation of the adaptive lossless floating-point
+// compression for decimals (ALP) (https://dl.acm.org/doi/10.1145/3626717).
+// It works by converting a float into a decimal (if possible). The exponent
+// and factor are chosen per vector. Each float is converted using
+// c(f) = int64(f * 10^exponent * 10^-factor). The converted floats are then
+// encoded via a delta frame of reference and bitpacked. Every exception,
+// where the conversion/reconversion changes the value of the float, is stored
+// separately and has to be patched into the decompressed vector afterwards.
+//
+// ==========================================================================
+//                    ALP COMPRESSION/DECOMPRESSION PIPELINE
+// ==========================================================================
+//
+// COMPRESSION FLOW:
+// -----------------
+//
+//   Input: float/double array
+//        |
+//        v
+//   +------------------------------------------------------------------+
+//   | 1. SAMPLING & PRESET GENERATION                                  |
+//   |    * Sample vectors from dataset                                 |
+//   |    * Try all exponent/factor combinations (e, f)                 |
+//   |    * Select best k combinations for preset                       |
+//   +------------------------------------+-----------------------------+
+//                                        | preset.combinations
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 2. PER-VECTOR COMPRESSION                                        |
+//   |    a) Find best (e,f) from preset for this vector                |
+//   |    b) Encode: encoded[i] = int64(value[i] * 10^e * 10^-f)        |
+//   |    c) Verify: if decode(encoded[i]) != value[i] -> exception     |
+//   |    d) Replace exceptions with placeholder value                  |
+//   +------------------------------------+-----------------------------+
+//                                        | encoded integers + exceptions
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 3. FRAME OF REFERENCE (FOR)                                      |
+//   |    * Find min value in encoded integers                          |
+//   |    * Subtract min from all values: delta[i] = encoded[i] - min   |
+//   +------------------------------------+-----------------------------+
+//                                        | delta values (smaller range)
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 4. BIT PACKING                                                   |
+//   |    * Calculate bit_width = log2(max_delta)                       |
+//   |    * Pack each value into bit_width bits                         |
+//   |    * Result: tightly packed binary data                          |
+//   +------------------------------------+-----------------------------+
+//                                        | packed bytes
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 5. SERIALIZATION (offset-based interleaved layout)              |
+//   |    [Header][Offsets...][Vector₀][Vector₁]...                    |
+//   |    where each Vector = [AlpInfo|ForInfo|Data]                   |
+//   +------------------------------------------------------------------+
+//
+//
+// DECOMPRESSION FLOW:
+// -------------------
+//
+//   Serialized bytes -> AlpEncodedVector::Load()
+//        |
+//        v
+//   +------------------------------------------------------------------+
+//   | 1. BIT UNPACKING                                                 |
+//   |    * Extract bit_width from metadata                             |
+//   |    * Unpack each value from bit_width bits -> delta values       |
+//   +------------------------------------+-----------------------------+
+//                                        | delta values
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 2. REVERSE FRAME OF REFERENCE (unFOR)                            |
+//   |    * Add back min: encoded[i] = delta[i] + frame_of_reference    |
+//   +------------------------------------+-----------------------------+
+//                                        | encoded integers
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 3. DECODE                                                        |
+//   |    * Apply inverse formula: value[i] = encoded[i] * 10^-e * 10^f |
+//   +------------------------------------+-----------------------------+
+//                                        | decoded floats (with placeholders)
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 4. PATCH EXCEPTIONS                                              |
+//   |    * Replace values at exception_positions[] with exceptions[]   |
+//   +------------------------------------+-----------------------------+
+//                                        |
+//                                        v
+//   Output: Original float/double array (lossless!)
+//
+// ==========================================================================
+
+// ----------------------------------------------------------------------
+// AlpMode
+
+/// \brief ALP compression mode
+///
+/// Currently only ALP (decimal compression) is implemented.
+enum class AlpMode { kAlp = 0 };

Review Comment:
   If this is might be used in serializatio we should consider static asserting 
the size is 1 byte.



##########
cpp/src/arrow/util/alp/alp.h:
##########
@@ -0,0 +1,795 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Adaptive Lossless floating-Point (ALP) compression implementation
+
+#pragma once
+
+#include <optional>
+#include <vector>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/alp/alp_constants.h"
+#include "arrow/util/span.h"
+
+namespace arrow {
+namespace util {
+namespace alp {
+
+// ----------------------------------------------------------------------
+// ALP Overview
+//
+// IMPORTANT: For abstract interfaces or examples how to use ALP, consult
+// alp_codec.h.
+// This is our implementation of the adaptive lossless floating-point
+// compression for decimals (ALP) (https://dl.acm.org/doi/10.1145/3626717).
+// It works by converting a float into a decimal (if possible). The exponent
+// and factor are chosen per vector. Each float is converted using
+// c(f) = int64(f * 10^exponent * 10^-factor). The converted floats are then
+// encoded via a delta frame of reference and bitpacked. Every exception,
+// where the conversion/reconversion changes the value of the float, is stored
+// separately and has to be patched into the decompressed vector afterwards.
+//
+// ==========================================================================
+//                    ALP COMPRESSION/DECOMPRESSION PIPELINE
+// ==========================================================================
+//
+// COMPRESSION FLOW:
+// -----------------
+//
+//   Input: float/double array
+//        |
+//        v
+//   +------------------------------------------------------------------+
+//   | 1. SAMPLING & PRESET GENERATION                                  |
+//   |    * Sample vectors from dataset                                 |
+//   |    * Try all exponent/factor combinations (e, f)                 |
+//   |    * Select best k combinations for preset                       |
+//   +------------------------------------+-----------------------------+
+//                                        | preset.combinations
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 2. PER-VECTOR COMPRESSION                                        |
+//   |    a) Find best (e,f) from preset for this vector                |
+//   |    b) Encode: encoded[i] = int64(value[i] * 10^e * 10^-f)        |
+//   |    c) Verify: if decode(encoded[i]) != value[i] -> exception     |
+//   |    d) Replace exceptions with placeholder value                  |
+//   +------------------------------------+-----------------------------+
+//                                        | encoded integers + exceptions
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 3. FRAME OF REFERENCE (FOR)                                      |
+//   |    * Find min value in encoded integers                          |
+//   |    * Subtract min from all values: delta[i] = encoded[i] - min   |
+//   +------------------------------------+-----------------------------+
+//                                        | delta values (smaller range)
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 4. BIT PACKING                                                   |
+//   |    * Calculate bit_width = log2(max_delta)                       |
+//   |    * Pack each value into bit_width bits                         |
+//   |    * Result: tightly packed binary data                          |
+//   +------------------------------------+-----------------------------+
+//                                        | packed bytes
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 5. SERIALIZATION (offset-based interleaved layout)              |
+//   |    [Header][Offsets...][Vector₀][Vector₁]...                    |
+//   |    where each Vector = [AlpInfo|ForInfo|Data]                   |
+//   +------------------------------------------------------------------+
+//
+//
+// DECOMPRESSION FLOW:
+// -------------------
+//
+//   Serialized bytes -> AlpEncodedVector::Load()
+//        |
+//        v
+//   +------------------------------------------------------------------+
+//   | 1. BIT UNPACKING                                                 |
+//   |    * Extract bit_width from metadata                             |
+//   |    * Unpack each value from bit_width bits -> delta values       |
+//   +------------------------------------+-----------------------------+
+//                                        | delta values
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 2. REVERSE FRAME OF REFERENCE (unFOR)                            |
+//   |    * Add back min: encoded[i] = delta[i] + frame_of_reference    |
+//   +------------------------------------+-----------------------------+
+//                                        | encoded integers
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 3. DECODE                                                        |
+//   |    * Apply inverse formula: value[i] = encoded[i] * 10^-e * 10^f |
+//   +------------------------------------+-----------------------------+
+//                                        | decoded floats (with placeholders)
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 4. PATCH EXCEPTIONS                                              |
+//   |    * Replace values at exception_positions[] with exceptions[]   |
+//   +------------------------------------+-----------------------------+
+//                                        |
+//                                        v
+//   Output: Original float/double array (lossless!)
+//
+// ==========================================================================
+
+// ----------------------------------------------------------------------
+// AlpMode
+
+/// \brief ALP compression mode
+///
+/// Currently only ALP (decimal compression) is implemented.
+enum class AlpMode { kAlp = 0 };
+
+// ----------------------------------------------------------------------
+// AlpExponentAndFactor
+
+/// \brief Helper struct to encapsulate the exponent and factor
+struct AlpExponentAndFactor {
+  uint8_t exponent{0};
+  uint8_t factor{0};
+
+  bool operator==(const AlpExponentAndFactor& other) const {
+    return exponent == other.exponent && factor == other.factor;
+  }
+
+  /// \brief Comparison operator for deterministic std::map ordering
+  bool operator<(const AlpExponentAndFactor& other) const {
+    if (exponent != other.exponent) return exponent < other.exponent;
+    return factor < other.factor;
+  }
+};
+
+// ----------------------------------------------------------------------
+// AlpEncodedVectorInfo (non-templated, ALP core metadata)
+
+/// \brief ALP-specific metadata for an encoded vector (non-templated)
+///
+/// Contains the metadata specific to ALP's float-to-integer conversion:
+///   - exponent/factor: parameters for decimal encoding
+///   - num_exceptions: count of values that couldn't be losslessly encoded
+///
+/// This struct is the same size regardless of the floating-point type 
(float/double).
+/// It is separate from the integer encoding metadata (e.g., FOR) to allow
+/// different integer encodings to be used in the future.
+///
+/// Serialization format (4 bytes):
+///
+///   +------------------------------------------+
+///   |  AlpEncodedVectorInfo (4 bytes)          |
+///   +------------------------------------------+
+///   |  Offset |  Field              |  Size    |
+///   +---------+---------------------+----------+
+///   |    0    |  exponent (uint8_t) |  1 byte  |
+///   |    1    |  factor (uint8_t)   |  1 byte  |
+///   |    2    |  num_exceptions     |  2 bytes |
+///   +------------------------------------------+
+class AlpEncodedVectorInfo {
+ public:
+  AlpEncodedVectorInfo() = default;
+  AlpEncodedVectorInfo(uint8_t exponent, uint8_t factor, int16_t 
num_exceptions)
+      : exponent_(exponent), factor_(factor), num_exceptions_(num_exceptions) 
{}
+
+  uint8_t exponent() const { return exponent_; }
+  uint8_t factor() const { return factor_; }
+  int16_t num_exceptions() const { return num_exceptions_; }
+
+  void set_exponent(uint8_t exponent) { exponent_ = exponent; }
+  void set_factor(uint8_t factor) { factor_ = factor; }
+  void set_num_exceptions(int16_t num_exceptions) { num_exceptions_ = 
num_exceptions; }
+
+  /// Size of the serialized portion (4 bytes, fixed)
+  static constexpr int64_t kStoredSize =
+      sizeof(uint8_t) + sizeof(uint8_t) + sizeof(int16_t);

Review Comment:
   ```suggestion
         sizeof(exponent_) + sizeof(factor_) + sizeof(num_exceptions_);
   ```
   If this doesn't work compile, I'm not sure the static_assert helps too much 
because we would need to keep it in sync with the members/serailziation.



##########
cpp/src/arrow/util/alp/alp.h:
##########
@@ -0,0 +1,795 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Adaptive Lossless floating-Point (ALP) compression implementation
+
+#pragma once
+
+#include <optional>
+#include <vector>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/alp/alp_constants.h"
+#include "arrow/util/span.h"
+
+namespace arrow {
+namespace util {
+namespace alp {
+
+// ----------------------------------------------------------------------
+// ALP Overview
+//
+// IMPORTANT: For abstract interfaces or examples how to use ALP, consult
+// alp_codec.h.
+// This is our implementation of the adaptive lossless floating-point
+// compression for decimals (ALP) (https://dl.acm.org/doi/10.1145/3626717).
+// It works by converting a float into a decimal (if possible). The exponent
+// and factor are chosen per vector. Each float is converted using
+// c(f) = int64(f * 10^exponent * 10^-factor). The converted floats are then
+// encoded via a delta frame of reference and bitpacked. Every exception,
+// where the conversion/reconversion changes the value of the float, is stored
+// separately and has to be patched into the decompressed vector afterwards.
+//
+// ==========================================================================
+//                    ALP COMPRESSION/DECOMPRESSION PIPELINE
+// ==========================================================================
+//
+// COMPRESSION FLOW:
+// -----------------
+//
+//   Input: float/double array
+//        |
+//        v
+//   +------------------------------------------------------------------+
+//   | 1. SAMPLING & PRESET GENERATION                                  |
+//   |    * Sample vectors from dataset                                 |
+//   |    * Try all exponent/factor combinations (e, f)                 |
+//   |    * Select best k combinations for preset                       |
+//   +------------------------------------+-----------------------------+
+//                                        | preset.combinations
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 2. PER-VECTOR COMPRESSION                                        |
+//   |    a) Find best (e,f) from preset for this vector                |
+//   |    b) Encode: encoded[i] = int64(value[i] * 10^e * 10^-f)        |
+//   |    c) Verify: if decode(encoded[i]) != value[i] -> exception     |
+//   |    d) Replace exceptions with placeholder value                  |
+//   +------------------------------------+-----------------------------+
+//                                        | encoded integers + exceptions
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 3. FRAME OF REFERENCE (FOR)                                      |
+//   |    * Find min value in encoded integers                          |
+//   |    * Subtract min from all values: delta[i] = encoded[i] - min   |
+//   +------------------------------------+-----------------------------+
+//                                        | delta values (smaller range)
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 4. BIT PACKING                                                   |
+//   |    * Calculate bit_width = log2(max_delta)                       |
+//   |    * Pack each value into bit_width bits                         |
+//   |    * Result: tightly packed binary data                          |
+//   +------------------------------------+-----------------------------+
+//                                        | packed bytes
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 5. SERIALIZATION (offset-based interleaved layout)              |
+//   |    [Header][Offsets...][Vector₀][Vector₁]...                    |
+//   |    where each Vector = [AlpInfo|ForInfo|Data]                   |
+//   +------------------------------------------------------------------+
+//
+//
+// DECOMPRESSION FLOW:
+// -------------------
+//
+//   Serialized bytes -> AlpEncodedVector::Load()
+//        |
+//        v
+//   +------------------------------------------------------------------+
+//   | 1. BIT UNPACKING                                                 |
+//   |    * Extract bit_width from metadata                             |
+//   |    * Unpack each value from bit_width bits -> delta values       |
+//   +------------------------------------+-----------------------------+
+//                                        | delta values
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 2. REVERSE FRAME OF REFERENCE (unFOR)                            |
+//   |    * Add back min: encoded[i] = delta[i] + frame_of_reference    |
+//   +------------------------------------+-----------------------------+
+//                                        | encoded integers
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 3. DECODE                                                        |
+//   |    * Apply inverse formula: value[i] = encoded[i] * 10^-e * 10^f |
+//   +------------------------------------+-----------------------------+
+//                                        | decoded floats (with placeholders)
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 4. PATCH EXCEPTIONS                                              |
+//   |    * Replace values at exception_positions[] with exceptions[]   |
+//   +------------------------------------+-----------------------------+
+//                                        |
+//                                        v
+//   Output: Original float/double array (lossless!)
+//
+// ==========================================================================
+
+// ----------------------------------------------------------------------
+// AlpMode
+
+/// \brief ALP compression mode
+///
+/// Currently only ALP (decimal compression) is implemented.
+enum class AlpMode { kAlp = 0 };
+
+// ----------------------------------------------------------------------
+// AlpExponentAndFactor
+
+/// \brief Helper struct to encapsulate the exponent and factor
+struct AlpExponentAndFactor {
+  uint8_t exponent{0};
+  uint8_t factor{0};
+
+  bool operator==(const AlpExponentAndFactor& other) const {
+    return exponent == other.exponent && factor == other.factor;
+  }
+
+  /// \brief Comparison operator for deterministic std::map ordering
+  bool operator<(const AlpExponentAndFactor& other) const {
+    if (exponent != other.exponent) return exponent < other.exponent;
+    return factor < other.factor;
+  }
+};
+
+// ----------------------------------------------------------------------
+// AlpEncodedVectorInfo (non-templated, ALP core metadata)
+
+/// \brief ALP-specific metadata for an encoded vector (non-templated)

Review Comment:
   ```suggestion
   /// \brief ALP-specific metadata for an encoded vector 
   ```



##########
cpp/src/parquet/encoding_alp_benchmark.cc:
##########
@@ -0,0 +1,1824 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <cmath>
+#include <cstring>
+#include <filesystem>
+#include <fstream>
+#include <iostream>
+#include <memory>
+#include <random>
+#include <sstream>
+#include <unistd.h>
+#include <unordered_set>
+#include <vector>
+
+#include <benchmark/benchmark.h>
+
+#include "arrow/buffer.h"
+#include "arrow/util/alp/alp_codec.h"
+#include "arrow/util/compression.h"
+#include "parquet/encoding.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+// This file benchmarks multiple encoding schemes for floating point values in
+// Parquet. Structure mirrors Snowflake's FloatComprBenchmark.cpp
+//
+// It evaluates:
+// 1) Compression Ratio
+// 2) Encoding Speed
+// 3) Decoding Speed
+//
+// Encoding schemes:
+// 1) ALP encoding
+// 2) ByteStreamSplit encoding
+// 3) ZSTD compression
+//
+// On synthetic datasets:
+// 1) Constant Value
+// 2) Increasing values
+// 3) Small Range decimal
+// 4) Range decimal
+// 5) Large Range decimal
+// 6) Random values
+//
+// And real-world datasets:
+// 1) floatingpoint_spotify1.csv (9 columns)
+// 2) floatingpoint_spotify2.csv (9 columns)
+// 3) floatingpoint_citytemperature.csv (1 column)
+// 4) floatingpoint_poi.csv (2 columns)
+// 5) floatingpoint_birdmigration.csv (1 column)
+// 6) floatingpoint_commongovernment.csv (3 columns)
+// 7) floatingpoint_arade.csv (4 columns)
+// 8) floatingpoint_num_brain.csv (1 column)
+// 9) floatingpoint_num_comet.csv (1 column)
+// 10) floatingpoint_num_control.csv (1 column)
+// 11) floatingpoint_num_plasma.csv (1 column)
+// 12) floatingpoint_obs_error.csv (1 column)
+// 13) floatingpoint_obs_info.csv (1 column)
+// 14) floatingpoint_obs_spitzer.csv (1 column)
+// 15) floatingpoint_obs_temp.csv (1 column)
+// 16) floatingpoint_msg_bt.csv (1 column)
+// 17) floatingpoint_msg_lu.csv (1 column)
+// 18) floatingpoint_msg_sp.csv (1 column)
+// 19) floatingpoint_msg_sppm.csv (1 column)
+// 20) floatingpoint_msg_sweep3d.csv (1 column)
+
+namespace parquet {
+
+using schema::PrimitiveNode;
+
+// Helper function matching Snowflake's pow10
+constexpr uint64_t Pow10(uint64_t exp) {
+  uint64_t result = 1;
+  for (uint64_t i = 0; i < exp; ++i) {
+    result *= 10;
+  }
+  return result;
+}
+
+// Encoding type enum (matching Snowflake's ComprEngine pattern)

Review Comment:
   Snowflake?



##########
cpp/src/parquet/encoder.cc:
##########
@@ -997,6 +999,104 @@ class ByteStreamSplitEncoder<FLBAType> : public 
ByteStreamSplitEncoderBase<FLBAT
   }
 };
 
+// ----------------------------------------------------------------------
+// ALP encoder (Adaptive Lossless floating-Point)
+
+template <typename DType>
+class AlpEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using ArrowType = typename EncodingTraits<DType>::ArrowType;
+  using TypedEncoder<DType>::Put;
+
+  explicit AlpEncoder(const ColumnDescriptor* descr,
+                      ::arrow::MemoryPool* pool = 
::arrow::default_memory_pool(),
+                      int32_t vector_size = 
::arrow::util::alp::AlpConstants::kAlpVectorSize)
+      : EncoderImpl(descr, Encoding::ALP, pool),
+        sink_{pool},
+        vector_size_(vector_size) {
+    static_assert(std::is_same<T, float>::value || std::is_same<T, 
double>::value,
+                  "ALP only supports float and double types");
+    if (vector_size_ <= 0 || (vector_size_ & (vector_size_ - 1)) != 0) {
+      throw ParquetException("ALP vector_size must be a positive power of 2, 
got " +
+                             std::to_string(vector_size_));
+    }
+    if (vector_size_ > (1 << 
::arrow::util::alp::AlpConstants::kMaxLogVectorSize)) {
+      throw ParquetException("ALP vector_size exceeds maximum " +
+                             std::to_string(1 << 
::arrow::util::alp::AlpConstants::kMaxLogVectorSize) +
+                             ", got " + std::to_string(vector_size_));
+    }
+  }
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  std::shared_ptr<Buffer> FlushValues() override {
+    if (sink_.length() == 0) {
+      // Empty buffer case
+      PARQUET_ASSIGN_OR_THROW(auto buf, sink_.Finish());
+      return buf;
+    }
+
+    // Call AlpCodec::Encode() - it handles sampling, preset selection, and 
compression
+    const int64_t numElements = sink_.length() / 
static_cast<int64_t>(sizeof(T));

Review Comment:
   nit: num_elements.
   ```suggestion
       const int64_t num_elements = sink_.length() / 
static_cast<int64_t>(sizeof(T));
   ```



##########
cpp/src/parquet/encoding_alp_benchmark.cc:
##########
@@ -0,0 +1,1824 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <cmath>
+#include <cstring>
+#include <filesystem>
+#include <fstream>
+#include <iostream>
+#include <memory>
+#include <random>
+#include <sstream>
+#include <unistd.h>
+#include <unordered_set>
+#include <vector>
+
+#include <benchmark/benchmark.h>
+
+#include "arrow/buffer.h"
+#include "arrow/util/alp/alp_codec.h"
+#include "arrow/util/compression.h"
+#include "parquet/encoding.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+// This file benchmarks multiple encoding schemes for floating point values in
+// Parquet. Structure mirrors Snowflake's FloatComprBenchmark.cpp
+//
+// It evaluates:
+// 1) Compression Ratio
+// 2) Encoding Speed
+// 3) Decoding Speed
+//
+// Encoding schemes:
+// 1) ALP encoding
+// 2) ByteStreamSplit encoding
+// 3) ZSTD compression
+//
+// On synthetic datasets:
+// 1) Constant Value
+// 2) Increasing values
+// 3) Small Range decimal
+// 4) Range decimal
+// 5) Large Range decimal
+// 6) Random values
+//
+// And real-world datasets:
+// 1) floatingpoint_spotify1.csv (9 columns)
+// 2) floatingpoint_spotify2.csv (9 columns)
+// 3) floatingpoint_citytemperature.csv (1 column)
+// 4) floatingpoint_poi.csv (2 columns)
+// 5) floatingpoint_birdmigration.csv (1 column)
+// 6) floatingpoint_commongovernment.csv (3 columns)
+// 7) floatingpoint_arade.csv (4 columns)
+// 8) floatingpoint_num_brain.csv (1 column)
+// 9) floatingpoint_num_comet.csv (1 column)
+// 10) floatingpoint_num_control.csv (1 column)
+// 11) floatingpoint_num_plasma.csv (1 column)
+// 12) floatingpoint_obs_error.csv (1 column)
+// 13) floatingpoint_obs_info.csv (1 column)
+// 14) floatingpoint_obs_spitzer.csv (1 column)
+// 15) floatingpoint_obs_temp.csv (1 column)
+// 16) floatingpoint_msg_bt.csv (1 column)
+// 17) floatingpoint_msg_lu.csv (1 column)
+// 18) floatingpoint_msg_sp.csv (1 column)
+// 19) floatingpoint_msg_sppm.csv (1 column)
+// 20) floatingpoint_msg_sweep3d.csv (1 column)
+
+namespace parquet {
+
+using schema::PrimitiveNode;
+
+// Helper function matching Snowflake's pow10
+constexpr uint64_t Pow10(uint64_t exp) {
+  uint64_t result = 1;
+  for (uint64_t i = 0; i < exp; ++i) {
+    result *= 10;
+  }
+  return result;
+}
+
+// Encoding type enum (matching Snowflake's ComprEngine pattern)
+enum class EncodingType {
+  kALP,
+  kByteStreamSplit,
+  kZSTD,
+};
+
+// Helper to create column descriptor for float/double
+template <typename DType>
+std::shared_ptr<ColumnDescriptor> MakeColumnDescriptor() {
+  auto node = PrimitiveNode::Make("column", Repetition::REQUIRED, 
DType::type_num);
+  return std::make_shared<ColumnDescriptor>(node, false, false);
+}
+
+// ============================================================================
+// Benchmark data base class
+// ============================================================================
+
+/// \brief Helper class to set up encoding benchmark data.
+///
+/// Matches Snowflake's RealComprBenchmarkData<T> structure with encoding 
parameter.
+template <typename T>
+struct RealComprBenchmarkData {
+  std::vector<T> input_uncompressed;
+  std::shared_ptr<Buffer> encoded_data;
+  std::vector<T> output_uncompressed;
+  uint64_t encoded_size = 0;
+  Encoding::type current_encoding;
+  std::unique_ptr<::arrow::util::Codec> codec;  // For ZSTD
+
+  virtual ~RealComprBenchmarkData() = default;
+
+  void PrepareBenchmarkData(uint64_t element_count, EncodingType 
encoding_type) {
+    FillUncompressedInput(element_count);
+
+    using DType =
+        typename std::conditional<std::is_same<T, float>::value, FloatType,
+                                  DoubleType>::type;
+    auto descr = MakeColumnDescriptor<DType>();
+
+    // Select encoding based on type
+    switch (encoding_type) {
+      case EncodingType::kALP:
+        current_encoding = Encoding::ALP;
+        break;
+      case EncodingType::kByteStreamSplit:
+        current_encoding = Encoding::BYTE_STREAM_SPLIT;
+        codec = 
::arrow::util::Codec::Create(::arrow::Compression::ZSTD).ValueOrDie();
+        break;
+      case EncodingType::kZSTD:
+        // ZSTD uses PLAIN encoding + compression
+        current_encoding = Encoding::PLAIN;
+        codec = 
::arrow::util::Codec::Create(::arrow::Compression::ZSTD).ValueOrDie();
+        break;
+    }
+
+    // Do initial encoding to size buffers
+    if (encoding_type == EncodingType::kALP) {
+      auto encoder = MakeTypedEncoder<DType>(Encoding::ALP, false, 
descr.get());
+      encoder->Put(input_uncompressed.data(),
+                   static_cast<int>(input_uncompressed.size()));
+      encoded_data = encoder->FlushValues();
+      encoded_size = encoded_data->size();
+    } else if (encoding_type == EncodingType::kZSTD) {
+      // For ZSTD: Plain encode then compress
+      auto encoder = MakeTypedEncoder<DType>(Encoding::PLAIN, false, 
descr.get());
+      encoder->Put(input_uncompressed.data(),
+                   static_cast<int>(input_uncompressed.size()));
+      auto plain_data = encoder->FlushValues();
+
+      // Compress with ZSTD - use AllocateBuffer to properly manage memory
+      int64_t max_compressed_len =
+          codec->MaxCompressedLen(plain_data->size(), plain_data->data());
+      auto compressed_buffer =
+          ::arrow::AllocateResizableBuffer(max_compressed_len).ValueOrDie();
+      int64_t actual_size =
+          codec
+              ->Compress(plain_data->size(), plain_data->data(), 
max_compressed_len,
+                         compressed_buffer->mutable_data())
+              .ValueOrDie();
+      // Resize to actual compressed size and move to shared_ptr
+      (void)compressed_buffer->Resize(actual_size);  // Resize can't fail for 
shrinking
+      encoded_data = std::shared_ptr<Buffer>(std::move(compressed_buffer));
+      encoded_size = actual_size;
+    } else {
+      // For ByteStreamSplit: Direct encoding
+      auto encoder = MakeTypedEncoder<DType>(current_encoding, false, 
descr.get());
+      encoder->Put(input_uncompressed.data(),
+                   static_cast<int>(input_uncompressed.size()));
+      auto byte_stream_split_data = encoder->FlushValues();
+      // Compress with ZSTD - use AllocateBuffer to properly manage memory
+      int64_t max_compressed_len = codec->MaxCompressedLen(
+          byte_stream_split_data->size(), byte_stream_split_data->data());
+      auto compressed_buffer =
+          ::arrow::AllocateResizableBuffer(max_compressed_len).ValueOrDie();
+      int64_t actual_size =
+          codec
+              ->Compress(byte_stream_split_data->size(), 
byte_stream_split_data->data(),
+                         max_compressed_len, compressed_buffer->mutable_data())
+              .ValueOrDie();
+      // Resize to actual compressed size and move to shared_ptr
+      (void)compressed_buffer->Resize(actual_size);  // Resize can't fail for 
shrinking
+      encoded_data = std::shared_ptr<Buffer>(std::move(compressed_buffer));
+      encoded_size = actual_size;
+    }
+
+    // Prepare output buffer
+    output_uncompressed.resize(input_uncompressed.size());
+  }
+
+  virtual void FillUncompressedInput(uint64_t element_count) = 0;
+};
+
+// ============================================================================
+// Synthetic Data Generators
+// ============================================================================
+
+template <typename T>
+struct ConstantValues : public RealComprBenchmarkData<T> {
+  void FillUncompressedInput(uint64_t element_count) override {
+    const T value = static_cast<T>(1.1);
+    this->input_uncompressed = std::vector<T>(element_count, value);
+  }
+};
+
+template <typename T>
+struct IncreasingValues : public RealComprBenchmarkData<T> {
+  void FillUncompressedInput(uint64_t element_count) override {
+    this->input_uncompressed.resize(element_count);
+    T current_value = 0.0;
+    for (uint64_t i = 0; i < element_count; i++) {
+      this->input_uncompressed[i] = current_value;
+      current_value += 1.0;
+    }
+  }
+};
+
+template <typename T>
+struct DecimalSmallRange : public RealComprBenchmarkData<T> {
+  void FillUncompressedInput(uint64_t element_count) override {
+    this->input_uncompressed.resize(element_count);
+    const uint64_t min_val = 100;
+    const uint64_t max_val = 1000;
+    const uint64_t decimal_places = 2;
+    const uint64_t mult = Pow10(decimal_places);
+
+    std::uniform_int_distribution<uint64_t> unif(min_val * mult, max_val * 
mult);
+    std::default_random_engine re;
+    for (uint64_t i = 0; i < element_count; i++) {
+      this->input_uncompressed[i] = unif(re) * 1.0 / mult;
+    }
+  }
+};
+
+template <typename T>
+struct DecimalRange : public RealComprBenchmarkData<T> {
+  void FillUncompressedInput(uint64_t element_count) override {
+    this->input_uncompressed.resize(element_count);
+    const uint64_t min_val = 1000;
+    const uint64_t max_val = 100000;
+    const uint64_t decimal_places = 6;
+    const uint64_t mult = Pow10(decimal_places);
+
+    std::uniform_int_distribution<uint64_t> unif(min_val * mult, max_val * 
mult);
+    std::default_random_engine re;
+    for (uint64_t i = 0; i < element_count; i++) {
+      this->input_uncompressed[i] = unif(re) * 1.0 / mult;
+    }
+  }
+};
+
+template <typename T>
+struct DecimalLargeRange : public RealComprBenchmarkData<T> {
+  void FillUncompressedInput(uint64_t element_count) override {
+    this->input_uncompressed.resize(element_count);
+    const uint64_t min_val = 1000;
+    const uint64_t max_val = 1000000;
+    const uint64_t decimal_places = 6;
+    const uint64_t mult = Pow10(decimal_places);
+
+    std::uniform_int_distribution<uint64_t> unif(min_val * mult, max_val * 
mult);
+    std::default_random_engine re;
+    for (uint64_t i = 0; i < element_count; i++) {
+      this->input_uncompressed[i] = unif(re) * 1.0 / mult;
+    }
+  }
+};
+
+template <typename T>
+struct RandomValues : public RealComprBenchmarkData<T> {
+  void FillUncompressedInput(uint64_t element_count) override {
+    this->input_uncompressed.resize(element_count);
+    std::uniform_real_distribution<T> unif(std::numeric_limits<T>::min(),
+                                           std::numeric_limits<T>::max());
+    std::default_random_engine re;
+    for (uint64_t i = 0; i < element_count; i++) {
+      this->input_uncompressed[i] = unif(re);
+    }
+  }
+};
+
+// ============================================================================
+// CSV Loading Infrastructure (for real-world datasets)
+// ============================================================================
+
+// Extract tarball once and return the data directory path
+std::string GetDataDirectory() {
+  static std::string data_dir;
+  static bool initialized = false;
+
+  if (!initialized) {
+    // Find the tarball location relative to this source file
+    std::string tarball_path = std::string(__FILE__);
+    tarball_path = tarball_path.substr(0, tarball_path.find_last_of("/\\"));
+    tarball_path = tarball_path.substr(0, tarball_path.find_last_of("/\\"));
+
+    tarball_path += 
"/../submodules/parquet-testing/data/floatingpoint_data.tar.gz";
+
+    // Use a fixed extraction directory that can be reused across runs
+    data_dir = "/tmp/parquet_alp_benchmark_data";
+
+    // Check if tarball exists
+    std::ifstream tarball_check(tarball_path);
+    if (!tarball_check.good()) {
+      // Fall back to original directory if tarball not found
+      data_dir = std::string(__FILE__);
+      data_dir = data_dir.substr(0, data_dir.find_last_of("/\\"));
+      data_dir = data_dir.substr(0, data_dir.find_last_of("/\\"));
+      data_dir += "/../submodules/parquet-testing/data";
+      initialized = true;
+      return data_dir;
+    }
+
+    // Check if extraction directory already exists and has files
+    std::ifstream check_file(data_dir + "/floatingpoint_spotify1.csv");
+    if (check_file.good()) {
+      // Directory already exists with data, reuse it
+      initialized = true;
+      return data_dir;
+    }
+
+    // Create extraction directory and extract tarball
+    std::string mkdir_cmd = "mkdir -p " + data_dir;
+    std::string extract_cmd = "tar -xzf " + tarball_path + " -C " + data_dir;
+
+    if (system(mkdir_cmd.c_str()) == 0 && system(extract_cmd.c_str()) == 0) {
+      initialized = true;
+    } else {
+      // Extraction failed, fall back to original directory
+      data_dir = std::string(__FILE__);
+      data_dir = data_dir.substr(0, data_dir.find_last_of("/\\"));
+      data_dir = data_dir.substr(0, data_dir.find_last_of("/\\"));
+      data_dir += "/../submodules/parquet-testing/data";
+      initialized = true;
+    }
+  }
+
+  return data_dir;
+}
+
+std::vector<std::string> SplitCsvRow(const std::string& line, char delimiter = 
',') {

Review Comment:
   I would guess we probably want to re-use the CSV parsing machinery already 
present in arrow to load data?  Is there a reason that wouldn't work?



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -2660,4 +2595,407 @@ TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) {
   }
 }
 
+// ----------------------------------------------------------------------
+// ALP encoding tests for float/double
+
+template <typename Type>
+class TestAlpEncoding : public TestEncodingBase<Type> {

Review Comment:
   I commented a few places below, but I think most if not all adhoc tests 
should probably be tested for float and decimal through parameterized test?   
Is there a reason not to do so?



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -2660,4 +2595,407 @@ TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) {
   }
 }
 
+// ----------------------------------------------------------------------
+// ALP encoding tests for float/double
+
+template <typename Type>
+class TestAlpEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+  static constexpr size_t kNumRoundTrips = 3;
+
+  void CheckRoundtrip() override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::ALP, /*use_dictionary=*/false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::ALP, descr_.get());
+
+    for (size_t i = 0; i < kNumRoundTrips; ++i) {
+      encoder->Put(draws_, num_values_);
+      encode_buffer_ = encoder->FlushValues();
+
+      decoder->SetData(num_values_, encode_buffer_->data(),
+                       static_cast<int>(encode_buffer_->size()));
+      int values_decoded = decoder->Decode(decode_buf_, num_values_);
+      ASSERT_EQ(num_values_, values_decoded);
+
+      // Use memcmp for bit-exact comparison (important for -0.0, NaN bit 
patterns)
+      ASSERT_EQ(0, std::memcmp(draws_, decode_buf_, num_values_ * 
sizeof(c_type)));
+    }
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits,
+                            int64_t valid_bits_offset) override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::ALP, /*use_dictionary=*/false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::ALP, descr_.get());
+
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    for (size_t i = 0; i < kNumRoundTrips; ++i) {
+      encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+      encode_buffer_ = encoder->FlushValues();
+
+      decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                       static_cast<int>(encode_buffer_->size()));
+      auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, 
null_count,
+                                                  valid_bits, 
valid_bits_offset);
+      ASSERT_EQ(num_values_, values_decoded);
+
+      // Verify only valid values
+      for (int j = 0; j < num_values_; ++j) {
+        if (bit_util::GetBit(valid_bits, valid_bits_offset + j)) {
+          ASSERT_EQ(0, std::memcmp(&draws_[j], &decode_buf_[j], 
sizeof(c_type))) << j;
+        }
+      }
+    }
+  }
+
+  void InitDataWithSpecialValues(int nvalues, int repeats) {
+    num_values_ = nvalues * repeats;
+    this->input_bytes_.resize(num_values_ * sizeof(c_type));
+    this->output_bytes_.resize(num_values_ * sizeof(c_type));
+    draws_ = reinterpret_cast<c_type*>(this->input_bytes_.data());
+    decode_buf_ = reinterpret_cast<c_type*>(this->output_bytes_.data());
+
+    // Fill with mix of normal and special values
+    for (int i = 0; i < nvalues; ++i) {
+      if (i % 20 == 0) {
+        draws_[i] = std::numeric_limits<c_type>::quiet_NaN();
+      } else if (i % 20 == 5) {
+        draws_[i] = std::numeric_limits<c_type>::infinity();
+      } else if (i % 20 == 10) {
+        draws_[i] = -std::numeric_limits<c_type>::infinity();
+      } else if (i % 20 == 15) {
+        draws_[i] = static_cast<c_type>(-0.0);
+      } else {
+        draws_[i] = static_cast<c_type>(i) * static_cast<c_type>(0.123);
+      }
+    }
+
+    // Repeat pattern
+    for (int j = 1; j < repeats; ++j) {
+      for (int i = 0; i < nvalues; ++i) {
+        draws_[nvalues * j + i] = draws_[i];
+      }
+    }
+  }
+
+  void InitDataDecimalPattern(int nvalues, int repeats) {
+    num_values_ = nvalues * repeats;
+    this->input_bytes_.resize(num_values_ * sizeof(c_type));
+    this->output_bytes_.resize(num_values_ * sizeof(c_type));
+    draws_ = reinterpret_cast<c_type*>(this->input_bytes_.data());
+    decode_buf_ = reinterpret_cast<c_type*>(this->output_bytes_.data());
+
+    // Decimal-like values that ALP compresses well
+    for (int i = 0; i < nvalues; ++i) {
+      draws_[i] = static_cast<c_type>(100.0 + i * 0.01);
+    }
+
+    for (int j = 1; j < repeats; ++j) {
+      for (int i = 0; i < nvalues; ++i) {
+        draws_[nvalues * j + i] = draws_[i];
+      }
+    }
+  }
+
+  void ExecuteSpecialValues(int nvalues, int repeats) {
+    InitDataWithSpecialValues(nvalues, repeats);
+    CheckRoundtrip();
+  }
+
+  void ExecuteDecimalPattern(int nvalues, int repeats) {
+    InitDataDecimalPattern(nvalues, repeats);
+    CheckRoundtrip();
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+using AlpEncodedTypes = ::testing::Types<FloatType, DoubleType>;
+TYPED_TEST_SUITE(TestAlpEncoding, AlpEncodedTypes);
+
+TYPED_TEST(TestAlpEncoding, BasicRoundTrip) {
+  // Test various sizes including edge cases
+  for (int values = 1; values < 32; ++values) {
+    ASSERT_NO_FATAL_FAILURE(this->Execute(values, 1));
+  }
+
+  // Test exactly vector size (1024)
+  ASSERT_NO_FATAL_FAILURE(this->Execute(1024, 1));
+
+  // Test just under and over vector size
+  ASSERT_NO_FATAL_FAILURE(this->Execute(1023, 1));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(1025, 1));
+
+  // Test multiple vectors
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2048, 1));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(3000, 1));
+}
+
+TYPED_TEST(TestAlpEncoding, RoundTripWithRepeats) {
+  // Test with repeated patterns
+  ASSERT_NO_FATAL_FAILURE(this->Execute(100, 10));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(1024, 3));
+}
+
+TYPED_TEST(TestAlpEncoding, SpecialValues) {
+  // Test NaN, Inf, -Inf, -0.0 (these become exceptions in ALP)
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpecialValues(100, 1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpecialValues(1024, 1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpecialValues(2000, 1));
+}
+
+TYPED_TEST(TestAlpEncoding, DecimalPatterns) {
+  // Test decimal-like values that ALP compresses efficiently
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteDecimalPattern(100, 1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteDecimalPattern(1024, 1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteDecimalPattern(5000, 1));
+}
+
+TYPED_TEST(TestAlpEncoding, SpacedRoundTrip) {
+  // Test with null values at various probabilities
+  for (double null_prob : {0.0, 0.1, 0.5, 0.9}) {
+    ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(100, 1, 0, null_prob));
+    ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(1024, 1, 0, null_prob));
+    ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(2000, 1, 0, null_prob));
+  }
+
+  // Test with offset
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(1024, 1, 7, 0.3));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(1024, 1, 64, 0.5));
+}
+
+TYPED_TEST(TestAlpEncoding, LargeDataset) {
+  // Test with large dataset (multiple pages worth)
+  ASSERT_NO_FATAL_FAILURE(this->Execute(100000, 1));
+}
+
+TYPED_TEST(TestAlpEncoding, RandomData) {
+  using c_type = typename TypeParam::c_type;
+  ::arrow::random::RandomArrayGenerator rag(42);
+
+  // Generate random float/double array
+  std::shared_ptr<::arrow::Array> arr;
+  if constexpr (std::is_same_v<c_type, float>) {
+    arr = rag.Float32(10000, -1000.0f, 1000.0f);
+  } else {
+    arr = rag.Float64(10000, -1000.0, 1000.0);
+  }
+
+  auto encoder = MakeTypedEncoder<TypeParam>(Encoding::ALP, false, 
this->descr_.get());
+  ASSERT_NO_THROW(encoder->Put(*arr));
+  auto buffer = encoder->FlushValues();
+
+  auto decoder = MakeTypedDecoder<TypeParam>(Encoding::ALP, 
this->descr_.get());
+  decoder->SetData(static_cast<int>(arr->length()), buffer->data(),
+                   static_cast<int>(buffer->size()));
+
+  std::vector<c_type> output(arr->length());
+  int decoded = decoder->Decode(output.data(), 
static_cast<int>(arr->length()));
+  ASSERT_EQ(decoded, arr->length());
+
+  // Verify round-trip
+  auto typed_arr = std::static_pointer_cast<
+      typename std::conditional<std::is_same_v<c_type, float>,
+                                ::arrow::FloatArray, 
::arrow::DoubleArray>::type>(arr);
+  ASSERT_EQ(0, std::memcmp(output.data(), typed_arr->raw_values(),
+                           arr->length() * sizeof(c_type)));
+}
+
+TEST(AlpEncodingAdHoc, InvalidDataTypes) {
+  // ALP only supports float and double
+  ASSERT_THROW(MakeTypedEncoder<Int32Type>(Encoding::ALP), ParquetException);
+  ASSERT_THROW(MakeTypedEncoder<Int64Type>(Encoding::ALP), ParquetException);
+  ASSERT_THROW(MakeTypedEncoder<BooleanType>(Encoding::ALP), ParquetException);
+  ASSERT_THROW(MakeTypedEncoder<ByteArrayType>(Encoding::ALP), 
ParquetException);
+
+  ASSERT_THROW(MakeTypedDecoder<Int32Type>(Encoding::ALP), ParquetException);
+  ASSERT_THROW(MakeTypedDecoder<Int64Type>(Encoding::ALP), ParquetException);
+  ASSERT_THROW(MakeTypedDecoder<BooleanType>(Encoding::ALP), ParquetException);
+  ASSERT_THROW(MakeTypedDecoder<ByteArrayType>(Encoding::ALP), 
ParquetException);
+}
+
+TEST(AlpEncodingAdHoc, ConstantValues) {
+  // Test all same values (should compress to bit_width=0)
+  auto descr = ExampleDescr<DoubleType>();
+  std::vector<double> data(1024, 123.456);
+
+  auto encoder = MakeTypedEncoder<DoubleType>(Encoding::ALP, false, 
descr.get());
+  encoder->Put(data.data(), static_cast<int>(data.size()));
+  auto buffer = encoder->FlushValues();
+
+  auto decoder = MakeTypedDecoder<DoubleType>(Encoding::ALP, descr.get());
+  decoder->SetData(static_cast<int>(data.size()), buffer->data(),
+                   static_cast<int>(buffer->size()));
+
+  std::vector<double> output(data.size());
+  int decoded = decoder->Decode(output.data(), static_cast<int>(data.size()));
+  ASSERT_EQ(decoded, static_cast<int>(data.size()));
+
+  for (size_t i = 0; i < data.size(); ++i) {
+    ASSERT_EQ(data[i], output[i]) << i;
+  }
+}
+
+TEST(AlpEncodingAdHoc, AllExceptions) {

Review Comment:
   same question, to fold into the type parameterized test?



##########
cpp/src/parquet/encoder.cc:
##########
@@ -1828,6 +1918,15 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type 
type_num, Encoding::type encodin
             "BYTE_STREAM_SPLIT only supports FLOAT, DOUBLE, INT32, INT64 "
             "and FIXED_LEN_BYTE_ARRAY");
     }
+  } else if (encoding == Encoding::ALP) {
+    switch (type_num) {
+      case Type::FLOAT:
+        return std::make_unique<AlpEncoder<FloatType>>(descr, pool);

Review Comment:
   for encoding it would be nice to add a new property to specify vector size, 
I'm OK with this as follow-up.



##########
cpp/src/parquet/encoding_alp_benchmark.cc:
##########
@@ -0,0 +1,1824 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <cmath>
+#include <cstring>
+#include <filesystem>
+#include <fstream>
+#include <iostream>
+#include <memory>
+#include <random>
+#include <sstream>
+#include <unistd.h>
+#include <unordered_set>
+#include <vector>
+
+#include <benchmark/benchmark.h>
+
+#include "arrow/buffer.h"
+#include "arrow/util/alp/alp_codec.h"
+#include "arrow/util/compression.h"
+#include "parquet/encoding.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+// This file benchmarks multiple encoding schemes for floating point values in
+// Parquet. Structure mirrors Snowflake's FloatComprBenchmark.cpp

Review Comment:
   Snowflake?



##########
cpp/src/parquet/encoder.cc:
##########
@@ -997,6 +999,104 @@ class ByteStreamSplitEncoder<FLBAType> : public 
ByteStreamSplitEncoderBase<FLBAT
   }
 };
 
+// ----------------------------------------------------------------------
+// ALP encoder (Adaptive Lossless floating-Point)
+
+template <typename DType>
+class AlpEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using ArrowType = typename EncodingTraits<DType>::ArrowType;
+  using TypedEncoder<DType>::Put;
+
+  explicit AlpEncoder(const ColumnDescriptor* descr,
+                      ::arrow::MemoryPool* pool = 
::arrow::default_memory_pool(),
+                      int32_t vector_size = 
::arrow::util::alp::AlpConstants::kAlpVectorSize)
+      : EncoderImpl(descr, Encoding::ALP, pool),
+        sink_{pool},
+        vector_size_(vector_size) {
+    static_assert(std::is_same<T, float>::value || std::is_same<T, 
double>::value,
+                  "ALP only supports float and double types");
+    if (vector_size_ <= 0 || (vector_size_ & (vector_size_ - 1)) != 0) {

Review Comment:
   At least for the second expression we should add it to 
[bit_util](https://github.com/apache/arrow/blob/4fce185a1a34dd6b409260c142e6e4e148f43995/cpp/src/arrow/util/bit_util.h#L72)
 instead of the raw bit manipulatipon.



##########
cpp/src/arrow/util/alp/alp.h:
##########
@@ -0,0 +1,795 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Adaptive Lossless floating-Point (ALP) compression implementation
+
+#pragma once
+
+#include <optional>
+#include <vector>
+
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/alp/alp_constants.h"
+#include "arrow/util/span.h"
+
+namespace arrow {
+namespace util {
+namespace alp {
+
+// ----------------------------------------------------------------------
+// ALP Overview
+//
+// IMPORTANT: For abstract interfaces or examples how to use ALP, consult
+// alp_codec.h.
+// This is our implementation of the adaptive lossless floating-point
+// compression for decimals (ALP) (https://dl.acm.org/doi/10.1145/3626717).
+// It works by converting a float into a decimal (if possible). The exponent
+// and factor are chosen per vector. Each float is converted using
+// c(f) = int64(f * 10^exponent * 10^-factor). The converted floats are then
+// encoded via a delta frame of reference and bitpacked. Every exception,
+// where the conversion/reconversion changes the value of the float, is stored
+// separately and has to be patched into the decompressed vector afterwards.
+//
+// ==========================================================================
+//                    ALP COMPRESSION/DECOMPRESSION PIPELINE
+// ==========================================================================
+//
+// COMPRESSION FLOW:
+// -----------------
+//
+//   Input: float/double array
+//        |
+//        v
+//   +------------------------------------------------------------------+
+//   | 1. SAMPLING & PRESET GENERATION                                  |
+//   |    * Sample vectors from dataset                                 |
+//   |    * Try all exponent/factor combinations (e, f)                 |
+//   |    * Select best k combinations for preset                       |
+//   +------------------------------------+-----------------------------+
+//                                        | preset.combinations
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 2. PER-VECTOR COMPRESSION                                        |
+//   |    a) Find best (e,f) from preset for this vector                |
+//   |    b) Encode: encoded[i] = int64(value[i] * 10^e * 10^-f)        |
+//   |    c) Verify: if decode(encoded[i]) != value[i] -> exception     |
+//   |    d) Replace exceptions with placeholder value                  |
+//   +------------------------------------+-----------------------------+
+//                                        | encoded integers + exceptions
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 3. FRAME OF REFERENCE (FOR)                                      |
+//   |    * Find min value in encoded integers                          |
+//   |    * Subtract min from all values: delta[i] = encoded[i] - min   |
+//   +------------------------------------+-----------------------------+
+//                                        | delta values (smaller range)
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 4. BIT PACKING                                                   |
+//   |    * Calculate bit_width = log2(max_delta)                       |
+//   |    * Pack each value into bit_width bits                         |
+//   |    * Result: tightly packed binary data                          |
+//   +------------------------------------+-----------------------------+
+//                                        | packed bytes
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 5. SERIALIZATION (offset-based interleaved layout)              |
+//   |    [Header][Offsets...][Vector₀][Vector₁]...                    |
+//   |    where each Vector = [AlpInfo|ForInfo|Data]                   |
+//   +------------------------------------------------------------------+
+//
+//
+// DECOMPRESSION FLOW:
+// -------------------
+//
+//   Serialized bytes -> AlpEncodedVector::Load()
+//        |
+//        v
+//   +------------------------------------------------------------------+
+//   | 1. BIT UNPACKING                                                 |
+//   |    * Extract bit_width from metadata                             |
+//   |    * Unpack each value from bit_width bits -> delta values       |
+//   +------------------------------------+-----------------------------+
+//                                        | delta values
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 2. REVERSE FRAME OF REFERENCE (unFOR)                            |
+//   |    * Add back min: encoded[i] = delta[i] + frame_of_reference    |
+//   +------------------------------------+-----------------------------+
+//                                        | encoded integers
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 3. DECODE                                                        |
+//   |    * Apply inverse formula: value[i] = encoded[i] * 10^-e * 10^f |
+//   +------------------------------------+-----------------------------+
+//                                        | decoded floats (with placeholders)
+//                                        v
+//   +------------------------------------------------------------------+
+//   | 4. PATCH EXCEPTIONS                                              |
+//   |    * Replace values at exception_positions[] with exceptions[]   |
+//   +------------------------------------+-----------------------------+
+//                                        |
+//                                        v
+//   Output: Original float/double array (lossless!)
+//
+// ==========================================================================
+
+// ----------------------------------------------------------------------
+// AlpMode
+
+/// \brief ALP compression mode
+///
+/// Currently only ALP (decimal compression) is implemented.
+enum class AlpMode { kAlp = 0 };
+
+// ----------------------------------------------------------------------
+// AlpExponentAndFactor
+
+/// \brief Helper struct to encapsulate the exponent and factor
+struct AlpExponentAndFactor {
+  uint8_t exponent{0};
+  uint8_t factor{0};
+
+  bool operator==(const AlpExponentAndFactor& other) const {
+    return exponent == other.exponent && factor == other.factor;
+  }
+
+  /// \brief Comparison operator for deterministic std::map ordering
+  bool operator<(const AlpExponentAndFactor& other) const {
+    if (exponent != other.exponent) return exponent < other.exponent;
+    return factor < other.factor;
+  }
+};
+
+// ----------------------------------------------------------------------
+// AlpEncodedVectorInfo (non-templated, ALP core metadata)
+
+/// \brief ALP-specific metadata for an encoded vector (non-templated)
+///
+/// Contains the metadata specific to ALP's float-to-integer conversion:
+///   - exponent/factor: parameters for decimal encoding
+///   - num_exceptions: count of values that couldn't be losslessly encoded
+///
+/// This struct is the same size regardless of the floating-point type 
(float/double).
+/// It is separate from the integer encoding metadata (e.g., FOR) to allow
+/// different integer encodings to be used in the future.
+///
+/// Serialization format (4 bytes):
+///
+///   +------------------------------------------+
+///   |  AlpEncodedVectorInfo (4 bytes)          |
+///   +------------------------------------------+
+///   |  Offset |  Field              |  Size    |
+///   +---------+---------------------+----------+
+///   |    0    |  exponent (uint8_t) |  1 byte  |
+///   |    1    |  factor (uint8_t)   |  1 byte  |
+///   |    2    |  num_exceptions     |  2 bytes |
+///   +------------------------------------------+
+class AlpEncodedVectorInfo {
+ public:
+  AlpEncodedVectorInfo() = default;
+  AlpEncodedVectorInfo(uint8_t exponent, uint8_t factor, int16_t 
num_exceptions)
+      : exponent_(exponent), factor_(factor), num_exceptions_(num_exceptions) 
{}
+
+  uint8_t exponent() const { return exponent_; }
+  uint8_t factor() const { return factor_; }
+  int16_t num_exceptions() const { return num_exceptions_; }
+
+  void set_exponent(uint8_t exponent) { exponent_ = exponent; }
+  void set_factor(uint8_t factor) { factor_ = factor; }
+  void set_num_exceptions(int16_t num_exceptions) { num_exceptions_ = 
num_exceptions; }
+
+  /// Size of the serialized portion (4 bytes, fixed)
+  static constexpr int64_t kStoredSize =
+      sizeof(uint8_t) + sizeof(uint8_t) + sizeof(int16_t);
+  static_assert(kStoredSize == 4, "AlpEncodedVectorInfo stored size must be 4 
bytes");
+
+  /// \brief Store the ALP metadata into an output buffer
+  ///
+  /// \pre output_buffer.size() >= kStoredSize
+  void Store(arrow::util::span<uint8_t> output_buffer) const;
+
+  /// \brief Load ALP metadata from an input buffer
+  ///
+  /// \return the loaded metadata, or Status::Invalid if the buffer is too 
small
+  static Result<AlpEncodedVectorInfo> Load(arrow::util::span<const uint8_t> 
input_buffer);
+
+  /// \brief Get serialized size of the ALP metadata
+  static int64_t GetStoredSize() { return kStoredSize; }
+
+  /// \brief Get exponent and factor as a combined struct
+  AlpExponentAndFactor GetExponentAndFactor() const {
+    return AlpExponentAndFactor{exponent_, factor_};
+  }
+
+  bool operator==(const AlpEncodedVectorInfo& other) const {
+    return exponent_ == other.exponent_ && factor_ == other.factor_ &&
+           num_exceptions_ == other.num_exceptions_;
+  }
+
+  bool operator!=(const AlpEncodedVectorInfo& other) const { return !(*this == 
other); }
+
+ private:
+  uint8_t exponent_ = 0;
+  uint8_t factor_ = 0;
+  int16_t num_exceptions_ = 0;
+};
+
+// ----------------------------------------------------------------------
+// AlpEncodedForVectorInfo (templated, FOR integer encoding metadata)
+
+/// \brief FOR (Frame of Reference) encoding metadata for an encoded vector
+///
+/// Contains the metadata specific to FOR bit-packing integer encoding:
+///   - frame_of_reference: minimum value subtracted from all encoded integers
+///   - bit_width: number of bits used to pack each delta value
+///
+/// This struct is templated because frame_of_reference size depends on T:
+///   - float:  uint32_t frame_of_reference (4 bytes)
+///   - double: uint64_t frame_of_reference (8 bytes)
+///
+/// Serialization format for float (5 bytes):
+///
+///   +------------------------------------------+
+///   |  AlpEncodedForVectorInfo<float> (5B)     |
+///   +------------------------------------------+
+///   |  Offset |  Field              |  Size    |
+///   +---------+---------------------+----------+
+///   |    0    |  frame_of_reference |  4 bytes |
+///   |    4    |  bit_width (uint8_t)|  1 byte  |
+///   +------------------------------------------+
+///
+/// Serialization format for double (9 bytes):
+///
+///   +------------------------------------------+
+///   |  AlpEncodedForVectorInfo<double> (9B)    |
+///   +------------------------------------------+
+///   |  Offset |  Field              |  Size    |
+///   +---------+---------------------+----------+
+///   |    0    |  frame_of_reference |  8 bytes |
+///   |    8    |  bit_width (uint8_t)|  1 byte  |
+///   +------------------------------------------+
+///
+/// \tparam T the floating point type (float or double)
+template <typename T>
+class AlpEncodedForVectorInfo {
+  static_assert(std::is_same_v<T, float> || std::is_same_v<T, double>,
+                "AlpEncodedForVectorInfo only supports float and double");
+
+ public:
+  /// Use uint32_t for float, uint64_t for double (matches encoded integer 
size)
+  using ExactType = typename AlpTypedConstants<T>::FloatingToExact;
+
+  AlpEncodedForVectorInfo() = default;
+  AlpEncodedForVectorInfo(ExactType frame_of_reference, uint8_t bit_width)
+      : frame_of_reference_(frame_of_reference), bit_width_(bit_width) {}
+
+  ExactType frame_of_reference() const { return frame_of_reference_; }
+  uint8_t bit_width() const { return bit_width_; }
+
+  void set_frame_of_reference(ExactType frame_of_reference) {
+    frame_of_reference_ = frame_of_reference;
+  }
+  void set_bit_width(uint8_t bit_width) { bit_width_ = bit_width; }
+
+  /// Size of the serialized portion (5 bytes for float, 9 for double)
+  static constexpr int64_t kStoredSize = sizeof(ExactType) + 1;

Review Comment:
   ```suggestion
     static constexpr int64_t kStoredSize = sizeof(ExactType) + 
sizeof(bitwidth_);
   ```



##########
cpp/src/parquet/encoding_alp_benchmark.cc:
##########


Review Comment:
   maybe we can pull this out into a separate PR to avoid churn.



##########
cpp/src/parquet/encoder.cc:
##########
@@ -997,6 +999,104 @@ class ByteStreamSplitEncoder<FLBAType> : public 
ByteStreamSplitEncoderBase<FLBAT
   }
 };
 
+// ----------------------------------------------------------------------
+// ALP encoder (Adaptive Lossless floating-Point)
+
+template <typename DType>
+class AlpEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using ArrowType = typename EncodingTraits<DType>::ArrowType;
+  using TypedEncoder<DType>::Put;
+
+  explicit AlpEncoder(const ColumnDescriptor* descr,
+                      ::arrow::MemoryPool* pool = 
::arrow::default_memory_pool(),
+                      int32_t vector_size = 
::arrow::util::alp::AlpConstants::kAlpVectorSize)
+      : EncoderImpl(descr, Encoding::ALP, pool),
+        sink_{pool},
+        vector_size_(vector_size) {
+    static_assert(std::is_same<T, float>::value || std::is_same<T, 
double>::value,
+                  "ALP only supports float and double types");
+    if (vector_size_ <= 0 || (vector_size_ & (vector_size_ - 1)) != 0) {
+      throw ParquetException("ALP vector_size must be a positive power of 2, 
got " +
+                             std::to_string(vector_size_));
+    }
+    if (vector_size_ > (1 << 
::arrow::util::alp::AlpConstants::kMaxLogVectorSize)) {
+      throw ParquetException("ALP vector_size exceeds maximum " +
+                             std::to_string(1 << 
::arrow::util::alp::AlpConstants::kMaxLogVectorSize) +
+                             ", got " + std::to_string(vector_size_));
+    }
+  }
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  std::shared_ptr<Buffer> FlushValues() override {
+    if (sink_.length() == 0) {
+      // Empty buffer case
+      PARQUET_ASSIGN_OR_THROW(auto buf, sink_.Finish());
+      return buf;
+    }
+
+    // Call AlpCodec::Encode() - it handles sampling, preset selection, and 
compression
+    const int64_t numElements = sink_.length() / 
static_cast<int64_t>(sizeof(T));
+    int64_t compSize =

Review Comment:
   ```suggestion
       int64_t comp_size =
   ```



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -2660,4 +2595,407 @@ TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) {
   }
 }
 
+// ----------------------------------------------------------------------
+// ALP encoding tests for float/double
+
+template <typename Type>
+class TestAlpEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+  static constexpr size_t kNumRoundTrips = 3;
+
+  void CheckRoundtrip() override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::ALP, /*use_dictionary=*/false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::ALP, descr_.get());
+
+    for (size_t i = 0; i < kNumRoundTrips; ++i) {
+      encoder->Put(draws_, num_values_);
+      encode_buffer_ = encoder->FlushValues();
+
+      decoder->SetData(num_values_, encode_buffer_->data(),
+                       static_cast<int>(encode_buffer_->size()));
+      int values_decoded = decoder->Decode(decode_buf_, num_values_);
+      ASSERT_EQ(num_values_, values_decoded);
+
+      // Use memcmp for bit-exact comparison (important for -0.0, NaN bit 
patterns)
+      ASSERT_EQ(0, std::memcmp(draws_, decode_buf_, num_values_ * 
sizeof(c_type)));
+    }
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits,
+                            int64_t valid_bits_offset) override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::ALP, /*use_dictionary=*/false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::ALP, descr_.get());
+
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    for (size_t i = 0; i < kNumRoundTrips; ++i) {
+      encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+      encode_buffer_ = encoder->FlushValues();
+
+      decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                       static_cast<int>(encode_buffer_->size()));
+      auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, 
null_count,
+                                                  valid_bits, 
valid_bits_offset);
+      ASSERT_EQ(num_values_, values_decoded);
+
+      // Verify only valid values
+      for (int j = 0; j < num_values_; ++j) {
+        if (bit_util::GetBit(valid_bits, valid_bits_offset + j)) {
+          ASSERT_EQ(0, std::memcmp(&draws_[j], &decode_buf_[j], 
sizeof(c_type))) << j;
+        }
+      }
+    }
+  }
+
+  void InitDataWithSpecialValues(int nvalues, int repeats) {
+    num_values_ = nvalues * repeats;
+    this->input_bytes_.resize(num_values_ * sizeof(c_type));
+    this->output_bytes_.resize(num_values_ * sizeof(c_type));
+    draws_ = reinterpret_cast<c_type*>(this->input_bytes_.data());
+    decode_buf_ = reinterpret_cast<c_type*>(this->output_bytes_.data());
+
+    // Fill with mix of normal and special values
+    for (int i = 0; i < nvalues; ++i) {
+      if (i % 20 == 0) {
+        draws_[i] = std::numeric_limits<c_type>::quiet_NaN();
+      } else if (i % 20 == 5) {
+        draws_[i] = std::numeric_limits<c_type>::infinity();
+      } else if (i % 20 == 10) {
+        draws_[i] = -std::numeric_limits<c_type>::infinity();
+      } else if (i % 20 == 15) {
+        draws_[i] = static_cast<c_type>(-0.0);
+      } else {
+        draws_[i] = static_cast<c_type>(i) * static_cast<c_type>(0.123);
+      }
+    }
+
+    // Repeat pattern
+    for (int j = 1; j < repeats; ++j) {
+      for (int i = 0; i < nvalues; ++i) {
+        draws_[nvalues * j + i] = draws_[i];
+      }
+    }
+  }
+
+  void InitDataDecimalPattern(int nvalues, int repeats) {
+    num_values_ = nvalues * repeats;
+    this->input_bytes_.resize(num_values_ * sizeof(c_type));
+    this->output_bytes_.resize(num_values_ * sizeof(c_type));
+    draws_ = reinterpret_cast<c_type*>(this->input_bytes_.data());
+    decode_buf_ = reinterpret_cast<c_type*>(this->output_bytes_.data());
+
+    // Decimal-like values that ALP compresses well
+    for (int i = 0; i < nvalues; ++i) {
+      draws_[i] = static_cast<c_type>(100.0 + i * 0.01);
+    }
+
+    for (int j = 1; j < repeats; ++j) {
+      for (int i = 0; i < nvalues; ++i) {
+        draws_[nvalues * j + i] = draws_[i];
+      }
+    }
+  }
+
+  void ExecuteSpecialValues(int nvalues, int repeats) {
+    InitDataWithSpecialValues(nvalues, repeats);
+    CheckRoundtrip();
+  }
+
+  void ExecuteDecimalPattern(int nvalues, int repeats) {
+    InitDataDecimalPattern(nvalues, repeats);
+    CheckRoundtrip();
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+using AlpEncodedTypes = ::testing::Types<FloatType, DoubleType>;
+TYPED_TEST_SUITE(TestAlpEncoding, AlpEncodedTypes);
+
+TYPED_TEST(TestAlpEncoding, BasicRoundTrip) {
+  // Test various sizes including edge cases
+  for (int values = 1; values < 32; ++values) {
+    ASSERT_NO_FATAL_FAILURE(this->Execute(values, 1));
+  }
+
+  // Test exactly vector size (1024)
+  ASSERT_NO_FATAL_FAILURE(this->Execute(1024, 1));
+
+  // Test just under and over vector size
+  ASSERT_NO_FATAL_FAILURE(this->Execute(1023, 1));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(1025, 1));
+
+  // Test multiple vectors
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2048, 1));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(3000, 1));
+}
+
+TYPED_TEST(TestAlpEncoding, RoundTripWithRepeats) {
+  // Test with repeated patterns
+  ASSERT_NO_FATAL_FAILURE(this->Execute(100, 10));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(1024, 3));
+}
+
+TYPED_TEST(TestAlpEncoding, SpecialValues) {
+  // Test NaN, Inf, -Inf, -0.0 (these become exceptions in ALP)
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpecialValues(100, 1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpecialValues(1024, 1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpecialValues(2000, 1));
+}
+
+TYPED_TEST(TestAlpEncoding, DecimalPatterns) {
+  // Test decimal-like values that ALP compresses efficiently
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteDecimalPattern(100, 1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteDecimalPattern(1024, 1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteDecimalPattern(5000, 1));
+}
+
+TYPED_TEST(TestAlpEncoding, SpacedRoundTrip) {
+  // Test with null values at various probabilities
+  for (double null_prob : {0.0, 0.1, 0.5, 0.9}) {
+    ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(100, 1, 0, null_prob));
+    ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(1024, 1, 0, null_prob));
+    ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(2000, 1, 0, null_prob));
+  }
+
+  // Test with offset
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(1024, 1, 7, 0.3));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(1024, 1, 64, 0.5));
+}
+
+TYPED_TEST(TestAlpEncoding, LargeDataset) {
+  // Test with large dataset (multiple pages worth)
+  ASSERT_NO_FATAL_FAILURE(this->Execute(100000, 1));
+}
+
+TYPED_TEST(TestAlpEncoding, RandomData) {
+  using c_type = typename TypeParam::c_type;
+  ::arrow::random::RandomArrayGenerator rag(42);
+
+  // Generate random float/double array
+  std::shared_ptr<::arrow::Array> arr;
+  if constexpr (std::is_same_v<c_type, float>) {
+    arr = rag.Float32(10000, -1000.0f, 1000.0f);

Review Comment:
   don't we want these to cover the full range of floating values?



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -2660,4 +2595,407 @@ TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) {
   }
 }
 
+// ----------------------------------------------------------------------
+// ALP encoding tests for float/double
+
+template <typename Type>
+class TestAlpEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+  static constexpr size_t kNumRoundTrips = 3;
+
+  void CheckRoundtrip() override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::ALP, /*use_dictionary=*/false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::ALP, descr_.get());
+
+    for (size_t i = 0; i < kNumRoundTrips; ++i) {
+      encoder->Put(draws_, num_values_);
+      encode_buffer_ = encoder->FlushValues();
+
+      decoder->SetData(num_values_, encode_buffer_->data(),
+                       static_cast<int>(encode_buffer_->size()));
+      int values_decoded = decoder->Decode(decode_buf_, num_values_);
+      ASSERT_EQ(num_values_, values_decoded);
+
+      // Use memcmp for bit-exact comparison (important for -0.0, NaN bit 
patterns)
+      ASSERT_EQ(0, std::memcmp(draws_, decode_buf_, num_values_ * 
sizeof(c_type)));
+    }
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits,
+                            int64_t valid_bits_offset) override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::ALP, /*use_dictionary=*/false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::ALP, descr_.get());
+
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    for (size_t i = 0; i < kNumRoundTrips; ++i) {
+      encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+      encode_buffer_ = encoder->FlushValues();
+
+      decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                       static_cast<int>(encode_buffer_->size()));
+      auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, 
null_count,
+                                                  valid_bits, 
valid_bits_offset);
+      ASSERT_EQ(num_values_, values_decoded);
+
+      // Verify only valid values
+      for (int j = 0; j < num_values_; ++j) {
+        if (bit_util::GetBit(valid_bits, valid_bits_offset + j)) {
+          ASSERT_EQ(0, std::memcmp(&draws_[j], &decode_buf_[j], 
sizeof(c_type))) << j;
+        }
+      }
+    }
+  }
+
+  void InitDataWithSpecialValues(int nvalues, int repeats) {
+    num_values_ = nvalues * repeats;
+    this->input_bytes_.resize(num_values_ * sizeof(c_type));
+    this->output_bytes_.resize(num_values_ * sizeof(c_type));
+    draws_ = reinterpret_cast<c_type*>(this->input_bytes_.data());
+    decode_buf_ = reinterpret_cast<c_type*>(this->output_bytes_.data());
+
+    // Fill with mix of normal and special values
+    for (int i = 0; i < nvalues; ++i) {
+      if (i % 20 == 0) {
+        draws_[i] = std::numeric_limits<c_type>::quiet_NaN();
+      } else if (i % 20 == 5) {
+        draws_[i] = std::numeric_limits<c_type>::infinity();
+      } else if (i % 20 == 10) {
+        draws_[i] = -std::numeric_limits<c_type>::infinity();
+      } else if (i % 20 == 15) {
+        draws_[i] = static_cast<c_type>(-0.0);
+      } else {
+        draws_[i] = static_cast<c_type>(i) * static_cast<c_type>(0.123);
+      }
+    }
+
+    // Repeat pattern
+    for (int j = 1; j < repeats; ++j) {
+      for (int i = 0; i < nvalues; ++i) {
+        draws_[nvalues * j + i] = draws_[i];
+      }
+    }
+  }
+
+  void InitDataDecimalPattern(int nvalues, int repeats) {
+    num_values_ = nvalues * repeats;
+    this->input_bytes_.resize(num_values_ * sizeof(c_type));
+    this->output_bytes_.resize(num_values_ * sizeof(c_type));
+    draws_ = reinterpret_cast<c_type*>(this->input_bytes_.data());
+    decode_buf_ = reinterpret_cast<c_type*>(this->output_bytes_.data());
+
+    // Decimal-like values that ALP compresses well
+    for (int i = 0; i < nvalues; ++i) {
+      draws_[i] = static_cast<c_type>(100.0 + i * 0.01);
+    }
+
+    for (int j = 1; j < repeats; ++j) {
+      for (int i = 0; i < nvalues; ++i) {
+        draws_[nvalues * j + i] = draws_[i];
+      }
+    }
+  }
+
+  void ExecuteSpecialValues(int nvalues, int repeats) {
+    InitDataWithSpecialValues(nvalues, repeats);
+    CheckRoundtrip();
+  }
+
+  void ExecuteDecimalPattern(int nvalues, int repeats) {
+    InitDataDecimalPattern(nvalues, repeats);
+    CheckRoundtrip();
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+using AlpEncodedTypes = ::testing::Types<FloatType, DoubleType>;
+TYPED_TEST_SUITE(TestAlpEncoding, AlpEncodedTypes);
+
+TYPED_TEST(TestAlpEncoding, BasicRoundTrip) {
+  // Test various sizes including edge cases
+  for (int values = 1; values < 32; ++values) {
+    ASSERT_NO_FATAL_FAILURE(this->Execute(values, 1));
+  }
+
+  // Test exactly vector size (1024)
+  ASSERT_NO_FATAL_FAILURE(this->Execute(1024, 1));
+
+  // Test just under and over vector size
+  ASSERT_NO_FATAL_FAILURE(this->Execute(1023, 1));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(1025, 1));
+
+  // Test multiple vectors
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2048, 1));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(3000, 1));
+}
+
+TYPED_TEST(TestAlpEncoding, RoundTripWithRepeats) {
+  // Test with repeated patterns
+  ASSERT_NO_FATAL_FAILURE(this->Execute(100, 10));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(1024, 3));
+}
+
+TYPED_TEST(TestAlpEncoding, SpecialValues) {
+  // Test NaN, Inf, -Inf, -0.0 (these become exceptions in ALP)
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpecialValues(100, 1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpecialValues(1024, 1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpecialValues(2000, 1));
+}
+
+TYPED_TEST(TestAlpEncoding, DecimalPatterns) {
+  // Test decimal-like values that ALP compresses efficiently
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteDecimalPattern(100, 1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteDecimalPattern(1024, 1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteDecimalPattern(5000, 1));
+}
+
+TYPED_TEST(TestAlpEncoding, SpacedRoundTrip) {
+  // Test with null values at various probabilities
+  for (double null_prob : {0.0, 0.1, 0.5, 0.9}) {
+    ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(100, 1, 0, null_prob));
+    ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(1024, 1, 0, null_prob));
+    ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(2000, 1, 0, null_prob));
+  }
+
+  // Test with offset
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(1024, 1, 7, 0.3));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(1024, 1, 64, 0.5));
+}
+
+TYPED_TEST(TestAlpEncoding, LargeDataset) {
+  // Test with large dataset (multiple pages worth)
+  ASSERT_NO_FATAL_FAILURE(this->Execute(100000, 1));
+}
+
+TYPED_TEST(TestAlpEncoding, RandomData) {
+  using c_type = typename TypeParam::c_type;
+  ::arrow::random::RandomArrayGenerator rag(42);
+
+  // Generate random float/double array
+  std::shared_ptr<::arrow::Array> arr;
+  if constexpr (std::is_same_v<c_type, float>) {
+    arr = rag.Float32(10000, -1000.0f, 1000.0f);
+  } else {
+    arr = rag.Float64(10000, -1000.0, 1000.0);
+  }
+
+  auto encoder = MakeTypedEncoder<TypeParam>(Encoding::ALP, false, 
this->descr_.get());
+  ASSERT_NO_THROW(encoder->Put(*arr));
+  auto buffer = encoder->FlushValues();
+
+  auto decoder = MakeTypedDecoder<TypeParam>(Encoding::ALP, 
this->descr_.get());
+  decoder->SetData(static_cast<int>(arr->length()), buffer->data(),
+                   static_cast<int>(buffer->size()));
+
+  std::vector<c_type> output(arr->length());
+  int decoded = decoder->Decode(output.data(), 
static_cast<int>(arr->length()));
+  ASSERT_EQ(decoded, arr->length());
+
+  // Verify round-trip
+  auto typed_arr = std::static_pointer_cast<
+      typename std::conditional<std::is_same_v<c_type, float>,
+                                ::arrow::FloatArray, 
::arrow::DoubleArray>::type>(arr);
+  ASSERT_EQ(0, std::memcmp(output.data(), typed_arr->raw_values(),
+                           arr->length() * sizeof(c_type)));
+}
+
+TEST(AlpEncodingAdHoc, InvalidDataTypes) {
+  // ALP only supports float and double
+  ASSERT_THROW(MakeTypedEncoder<Int32Type>(Encoding::ALP), ParquetException);
+  ASSERT_THROW(MakeTypedEncoder<Int64Type>(Encoding::ALP), ParquetException);
+  ASSERT_THROW(MakeTypedEncoder<BooleanType>(Encoding::ALP), ParquetException);
+  ASSERT_THROW(MakeTypedEncoder<ByteArrayType>(Encoding::ALP), 
ParquetException);
+
+  ASSERT_THROW(MakeTypedDecoder<Int32Type>(Encoding::ALP), ParquetException);
+  ASSERT_THROW(MakeTypedDecoder<Int64Type>(Encoding::ALP), ParquetException);
+  ASSERT_THROW(MakeTypedDecoder<BooleanType>(Encoding::ALP), ParquetException);
+  ASSERT_THROW(MakeTypedDecoder<ByteArrayType>(Encoding::ALP), 
ParquetException);
+}
+
+TEST(AlpEncodingAdHoc, ConstantValues) {

Review Comment:
   can this be folded into the type parameterized test?



##########
cpp/src/parquet/decoder.cc:
##########
@@ -2372,6 +2327,125 @@ class ByteStreamSplitDecoder<FLBAType> : public 
ByteStreamSplitDecoderBase<FLBAT
   }
 };
 
+// ----------------------------------------------------------------------
+// ALP decoder (Adaptive Lossless floating-Point)
+
+template <typename DType>
+class AlpDecoder : public TypedDecoderImpl<DType> {
+ public:
+  using Base = TypedDecoderImpl<DType>;
+  using T = typename DType::c_type;
+
+  explicit AlpDecoder(const ColumnDescriptor* descr)
+      : Base(descr, Encoding::ALP), current_offset_{0}, needs_decode_{false} {
+    static_assert(std::is_same<T, float>::value || std::is_same<T, 
double>::value,
+                  "ALP only supports float and double types");
+  }
+
+  void SetData(int num_values, const uint8_t* data, int len) final {
+    Base::SetData(num_values, data, len);
+    current_offset_ = 0;
+    if (num_values > 0 && len <= 0) {
+      throw ParquetException("ALP SetData: num_values=" + 
std::to_string(num_values) +
+                             " but len=" + std::to_string(len));
+    }
+    needs_decode_ = (num_values > 0);
+    decoded_buffer_.clear();
+  }
+
+  int Decode(T* buffer, int max_values) override {
+    // Fast path: decode directly into output buffer if requesting all values
+    if (needs_decode_ && max_values >= this->num_values_) {
+      PARQUET_THROW_NOT_OK(::arrow::util::alp::AlpCodec<T>::Decode(
+          this->num_values_, this->data_, this->len_,
+          buffer));
+
+      const int decoded = this->num_values_;
+      this->num_values_ = 0;
+      needs_decode_ = false;
+      return decoded;
+    }
+
+    // Slow path: partial read - decode to intermediate buffer
+    // ALP Bit unpacker needs batches of 64
+    if (needs_decode_) {
+      decoded_buffer_.resize(this->num_values_);
+      PARQUET_THROW_NOT_OK(::arrow::util::alp::AlpCodec<T>::Decode(
+          this->num_values_, this->data_, this->len_,
+          decoded_buffer_.data()));
+      needs_decode_ = false;
+    }
+
+    // Copy from intermediate buffer
+    const int values_to_decode = std::min(
+        max_values,
+        static_cast<int>(decoded_buffer_.size() - current_offset_));
+
+    if (values_to_decode > 0) {
+      std::memcpy(buffer, decoded_buffer_.data() + current_offset_,
+                  values_to_decode * sizeof(T));
+      current_offset_ += values_to_decode;
+      this->num_values_ -= values_to_decode;
+    }
+
+    return values_to_decode;
+  }
+
+  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+                  int64_t valid_bits_offset,
+                  typename EncodingTraits<DType>::Accumulator* builder) 
override {
+    const int values_to_decode = num_values - null_count;
+    if (ARROW_PREDICT_FALSE(this->num_values_ < values_to_decode)) {
+      ParquetException::EofException("ALP DecodeArrow: Not enough values 
available. "

Review Comment:
   need to double check if this just creates the exception or  if it actually 
throws.



##########
cpp/src/parquet/encoder.cc:
##########
@@ -995,6 +999,90 @@ class ByteStreamSplitEncoder<FLBAType> : public 
ByteStreamSplitEncoderBase<FLBAT
   }
 };
 
+// ----------------------------------------------------------------------
+// ALP encoder (Adaptive Lossless floating-Point)
+
+template <typename DType>
+class AlpEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using ArrowType = typename EncodingTraits<DType>::ArrowType;
+  using TypedEncoder<DType>::Put;
+
+  explicit AlpEncoder(const ColumnDescriptor* descr,
+                      ::arrow::MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::ALP, pool),
+        sink_{pool} {
+    static_assert(std::is_same<T, float>::value || std::is_same<T, 
double>::value,
+                  "ALP only supports float and double types");
+  }
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  std::shared_ptr<Buffer> FlushValues() override {
+    if (sink_.length() == 0) {
+      // Empty buffer case
+      PARQUET_ASSIGN_OR_THROW(auto buf, sink_.Finish());
+      return buf;
+    }
+
+    // Call AlpWrapper::Encode() - it handles sampling, preset selection, and 
compression
+    const size_t decompSize = sink_.length();
+    size_t compSize = 
::arrow::util::alp::AlpWrapper<T>::GetMaxCompressedSize(decompSize);
+
+    PARQUET_ASSIGN_OR_THROW(
+        auto compressed_buffer,
+        ::arrow::AllocateResizableBuffer(compSize, this->memory_pool()));
+
+    ::arrow::util::alp::AlpWrapper<T>::Encode(
+        reinterpret_cast<const T*>(sink_.data()),
+        decompSize,
+        reinterpret_cast<char*>(compressed_buffer->mutable_data()),
+        &compSize);
+
+    PARQUET_THROW_NOT_OK(compressed_buffer->Resize(compSize));
+    sink_.Reset();
+
+    return std::shared_ptr<Buffer>(std::move(compressed_buffer));
+  }
+
+  void Put(const T* buffer, int num_values) override {
+    if (num_values > 0) {
+      PARQUET_THROW_NOT_OK(

Review Comment:
   lets put a TODO here so we understand this isn't the final intended state.



##########
cpp/src/arrow/util/alp/alp_sampler.h:
##########
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// ALP sampler for collecting samples and creating encoding presets
+
+#pragma once
+
+#include <optional>
+#include <vector>
+
+#include "arrow/util/alp/alp.h"
+#include "arrow/util/span.h"
+
+namespace arrow {
+namespace util {
+namespace alp {
+
+// ----------------------------------------------------------------------
+// AlpSampler
+
+/// \class AlpSampler
+/// \brief Collects samples from data to be compressed with ALP
+///
+/// Usage: Call AddSample() or AddSampleVector() multiple times to collect
+/// samples, then call Finalize() to retrieve the resulting preset.
+///
+/// \tparam T the floating point type (float or double) to sample
+template <typename T>
+class AlpSampler {
+ public:
+  /// \brief Default constructor
+  AlpSampler();
+
+  /// \brief Helper struct containing the preset for ALP compression
+  struct AlpSamplerResult {
+    AlpEncodingParameters alp_parameters;
+  };
+
+  /// \brief Add a sample of arbitrary size
+  ///
+  /// The sample is internally separated into vectors on which 
AddSampleVector()
+  /// is called.
+  ///
+  /// \param[in] input the input data to sample from
+  void AddSample(arrow::util::span<const T> input);
+
+  /// \brief Add a single vector as a sample
+  ///
+  /// \param[in] input the input vector to add.
+  ///            Size should be <= AlpConstants::kAlpVectorSize.
+  void AddSampleVector(arrow::util::span<const T> input);
+
+  /// \brief Finalize sampling and generate the encoding preset
+  ///
+  /// \return an AlpSamplerResult containing the generated encoding preset
+  AlpSamplerResult Finalize();
+
+ private:
+  /// \brief Helper struct to encapsulate settings used for sampling
+  struct AlpSamplingParameters {
+    uint64_t num_lookup_value;
+    uint64_t num_sampled_increments;
+    uint64_t num_sampled_values;
+  };
+
+  /// \brief Calculate sampling parameters for the current vector
+  ///
+  /// \param[in] num_current_vector_values number of values in current vector
+  /// \return the sampling parameters to use
+  AlpSamplingParameters GetAlpSamplingParameters(uint64_t 
num_current_vector_values);
+
+  /// \brief Check if the current vector must be ignored for sampling
+  ///
+  /// \param[in] vectors_count the total number of vectors processed so far
+  /// \param[in] vectors_sampled_count the number of vectors sampled so far
+  /// \param[in] num_current_vector_values number of values in current vector
+  /// \return true if the current vector should be skipped, false otherwise
+  bool MustSkipSamplingFromCurrentVector(uint64_t vectors_count,
+                                         uint64_t vectors_sampled_count,
+                                         uint64_t num_current_vector_values);
+
+  /// Count of vectors that have been sampled
+  uint64_t vectors_sampled_count_ = 0;

Review Comment:
   nit: int64_ for all the uint64_t variables here?



##########
cpp/src/parquet/decoder.cc:
##########
@@ -2372,6 +2327,125 @@ class ByteStreamSplitDecoder<FLBAType> : public 
ByteStreamSplitDecoderBase<FLBAT
   }
 };
 
+// ----------------------------------------------------------------------
+// ALP decoder (Adaptive Lossless floating-Point)
+
+template <typename DType>
+class AlpDecoder : public TypedDecoderImpl<DType> {
+ public:
+  using Base = TypedDecoderImpl<DType>;
+  using T = typename DType::c_type;
+
+  explicit AlpDecoder(const ColumnDescriptor* descr)
+      : Base(descr, Encoding::ALP), current_offset_{0}, needs_decode_{false} {
+    static_assert(std::is_same<T, float>::value || std::is_same<T, 
double>::value,
+                  "ALP only supports float and double types");
+  }
+
+  void SetData(int num_values, const uint8_t* data, int len) final {
+    Base::SetData(num_values, data, len);
+    current_offset_ = 0;
+    if (num_values > 0 && len <= 0) {
+      throw ParquetException("ALP SetData: num_values=" + 
std::to_string(num_values) +
+                             " but len=" + std::to_string(len));
+    }
+    needs_decode_ = (num_values > 0);
+    decoded_buffer_.clear();
+  }
+
+  int Decode(T* buffer, int max_values) override {
+    // Fast path: decode directly into output buffer if requesting all values
+    if (needs_decode_ && max_values >= this->num_values_) {
+      PARQUET_THROW_NOT_OK(::arrow::util::alp::AlpCodec<T>::Decode(
+          this->num_values_, this->data_, this->len_,
+          buffer));
+
+      const int decoded = this->num_values_;
+      this->num_values_ = 0;
+      needs_decode_ = false;
+      return decoded;
+    }
+
+    // Slow path: partial read - decode to intermediate buffer
+    // ALP Bit unpacker needs batches of 64
+    if (needs_decode_) {
+      decoded_buffer_.resize(this->num_values_);
+      PARQUET_THROW_NOT_OK(::arrow::util::alp::AlpCodec<T>::Decode(
+          this->num_values_, this->data_, this->len_,
+          decoded_buffer_.data()));
+      needs_decode_ = false;
+    }
+
+    // Copy from intermediate buffer
+    const int values_to_decode = std::min(
+        max_values,
+        static_cast<int>(decoded_buffer_.size() - current_offset_));
+
+    if (values_to_decode > 0) {
+      std::memcpy(buffer, decoded_buffer_.data() + current_offset_,
+                  values_to_decode * sizeof(T));
+      current_offset_ += values_to_decode;
+      this->num_values_ -= values_to_decode;
+    }
+
+    return values_to_decode;
+  }
+
+  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+                  int64_t valid_bits_offset,
+                  typename EncodingTraits<DType>::Accumulator* builder) 
override {
+    const int values_to_decode = num_values - null_count;
+    if (ARROW_PREDICT_FALSE(this->num_values_ < values_to_decode)) {
+      ParquetException::EofException("ALP DecodeArrow: Not enough values 
available. "
+                                      "Available: " + 
std::to_string(this->num_values_) +
+                                      ", Requested: " + 
std::to_string(values_to_decode));
+    }
+
+    // Decode if needed (DecodeArrow always needs intermediate buffer for 
nulls)
+    if (needs_decode_) {
+      decoded_buffer_.resize(this->num_values_);
+      PARQUET_THROW_NOT_OK(::arrow::util::alp::AlpCodec<T>::Decode(
+          this->num_values_, this->data_, this->len_,
+          decoded_buffer_.data()));
+      needs_decode_ = false;
+    }
+
+    if (null_count == 0) {

Review Comment:
   I think we should also have a TODO someplace around here to reflect this 
isn't the final state and we should handle incremental decode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to