pitrou commented on code in PR #14191:
URL: https://github.com/apache/arrow/pull/14191#discussion_r985814027


##########
cpp/src/parquet/encoding.cc:
##########
@@ -2062,6 +2062,184 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(128),
+        mini_blocks_per_block_(4),
+        values_per_mini_block_(values_per_block_ / mini_blocks_per_block_),
+        values_current_block_(0),
+        total_value_count_(0),
+        first_value_(0),
+        current_value_(0),
+        sink_(pool),
+        bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        deltas_(std::vector<T>(values_per_block_)) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return 4 * sizeof(uint64_t) + sink_.length();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    ParquetException::NYI("put spaced");
+  }
+
+ protected:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_;
+  uint32_t total_value_count_;
+  T first_value_;
+  T current_value_;
+  ::arrow::BufferBuilder sink_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<T> deltas_;
+
+ private:
+  void FlushBlock();
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  uint32_t idx = 0;
+  if (total_value_count_ == 0) {
+    first_value_ = src[0];
+    current_value_ = first_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  int increment = total_value_count_ * sizeof(T);
+  if (ARROW_PREDICT_FALSE(sink_.capacity() + increment > sink_.capacity())) {

Review Comment:
   This seems wrong, for the record.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2062,6 +2062,184 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(128),
+        mini_blocks_per_block_(4),
+        values_per_mini_block_(values_per_block_ / mini_blocks_per_block_),
+        values_current_block_(0),
+        total_value_count_(0),
+        first_value_(0),
+        current_value_(0),
+        sink_(pool),
+        bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        deltas_(std::vector<T>(values_per_block_)) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return 4 * sizeof(uint64_t) + sink_.length();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    ParquetException::NYI("put spaced");
+  }
+
+ protected:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_;
+  uint32_t total_value_count_;
+  T first_value_;
+  T current_value_;
+  ::arrow::BufferBuilder sink_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<T> deltas_;
+
+ private:
+  void FlushBlock();
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  uint32_t idx = 0;
+  if (total_value_count_ == 0) {
+    first_value_ = src[0];
+    current_value_ = first_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  int increment = total_value_count_ * sizeof(T);

Review Comment:
   This does not look like an increment?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2062,6 +2062,184 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(128),
+        mini_blocks_per_block_(4),
+        values_per_mini_block_(values_per_block_ / mini_blocks_per_block_),
+        values_current_block_(0),
+        total_value_count_(0),
+        first_value_(0),
+        current_value_(0),
+        sink_(pool),
+        bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        deltas_(std::vector<T>(values_per_block_)) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return 4 * sizeof(uint64_t) + sink_.length();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    ParquetException::NYI("put spaced");
+  }
+
+ protected:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_;
+  uint32_t total_value_count_;
+  T first_value_;
+  T current_value_;
+  ::arrow::BufferBuilder sink_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<T> deltas_;
+
+ private:
+  void FlushBlock();
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  uint32_t idx = 0;
+  if (total_value_count_ == 0) {
+    first_value_ = src[0];
+    current_value_ = first_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  int increment = total_value_count_ * sizeof(T);
+  if (ARROW_PREDICT_FALSE(sink_.capacity() + increment > sink_.capacity())) {
+    PARQUET_THROW_NOT_OK(sink_.Resize(increment, false));

Review Comment:
   Are you sure about resizing to the increment?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2062,6 +2062,184 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(128),
+        mini_blocks_per_block_(4),
+        values_per_mini_block_(values_per_block_ / mini_blocks_per_block_),
+        values_current_block_(0),
+        total_value_count_(0),
+        first_value_(0),
+        current_value_(0),
+        sink_(pool),
+        bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        deltas_(std::vector<T>(values_per_block_)) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return 4 * sizeof(uint64_t) + sink_.length();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    ParquetException::NYI("put spaced");
+  }
+
+ protected:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_;
+  uint32_t total_value_count_;
+  T first_value_;
+  T current_value_;
+  ::arrow::BufferBuilder sink_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<T> deltas_;
+
+ private:
+  void FlushBlock();
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  uint32_t idx = 0;
+  if (total_value_count_ == 0) {
+    first_value_ = src[0];
+    current_value_ = first_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  int increment = total_value_count_ * sizeof(T);
+  if (ARROW_PREDICT_FALSE(sink_.capacity() + increment > sink_.capacity())) {

Review Comment:
   (unless this a way of testing for `increment != 0`?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2062,6 +2062,184 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(128),
+        mini_blocks_per_block_(4),
+        values_per_mini_block_(values_per_block_ / mini_blocks_per_block_),
+        values_current_block_(0),
+        total_value_count_(0),
+        first_value_(0),
+        current_value_(0),
+        sink_(pool),
+        bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        deltas_(std::vector<T>(values_per_block_)) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return 4 * sizeof(uint64_t) + sink_.length();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    ParquetException::NYI("put spaced");
+  }
+
+ protected:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_;
+  uint32_t total_value_count_;
+  T first_value_;
+  T current_value_;
+  ::arrow::BufferBuilder sink_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<T> deltas_;
+
+ private:
+  void FlushBlock();
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  uint32_t idx = 0;
+  if (total_value_count_ == 0) {
+    first_value_ = src[0];
+    current_value_ = first_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  int increment = total_value_count_ * sizeof(T);
+  if (ARROW_PREDICT_FALSE(sink_.capacity() + increment > sink_.capacity())) {
+    PARQUET_THROW_NOT_OK(sink_.Resize(increment, false));
+  }
+
+  while (idx < static_cast<uint32_t>(num_values)) {
+    T value = src[idx];
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+
+  if (values_current_block_ != 0) {
+    FlushBlock();
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const auto min_delta =
+      static_cast<int64_t>(*std::min_element(deltas_.begin(), deltas_.end()));

Review Comment:
   What if `deltas_` was not filled up to its end? (since the end of `Put()` 
above)



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2062,6 +2062,184 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(128),
+        mini_blocks_per_block_(4),
+        values_per_mini_block_(values_per_block_ / mini_blocks_per_block_),
+        values_current_block_(0),
+        total_value_count_(0),
+        first_value_(0),
+        current_value_(0),
+        sink_(pool),
+        bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        deltas_(std::vector<T>(values_per_block_)) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return 4 * sizeof(uint64_t) + sink_.length();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    ParquetException::NYI("put spaced");
+  }
+
+ protected:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_;
+  uint32_t total_value_count_;
+  T first_value_;
+  T current_value_;
+  ::arrow::BufferBuilder sink_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<T> deltas_;
+
+ private:
+  void FlushBlock();
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  uint32_t idx = 0;
+  if (total_value_count_ == 0) {
+    first_value_ = src[0];
+    current_value_ = first_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  int increment = total_value_count_ * sizeof(T);
+  if (ARROW_PREDICT_FALSE(sink_.capacity() + increment > sink_.capacity())) {
+    PARQUET_THROW_NOT_OK(sink_.Resize(increment, false));
+  }
+
+  while (idx < static_cast<uint32_t>(num_values)) {
+    T value = src[idx];
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+
+  if (values_current_block_ != 0) {
+    FlushBlock();
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const auto min_delta =
+      static_cast<int64_t>(*std::min_element(deltas_.begin(), deltas_.end()));
+  DCHECK(bit_writer_.PutZigZagVlqInt(min_delta));

Review Comment:
   `min_delta` is an iterator (returned by `std::min_element`)... what does it 
mean  to pack it as bits?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2062,6 +2062,184 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(128),
+        mini_blocks_per_block_(4),
+        values_per_mini_block_(values_per_block_ / mini_blocks_per_block_),
+        values_current_block_(0),
+        total_value_count_(0),
+        first_value_(0),
+        current_value_(0),
+        sink_(pool),
+        bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        deltas_(std::vector<T>(values_per_block_)) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return 4 * sizeof(uint64_t) + sink_.length();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    ParquetException::NYI("put spaced");
+  }
+
+ protected:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_;
+  uint32_t total_value_count_;
+  T first_value_;
+  T current_value_;
+  ::arrow::BufferBuilder sink_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<T> deltas_;
+
+ private:
+  void FlushBlock();
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  uint32_t idx = 0;
+  if (total_value_count_ == 0) {
+    first_value_ = src[0];
+    current_value_ = first_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  int increment = total_value_count_ * sizeof(T);
+  if (ARROW_PREDICT_FALSE(sink_.capacity() + increment > sink_.capacity())) {
+    PARQUET_THROW_NOT_OK(sink_.Resize(increment, false));
+  }
+
+  while (idx < static_cast<uint32_t>(num_values)) {
+    T value = src[idx];
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+
+  if (values_current_block_ != 0) {
+    FlushBlock();
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const auto min_delta =
+      static_cast<int64_t>(*std::min_element(deltas_.begin(), deltas_.end()));
+  DCHECK(bit_writer_.PutZigZagVlqInt(min_delta));

Review Comment:
   (perhaps you meant `std::min` rather than `std::min_element`)



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2062,6 +2062,184 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(128),
+        mini_blocks_per_block_(4),
+        values_per_mini_block_(values_per_block_ / mini_blocks_per_block_),
+        values_current_block_(0),
+        total_value_count_(0),
+        first_value_(0),
+        current_value_(0),
+        sink_(pool),
+        bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        deltas_(std::vector<T>(values_per_block_)) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return 4 * sizeof(uint64_t) + sink_.length();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    ParquetException::NYI("put spaced");
+  }
+
+ protected:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_;
+  uint32_t total_value_count_;
+  T first_value_;
+  T current_value_;
+  ::arrow::BufferBuilder sink_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<T> deltas_;
+
+ private:
+  void FlushBlock();
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  uint32_t idx = 0;
+  if (total_value_count_ == 0) {
+    first_value_ = src[0];
+    current_value_ = first_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  int increment = total_value_count_ * sizeof(T);
+  if (ARROW_PREDICT_FALSE(sink_.capacity() + increment > sink_.capacity())) {
+    PARQUET_THROW_NOT_OK(sink_.Resize(increment, false));
+  }
+
+  while (idx < static_cast<uint32_t>(num_values)) {
+    T value = src[idx];
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+
+  if (values_current_block_ != 0) {
+    FlushBlock();
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const auto min_delta =
+      static_cast<int64_t>(*std::min_element(deltas_.begin(), deltas_.end()));

Review Comment:
   What if `deltas_` was not filled up to its end? (see the end of `Put()` 
above)



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2062,6 +2062,184 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(128),
+        mini_blocks_per_block_(4),
+        values_per_mini_block_(values_per_block_ / mini_blocks_per_block_),
+        values_current_block_(0),
+        total_value_count_(0),
+        first_value_(0),
+        current_value_(0),
+        sink_(pool),
+        bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        deltas_(std::vector<T>(values_per_block_)) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return 4 * sizeof(uint64_t) + sink_.length();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    ParquetException::NYI("put spaced");
+  }
+
+ protected:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_;
+  uint32_t total_value_count_;
+  T first_value_;
+  T current_value_;
+  ::arrow::BufferBuilder sink_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<T> deltas_;
+
+ private:
+  void FlushBlock();
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  uint32_t idx = 0;
+  if (total_value_count_ == 0) {
+    first_value_ = src[0];
+    current_value_ = first_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  int increment = total_value_count_ * sizeof(T);
+  if (ARROW_PREDICT_FALSE(sink_.capacity() + increment > sink_.capacity())) {
+    PARQUET_THROW_NOT_OK(sink_.Resize(increment, false));
+  }
+
+  while (idx < static_cast<uint32_t>(num_values)) {
+    T value = src[idx];
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+
+  if (values_current_block_ != 0) {
+    FlushBlock();
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const auto min_delta =
+      static_cast<int64_t>(*std::min_element(deltas_.begin(), deltas_.end()));
+  DCHECK(bit_writer_.PutZigZagVlqInt(min_delta));
+
+  uint8_t* bit_width_offsets = 
bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_offsets != NULL);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    const uint32_t n = std::min(values_per_mini_block_, values_current_block_);
+    if (n == 0) {
+      DCHECK(
+          bit_writer_.PutAlignedOffset<uint32_t>(bit_width_offsets + i, 
uint32_t(1), 1));
+      continue;
+    }
+
+    const uint32_t start = i * values_per_mini_block_;
+    const auto max_delta = static_cast<int64_t>(
+        *std::max_element(deltas_.begin() + start, deltas_.begin() + start + 
n));
+
+    const uint32_t num_bits = bit_util::NumRequiredBits(max_delta - min_delta);
+    DCHECK(bit_writer_.PutAlignedOffset<uint32_t>(bit_width_offsets + i, 
num_bits, 1));
+
+    for (uint64_t j = start; j < start + n; j++) {
+      DCHECK(
+          bit_writer_.PutValue(static_cast<uint64_t>(deltas_[j] - min_delta), 
num_bits));
+    }
+    for (uint64_t j = n; j < values_per_mini_block_; j++) {
+      DCHECK(bit_writer_.PutValue(0, num_bits));
+    }
+    values_current_block_ -= n;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  std::shared_ptr<ResizableBuffer> header_buffer = AllocateBuffer(pool_, 32);
+  ::arrow::bit_util::BitWriter header_writer(header_buffer->mutable_data(),
+                                             
static_cast<int>(header_buffer->size()));
+  if (!header_writer.PutVlqInt(values_per_block_) ||
+      !header_writer.PutVlqInt(mini_blocks_per_block_) ||
+      !header_writer.PutVlqInt(total_value_count_) ||
+      !header_writer.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("cannot write");
+  }
+  header_writer.Flush(false);
+
+  ::arrow::BufferBuilder sink;
+  PARQUET_THROW_NOT_OK(
+      sink.Append(header_writer.buffer(), header_writer.bytes_written()));
+  header_writer.Clear();
+
+  std::shared_ptr<Buffer> bits_buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&bits_buffer, true));
+
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink.Append(bits_buffer->mutable_data(), 
bits_buffer->size()));
+  PARQUET_THROW_NOT_OK(sink.Finish(&buffer, true));
+  return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+  auto src = 
reinterpret_cast<int32_t*>(values.data()->buffers[0]->mutable_data());

Review Comment:
   Hmm, why `buffers[0]`? That would be the null bitmap, no?
   
   Probably something like `values.data()->GetValues<int32_t>(1)` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to