pitrou commented on code in PR #14191:
URL: https://github.com/apache/arrow/pull/14191#discussion_r1041141931


##########
cpp/src/arrow/util/bit_stream_utils.h:
##########
@@ -203,9 +203,10 @@ class BitReader {
 };
 
 inline bool BitWriter::PutValue(uint64_t v, int num_bits) {
-  // TODO: revisit this limit if necessary (can be raised to 64 by fixing some 
edge cases)
-  DCHECK_LE(num_bits, 32);
-  DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits;
+  DCHECK_LE(num_bits, 64);
+  if (num_bits < 64) {
+    DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits;
+  }

Review Comment:
   It seems like we could expand the existing tests to exercise this? Some of 
the tests `rle_encoding_test.cc` would apply.
   



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1282,108 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), 
ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  void InitBoundData(int nvalues, int repeats) {
+    num_values_ = nvalues * repeats;
+    input_bytes_.resize(num_values_ * sizeof(c_type));
+    output_bytes_.resize(num_values_ * sizeof(c_type));
+    draws_ = reinterpret_cast<c_type*>(input_bytes_.data());
+    decode_buf_ = reinterpret_cast<c_type*>(output_bytes_.data());
+    GenerateBoundData<c_type>(nvalues, draws_, -10, 10, &data_buffer_);
+
+    // add some repeated values
+    for (int j = 1; j < repeats; ++j) {
+      for (int i = 0; i < nvalues; ++i) {
+        draws_[nvalues * j + i] = draws_[i];
+      }
+    }
+  }
+
+  void ExecuteBound(int nvalues, int repeats) {
+    InitBoundData(nvalues, repeats);
+    CheckRoundtrip();
+  }
+
+  void ExecuteSpacedBound(int nvalues, int repeats, int64_t valid_bits_offset,
+                          double null_probability) {
+    InitBoundData(nvalues, repeats);
+
+    int64_t size = num_values_ + valid_bits_offset;
+    auto rand = ::arrow::random::RandomArrayGenerator(1923);
+    const auto array = rand.UInt8(size, 0, 100, null_probability);
+    const auto valid_bits = array->null_bitmap_data();
+    if (valid_bits) {
+      CheckRoundtripSpaced(valid_bits, valid_bits_offset);
+    }
+  }
+
+  void CheckRoundtrip() override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, 
descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, 
num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits,
+                            int64_t valid_bits_offset) override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, 
descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, 
null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, 
num_values_,
+                                                        valid_bits, 
valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+  std::vector<uint8_t> input_bytes_;
+  std::vector<uint8_t> output_bytes_;
+};
+
+using TestDeltaBitPackEncodingTypes = ::testing::Types<Int32Type, Int64Type>;
+TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes);
+
+TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 2000));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 
0.1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(25000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(2000, 2000));

Review Comment:
   It doesn't seem very useful to vary the number of values and repeats if 
you're always using the same (small) bounds.
   It would be nice to test cases where the bit width ends up 0, 64, or some 
values in between.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * 
sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        bit_widths_(mini_blocks_per_block, 0) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block 
must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest 
possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<uint8_t> bit_widths_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    UT value = static_cast<UT>(src[idx]);
+    // Calculate deltas. The possible overflow is handled by use of unsigned 
integers
+    // making subtraction operations well defined and correct even in case of 
overflow.
+    // Encoded integes will wrap back around on decoding.

