pitrou commented on code in PR #14191:
URL: https://github.com/apache/arrow/pull/14191#discussion_r1028282051


##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),

Review Comment:
   Uh... cast to `double` instead? I know that, given these quantities are 
small, `float` ideally shouldn't incur any precision loss here, but still, 
let's avoid pointless micro-optimizations.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;

Review Comment:
   Why not put these into `DeltaBitPackEncoder`?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);

Review Comment:
   `mini_blocks_per_block_` is typically 4, so the overhead of 
`arrow::stl::allocator` is probably overkill here.
   And/or, you can cache this container on the encoder instead of doing a 
temporary heap allocation every 128 values.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }

Review Comment:
   Can we also ensure that `values_per_block % mini_blocks_per_block == 0`?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+

Review Comment:
   Why 32 above?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);

Review Comment:
   You're using `SafeSignedSubtract` here... but what would happen if the 
subtraction overflow?
   
   Say `T` is `INT32`, you have current_value_ = -2147483647 (0x80000001) and 
value = 2147483647 (0x7fffffff)... then the computed delta is -2 (!).
   
   I see the Java implementation [claims 
that](https://github.com/apache/parquet-mr/blob/d057b39d93014fe40f5067ee4a33621e65c91552/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java#L86-L88):
   > The algorithm is correct because Java long is working as a modalar ring 
with base 2^64 and because of the plus and minus properties of a ring.
   
   The Rust Parquet reader also seems to take specific steps to handle such 
input properly:
   
https://github.com/apache/arrow-rs/blob/6f8187fb34effc79b20a03cb1d0d164448fb5ba8/parquet/src/encodings/decoding.rs#L712-L717
   
   Worth adding a comment about this?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(

Review Comment:
   Is the call to `std::min` required or even ok?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);

Review Comment:
   Should add a comment here explaining that you are skipping over the "slots" 
for the miniblock bitwidths.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }

Review Comment:
   Perhaps this might be simplified:
   ```c++
   const int num_bits = 
bit_util::NumRequiredBits(static_cast<std::make_unsigned_t<T>>(max_delta_diff));
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ 
vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last mini block, we pad the 
mini block
+    // with zeroes so that its length is the number of values in a full mini 
block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]);

Review Comment:
   I would say the call to `ToLittleEndian` is pedantic, given that bit widths 
are 8-bit quantities. :-)



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ 
vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);

Review Comment:
   Hmm... the encoded deltas are supposed to be bit-packed, but here you are 
aligning them over byte boundaries?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ 
vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last mini block, we pad the 
mini block
+    // with zeroes so that its length is the number of values in a full mini 
block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);

Review Comment:
   (same question)



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ 
vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last mini block, we pad the 
mini block
+    // with zeroes so that its length is the number of values in a full mini 
block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();
+
+  if (!bit_writer_.PutVlqInt(values_per_block_) ||
+      !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+      !bit_writer_.PutVlqInt(total_value_count_) ||
+      !bit_writer_.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("header writing error");
+  }

Review Comment:
   This seems to assume that the `bits_buffer_` is large enough, once reset, to 
write the header into... which I assume it is, but perhaps add a check just to 
be sure?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ 
vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last mini block, we pad the 
mini block
+    // with zeroes so that its length is the number of values in a full mini 
block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();
+
+  if (!bit_writer_.PutVlqInt(values_per_block_) ||
+      !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+      !bit_writer_.PutVlqInt(total_value_count_) ||
+      !bit_writer_.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("header writing error");
+  }
+  bit_writer_.Flush();
+
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), 
bit_buffer->size()));
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+  return buffer;

Review Comment:
   I think this can spelled more concisely
   ```suggestion
     PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/ 
true));
     return buffer;
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ 
vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last mini block, we pad the 
mini block
+    // with zeroes so that its length is the number of values in a full mini 
block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();
+
+  if (!bit_writer_.PutVlqInt(values_per_block_) ||
+      !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+      !bit_writer_.PutVlqInt(total_value_count_) ||
+      !bit_writer_.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("header writing error");
+  }
+  bit_writer_.Flush();
+
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), 
bit_buffer->size()));

Review Comment:
   It's a bit of a pity to copy again the generated blocks here... 
Unfortunately the header's byte length cannot be computed until `FlushValues` 
AFAIU, since `total_value_count_` is varlen-encoded. However, you could perhaps 
at least presize `sink_` here to avoid a pointless reallocation?



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1276,66 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), 
ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, 
descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, 
num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t 
valid_bits_offset) {

Review Comment:
   `override` here as well?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ 
vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last mini block, we pad the 
mini block
+    // with zeroes so that its length is the number of values in a full mini 
block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();

Review Comment:
   Perhaps
   ```suggestion
     PARQUET_ASSIGN_OR_THROW(auto bit_buffer, sink_.Finish(/*shrink_to_fit=*/ 
true));
   ```
   
   Also, note that `BufferBuilder::Finish` calls `Reset` implicitly.



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1276,66 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), 
ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {

Review Comment:
   Why is this `virtual`? Did you mean to mark this `override`?



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1276,66 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), 
ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, 
descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, 
num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t 
valid_bits_offset) {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, 
descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, 
null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, 
num_values_,
+                                                        valid_bits, 
valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<Int32Type, Int64Type> TestDeltaBitPackEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes);
+
+TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 2000));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 
0.1));
+}

Review Comment:
   Also, currently `GenerateData` uses `std::numeric_limits<T>` for the random 
distribution of values, probably ensuring that the max bitwidth is always 
emitted?
   
   I think we want to improve this.



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1276,66 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), 
ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, 
descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, 
num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t 
valid_bits_offset) {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, 
descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, 
null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, 
num_values_,
+                                                        valid_bits, 
valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<Int32Type, Int64Type> TestDeltaBitPackEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes);
+
+TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 2000));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 
0.1));
+}

Review Comment:
   I think we would like to check with all data sizes between, say, 0 and 256 
values, to exercise the blocking logic a bit more.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ 
vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);

Review Comment:
   If this is really an error, I would like the tests to be able to catch this!



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1276,66 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), 
ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, 
descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, 
num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t 
valid_bits_offset) {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, 
descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, 
null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, 
num_values_,
+                                                        valid_bits, 
valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<Int32Type, Int64Type> TestDeltaBitPackEncodingTypes;

Review Comment:
   Style nit
   ```suggestion
   using TestDeltaBitPackEncodingTypes = ::testing::Types<Int32Type, Int64Type>;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to