rok commented on code in PR #14191: URL: https://github.com/apache/arrow/pull/14191#discussion_r1020433779
########## cpp/src/parquet/encoding.cc: ########## @@ -2060,6 +2062,209 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>, } }; +// ---------------------------------------------------------------------- +// DeltaBitPackEncoder + +constexpr uint32_t kValuesPerBlock = 128; +constexpr uint32_t kMiniBlocksPerBlock = 4; + +template <typename DType> +class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> { + public: + using T = typename DType::c_type; + using TypedEncoder<DType>::Put; + + explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool, + const uint32_t values_per_block = kValuesPerBlock, + const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock) + : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool), + values_per_block_(values_per_block), + mini_blocks_per_block_(mini_blocks_per_block), + values_per_mini_block_(values_per_block / mini_blocks_per_block), + deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)), + bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))), + sink_(pool), + bit_writer_(bits_buffer_->mutable_data(), + static_cast<int>(bits_buffer_->size())) { + // TODO: this does not evaluate in release build + DCHECK_EQ(values_per_mini_block_ % 32, 0); + } + + std::shared_ptr<Buffer> FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { return sink_.length(); } + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override; + + void FlushBlock(); + + private: + const uint32_t values_per_block_; + const uint32_t mini_blocks_per_block_; + const uint32_t values_per_mini_block_; + uint32_t values_current_block_{0}; + uint32_t total_value_count_{0}; + T first_value_{0}; + T current_value_{0}; + ArrowPoolVector<T> deltas_; + std::shared_ptr<ResizableBuffer> bits_buffer_; + ::arrow::BufferBuilder sink_; + ::arrow::bit_util::BitWriter bit_writer_; +}; + +template <typename DType> +void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) { + if (num_values == 0) { + return; + } + + int idx = 0; + if (total_value_count_ == 0) { + current_value_ = src[0]; + first_value_ = current_value_; + idx = 1; + } + total_value_count_ += num_values; + + while (idx < num_values) { + T value = src[idx]; + deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_); + current_value_ = value; + idx++; + values_current_block_++; + if (values_current_block_ == values_per_block_) { + FlushBlock(); + } + } +} + +template <typename DType> +void DeltaBitPackEncoder<DType>::FlushBlock() { + if (values_current_block_ == 0) { + return; + } + + const T min_delta = + *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_); + bit_writer_.PutZigZagVlqInt(min_delta); + + std::vector<uint8_t> bit_widths(mini_blocks_per_block_); + uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); + DCHECK(bit_width_data != nullptr); + + for (uint32_t i = 0; i < mini_blocks_per_block_; i++) { + const uint32_t n = std::min(values_per_mini_block_, values_current_block_); + + if (ARROW_PREDICT_FALSE(n == 0)) { + bit_widths[i] = 1; + continue; + } + + const uint32_t start = i * values_per_mini_block_; + const T max_delta = + *std::max_element(deltas_.begin() + start, deltas_.begin() + start + n); + + const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta); + int num_bits; + if constexpr (std::is_same<T, int64_t>::value) { + num_bits = bit_util::NumRequiredBits(max_delta_diff); + } else { + num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff)); + } + const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits)); + bit_widths[i] = num_bits; + + for (uint32_t j = start; j < start + n; j++) { + const T value = SafeSignedSubtract(deltas_[j], min_delta); + bit_writer_.PutAligned<T>(value, num_bytes); + } + for (uint32_t j = n; j < values_per_mini_block_; j++) { Review Comment: This way of padding didn't pass UBSAN checks, reverting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org