Review Comment:
   ```suggestion
       // Encoded integers will wrap back around on decoding.
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * 
sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        bit_widths_(mini_blocks_per_block, 0) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block 
must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest 
possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<uint8_t> bit_widths_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    UT value = static_cast<UT>(src[idx]);
+    // Calculate deltas. The possible overflow is handled by use of unsigned 
integers
+    // making subtraction operations well defined and correct even in case of 
overflow.
+    // Encoded integes will wrap back around on decoding.
+    // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  auto min_delta = static_cast<UT>(

Review Comment:
   Is the `static_cast` still needed?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * 
sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        bit_widths_(mini_blocks_per_block, 0) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block 
must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest 
possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<uint8_t> bit_widths_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    UT value = static_cast<UT>(src[idx]);
+    // Calculate deltas. The possible overflow is handled by use of unsigned 
integers
+    // making subtraction operations well defined and correct even in case of 
overflow.
+    // Encoded integes will wrap back around on decoding.
+    // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  auto min_delta = static_cast<UT>(
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_));
+  bit_writer_.PutZigZagVlqInt(static_cast<T>(min_delta));
+
+  // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to 
write
+  // bit widths of miniblocks as they become known during the encoding.
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks =
+      
static_cast<uint32_t>(std::ceil(static_cast<double>(values_current_block_) /
+                                      
static_cast<double>(values_per_mini_block_)));
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    auto max_delta = static_cast<UT>(*std::max_element(

Review Comment:
   Is the `static_cast` required?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;

Review Comment:
   Can you add a comment explaining this one?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * 
sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        bit_widths_(mini_blocks_per_block, 0) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block 
must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest 
possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<uint8_t> bit_widths_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    UT value = static_cast<UT>(src[idx]);
+    // Calculate deltas. The possible overflow is handled by use of unsigned 
integers
+    // making subtraction operations well defined and correct even in case of 
overflow.
+    // Encoded integes will wrap back around on decoding.
+    // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  auto min_delta = static_cast<UT>(

Review Comment:
   Also, make this `const`?



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1282,108 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), 
ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  void InitBoundData(int nvalues, int repeats) {
+    num_values_ = nvalues * repeats;
+    input_bytes_.resize(num_values_ * sizeof(c_type));
+    output_bytes_.resize(num_values_ * sizeof(c_type));
+    draws_ = reinterpret_cast<c_type*>(input_bytes_.data());
+    decode_buf_ = reinterpret_cast<c_type*>(output_bytes_.data());
+    GenerateBoundData<c_type>(nvalues, draws_, -10, 10, &data_buffer_);
+
+    // add some repeated values
+    for (int j = 1; j < repeats; ++j) {
+      for (int i = 0; i < nvalues; ++i) {
+        draws_[nvalues * j + i] = draws_[i];
+      }
+    }
+  }
+
+  void ExecuteBound(int nvalues, int repeats) {
+    InitBoundData(nvalues, repeats);
+    CheckRoundtrip();
+  }
+
+  void ExecuteSpacedBound(int nvalues, int repeats, int64_t valid_bits_offset,
+                          double null_probability) {
+    InitBoundData(nvalues, repeats);
+
+    int64_t size = num_values_ + valid_bits_offset;
+    auto rand = ::arrow::random::RandomArrayGenerator(1923);
+    const auto array = rand.UInt8(size, 0, 100, null_probability);
+    const auto valid_bits = array->null_bitmap_data();
+    if (valid_bits) {
+      CheckRoundtripSpaced(valid_bits, valid_bits_offset);
+    }

Review Comment:
   We should still try to roundtrip if there are only nulls.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * 
sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        bit_widths_(mini_blocks_per_block, 0) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block 
must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest 
possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<uint8_t> bit_widths_;

Review Comment:
   I'm curious, why use a `ArrowPoolVector` above but not here?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * 
sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        bit_widths_(mini_blocks_per_block, 0) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block 
must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest 
possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<uint8_t> bit_widths_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    UT value = static_cast<UT>(src[idx]);
+    // Calculate deltas. The possible overflow is handled by use of unsigned 
integers
+    // making subtraction operations well defined and correct even in case of 
overflow.
+    // Encoded integes will wrap back around on decoding.
+    // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  auto min_delta = static_cast<UT>(
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_));
+  bit_writer_.PutZigZagVlqInt(static_cast<T>(min_delta));
+
+  // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to 
write
+  // bit widths of miniblocks as they become known during the encoding.
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks =
+      
static_cast<uint32_t>(std::ceil(static_cast<double>(values_current_block_) /
+                                      
static_cast<double>(values_per_mini_block_)));
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    auto max_delta = static_cast<UT>(*std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block));
+
+    // The minimum number of bits required to write any of values in deltas_ 
vector.
+    // See overflow comment above.
+    bit_widths_[i] = bit_util::NumRequiredBits(max_delta - min_delta);
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      // See overflow comment above.
+      const UT value = deltas_[j] - min_delta;
+      bit_writer_.PutValue(value, bit_widths_[i]);
+    }
+    // If there are not enough values to fill the last mini block, we pad the 
mini block
+    // with zeroes so that its length is the number of values in a full mini 
block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutValue(0, bit_widths_[i]);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_widths_[i];
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;

Review Comment:
   This statement is pointless, since `bit_width_data` ceases to exist just 
after.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * 
sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        bit_widths_(mini_blocks_per_block, 0) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block 
must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest 
possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<uint8_t> bit_widths_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    UT value = static_cast<UT>(src[idx]);
+    // Calculate deltas. The possible overflow is handled by use of unsigned 
integers
+    // making subtraction operations well defined and correct even in case of 
overflow.
+    // Encoded integes will wrap back around on decoding.
+    // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  auto min_delta = static_cast<UT>(
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_));
+  bit_writer_.PutZigZagVlqInt(static_cast<T>(min_delta));
+
+  // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to 
write
+  // bit widths of miniblocks as they become known during the encoding.
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks =

Review Comment:
   Nit.
   ```suggestion
     const uint32_t num_miniblocks =
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * 
sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        bit_widths_(mini_blocks_per_block, 0) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block 
must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest 
possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<uint8_t> bit_widths_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    UT value = static_cast<UT>(src[idx]);
+    // Calculate deltas. The possible overflow is handled by use of unsigned 
integers
+    // making subtraction operations well defined and correct even in case of 
overflow.
+    // Encoded integes will wrap back around on decoding.
+    // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  auto min_delta = static_cast<UT>(
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_));
+  bit_writer_.PutZigZagVlqInt(static_cast<T>(min_delta));
+
+  // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to 
write
+  // bit widths of miniblocks as they become known during the encoding.
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks =
+      
static_cast<uint32_t>(std::ceil(static_cast<double>(values_current_block_) /
+                                      
static_cast<double>(values_per_mini_block_)));
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    auto max_delta = static_cast<UT>(*std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block));
+
+    // The minimum number of bits required to write any of values in deltas_ 
vector.
+    // See overflow comment above.
+    bit_widths_[i] = bit_util::NumRequiredBits(max_delta - min_delta);
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      // See overflow comment above.
+      const UT value = deltas_[j] - min_delta;
+      bit_writer_.PutValue(value, bit_widths_[i]);
+    }
+    // If there are not enough values to fill the last mini block, we pad the 
mini block
+    // with zeroes so that its length is the number of values in a full mini 
block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutValue(0, bit_widths_[i]);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_widths_[i];
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true));
+
+  uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {};
+  bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_));
+  if (!header_writer.PutVlqInt(values_per_block_) ||
+      !header_writer.PutVlqInt(mini_blocks_per_block_) ||
+      !header_writer.PutVlqInt(total_value_count_) ||
+      !header_writer.PutZigZagVlqInt(static_cast<T>(first_value_))) {
+    throw ParquetException("header writing error");
+  }
+  header_writer.Flush();
+
+  // We reserved enough space at the beginning of the buffer for largest 
possible header
+  // and data was written immediately after. We now write the header data 
immediately
+  // before the end of reserved space.
+  const size_t offset_bytes = kMaxPageHeaderWriterSize - 
header_writer.bytes_written();
+  std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_,
+              header_writer.bytes_written());
+
+  // Excess bytes at the beginning will are sliced off and ignored.
+  return SliceBuffer(buffer, offset_bytes);
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+  const ::arrow::ArrayData& data = *values.data();
+  if (values.null_count() == 0) {
+    Put(data.GetValues<int32_t>(1), static_cast<int>(data.length));
+  } else {
+    PutSpaced(data.GetValues<int32_t>(1), static_cast<int>(data.length),
+              data.GetValues<uint8_t>(0, 0), data.offset);
+  }
+}
+
+template <>
+void DeltaBitPackEncoder<Int64Type>::Put(const ::arrow::Array& values) {
+  const ::arrow::ArrayData& data = *values.data();
+  if (values.null_count() == 0) {
+    Put(data.GetValues<int64_t>(1), static_cast<int>(data.length));
+  } else {
+    PutSpaced(data.GetValues<int64_t>(1), static_cast<int>(data.length),
+              data.GetValues<uint8_t>(0, 0), data.offset);
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const ::arrow::Array& values) {

Review Comment:
   This one will never be instantiated, right? Is it required to declare it?



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1282,108 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), 
ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  void InitBoundData(int nvalues, int repeats) {
+    num_values_ = nvalues * repeats;
+    input_bytes_.resize(num_values_ * sizeof(c_type));
+    output_bytes_.resize(num_values_ * sizeof(c_type));
+    draws_ = reinterpret_cast<c_type*>(input_bytes_.data());
+    decode_buf_ = reinterpret_cast<c_type*>(output_bytes_.data());
+    GenerateBoundData<c_type>(nvalues, draws_, -10, 10, &data_buffer_);
+
+    // add some repeated values
+    for (int j = 1; j < repeats; ++j) {
+      for (int i = 0; i < nvalues; ++i) {
+        draws_[nvalues * j + i] = draws_[i];
+      }
+    }
+  }
+
+  void ExecuteBound(int nvalues, int repeats) {
+    InitBoundData(nvalues, repeats);
+    CheckRoundtrip();
+  }
+
+  void ExecuteSpacedBound(int nvalues, int repeats, int64_t valid_bits_offset,
+                          double null_probability) {
+    InitBoundData(nvalues, repeats);
+
+    int64_t size = num_values_ + valid_bits_offset;
+    auto rand = ::arrow::random::RandomArrayGenerator(1923);
+    const auto array = rand.UInt8(size, 0, 100, null_probability);
+    const auto valid_bits = array->null_bitmap_data();
+    if (valid_bits) {
+      CheckRoundtripSpaced(valid_bits, valid_bits_offset);
+    }
+  }
+
+  void CheckRoundtrip() override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, 
descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, 
num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits,
+                            int64_t valid_bits_offset) override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, 
descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, 
null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, 
num_values_,
+                                                        valid_bits, 
valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+  std::vector<uint8_t> input_bytes_;
+  std::vector<uint8_t> output_bytes_;
+};
+
+using TestDeltaBitPackEncodingTypes = ::testing::Types<Int32Type, Int64Type>;
+TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes);
+
+TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 2000));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 
0.1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(25000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(2000, 2000));

