rok commented on code in PR #14191:
URL: https://github.com/apache/arrow/pull/14191#discussion_r1019578432


##########
cpp/src/parquet/encoding.cc:
##########
@@ -2062,6 +2064,202 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(128),
+        mini_blocks_per_block_(4),
+        values_per_mini_block_(values_per_block_ / mini_blocks_per_block_),
+        values_current_block_(0),
+        total_value_count_(0),
+        first_value_(0),
+        current_value_(0),
+        sink_(pool),
+        bits_buffer_(AllocateBuffer(pool, (4 + values_per_block_) * 
sizeof(T))),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        deltas_(std::vector<T>(values_per_block_)) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return 4 * sizeof(T) + 
sink_.length(); }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+ protected:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_;
+  uint32_t total_value_count_;
+  T first_value_;
+  T current_value_;
+  ::arrow::BufferBuilder sink_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<T> deltas_;
+
+ private:
+  void FlushBlock();
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  DCHECK(bit_writer_.PutZigZagVlqInt(min_delta));
+
+  uint8_t* bit_width_offsets = 
bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_offsets != nullptr);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    const uint32_t n = std::min(values_per_mini_block_, values_current_block_);
+    if (n == 0) {
+      DCHECK(bit_writer_.PutAlignedOffset<int8_t>(bit_width_offsets++, 
int8_t(32), 1));
+      for (uint32_t j = 0; j < values_per_mini_block_; j++) {
+        DCHECK(bit_writer_.PutAligned<T>(0, 4));
+      }
+      continue;
+    }
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta =
+        *std::max_element(deltas_.begin() + start, deltas_.begin() + start + 
n);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int8_t num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    const int32_t num_bytes =
+        static_cast<const int32_t>(bit_util::BytesForBits(num_bits));
+    DCHECK(bit_writer_.PutAlignedOffset<int8_t>(bit_width_offsets++, num_bits, 
1));
+
+    for (uint32_t j = start; j < start + n; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      DCHECK(bit_writer_.PutAligned<T>(value, num_bytes));
+    }
+    for (uint32_t j = n; j < values_per_mini_block_; j++) {
+      DCHECK(bit_writer_.PutAligned<T>(0, num_bytes));
+    }
+    values_current_block_ -= n;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ != 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<ResizableBuffer> header_buffer = AllocateBuffer(pool_, 32);
+  ::arrow::bit_util::BitWriter header_writer(header_buffer->mutable_data(),
+                                             
static_cast<int>(header_buffer->size()));
+  if (!header_writer.PutVlqInt(values_per_block_) ||
+      !header_writer.PutVlqInt(mini_blocks_per_block_) ||
+      !header_writer.PutVlqInt(total_value_count_) ||
+      !header_writer.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("cannot write");
+  }
+  header_writer.Flush(false);
+
+  ::arrow::BufferBuilder sink;
+  PARQUET_THROW_NOT_OK(
+      sink.Append(header_writer.buffer(), header_writer.bytes_written()));
+  header_writer.Clear();
+
+  std::shared_ptr<Buffer> bits_buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&bits_buffer, true));
+
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink.Append(bits_buffer->mutable_data(), 
bits_buffer->size()));
+  PARQUET_THROW_NOT_OK(sink.Finish(&buffer, true));
+  return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {

Review Comment:
   Added:
   ```
     if (values.null_count() > 0) {
       ParquetException::NYI("DELTA_BINARY_PACKED encoding with null values");
     }
   ```
   I'm not sure if that's what you had in mind. I think 
[DELTA_BINARY_PACKED](https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5)
 doesn't accept null values.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2062,6 +2064,202 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(128),
+        mini_blocks_per_block_(4),
+        values_per_mini_block_(values_per_block_ / mini_blocks_per_block_),
+        values_current_block_(0),
+        total_value_count_(0),
+        first_value_(0),
+        current_value_(0),
+        sink_(pool),
+        bits_buffer_(AllocateBuffer(pool, (4 + values_per_block_) * 
sizeof(T))),
+        bit_writer_(bits_buffer_->mutable_data(), 
static_cast<int>(bits_buffer_->size())),
+        deltas_(std::vector<T>(values_per_block_)) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return 4 * sizeof(T) + 
sink_.length(); }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+ protected:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_;
+  uint32_t total_value_count_;
+  T first_value_;
+  T current_value_;
+  ::arrow::BufferBuilder sink_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<T> deltas_;
+
+ private:
+  void FlushBlock();
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  DCHECK(bit_writer_.PutZigZagVlqInt(min_delta));
+
+  uint8_t* bit_width_offsets = 
bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_offsets != nullptr);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    const uint32_t n = std::min(values_per_mini_block_, values_current_block_);
+    if (n == 0) {
+      DCHECK(bit_writer_.PutAlignedOffset<int8_t>(bit_width_offsets++, 
int8_t(32), 1));
+      for (uint32_t j = 0; j < values_per_mini_block_; j++) {
+        DCHECK(bit_writer_.PutAligned<T>(0, 4));
+      }
+      continue;
+    }
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta =
+        *std::max_element(deltas_.begin() + start, deltas_.begin() + start + 
n);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int8_t num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    const int32_t num_bytes =
+        static_cast<const int32_t>(bit_util::BytesForBits(num_bits));
+    DCHECK(bit_writer_.PutAlignedOffset<int8_t>(bit_width_offsets++, num_bits, 
1));
+
+    for (uint32_t j = start; j < start + n; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      DCHECK(bit_writer_.PutAligned<T>(value, num_bytes));
+    }
+    for (uint32_t j = n; j < values_per_mini_block_; j++) {
+      DCHECK(bit_writer_.PutAligned<T>(0, num_bytes));
+    }
+    values_current_block_ -= n;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), 
bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ != 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<ResizableBuffer> header_buffer = AllocateBuffer(pool_, 32);
+  ::arrow::bit_util::BitWriter header_writer(header_buffer->mutable_data(),
+                                             
static_cast<int>(header_buffer->size()));
+  if (!header_writer.PutVlqInt(values_per_block_) ||
+      !header_writer.PutVlqInt(mini_blocks_per_block_) ||
+      !header_writer.PutVlqInt(total_value_count_) ||
+      !header_writer.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("cannot write");
+  }
+  header_writer.Flush(false);
+
+  ::arrow::BufferBuilder sink;
+  PARQUET_THROW_NOT_OK(
+      sink.Append(header_writer.buffer(), header_writer.bytes_written()));
+  header_writer.Clear();
+
+  std::shared_ptr<Buffer> bits_buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&bits_buffer, true));
+
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink.Append(bits_buffer->mutable_data(), 
bits_buffer->size()));
+  PARQUET_THROW_NOT_OK(sink.Finish(&buffer, true));
+  return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {

Review Comment:
   Added:
   ```cpp
     if (values.null_count() > 0) {
       ParquetException::NYI("DELTA_BINARY_PACKED encoding with null values");
     }
   ```
   I'm not sure if that's what you had in mind. I think 
[DELTA_BINARY_PACKED](https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5)
 doesn't accept null values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to