wjones127 commented on code in PR #14191:
URL: https://github.com/apache/arrow/pull/14191#discussion_r1024423662


##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};

Review Comment:
   Question: what does the `{0}` mean? a default value?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =

Review Comment:
   nit: call this `length` or `mini_block_length` instead?
   ```suggestion
       const uint32_t length =
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);

Review Comment:
   Should we compute `num_miniblocks` first, and then use `num_miniblocks` as 
the length instead? Or is there a reason we might want extra slots? (If the 
latter, worth a comment IMO)



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder

Review Comment:
   For future readers, I'd appreciate some more comments on the encoding. It 
would be nice to know before getting into the code:
   
    * What is a block?
    * What is a miniblock?
    * Is there a website where the full specification of this encoding is 
provided?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));

Review Comment:
   Perhaps you could add a comment describing what `num_bytes` represents in 
the context of the encoding? IIUC each value is represented by a diff that 
takes up exactly `num_bytes` in a bitmap. Is that right?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last miniblock, we pad the 
miniblock
+    // with zeroes so that its length is the number of values in a full 
miniblock
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    T val = bit_util::ToLittleEndian(bit_widths[i]);
+    memcpy(bit_width_data + i, &val, 1);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();
+
+  if (!bit_writer_.PutVlqInt(values_per_block_) ||
+      !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+      !bit_writer_.PutVlqInt(total_value_count_) ||
+      !bit_writer_.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("header writing error");
+  }
+  bit_writer_.Flush();
+
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), 
bit_buffer->size()));
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+  return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+  const auto& data = *values.data();

Review Comment:
   ```suggestion
     const ArrayData& data = *values.data();
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last miniblock, we pad the 
miniblock
+    // with zeroes so that its length is the number of values in a full 
miniblock
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    T val = bit_util::ToLittleEndian(bit_widths[i]);
+    memcpy(bit_width_data + i, &val, 1);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();
+
+  if (!bit_writer_.PutVlqInt(values_per_block_) ||
+      !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+      !bit_writer_.PutVlqInt(total_value_count_) ||
+      !bit_writer_.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("header writing error");
+  }
+  bit_writer_.Flush();
+
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), 
bit_buffer->size()));
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+  return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+  const auto& data = *values.data();
+  if (values.null_count() == 0) {
+    Put(data.GetValues<int32_t>(1), static_cast<int>(values.length()));
+  } else {
+    PutSpaced(data.GetValues<int32_t>(1), static_cast<int>(data.length),
+              data.GetValues<uint8_t>(0, 0), data.offset);
+  }
+}
+
+template <>
+void DeltaBitPackEncoder<Int64Type>::Put(const ::arrow::Array& values) {
+  const auto& data = *values.data();

Review Comment:
   ```suggestion
     const ArrayData& data = *values.data();
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last miniblock, we pad the 
miniblock
+    // with zeroes so that its length is the number of values in a full 
miniblock
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    T val = bit_util::ToLittleEndian(bit_widths[i]);
+    memcpy(bit_width_data + i, &val, 1);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();
+
+  if (!bit_writer_.PutVlqInt(values_per_block_) ||
+      !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+      !bit_writer_.PutVlqInt(total_value_count_) ||
+      !bit_writer_.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("header writing error");
+  }
+  bit_writer_.Flush();
+
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), 
bit_buffer->size()));
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+  return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+  const auto& data = *values.data();
+  if (values.null_count() == 0) {
+    Put(data.GetValues<int32_t>(1), static_cast<int>(values.length()));

Review Comment:
   To be more consistent with line below (and possibly more optimal?)
   ```suggestion
       Put(data.GetValues<int32_t>(1), static_cast<int>(data.length));
   ```



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1276,64 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), 
ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, 
descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, 
num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t 
valid_bits_offset) {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, 
descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, 
null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, 
num_values_,
+                                                        valid_bits, 
valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<Int32Type, Int64Type> TestDeltaBitPackEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes);
+
+TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200));

Review Comment:
   Would it make sense to test the edge cases of an empty array or one that's 
all repeats?
   
   ```suggestion
     ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200));
     ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
     ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 2000));
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};

Review Comment:
   Also nit: could we use the term `length` instead of `values`? When I see the 
name `values` I think it should be a pointer into an array of values, not the 
number of values.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),

Review Comment:
   `(values_per_block + 3) * sizeof(T)` seems like some odd magic that could 
use a comment justifying it.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last miniblock, we pad the 
miniblock
+    // with zeroes so that its length is the number of values in a full 
miniblock
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    T val = bit_util::ToLittleEndian(bit_widths[i]);
+    memcpy(bit_width_data + i, &val, 1);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();
+
+  if (!bit_writer_.PutVlqInt(values_per_block_) ||
+      !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+      !bit_writer_.PutVlqInt(total_value_count_) ||
+      !bit_writer_.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("header writing error");
+  }
+  bit_writer_.Flush();
+
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), 
bit_buffer->size()));
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+  return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+  const auto& data = *values.data();
+  if (values.null_count() == 0) {
+    Put(data.GetValues<int32_t>(1), static_cast<int>(values.length()));
+  } else {
+    PutSpaced(data.GetValues<int32_t>(1), static_cast<int>(data.length),
+              data.GetValues<uint8_t>(0, 0), data.offset);
+  }
+}
+
+template <>
+void DeltaBitPackEncoder<Int64Type>::Put(const ::arrow::Array& values) {
+  const auto& data = *values.data();
+  if (values.null_count() == 0) {
+    Put(data.GetValues<int64_t>(1), static_cast<int>(values.length()));

Review Comment:
   ```suggestion
       Put(data.GetValues<int64_t>(1), static_cast<int>(data.length));
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last miniblock, we pad the 
miniblock
+    // with zeroes so that its length is the number of values in a full 
miniblock
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    T val = bit_util::ToLittleEndian(bit_widths[i]);
+    memcpy(bit_width_data + i, &val, 1);

Review Comment:
   Perhaps I don't understand, but does this not work?
   
   ```suggestion
       bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to