Review Comment:
   Also, test with number of values that are:
   * a multiple of the block size
   * a multiple of the miniblock size
   * neither
   



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * 
sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        bit_widths_(mini_blocks_per_block, 0) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block 
must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest 
possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<uint8_t> bit_widths_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    UT value = static_cast<UT>(src[idx]);
+    // Calculate deltas. The possible overflow is handled by use of unsigned 
integers
+    // making subtraction operations well defined and correct even in case of 
overflow.
+    // Encoded integes will wrap back around on decoding.
+    // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  auto min_delta = static_cast<UT>(
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_));
+  bit_writer_.PutZigZagVlqInt(static_cast<T>(min_delta));
+
+  // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to 
write
+  // bit widths of miniblocks as they become known during the encoding.
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks =
+      
static_cast<uint32_t>(std::ceil(static_cast<double>(values_current_block_) /
+                                      
static_cast<double>(values_per_mini_block_)));
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    auto max_delta = static_cast<UT>(*std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block));
+
+    // The minimum number of bits required to write any of values in deltas_ 
vector.
+    // See overflow comment above.
+    bit_widths_[i] = bit_util::NumRequiredBits(max_delta - min_delta);
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      // See overflow comment above.
+      const UT value = deltas_[j] - min_delta;
+      bit_writer_.PutValue(value, bit_widths_[i]);
+    }
+    // If there are not enough values to fill the last mini block, we pad the 
mini block
+    // with zeroes so that its length is the number of values in a full mini 
block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutValue(0, bit_widths_[i]);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_widths_[i];
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true));
+
+  uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {};
+  bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_));
+  if (!header_writer.PutVlqInt(values_per_block_) ||
+      !header_writer.PutVlqInt(mini_blocks_per_block_) ||
+      !header_writer.PutVlqInt(total_value_count_) ||
+      !header_writer.PutZigZagVlqInt(static_cast<T>(first_value_))) {
+    throw ParquetException("header writing error");
+  }
+  header_writer.Flush();
+
+  // We reserved enough space at the beginning of the buffer for largest 
possible header
+  // and data was written immediately after. We now write the header data 
immediately
+  // before the end of reserved space.
+  const size_t offset_bytes = kMaxPageHeaderWriterSize - 
header_writer.bytes_written();
+  std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_,
+              header_writer.bytes_written());
+
+  // Excess bytes at the beginning will are sliced off and ignored.
+  return SliceBuffer(buffer, offset_bytes);
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+  const ::arrow::ArrayData& data = *values.data();

