wjones127 commented on code in PR #14191:
URL: https://github.com/apache/arrow/pull/14191#discussion_r1024423662
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
}
};
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+ using TypedEncoder<DType>::Put;
+
+ explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+ const uint32_t values_per_block =
kValuesPerBlock,
+ const uint32_t mini_blocks_per_block =
kMiniBlocksPerBlock)
+ : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+ values_per_block_(values_per_block),
+ mini_blocks_per_block_(mini_blocks_per_block),
+ values_per_mini_block_(values_per_block / mini_blocks_per_block),
+ deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+ bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+ sink_(pool),
+ bit_writer_(bits_buffer_->mutable_data(),
+ static_cast<int>(bits_buffer_->size())) {
+ if (values_per_mini_block_ % 32 != 0) {
+ throw ParquetException(
+ "the number of values in a miniblock must be multiple of 32, but
it's " +
+ std::to_string(values_per_mini_block_));
+ }
+ }
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ void FlushBlock();
+
+ private:
+ const uint32_t values_per_block_;
+ const uint32_t mini_blocks_per_block_;
+ const uint32_t values_per_mini_block_;
+ uint32_t values_current_block_{0};
Review Comment:
Question: what does the `{0}` mean? a default value?
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
}
};
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+ using TypedEncoder<DType>::Put;
+
+ explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+ const uint32_t values_per_block =
kValuesPerBlock,
+ const uint32_t mini_blocks_per_block =
kMiniBlocksPerBlock)
+ : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+ values_per_block_(values_per_block),
+ mini_blocks_per_block_(mini_blocks_per_block),
+ values_per_mini_block_(values_per_block / mini_blocks_per_block),
+ deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+ bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+ sink_(pool),
+ bit_writer_(bits_buffer_->mutable_data(),
+ static_cast<int>(bits_buffer_->size())) {
+ if (values_per_mini_block_ % 32 != 0) {
+ throw ParquetException(
+ "the number of values in a miniblock must be multiple of 32, but
it's " +
+ std::to_string(values_per_mini_block_));
+ }
+ }
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ void FlushBlock();
+
+ private:
+ const uint32_t values_per_block_;
+ const uint32_t mini_blocks_per_block_;
+ const uint32_t values_per_mini_block_;
+ uint32_t values_current_block_{0};
+ uint32_t total_value_count_{0};
+ T first_value_{0};
+ T current_value_{0};
+ ArrowPoolVector<T> deltas_;
+ std::shared_ptr<ResizableBuffer> bits_buffer_;
+ ::arrow::BufferBuilder sink_;
+ ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+
+ int idx = 0;
+ if (total_value_count_ == 0) {
+ current_value_ = src[0];
+ first_value_ = current_value_;
+ idx = 1;
+ }
+ total_value_count_ += num_values;
+
+ while (idx < num_values) {
+ T value = src[idx];
+ deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+ current_value_ = value;
+ idx++;
+ values_current_block_++;
+ if (values_current_block_ == values_per_block_) {
+ FlushBlock();
+ }
+ }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+ if (values_current_block_ == 0) {
+ return;
+ }
+
+ const T min_delta =
+ *std::min_element(deltas_.begin(), deltas_.begin() +
values_current_block_);
+ bit_writer_.PutZigZagVlqInt(min_delta);
+
+ std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+ mini_blocks_per_block_, 0);
+ uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+ DCHECK(bit_width_data != nullptr);
+
+ uint32_t num_miniblocks = std::min(
+
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+
static_cast<float>(values_per_mini_block_))),
+ mini_blocks_per_block_);
+ for (uint32_t i = 0; i < num_miniblocks; i++) {
+ const uint32_t values_current_mini_block =
Review Comment:
nit: call this `length` or `mini_block_length` instead?
```suggestion
const uint32_t length =
```
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
}
};
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+ using TypedEncoder<DType>::Put;
+
+ explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+ const uint32_t values_per_block =
kValuesPerBlock,
+ const uint32_t mini_blocks_per_block =
kMiniBlocksPerBlock)
+ : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+ values_per_block_(values_per_block),
+ mini_blocks_per_block_(mini_blocks_per_block),
+ values_per_mini_block_(values_per_block / mini_blocks_per_block),
+ deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+ bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+ sink_(pool),
+ bit_writer_(bits_buffer_->mutable_data(),
+ static_cast<int>(bits_buffer_->size())) {
+ if (values_per_mini_block_ % 32 != 0) {
+ throw ParquetException(
+ "the number of values in a miniblock must be multiple of 32, but
it's " +
+ std::to_string(values_per_mini_block_));
+ }
+ }
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ void FlushBlock();
+
+ private:
+ const uint32_t values_per_block_;
+ const uint32_t mini_blocks_per_block_;
+ const uint32_t values_per_mini_block_;
+ uint32_t values_current_block_{0};
+ uint32_t total_value_count_{0};
+ T first_value_{0};
+ T current_value_{0};
+ ArrowPoolVector<T> deltas_;
+ std::shared_ptr<ResizableBuffer> bits_buffer_;
+ ::arrow::BufferBuilder sink_;
+ ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+
+ int idx = 0;
+ if (total_value_count_ == 0) {
+ current_value_ = src[0];
+ first_value_ = current_value_;
+ idx = 1;
+ }
+ total_value_count_ += num_values;
+
+ while (idx < num_values) {
+ T value = src[idx];
+ deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+ current_value_ = value;
+ idx++;
+ values_current_block_++;
+ if (values_current_block_ == values_per_block_) {
+ FlushBlock();
+ }
+ }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+ if (values_current_block_ == 0) {
+ return;
+ }
+
+ const T min_delta =
+ *std::min_element(deltas_.begin(), deltas_.begin() +
values_current_block_);
+ bit_writer_.PutZigZagVlqInt(min_delta);
+
+ std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+ mini_blocks_per_block_, 0);
Review Comment:
Should we compute `num_miniblocks` first, and then use `num_miniblocks` as
the length instead? Or is there a reason we might want extra slots? (If the
latter, worth a comment IMO)
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
}
};
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
Review Comment:
For future readers, I'd appreciate some more comments on the encoding. It
would be nice to know before getting into the code:
* What is a block?
* What is a miniblock?
* Is there a website where the full specification of this encoding is
provided?
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
}
};
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+ using TypedEncoder<DType>::Put;
+
+ explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+ const uint32_t values_per_block =
kValuesPerBlock,
+ const uint32_t mini_blocks_per_block =
kMiniBlocksPerBlock)
+ : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+ values_per_block_(values_per_block),
+ mini_blocks_per_block_(mini_blocks_per_block),
+ values_per_mini_block_(values_per_block / mini_blocks_per_block),
+ deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+ bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+ sink_(pool),
+ bit_writer_(bits_buffer_->mutable_data(),
+ static_cast<int>(bits_buffer_->size())) {
+ if (values_per_mini_block_ % 32 != 0) {
+ throw ParquetException(
+ "the number of values in a miniblock must be multiple of 32, but
it's " +
+ std::to_string(values_per_mini_block_));
+ }
+ }
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ void FlushBlock();
+
+ private:
+ const uint32_t values_per_block_;
+ const uint32_t mini_blocks_per_block_;
+ const uint32_t values_per_mini_block_;
+ uint32_t values_current_block_{0};
+ uint32_t total_value_count_{0};
+ T first_value_{0};
+ T current_value_{0};
+ ArrowPoolVector<T> deltas_;
+ std::shared_ptr<ResizableBuffer> bits_buffer_;
+ ::arrow::BufferBuilder sink_;
+ ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+
+ int idx = 0;
+ if (total_value_count_ == 0) {
+ current_value_ = src[0];
+ first_value_ = current_value_;
+ idx = 1;
+ }
+ total_value_count_ += num_values;
+
+ while (idx < num_values) {
+ T value = src[idx];
+ deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+ current_value_ = value;
+ idx++;
+ values_current_block_++;
+ if (values_current_block_ == values_per_block_) {
+ FlushBlock();
+ }
+ }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+ if (values_current_block_ == 0) {
+ return;
+ }
+
+ const T min_delta =
+ *std::min_element(deltas_.begin(), deltas_.begin() +
values_current_block_);
+ bit_writer_.PutZigZagVlqInt(min_delta);
+
+ std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+ mini_blocks_per_block_, 0);
+ uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+ DCHECK(bit_width_data != nullptr);
+
+ uint32_t num_miniblocks = std::min(
+
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+
static_cast<float>(values_per_mini_block_))),
+ mini_blocks_per_block_);
+ for (uint32_t i = 0; i < num_miniblocks; i++) {
+ const uint32_t values_current_mini_block =
+ std::min(values_per_mini_block_, values_current_block_);
+
+ const uint32_t start = i * values_per_mini_block_;
+ const T max_delta = *std::max_element(
+ deltas_.begin() + start, deltas_.begin() + start +
values_current_mini_block);
+
+ const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+ int num_bits;
+ if constexpr (std::is_same<T, int64_t>::value) {
+ num_bits = bit_util::NumRequiredBits(max_delta_diff);
+ } else {
+ num_bits =
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+ }
+ const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
Review Comment:
Perhaps you could add a comment describing what `num_bytes` represents in
the context of the encoding? IIUC each value is represented by a diff that
takes up exactly `num_bytes` in a bitmap. Is that right?
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
}
};
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+ using TypedEncoder<DType>::Put;
+
+ explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+ const uint32_t values_per_block =
kValuesPerBlock,
+ const uint32_t mini_blocks_per_block =
kMiniBlocksPerBlock)
+ : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+ values_per_block_(values_per_block),
+ mini_blocks_per_block_(mini_blocks_per_block),
+ values_per_mini_block_(values_per_block / mini_blocks_per_block),
+ deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+ bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+ sink_(pool),
+ bit_writer_(bits_buffer_->mutable_data(),
+ static_cast<int>(bits_buffer_->size())) {
+ if (values_per_mini_block_ % 32 != 0) {
+ throw ParquetException(
+ "the number of values in a miniblock must be multiple of 32, but
it's " +
+ std::to_string(values_per_mini_block_));
+ }
+ }
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ void FlushBlock();
+
+ private:
+ const uint32_t values_per_block_;
+ const uint32_t mini_blocks_per_block_;
+ const uint32_t values_per_mini_block_;
+ uint32_t values_current_block_{0};
+ uint32_t total_value_count_{0};
+ T first_value_{0};
+ T current_value_{0};
+ ArrowPoolVector<T> deltas_;
+ std::shared_ptr<ResizableBuffer> bits_buffer_;
+ ::arrow::BufferBuilder sink_;
+ ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+
+ int idx = 0;
+ if (total_value_count_ == 0) {
+ current_value_ = src[0];
+ first_value_ = current_value_;
+ idx = 1;
+ }
+ total_value_count_ += num_values;
+
+ while (idx < num_values) {
+ T value = src[idx];
+ deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+ current_value_ = value;
+ idx++;
+ values_current_block_++;
+ if (values_current_block_ == values_per_block_) {
+ FlushBlock();
+ }
+ }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+ if (values_current_block_ == 0) {
+ return;
+ }
+
+ const T min_delta =
+ *std::min_element(deltas_.begin(), deltas_.begin() +
values_current_block_);
+ bit_writer_.PutZigZagVlqInt(min_delta);
+
+ std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+ mini_blocks_per_block_, 0);
+ uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+ DCHECK(bit_width_data != nullptr);
+
+ uint32_t num_miniblocks = std::min(
+
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+
static_cast<float>(values_per_mini_block_))),
+ mini_blocks_per_block_);
+ for (uint32_t i = 0; i < num_miniblocks; i++) {
+ const uint32_t values_current_mini_block =
+ std::min(values_per_mini_block_, values_current_block_);
+
+ const uint32_t start = i * values_per_mini_block_;
+ const T max_delta = *std::max_element(
+ deltas_.begin() + start, deltas_.begin() + start +
values_current_mini_block);
+
+ const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+ int num_bits;
+ if constexpr (std::is_same<T, int64_t>::value) {
+ num_bits = bit_util::NumRequiredBits(max_delta_diff);
+ } else {
+ num_bits =
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+ }
+ const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+ bit_widths[i] = num_bits;
+
+ for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+ const T value = SafeSignedSubtract(deltas_[j], min_delta);
+ bit_writer_.PutAligned<T>(value, num_bytes);
+ }
+ // If there are not enough values to fill the last miniblock, we pad the
miniblock
+ // with zeroes so that its length is the number of values in a full
miniblock
+ // multiplied by the bit width.
+ for (uint32_t j = values_current_mini_block; j < values_per_mini_block_;
j++) {
+ bit_writer_.PutAligned<T>(0, num_bytes);
+ }
+ values_current_block_ -= values_current_mini_block;
+ }
+ DCHECK_EQ(values_current_block_, 0);
+
+ for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+ T val = bit_util::ToLittleEndian(bit_widths[i]);
+ memcpy(bit_width_data + i, &val, 1);
+ }
+
+ bit_writer_.Flush();
+ PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(),
bit_writer_.bytes_written()));
+ bit_writer_.Clear();
+ bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+ if (values_current_block_ > 0) {
+ FlushBlock();
+ }
+
+ std::shared_ptr<Buffer> bit_buffer;
+ PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+ sink_.Reset();
+
+ if (!bit_writer_.PutVlqInt(values_per_block_) ||
+ !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+ !bit_writer_.PutVlqInt(total_value_count_) ||
+ !bit_writer_.PutZigZagVlqInt(first_value_)) {
+ throw ParquetException("header writing error");
+ }
+ bit_writer_.Flush();
+
+ PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(),
bit_writer_.bytes_written()));
+ PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(),
bit_buffer->size()));
+ std::shared_ptr<Buffer> buffer;
+ PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+ return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+ const auto& data = *values.data();
Review Comment:
```suggestion
const ArrayData& data = *values.data();
```
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
}
};
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+ using TypedEncoder<DType>::Put;
+
+ explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+ const uint32_t values_per_block =
kValuesPerBlock,
+ const uint32_t mini_blocks_per_block =
kMiniBlocksPerBlock)
+ : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+ values_per_block_(values_per_block),
+ mini_blocks_per_block_(mini_blocks_per_block),
+ values_per_mini_block_(values_per_block / mini_blocks_per_block),
+ deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+ bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+ sink_(pool),
+ bit_writer_(bits_buffer_->mutable_data(),
+ static_cast<int>(bits_buffer_->size())) {
+ if (values_per_mini_block_ % 32 != 0) {
+ throw ParquetException(
+ "the number of values in a miniblock must be multiple of 32, but
it's " +
+ std::to_string(values_per_mini_block_));
+ }
+ }
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ void FlushBlock();
+
+ private:
+ const uint32_t values_per_block_;
+ const uint32_t mini_blocks_per_block_;
+ const uint32_t values_per_mini_block_;
+ uint32_t values_current_block_{0};
+ uint32_t total_value_count_{0};
+ T first_value_{0};
+ T current_value_{0};
+ ArrowPoolVector<T> deltas_;
+ std::shared_ptr<ResizableBuffer> bits_buffer_;
+ ::arrow::BufferBuilder sink_;
+ ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+
+ int idx = 0;
+ if (total_value_count_ == 0) {
+ current_value_ = src[0];
+ first_value_ = current_value_;
+ idx = 1;
+ }
+ total_value_count_ += num_values;
+
+ while (idx < num_values) {
+ T value = src[idx];
+ deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+ current_value_ = value;
+ idx++;
+ values_current_block_++;
+ if (values_current_block_ == values_per_block_) {
+ FlushBlock();
+ }
+ }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+ if (values_current_block_ == 0) {
+ return;
+ }
+
+ const T min_delta =
+ *std::min_element(deltas_.begin(), deltas_.begin() +
values_current_block_);
+ bit_writer_.PutZigZagVlqInt(min_delta);
+
+ std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+ mini_blocks_per_block_, 0);
+ uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+ DCHECK(bit_width_data != nullptr);
+
+ uint32_t num_miniblocks = std::min(
+
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+
static_cast<float>(values_per_mini_block_))),
+ mini_blocks_per_block_);
+ for (uint32_t i = 0; i < num_miniblocks; i++) {
+ const uint32_t values_current_mini_block =
+ std::min(values_per_mini_block_, values_current_block_);
+
+ const uint32_t start = i * values_per_mini_block_;
+ const T max_delta = *std::max_element(
+ deltas_.begin() + start, deltas_.begin() + start +
values_current_mini_block);
+
+ const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+ int num_bits;
+ if constexpr (std::is_same<T, int64_t>::value) {
+ num_bits = bit_util::NumRequiredBits(max_delta_diff);
+ } else {
+ num_bits =
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+ }
+ const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+ bit_widths[i] = num_bits;
+
+ for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+ const T value = SafeSignedSubtract(deltas_[j], min_delta);
+ bit_writer_.PutAligned<T>(value, num_bytes);
+ }
+ // If there are not enough values to fill the last miniblock, we pad the
miniblock
+ // with zeroes so that its length is the number of values in a full
miniblock
+ // multiplied by the bit width.
+ for (uint32_t j = values_current_mini_block; j < values_per_mini_block_;
j++) {
+ bit_writer_.PutAligned<T>(0, num_bytes);
+ }
+ values_current_block_ -= values_current_mini_block;
+ }
+ DCHECK_EQ(values_current_block_, 0);
+
+ for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+ T val = bit_util::ToLittleEndian(bit_widths[i]);
+ memcpy(bit_width_data + i, &val, 1);
+ }
+
+ bit_writer_.Flush();
+ PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(),
bit_writer_.bytes_written()));
+ bit_writer_.Clear();
+ bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+ if (values_current_block_ > 0) {
+ FlushBlock();
+ }
+
+ std::shared_ptr<Buffer> bit_buffer;
+ PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+ sink_.Reset();
+
+ if (!bit_writer_.PutVlqInt(values_per_block_) ||
+ !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+ !bit_writer_.PutVlqInt(total_value_count_) ||
+ !bit_writer_.PutZigZagVlqInt(first_value_)) {
+ throw ParquetException("header writing error");
+ }
+ bit_writer_.Flush();
+
+ PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(),
bit_writer_.bytes_written()));
+ PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(),
bit_buffer->size()));
+ std::shared_ptr<Buffer> buffer;
+ PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+ return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+ const auto& data = *values.data();
+ if (values.null_count() == 0) {
+ Put(data.GetValues<int32_t>(1), static_cast<int>(values.length()));
+ } else {
+ PutSpaced(data.GetValues<int32_t>(1), static_cast<int>(data.length),
+ data.GetValues<uint8_t>(0, 0), data.offset);
+ }
+}
+
+template <>
+void DeltaBitPackEncoder<Int64Type>::Put(const ::arrow::Array& values) {
+ const auto& data = *values.data();
Review Comment:
```suggestion
const ArrayData& data = *values.data();
```
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
}
};
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+ using TypedEncoder<DType>::Put;
+
+ explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+ const uint32_t values_per_block =
kValuesPerBlock,
+ const uint32_t mini_blocks_per_block =
kMiniBlocksPerBlock)
+ : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+ values_per_block_(values_per_block),
+ mini_blocks_per_block_(mini_blocks_per_block),
+ values_per_mini_block_(values_per_block / mini_blocks_per_block),
+ deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+ bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+ sink_(pool),
+ bit_writer_(bits_buffer_->mutable_data(),
+ static_cast<int>(bits_buffer_->size())) {
+ if (values_per_mini_block_ % 32 != 0) {
+ throw ParquetException(
+ "the number of values in a miniblock must be multiple of 32, but
it's " +
+ std::to_string(values_per_mini_block_));
+ }
+ }
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ void FlushBlock();
+
+ private:
+ const uint32_t values_per_block_;
+ const uint32_t mini_blocks_per_block_;
+ const uint32_t values_per_mini_block_;
+ uint32_t values_current_block_{0};
+ uint32_t total_value_count_{0};
+ T first_value_{0};
+ T current_value_{0};
+ ArrowPoolVector<T> deltas_;
+ std::shared_ptr<ResizableBuffer> bits_buffer_;
+ ::arrow::BufferBuilder sink_;
+ ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+
+ int idx = 0;
+ if (total_value_count_ == 0) {
+ current_value_ = src[0];
+ first_value_ = current_value_;
+ idx = 1;
+ }
+ total_value_count_ += num_values;
+
+ while (idx < num_values) {
+ T value = src[idx];
+ deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+ current_value_ = value;
+ idx++;
+ values_current_block_++;
+ if (values_current_block_ == values_per_block_) {
+ FlushBlock();
+ }
+ }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+ if (values_current_block_ == 0) {
+ return;
+ }
+
+ const T min_delta =
+ *std::min_element(deltas_.begin(), deltas_.begin() +
values_current_block_);
+ bit_writer_.PutZigZagVlqInt(min_delta);
+
+ std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+ mini_blocks_per_block_, 0);
+ uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+ DCHECK(bit_width_data != nullptr);
+
+ uint32_t num_miniblocks = std::min(
+
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+
static_cast<float>(values_per_mini_block_))),
+ mini_blocks_per_block_);
+ for (uint32_t i = 0; i < num_miniblocks; i++) {
+ const uint32_t values_current_mini_block =
+ std::min(values_per_mini_block_, values_current_block_);
+
+ const uint32_t start = i * values_per_mini_block_;
+ const T max_delta = *std::max_element(
+ deltas_.begin() + start, deltas_.begin() + start +
values_current_mini_block);
+
+ const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+ int num_bits;
+ if constexpr (std::is_same<T, int64_t>::value) {
+ num_bits = bit_util::NumRequiredBits(max_delta_diff);
+ } else {
+ num_bits =
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+ }
+ const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+ bit_widths[i] = num_bits;
+
+ for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+ const T value = SafeSignedSubtract(deltas_[j], min_delta);
+ bit_writer_.PutAligned<T>(value, num_bytes);
+ }
+ // If there are not enough values to fill the last miniblock, we pad the
miniblock
+ // with zeroes so that its length is the number of values in a full
miniblock
+ // multiplied by the bit width.
+ for (uint32_t j = values_current_mini_block; j < values_per_mini_block_;
j++) {
+ bit_writer_.PutAligned<T>(0, num_bytes);
+ }
+ values_current_block_ -= values_current_mini_block;
+ }
+ DCHECK_EQ(values_current_block_, 0);
+
+ for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+ T val = bit_util::ToLittleEndian(bit_widths[i]);
+ memcpy(bit_width_data + i, &val, 1);
+ }
+
+ bit_writer_.Flush();
+ PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(),
bit_writer_.bytes_written()));
+ bit_writer_.Clear();
+ bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+ if (values_current_block_ > 0) {
+ FlushBlock();
+ }
+
+ std::shared_ptr<Buffer> bit_buffer;
+ PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+ sink_.Reset();
+
+ if (!bit_writer_.PutVlqInt(values_per_block_) ||
+ !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+ !bit_writer_.PutVlqInt(total_value_count_) ||
+ !bit_writer_.PutZigZagVlqInt(first_value_)) {
+ throw ParquetException("header writing error");
+ }
+ bit_writer_.Flush();
+
+ PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(),
bit_writer_.bytes_written()));
+ PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(),
bit_buffer->size()));
+ std::shared_ptr<Buffer> buffer;
+ PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+ return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+ const auto& data = *values.data();
+ if (values.null_count() == 0) {
+ Put(data.GetValues<int32_t>(1), static_cast<int>(values.length()));
Review Comment:
To be more consistent with line below (and possibly more optimal?)
```suggestion
Put(data.GetValues<int32_t>(1), static_cast<int>(data.length));
```
##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1276,64 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT),
ParquetException);
}
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+ using c_type = typename Type::c_type;
+ static constexpr int TYPE = Type::type_num;
+
+ virtual void CheckRoundtrip() {
+ auto encoder =
+ MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false,
descr_.get());
+ auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED,
descr_.get());
+
+ encoder->Put(draws_, num_values_);
+ encode_buffer_ = encoder->FlushValues();
+
+ decoder->SetData(num_values_, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ int values_decoded = decoder->Decode(decode_buf_, num_values_);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_,
num_values_));
+ }
+
+ void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t
valid_bits_offset) {
+ auto encoder =
+ MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false,
descr_.get());
+ auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED,
descr_.get());
+ int null_count = 0;
+ for (auto i = 0; i < num_values_; i++) {
+ if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+ null_count++;
+ }
+ }
+
+ encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+ encode_buffer_ = encoder->FlushValues();
+ decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_,
null_count,
+ valid_bits, valid_bits_offset);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_,
num_values_,
+ valid_bits,
valid_bits_offset));
+ }
+
+ protected:
+ USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<Int32Type, Int64Type> TestDeltaBitPackEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes);
+
+TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
+ ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200));
Review Comment:
Would it make sense to test the edge cases of an empty array or one that's
all repeats?
```suggestion
ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200));
ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 2000));
```
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
}
};
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+ using TypedEncoder<DType>::Put;
+
+ explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+ const uint32_t values_per_block =
kValuesPerBlock,
+ const uint32_t mini_blocks_per_block =
kMiniBlocksPerBlock)
+ : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+ values_per_block_(values_per_block),
+ mini_blocks_per_block_(mini_blocks_per_block),
+ values_per_mini_block_(values_per_block / mini_blocks_per_block),
+ deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+ bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+ sink_(pool),
+ bit_writer_(bits_buffer_->mutable_data(),
+ static_cast<int>(bits_buffer_->size())) {
+ if (values_per_mini_block_ % 32 != 0) {
+ throw ParquetException(
+ "the number of values in a miniblock must be multiple of 32, but
it's " +
+ std::to_string(values_per_mini_block_));
+ }
+ }
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ void FlushBlock();
+
+ private:
+ const uint32_t values_per_block_;
+ const uint32_t mini_blocks_per_block_;
+ const uint32_t values_per_mini_block_;
+ uint32_t values_current_block_{0};
Review Comment:
Also nit: could we use the term `length` instead of `values`? When I see the
name `values` I think it should be a pointer into an array of values, not the
number of values.
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
}
};
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+ using TypedEncoder<DType>::Put;
+
+ explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+ const uint32_t values_per_block =
kValuesPerBlock,
+ const uint32_t mini_blocks_per_block =
kMiniBlocksPerBlock)
+ : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+ values_per_block_(values_per_block),
+ mini_blocks_per_block_(mini_blocks_per_block),
+ values_per_mini_block_(values_per_block / mini_blocks_per_block),
+ deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+ bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
Review Comment:
`(values_per_block + 3) * sizeof(T)` seems like some odd magic that could
use a comment justifying it.
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
}
};
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+ using TypedEncoder<DType>::Put;
+
+ explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+ const uint32_t values_per_block =
kValuesPerBlock,
+ const uint32_t mini_blocks_per_block =
kMiniBlocksPerBlock)
+ : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+ values_per_block_(values_per_block),
+ mini_blocks_per_block_(mini_blocks_per_block),
+ values_per_mini_block_(values_per_block / mini_blocks_per_block),
+ deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+ bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+ sink_(pool),
+ bit_writer_(bits_buffer_->mutable_data(),
+ static_cast<int>(bits_buffer_->size())) {
+ if (values_per_mini_block_ % 32 != 0) {
+ throw ParquetException(
+ "the number of values in a miniblock must be multiple of 32, but
it's " +
+ std::to_string(values_per_mini_block_));
+ }
+ }
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ void FlushBlock();
+
+ private:
+ const uint32_t values_per_block_;
+ const uint32_t mini_blocks_per_block_;
+ const uint32_t values_per_mini_block_;
+ uint32_t values_current_block_{0};
+ uint32_t total_value_count_{0};
+ T first_value_{0};
+ T current_value_{0};
+ ArrowPoolVector<T> deltas_;
+ std::shared_ptr<ResizableBuffer> bits_buffer_;
+ ::arrow::BufferBuilder sink_;
+ ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+
+ int idx = 0;
+ if (total_value_count_ == 0) {
+ current_value_ = src[0];
+ first_value_ = current_value_;
+ idx = 1;
+ }
+ total_value_count_ += num_values;
+
+ while (idx < num_values) {
+ T value = src[idx];
+ deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+ current_value_ = value;
+ idx++;
+ values_current_block_++;
+ if (values_current_block_ == values_per_block_) {
+ FlushBlock();
+ }
+ }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+ if (values_current_block_ == 0) {
+ return;
+ }
+
+ const T min_delta =
+ *std::min_element(deltas_.begin(), deltas_.begin() +
values_current_block_);
+ bit_writer_.PutZigZagVlqInt(min_delta);
+
+ std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+ mini_blocks_per_block_, 0);
+ uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+ DCHECK(bit_width_data != nullptr);
+
+ uint32_t num_miniblocks = std::min(
+
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+
static_cast<float>(values_per_mini_block_))),
+ mini_blocks_per_block_);
+ for (uint32_t i = 0; i < num_miniblocks; i++) {
+ const uint32_t values_current_mini_block =
+ std::min(values_per_mini_block_, values_current_block_);
+
+ const uint32_t start = i * values_per_mini_block_;
+ const T max_delta = *std::max_element(
+ deltas_.begin() + start, deltas_.begin() + start +
values_current_mini_block);
+
+ const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+ int num_bits;
+ if constexpr (std::is_same<T, int64_t>::value) {
+ num_bits = bit_util::NumRequiredBits(max_delta_diff);
+ } else {
+ num_bits =
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+ }
+ const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+ bit_widths[i] = num_bits;
+
+ for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+ const T value = SafeSignedSubtract(deltas_[j], min_delta);
+ bit_writer_.PutAligned<T>(value, num_bytes);
+ }
+ // If there are not enough values to fill the last miniblock, we pad the
miniblock
+ // with zeroes so that its length is the number of values in a full
miniblock
+ // multiplied by the bit width.
+ for (uint32_t j = values_current_mini_block; j < values_per_mini_block_;
j++) {
+ bit_writer_.PutAligned<T>(0, num_bytes);
+ }
+ values_current_block_ -= values_current_mini_block;
+ }
+ DCHECK_EQ(values_current_block_, 0);
+
+ for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+ T val = bit_util::ToLittleEndian(bit_widths[i]);
+ memcpy(bit_width_data + i, &val, 1);
+ }
+
+ bit_writer_.Flush();
+ PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(),
bit_writer_.bytes_written()));
+ bit_writer_.Clear();
+ bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+ if (values_current_block_ > 0) {
+ FlushBlock();
+ }
+
+ std::shared_ptr<Buffer> bit_buffer;
+ PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+ sink_.Reset();
+
+ if (!bit_writer_.PutVlqInt(values_per_block_) ||
+ !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+ !bit_writer_.PutVlqInt(total_value_count_) ||
+ !bit_writer_.PutZigZagVlqInt(first_value_)) {
+ throw ParquetException("header writing error");
+ }
+ bit_writer_.Flush();
+
+ PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(),
bit_writer_.bytes_written()));
+ PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(),
bit_buffer->size()));
+ std::shared_ptr<Buffer> buffer;
+ PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+ return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+ const auto& data = *values.data();
+ if (values.null_count() == 0) {
+ Put(data.GetValues<int32_t>(1), static_cast<int>(values.length()));
+ } else {
+ PutSpaced(data.GetValues<int32_t>(1), static_cast<int>(data.length),
+ data.GetValues<uint8_t>(0, 0), data.offset);
+ }
+}
+
+template <>
+void DeltaBitPackEncoder<Int64Type>::Put(const ::arrow::Array& values) {
+ const auto& data = *values.data();
+ if (values.null_count() == 0) {
+ Put(data.GetValues<int64_t>(1), static_cast<int>(values.length()));
Review Comment:
```suggestion
Put(data.GetValues<int64_t>(1), static_cast<int>(data.length));
```
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
}
};
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+ using TypedEncoder<DType>::Put;
+
+ explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+ const uint32_t values_per_block =
kValuesPerBlock,
+ const uint32_t mini_blocks_per_block =
kMiniBlocksPerBlock)
+ : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+ values_per_block_(values_per_block),
+ mini_blocks_per_block_(mini_blocks_per_block),
+ values_per_mini_block_(values_per_block / mini_blocks_per_block),
+ deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+ bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+ sink_(pool),
+ bit_writer_(bits_buffer_->mutable_data(),
+ static_cast<int>(bits_buffer_->size())) {
+ if (values_per_mini_block_ % 32 != 0) {
+ throw ParquetException(
+ "the number of values in a miniblock must be multiple of 32, but
it's " +
+ std::to_string(values_per_mini_block_));
+ }
+ }
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ void FlushBlock();
+
+ private:
+ const uint32_t values_per_block_;
+ const uint32_t mini_blocks_per_block_;
+ const uint32_t values_per_mini_block_;
+ uint32_t values_current_block_{0};
+ uint32_t total_value_count_{0};
+ T first_value_{0};
+ T current_value_{0};
+ ArrowPoolVector<T> deltas_;
+ std::shared_ptr<ResizableBuffer> bits_buffer_;
+ ::arrow::BufferBuilder sink_;
+ ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+
+ int idx = 0;
+ if (total_value_count_ == 0) {
+ current_value_ = src[0];
+ first_value_ = current_value_;
+ idx = 1;
+ }
+ total_value_count_ += num_values;
+
+ while (idx < num_values) {
+ T value = src[idx];
+ deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+ current_value_ = value;
+ idx++;
+ values_current_block_++;
+ if (values_current_block_ == values_per_block_) {
+ FlushBlock();
+ }
+ }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+ if (values_current_block_ == 0) {
+ return;
+ }
+
+ const T min_delta =
+ *std::min_element(deltas_.begin(), deltas_.begin() +
values_current_block_);
+ bit_writer_.PutZigZagVlqInt(min_delta);
+
+ std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+ mini_blocks_per_block_, 0);
+ uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+ DCHECK(bit_width_data != nullptr);
+
+ uint32_t num_miniblocks = std::min(
+
static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+
static_cast<float>(values_per_mini_block_))),
+ mini_blocks_per_block_);
+ for (uint32_t i = 0; i < num_miniblocks; i++) {
+ const uint32_t values_current_mini_block =
+ std::min(values_per_mini_block_, values_current_block_);
+
+ const uint32_t start = i * values_per_mini_block_;
+ const T max_delta = *std::max_element(
+ deltas_.begin() + start, deltas_.begin() + start +
values_current_mini_block);
+
+ const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+ int num_bits;
+ if constexpr (std::is_same<T, int64_t>::value) {
+ num_bits = bit_util::NumRequiredBits(max_delta_diff);
+ } else {
+ num_bits =
bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+ }
+ const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+ bit_widths[i] = num_bits;
+
+ for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+ const T value = SafeSignedSubtract(deltas_[j], min_delta);
+ bit_writer_.PutAligned<T>(value, num_bytes);
+ }
+ // If there are not enough values to fill the last miniblock, we pad the
miniblock
+ // with zeroes so that its length is the number of values in a full
miniblock
+ // multiplied by the bit width.
+ for (uint32_t j = values_current_mini_block; j < values_per_mini_block_;
j++) {
+ bit_writer_.PutAligned<T>(0, num_bytes);
+ }
+ values_current_block_ -= values_current_mini_block;
+ }
+ DCHECK_EQ(values_current_block_, 0);
+
+ for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+ T val = bit_util::ToLittleEndian(bit_widths[i]);
+ memcpy(bit_width_data + i, &val, 1);
Review Comment:
Perhaps I don't understand, but does this not work?
```suggestion
bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]