rok commented on code in PR #14191:
URL: https://github.com/apache/arrow/pull/14191#discussion_r1028480893


##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// 
https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary 
packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] 
[first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will 
be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first 
element in the
+/// block, use the last element in the previous block or, in the case of the 
first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the 
block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all 
values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int 
followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) 
bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = 
kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = 
kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " 
+
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but 
it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + 
values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> 
miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the 
unneeded
+  // miniblocks are still present, their value should be zero, but readers 
must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      
static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + 
values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = 
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ 
vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);

Review Comment:
   By assumption was that we're packing byte aligned. I'll look into it.
   I do think current tests generate integers close to numeric limit so maybe 
that is hiding the issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to