Review Comment:
   If you look at other implementations of `Put(const ::arrow::Array&)`, you 
should check the array type before using its buffers below.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * 
sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        bit_widths_(mini_blocks_per_block, 0) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block 
must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest 
possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<uint8_t> bit_widths_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    UT value = static_cast<UT>(src[idx]);
+    // Calculate deltas. The possible overflow is handled by use of unsigned 
integers
+    // making subtraction operations well defined and correct even in case of 
overflow.
+    // Encoded integes will wrap back around on decoding.
+    // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  auto min_delta = static_cast<UT>(
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_));
+  bit_writer_.PutZigZagVlqInt(static_cast<T>(min_delta));
+
+  // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to 
write
+  // bit widths of miniblocks as they become known during the encoding.
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks =
+      
static_cast<uint32_t>(std::ceil(static_cast<double>(values_current_block_) /
+                                      
static_cast<double>(values_per_mini_block_)));
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    auto max_delta = static_cast<UT>(*std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block));
+
+    // The minimum number of bits required to write any of values in deltas_ 
vector.
+    // See overflow comment above.
+    bit_widths_[i] = bit_util::NumRequiredBits(max_delta - min_delta);
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      // See overflow comment above.
+      const UT value = deltas_[j] - min_delta;
+      bit_writer_.PutValue(value, bit_widths_[i]);
+    }
+    // If there are not enough values to fill the last mini block, we pad the 
mini block
+    // with zeroes so that its length is the number of values in a full mini 
block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutValue(0, bit_widths_[i]);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_widths_[i];
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true));
+
+  uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {};
+  bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_));
+  if (!header_writer.PutVlqInt(values_per_block_) ||
+      !header_writer.PutVlqInt(mini_blocks_per_block_) ||
+      !header_writer.PutVlqInt(total_value_count_) ||
+      !header_writer.PutZigZagVlqInt(static_cast<T>(first_value_))) {
+    throw ParquetException("header writing error");
+  }
+  header_writer.Flush();
+
+  // We reserved enough space at the beginning of the buffer for largest 
possible header
+  // and data was written immediately after. We now write the header data 
immediately
+  // before the end of reserved space.
+  const size_t offset_bytes = kMaxPageHeaderWriterSize - 
header_writer.bytes_written();
+  std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_,
+              header_writer.bytes_written());
+
+  // Excess bytes at the beginning will are sliced off and ignored.
+  return SliceBuffer(buffer, offset_bytes);
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+  const ::arrow::ArrayData& data = *values.data();
+  if (values.null_count() == 0) {
+    Put(data.GetValues<int32_t>(1), static_cast<int>(data.length));

Review Comment:
   What if `data.length` doesn't fit in an `int`? We would probably want to 
throw an exception in that case.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * 
sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        bit_widths_(mini_blocks_per_block, 0) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block 
must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest 
possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<uint8_t> bit_widths_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    UT value = static_cast<UT>(src[idx]);
+    // Calculate deltas. The possible overflow is handled by use of unsigned 
integers
+    // making subtraction operations well defined and correct even in case of 
overflow.
+    // Encoded integes will wrap back around on decoding.
+    // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  auto min_delta = static_cast<UT>(
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_));
+  bit_writer_.PutZigZagVlqInt(static_cast<T>(min_delta));
+
+  // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to 
write
+  // bit widths of miniblocks as they become known during the encoding.
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks =
+      
static_cast<uint32_t>(std::ceil(static_cast<double>(values_current_block_) /
+                                      
static_cast<double>(values_per_mini_block_)));
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    auto max_delta = static_cast<UT>(*std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block));
+
+    // The minimum number of bits required to write any of values in deltas_ 
vector.
+    // See overflow comment above.
+    bit_widths_[i] = bit_util::NumRequiredBits(max_delta - min_delta);
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      // See overflow comment above.
+      const UT value = deltas_[j] - min_delta;
+      bit_writer_.PutValue(value, bit_widths_[i]);
+    }
+    // If there are not enough values to fill the last mini block, we pad the 
mini block
+    // with zeroes so that its length is the number of values in a full mini 
block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutValue(0, bit_widths_[i]);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_widths_[i];
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true));
+
+  uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {};
+  bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_));
+  if (!header_writer.PutVlqInt(values_per_block_) ||
+      !header_writer.PutVlqInt(mini_blocks_per_block_) ||
+      !header_writer.PutVlqInt(total_value_count_) ||
+      !header_writer.PutZigZagVlqInt(static_cast<T>(first_value_))) {
+    throw ParquetException("header writing error");
+  }
+  header_writer.Flush();
+
+  // We reserved enough space at the beginning of the buffer for largest 
possible header
+  // and data was written immediately after. We now write the header data 
immediately
+  // before the end of reserved space.
+  const size_t offset_bytes = kMaxPageHeaderWriterSize - 
header_writer.bytes_written();
+  std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_,
+              header_writer.bytes_written());
+
+  // Excess bytes at the beginning will are sliced off and ignored.

