winval commented on code in PR #14191:
URL: https://github.com/apache/arrow/pull/14191#discussion_r977412990
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2062,6 +2062,152 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
}
};
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+
+ explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+ : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+ bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
+ sink_(pool),
+ bit_writer_(bits_buffer_->mutable_data(),
static_cast<int>(bits_buffer_->size())),
+ header_writer_(bits_buffer_->mutable_data(),
+ static_cast<int>(bits_buffer_->size())),
+ values_per_block_(256),
+ mini_blocks_per_block_(8),
+ values_per_mini_block_(values_per_block_ / mini_blocks_per_block_),
+ total_value_count_(0),
+ first_value_(0),
+ current_value_(0),
+ deltas_(std::vector<T>(values_per_mini_block_)) {}
+
+ int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+ std::shared_ptr<Buffer> FlushValues() override {
+ std::shared_ptr<Buffer> buffer;
+ // TODO: write total_value_count_
+ PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
+ return buffer;
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values
* sizeof(T),
+
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ void UnsafePutByteArray(const void* data, uint32_t length) {
+ DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL";
+ sink_.UnsafeAppend(&length, sizeof(uint32_t));
+ sink_.UnsafeAppend(data, static_cast<int64_t>(length));
+ }
+
+ protected:
+ std::shared_ptr<ResizableBuffer> bits_buffer_;
+ ::arrow::BufferBuilder sink_;
+ ::arrow::bit_util::BitWriter bit_writer_;
+ ::arrow::bit_util::BitWriter header_writer_;
+ uint32_t values_per_block_;
+ uint32_t mini_blocks_per_block_;
+ uint32_t values_per_mini_block_;
+ uint32_t total_value_count_;
+ T first_value_;
+ T current_value_;
+ uint64_t mini_block_bit_width_;
+ std::vector<T> deltas_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* buffer, int num_values) {
+ first_value_ = current_value_ = buffer[0];
+
+ // <block size in values> <number of miniblocks in a block> <total value
count> <first
+ // value>
+ if (!header_writer_.PutVlqInt(values_per_block_) ||
Review Comment:
Put generate a block? The block should be generated by FlushValues
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]