wgtmac commented on code in PR #14341: URL: https://github.com/apache/arrow/pull/14341#discussion_r1256043091
########## cpp/src/parquet/encoding.cc: ########## @@ -2842,7 +2888,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, std::shared_ptr<::arrow::bit_util::BitReader> decoder_; DeltaBitPackDecoder<Int32Type> len_decoder_; - int num_valid_values_; + int num_valid_values_{0}; Review Comment: This is initialized via `SetData`. But it doesn't hurt to provide a default value here. Would you want to do the same thing for `length_idx_`? ########## cpp/src/parquet/encoding.cc: ########## @@ -1238,25 +1240,35 @@ int PlainBooleanDecoder::Decode(bool* buffer, int max_values) { return max_values; } -struct ArrowBinaryHelper { +template <typename DType> +struct ArrowBinaryHelper; Review Comment: Just a reminder that this may conflict with https://github.com/apache/arrow/pull/35825/files once it gets merged. ########## cpp/src/parquet/encoding.cc: ########## @@ -3037,12 +3083,230 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY -class DeltaByteArrayDecoder : public DecoderImpl, - virtual public TypedDecoder<ByteArrayType> { +/// Delta Byte Array encoding also known as incremental encoding or front compression: +/// for each element in a sequence of strings, store the prefix length of the previous +/// entry plus the suffix. +/// +/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED), +/// followed by the suffixes encoded as delta length byte arrays +/// (DELTA_LENGTH_BYTE_ARRAY). + +// ---------------------------------------------------------------------- +// DeltaByteArrayEncoder + +template <typename DType> +class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> { public: - explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr, + using T = typename DType::c_type; + + explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) + : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool), + sink_(pool), + prefix_length_encoder_(nullptr, pool), + suffix_encoder_(nullptr, pool), Review Comment: Why passing nullptr instead of descr? Is it safe to do so? ########## cpp/src/parquet/encoding.cc: ########## @@ -3037,12 +3083,230 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY -class DeltaByteArrayDecoder : public DecoderImpl, - virtual public TypedDecoder<ByteArrayType> { +/// Delta Byte Array encoding also known as incremental encoding or front compression: +/// for each element in a sequence of strings, store the prefix length of the previous +/// entry plus the suffix. +/// +/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED), +/// followed by the suffixes encoded as delta length byte arrays +/// (DELTA_LENGTH_BYTE_ARRAY). + +// ---------------------------------------------------------------------- +// DeltaByteArrayEncoder + +template <typename DType> +class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> { public: - explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr, + using T = typename DType::c_type; + + explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) + : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool), + sink_(pool), + prefix_length_encoder_(nullptr, pool), + suffix_encoder_(nullptr, pool), + last_value_(""), + kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {} + + std::shared_ptr<Buffer> FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { + return prefix_length_encoder_.EstimatedDataEncodedSize() + + suffix_encoder_.EstimatedDataEncodedSize(); + } + + using TypedEncoder<DType>::Put; + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = reinterpret_cast<T*>(buffer->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress<T>( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } + } + + protected: + template <typename VisitorType> + void PutInternal(const T* src, int num_values) { + if (num_values == 0) { + return; + } + uint32_t len = descr_->type_length(); + + std::string_view last_value_view = last_value_; + constexpr int kBatchSize = 256; + std::array<int32_t, kBatchSize> prefix_lengths; + std::array<ByteArray, kBatchSize> suffixes; + auto visitor = VisitorType{src, len}; + + for (int i = 0; i < num_values; i += kBatchSize) { + const int batch_size = std::min(kBatchSize, num_values - i); + + for (int j = 0; j < batch_size; ++j) { + auto view = visitor[i + j]; + len = visitor.len(i + j); + + uint32_t k = 0; + const uint32_t common_length = + std::min(len, static_cast<uint32_t>(last_value_view.length())); + while (k < common_length) { + if (last_value_view[k] != view[k]) { + break; + } + k++; + } + + last_value_view = view; + prefix_lengths[j] = k; + const uint32_t suffix_length = len - k; + const uint8_t* suffix_ptr = src[i + j].ptr + k; + + // Convert to ByteArray, so it can be passed to the suffix_encoder_. + const ByteArray suffix(suffix_length, suffix_ptr); + suffixes[j] = suffix; + } + suffix_encoder_.Put(suffixes.data(), batch_size); + prefix_length_encoder_.Put(prefix_lengths.data(), batch_size); + } + last_value_ = last_value_view; + } + + template <typename ArrayType> + void PutBinaryArray(const ArrayType& array) { + auto previous_len = static_cast<uint32_t>(last_value_.length()); + std::string_view last_value_view = last_value_; + + PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>( + *array.data(), + [&](::std::string_view view) { + if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) { + return Status::Invalid("Parquet cannot store strings with size 2GB or more"); + } + // Convert to ByteArray, so it can be passed to the suffix_encoder_. + const ByteArray src{view}; + + uint32_t j = 0; + const uint32_t len = src.len; + const uint32_t common_length = std::min(previous_len, len); + while (j < common_length) { + if (last_value_view[j] != view[j]) { + break; + } + j++; + } + previous_len = len; + prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1); + + last_value_view = view; + const auto suffix_length = static_cast<uint32_t>(len - j); + if (suffix_length == 0) { + suffix_encoder_.Put(&kEmpty, 1); + return Status::OK(); + } + const uint8_t* suffix_ptr = src.ptr + j; + // Convert to ByteArray, so it can be passed to the suffix_encoder_. + const ByteArray suffix(suffix_length, suffix_ptr); + suffix_encoder_.Put(&suffix, 1); + + return Status::OK(); + }, + []() { return Status::OK(); })); + last_value_ = last_value_view; + } + + ::arrow::BufferBuilder sink_; + DeltaBitPackEncoder<Int32Type> prefix_length_encoder_; + DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_; + std::string last_value_; + const ByteArray kEmpty; +}; + +struct ByteArrayVisitor { + const ByteArray* src; + const uint32_t type_length; Review Comment: Add a comment to say type_length is not used for ByteArratType or simply add `[[maybe_unused]]`? ########## cpp/src/parquet/encoding.cc: ########## @@ -3037,12 +3083,230 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY -class DeltaByteArrayDecoder : public DecoderImpl, - virtual public TypedDecoder<ByteArrayType> { +/// Delta Byte Array encoding also known as incremental encoding or front compression: +/// for each element in a sequence of strings, store the prefix length of the previous +/// entry plus the suffix. +/// +/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED), +/// followed by the suffixes encoded as delta length byte arrays +/// (DELTA_LENGTH_BYTE_ARRAY). + +// ---------------------------------------------------------------------- +// DeltaByteArrayEncoder + +template <typename DType> +class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> { public: - explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr, + using T = typename DType::c_type; + + explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) + : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool), + sink_(pool), + prefix_length_encoder_(nullptr, pool), + suffix_encoder_(nullptr, pool), + last_value_(""), + kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {} + + std::shared_ptr<Buffer> FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { + return prefix_length_encoder_.EstimatedDataEncodedSize() + + suffix_encoder_.EstimatedDataEncodedSize(); + } + + using TypedEncoder<DType>::Put; + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = reinterpret_cast<T*>(buffer->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress<T>( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } + } + + protected: + template <typename VisitorType> + void PutInternal(const T* src, int num_values) { + if (num_values == 0) { + return; + } + uint32_t len = descr_->type_length(); + + std::string_view last_value_view = last_value_; + constexpr int kBatchSize = 256; + std::array<int32_t, kBatchSize> prefix_lengths; + std::array<ByteArray, kBatchSize> suffixes; + auto visitor = VisitorType{src, len}; + + for (int i = 0; i < num_values; i += kBatchSize) { + const int batch_size = std::min(kBatchSize, num_values - i); + + for (int j = 0; j < batch_size; ++j) { + auto view = visitor[i + j]; Review Comment: `i + j` is computed for three times, probably worth a local variable? ########## cpp/src/parquet/encoding.cc: ########## @@ -3188,18 +3447,54 @@ class DeltaByteArrayDecoder : public DecoderImpl, return Status::OK(); } + MemoryPool* pool_; + + private: std::shared_ptr<::arrow::bit_util::BitReader> decoder_; DeltaBitPackDecoder<Int32Type> prefix_len_decoder_; DeltaLengthByteArrayDecoder suffix_decoder_; std::string last_value_; // string buffer for last value in previous page std::string last_value_in_previous_page_; - int num_valid_values_; + int num_valid_values_{0}; Review Comment: ditto, is this necessary? ########## cpp/src/parquet/encoding.cc: ########## @@ -3037,12 +3083,230 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY -class DeltaByteArrayDecoder : public DecoderImpl, - virtual public TypedDecoder<ByteArrayType> { +/// Delta Byte Array encoding also known as incremental encoding or front compression: +/// for each element in a sequence of strings, store the prefix length of the previous +/// entry plus the suffix. +/// +/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED), +/// followed by the suffixes encoded as delta length byte arrays +/// (DELTA_LENGTH_BYTE_ARRAY). + +// ---------------------------------------------------------------------- +// DeltaByteArrayEncoder + +template <typename DType> Review Comment: ```suggestion // DeltaByteArrayEncoder constexpr std::string_view kEmpty = ""; template <typename DType> ``` ########## cpp/src/parquet/encoding.cc: ########## @@ -3037,12 +3083,230 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY -class DeltaByteArrayDecoder : public DecoderImpl, - virtual public TypedDecoder<ByteArrayType> { +/// Delta Byte Array encoding also known as incremental encoding or front compression: +/// for each element in a sequence of strings, store the prefix length of the previous +/// entry plus the suffix. +/// +/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED), +/// followed by the suffixes encoded as delta length byte arrays +/// (DELTA_LENGTH_BYTE_ARRAY). + +// ---------------------------------------------------------------------- +// DeltaByteArrayEncoder + +template <typename DType> +class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> { public: - explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr, + using T = typename DType::c_type; + + explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) + : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool), + sink_(pool), + prefix_length_encoder_(nullptr, pool), + suffix_encoder_(nullptr, pool), + last_value_(""), + kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {} Review Comment: I am afraid this may be dangerous. What about declaring the constant kEmpty outside like above, and then use it here: ```cpp empty_(kEmpty.size(), reinterpret_cast<const uint8_t*>(kEmpty.data())) {} ``` ########## cpp/src/parquet/encoding.cc: ########## @@ -1238,25 +1240,35 @@ int PlainBooleanDecoder::Decode(bool* buffer, int max_values) { return max_values; } -struct ArrowBinaryHelper { +template <typename DType> +struct ArrowBinaryHelper; + +template <> +struct ArrowBinaryHelper<ByteArrayType> { explicit ArrowBinaryHelper(typename EncodingTraits<ByteArrayType>::Accumulator* out) { this->out = out; this->builder = out->builder.get(); + if (ARROW_PREDICT_FALSE(SubtractWithOverflow(::arrow::kBinaryMemoryLimit, + this->builder->value_data_length(), + &this->chunk_space_remaining))) { + throw ParquetException("excess expansion in ArrowBinaryHelper<ByteArrayType>"); + } this->chunk_space_remaining = Review Comment: This line would be redundant, it has been set via line 1251 already. ########## cpp/src/parquet/encoding.cc: ########## @@ -3037,12 +3083,230 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY -class DeltaByteArrayDecoder : public DecoderImpl, - virtual public TypedDecoder<ByteArrayType> { +/// Delta Byte Array encoding also known as incremental encoding or front compression: +/// for each element in a sequence of strings, store the prefix length of the previous +/// entry plus the suffix. +/// +/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED), +/// followed by the suffixes encoded as delta length byte arrays +/// (DELTA_LENGTH_BYTE_ARRAY). + +// ---------------------------------------------------------------------- +// DeltaByteArrayEncoder + +template <typename DType> +class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> { public: - explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr, + using T = typename DType::c_type; + + explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) + : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool), + sink_(pool), + prefix_length_encoder_(nullptr, pool), + suffix_encoder_(nullptr, pool), + last_value_(""), + kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {} + + std::shared_ptr<Buffer> FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { + return prefix_length_encoder_.EstimatedDataEncodedSize() + + suffix_encoder_.EstimatedDataEncodedSize(); + } + + using TypedEncoder<DType>::Put; + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = reinterpret_cast<T*>(buffer->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress<T>( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } + } + + protected: + template <typename VisitorType> + void PutInternal(const T* src, int num_values) { + if (num_values == 0) { + return; + } + uint32_t len = descr_->type_length(); Review Comment: IIRC, `descr_->type_length()` will be 0 for ByteArrayType, so it is only used for FLBAType, right? ########## cpp/src/parquet/encoding.cc: ########## @@ -1275,6 +1287,40 @@ struct ArrowBinaryHelper { int64_t chunk_space_remaining; }; +template <> +struct ArrowBinaryHelper<FLBAType> { + explicit ArrowBinaryHelper(EncodingTraits<FLBAType>::Accumulator* builder) { + this->builder = builder; + if (ARROW_PREDICT_FALSE(SubtractWithOverflow(::arrow::kBinaryMemoryLimit, + this->builder->value_data_length(), + &this->chunk_space_remaining))) { + throw ParquetException("excess expansion in ArrowBinaryHelper<FLBAType>"); + } + } + + Status PushChunk() { + std::shared_ptr<::arrow::Array> result; + RETURN_NOT_OK(builder->Finish(&result)); + chunks.push_back(std::move(result)); + chunk_space_remaining = ::arrow::kBinaryMemoryLimit; + return Status::OK(); + } + + bool CanFit(int64_t length) const { return length <= chunk_space_remaining; } + + Status Append(const uint8_t* data, int32_t length) { + DCHECK(CanFit(length)); + chunk_space_remaining -= length; + return builder->Append(data); + } + + Status AppendNull() { return builder->AppendNull(); } + + ::arrow::FixedSizeBinaryBuilder* builder; + std::vector<std::shared_ptr<::arrow::Array>> chunks; Review Comment: I'm not sure if this has been discussed already. Would it help to change `struct EncodingTraits<FLBAType>` to be similar with `struct EncodingTraits<ByteArrayType>`? Something like: ```cpp template <> struct EncodingTraits<FLBAType> { using Encoder = FLBAEncoder; using Decoder = FLBADecoder; struct Accumulator { std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder; std::vector<std::shared_ptr<::arrow::Array>> chunks; }; using ArrowType = ::arrow::FixedSizeBinaryType; using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::FixedSizeBinaryType>; }; ``` This may help eliminate the duplication introduced by template specialization. You may simply define this: ```cpp template <typename DType, typename Enable = void> struct ArrowBinaryHelper; template <typename DType> struct ArrowBinaryHelper<DType, std::enable_if_t<std::is_same_v<DType, ByteArrayType> || std::is_same_v<DType, FLBAType>, void>)> { ... }; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org