Review Comment:
   ```suggestion
     // Excess bytes at the beginning are sliced off and ignored.
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * 
sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        bit_widths_(mini_blocks_per_block, 0) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block 
must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest 
possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<uint8_t> bit_widths_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    UT value = static_cast<UT>(src[idx]);
+    // Calculate deltas. The possible overflow is handled by use of unsigned 
integers
+    // making subtraction operations well defined and correct even in case of 
overflow.
+    // Encoded integes will wrap back around on decoding.
+    // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  auto min_delta = static_cast<UT>(
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_));
+  bit_writer_.PutZigZagVlqInt(static_cast<T>(min_delta));
+
+  // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to 
write
+  // bit widths of miniblocks as they become known during the encoding.
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks =
+      
static_cast<uint32_t>(std::ceil(static_cast<double>(values_current_block_) /
+                                      
static_cast<double>(values_per_mini_block_)));
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    auto max_delta = static_cast<UT>(*std::max_element(

Review Comment:
   Also, make this `const`?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * 
sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        bit_widths_(mini_blocks_per_block, 0) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block 
must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest 
possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<uint8_t> bit_widths_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    UT value = static_cast<UT>(src[idx]);
+    // Calculate deltas. The possible overflow is handled by use of unsigned 
integers
+    // making subtraction operations well defined and correct even in case of 
overflow.
+    // Encoded integes will wrap back around on decoding.
+    // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  auto min_delta = static_cast<UT>(
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_));
+  bit_writer_.PutZigZagVlqInt(static_cast<T>(min_delta));
+
+  // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to 
write
+  // bit widths of miniblocks as they become known during the encoding.
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks =
+      
static_cast<uint32_t>(std::ceil(static_cast<double>(values_current_block_) /
+                                      
static_cast<double>(values_per_mini_block_)));
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    auto max_delta = static_cast<UT>(*std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block));
+
+    // The minimum number of bits required to write any of values in deltas_ 
vector.
+    // See overflow comment above.
+    bit_widths_[i] = bit_util::NumRequiredBits(max_delta - min_delta);

Review Comment:
   I'm not sure the compiler is able to correctly infer that this won't change 
afterwards, so I would also store it in a local variable:
   ```suggestion
       const auto bit_width = bit_widths_[i] = 
bit_util::NumRequiredBits(max_delta - min_delta);
   ```
   
   And then below you can:
   ```c++
         bit_writer_.PutValue(value, bit_width);
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,13 +2060,283 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * 
sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        bit_widths_(mini_blocks_per_block, 0) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block 
must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest 
possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<uint8_t> bit_widths_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    UT value = static_cast<UT>(src[idx]);
+    // Calculate deltas. The possible overflow is handled by use of unsigned 
integers
+    // making subtraction operations well defined and correct even in case of 
overflow.
+    // Encoded integes will wrap back around on decoding.
+    // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  auto min_delta = static_cast<UT>(
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_));
+  bit_writer_.PutZigZagVlqInt(static_cast<T>(min_delta));
+
+  // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to 
write
+  // bit widths of miniblocks as they become known during the encoding.
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks =
+      
static_cast<uint32_t>(std::ceil(static_cast<double>(values_current_block_) /
+                                      
static_cast<double>(values_per_mini_block_)));
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    auto max_delta = static_cast<UT>(*std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block));
+
+    // The minimum number of bits required to write any of values in deltas_ 
vector.
+    // See overflow comment above.
+    bit_widths_[i] = bit_util::NumRequiredBits(max_delta - min_delta);
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      // See overflow comment above.
+      const UT value = deltas_[j] - min_delta;
+      bit_writer_.PutValue(value, bit_widths_[i]);
+    }
+    // If there are not enough values to fill the last mini block, we pad the 
mini block
+    // with zeroes so that its length is the number of values in a full mini 
block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; 
j++) {
+      bit_writer_.PutValue(0, bit_widths_[i]);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_widths_[i];

Review Comment:
   Why not write directly to `bit_width_data` above instead of going through 
the temporary `bit_widths_`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to