rok commented on code in PR #14341: URL: https://github.com/apache/arrow/pull/14341#discussion_r1264212141
########## cpp/src/parquet/encoding_test.cc: ########## @@ -874,7 +874,7 @@ std::shared_ptr<::arrow::Array> EncodingAdHocTyped<FLBAType>::GetValues(int seed } using EncodingAdHocTypedCases = - ::testing::Types<BooleanType, Int32Type, Int64Type, FloatType, DoubleType, FLBAType>; + ::testing::Types<BooleanType, Int32Type, Int64Type, FloatType, DoubleType>; Review Comment: @wgtmac [proposed it](https://github.com/apache/arrow/pull/14341#discussion_r1256021740) to simplify `ArrowBinaryHelper` and there was no discussion yet. Would this be considered an external API change? ########## cpp/src/parquet/encoding.cc: ########## @@ -3037,12 +3058,235 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY -class DeltaByteArrayDecoder : public DecoderImpl, - virtual public TypedDecoder<ByteArrayType> { +/// Delta Byte Array encoding also known as incremental encoding or front compression: +/// for each element in a sequence of strings, store the prefix length of the previous +/// entry plus the suffix. +/// +/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED), +/// followed by the suffixes encoded as delta length byte arrays +/// (DELTA_LENGTH_BYTE_ARRAY). + +// ---------------------------------------------------------------------- +// DeltaByteArrayEncoder + +template <typename DType> +class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> { + static constexpr std::string_view kEmpty = ""; + public: - explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr, + using T = typename DType::c_type; + + explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) + : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool), + sink_(pool), + prefix_length_encoder_(nullptr, pool), + suffix_encoder_(descr, pool), + last_value_(""), + empty_(static_cast<uint32_t>(kEmpty.size()), + reinterpret_cast<const uint8_t*>(kEmpty.data())) {} + + std::shared_ptr<Buffer> FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { + return prefix_length_encoder_.EstimatedDataEncodedSize() + + suffix_encoder_.EstimatedDataEncodedSize(); + } + + using TypedEncoder<DType>::Put; + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = reinterpret_cast<T*>(buffer->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress<T>( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } + } + + protected: + template <typename VisitorType> + void PutInternal(const T* src, int num_values) { + if (num_values == 0) { + return; + } + uint32_t len = descr_->type_length(); + + std::string_view last_value_view = last_value_; + constexpr int kBatchSize = 256; + std::array<int32_t, kBatchSize> prefix_lengths; + std::array<ByteArray, kBatchSize> suffixes; + auto visitor = VisitorType{src, len}; + + for (int i = 0; i < num_values; i += kBatchSize) { + const int batch_size = std::min(kBatchSize, num_values - i); + + for (int j = 0; j < batch_size; ++j) { + const int idx = i + j; + auto view = visitor[idx]; + len = visitor.len(idx); + + uint32_t k = 0; + const uint32_t common_length = Review Comment: Changed. ########## cpp/src/parquet/encoding.cc: ########## @@ -3037,12 +3058,235 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY -class DeltaByteArrayDecoder : public DecoderImpl, - virtual public TypedDecoder<ByteArrayType> { +/// Delta Byte Array encoding also known as incremental encoding or front compression: +/// for each element in a sequence of strings, store the prefix length of the previous +/// entry plus the suffix. +/// +/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED), +/// followed by the suffixes encoded as delta length byte arrays +/// (DELTA_LENGTH_BYTE_ARRAY). + +// ---------------------------------------------------------------------- +// DeltaByteArrayEncoder + +template <typename DType> +class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> { + static constexpr std::string_view kEmpty = ""; + public: - explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr, + using T = typename DType::c_type; + + explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) + : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool), + sink_(pool), + prefix_length_encoder_(nullptr, pool), + suffix_encoder_(descr, pool), + last_value_(""), + empty_(static_cast<uint32_t>(kEmpty.size()), + reinterpret_cast<const uint8_t*>(kEmpty.data())) {} + + std::shared_ptr<Buffer> FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { + return prefix_length_encoder_.EstimatedDataEncodedSize() + + suffix_encoder_.EstimatedDataEncodedSize(); + } + + using TypedEncoder<DType>::Put; + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = reinterpret_cast<T*>(buffer->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress<T>( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } + } + + protected: + template <typename VisitorType> + void PutInternal(const T* src, int num_values) { + if (num_values == 0) { + return; + } + uint32_t len = descr_->type_length(); + + std::string_view last_value_view = last_value_; + constexpr int kBatchSize = 256; + std::array<int32_t, kBatchSize> prefix_lengths; + std::array<ByteArray, kBatchSize> suffixes; + auto visitor = VisitorType{src, len}; + + for (int i = 0; i < num_values; i += kBatchSize) { + const int batch_size = std::min(kBatchSize, num_values - i); + + for (int j = 0; j < batch_size; ++j) { + const int idx = i + j; + auto view = visitor[idx]; + len = visitor.len(idx); Review Comment: `const uint32_t len = view.length()` is an option. ########## cpp/src/parquet/encoding.cc: ########## @@ -3037,12 +3058,235 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY -class DeltaByteArrayDecoder : public DecoderImpl, - virtual public TypedDecoder<ByteArrayType> { +/// Delta Byte Array encoding also known as incremental encoding or front compression: +/// for each element in a sequence of strings, store the prefix length of the previous +/// entry plus the suffix. +/// +/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED), +/// followed by the suffixes encoded as delta length byte arrays +/// (DELTA_LENGTH_BYTE_ARRAY). + +// ---------------------------------------------------------------------- +// DeltaByteArrayEncoder + +template <typename DType> +class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> { + static constexpr std::string_view kEmpty = ""; + public: - explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr, + using T = typename DType::c_type; + + explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) + : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool), + sink_(pool), + prefix_length_encoder_(nullptr, pool), + suffix_encoder_(descr, pool), + last_value_(""), + empty_(static_cast<uint32_t>(kEmpty.size()), + reinterpret_cast<const uint8_t*>(kEmpty.data())) {} + + std::shared_ptr<Buffer> FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { + return prefix_length_encoder_.EstimatedDataEncodedSize() + + suffix_encoder_.EstimatedDataEncodedSize(); + } + + using TypedEncoder<DType>::Put; + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = reinterpret_cast<T*>(buffer->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress<T>( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } + } + + protected: + template <typename VisitorType> + void PutInternal(const T* src, int num_values) { + if (num_values == 0) { + return; + } + uint32_t len = descr_->type_length(); Review Comment: Changed. ########## cpp/src/parquet/encoding.cc: ########## @@ -1238,43 +1240,62 @@ int PlainBooleanDecoder::Decode(bool* buffer, int max_values) { return max_values; } -struct ArrowBinaryHelper { - explicit ArrowBinaryHelper(typename EncodingTraits<ByteArrayType>::Accumulator* out) { - this->out = out; - this->builder = out->builder.get(); - this->chunk_space_remaining = - ::arrow::kBinaryMemoryLimit - this->builder->value_data_length(); +template <typename DType, typename Enable = void> +struct ArrowBinaryHelper; + +template <typename DType> +struct ArrowBinaryHelper<DType, std::enable_if_t<std::is_same_v<DType, ByteArrayType> || + std::is_same_v<DType, FLBAType>, + void>> { + explicit ArrowBinaryHelper(typename EncodingTraits<DType>::Accumulator* acc) { + builder = acc->builder.get(); + chunks = acc->chunks; + if (ARROW_PREDICT_FALSE(SubtractWithOverflow(::arrow::kBinaryMemoryLimit, + builder->value_data_length(), + &chunk_space_remaining))) { + throw ParquetException("excess expansion in ArrowBinaryHelper<DType>"); + } } Status PushChunk() { std::shared_ptr<::arrow::Array> result; RETURN_NOT_OK(builder->Finish(&result)); - out->chunks.push_back(result); + chunks.push_back(std::move(result)); chunk_space_remaining = ::arrow::kBinaryMemoryLimit; return Status::OK(); } bool CanFit(int64_t length) const { return length <= chunk_space_remaining; } void UnsafeAppend(const uint8_t* data, int32_t length) { + DCHECK(CanFit(length)); chunk_space_remaining -= length; builder->UnsafeAppend(data, length); } void UnsafeAppendNull() { builder->UnsafeAppendNull(); } - Status Append(const uint8_t* data, int32_t length) { - chunk_space_remaining -= length; - return builder->Append(data, length); - } + virtual Status Append(const uint8_t* data, int32_t length); Status AppendNull() { return builder->AppendNull(); } - typename EncodingTraits<ByteArrayType>::Accumulator* out; - ::arrow::BinaryBuilder* builder; + typename EncodingTraits<DType>::BuilderType* builder; + std::vector<std::shared_ptr<::arrow::Array>> chunks; Review Comment: I prefer the second option. Changed. ########## cpp/src/parquet/encoding.cc: ########## @@ -3037,12 +3058,235 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY -class DeltaByteArrayDecoder : public DecoderImpl, - virtual public TypedDecoder<ByteArrayType> { +/// Delta Byte Array encoding also known as incremental encoding or front compression: +/// for each element in a sequence of strings, store the prefix length of the previous +/// entry plus the suffix. +/// +/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED), +/// followed by the suffixes encoded as delta length byte arrays +/// (DELTA_LENGTH_BYTE_ARRAY). + +// ---------------------------------------------------------------------- +// DeltaByteArrayEncoder + +template <typename DType> +class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> { + static constexpr std::string_view kEmpty = ""; + public: - explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr, + using T = typename DType::c_type; + + explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) + : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool), + sink_(pool), + prefix_length_encoder_(nullptr, pool), Review Comment: I'm not sure what to comment here. `"// Prefix lengths are encoded using DeltaBitPackEncoder that can be left uninitialized."`? ########## cpp/src/parquet/encoding.cc: ########## @@ -3037,12 +3058,235 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder { // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY -class DeltaByteArrayDecoder : public DecoderImpl, - virtual public TypedDecoder<ByteArrayType> { +/// Delta Byte Array encoding also known as incremental encoding or front compression: +/// for each element in a sequence of strings, store the prefix length of the previous +/// entry plus the suffix. +/// +/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED), +/// followed by the suffixes encoded as delta length byte arrays +/// (DELTA_LENGTH_BYTE_ARRAY). + +// ---------------------------------------------------------------------- +// DeltaByteArrayEncoder + +template <typename DType> +class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> { + static constexpr std::string_view kEmpty = ""; + public: - explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr, + using T = typename DType::c_type; + + explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow::default_memory_pool()) + : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool), + sink_(pool), + prefix_length_encoder_(nullptr, pool), + suffix_encoder_(descr, pool), + last_value_(""), + empty_(static_cast<uint32_t>(kEmpty.size()), + reinterpret_cast<const uint8_t*>(kEmpty.data())) {} + + std::shared_ptr<Buffer> FlushValues() override; + + int64_t EstimatedDataEncodedSize() override { + return prefix_length_encoder_.EstimatedDataEncodedSize() + + suffix_encoder_.EstimatedDataEncodedSize(); + } + + using TypedEncoder<DType>::Put; + + void Put(const ::arrow::Array& values) override; + + void Put(const T* buffer, int num_values) override; + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = reinterpret_cast<T*>(buffer->mutable_data()); + int num_valid_values = ::arrow::util::internal::SpacedCompress<T>( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); + } + } + + protected: + template <typename VisitorType> + void PutInternal(const T* src, int num_values) { + if (num_values == 0) { + return; + } + uint32_t len = descr_->type_length(); + + std::string_view last_value_view = last_value_; + constexpr int kBatchSize = 256; + std::array<int32_t, kBatchSize> prefix_lengths; + std::array<ByteArray, kBatchSize> suffixes; + auto visitor = VisitorType{src, len}; + + for (int i = 0; i < num_values; i += kBatchSize) { + const int batch_size = std::min(kBatchSize, num_values - i); + + for (int j = 0; j < batch_size; ++j) { + const int idx = i + j; + auto view = visitor[idx]; + len = visitor.len(idx); + + uint32_t k = 0; + const uint32_t common_length = + std::min(len, static_cast<uint32_t>(last_value_view.length())); + while (k < common_length) { + if (last_value_view[k] != view[k]) { + break; + } + k++; + } + + last_value_view = view; + prefix_lengths[j] = k; + const uint32_t suffix_length = len - k; + const uint8_t* suffix_ptr = src[idx].ptr + k; + + // Convert to ByteArray, so it can be passed to the suffix_encoder_. + const ByteArray suffix(suffix_length, suffix_ptr); + suffixes[j] = suffix; + } + suffix_encoder_.Put(suffixes.data(), batch_size); + prefix_length_encoder_.Put(prefix_lengths.data(), batch_size); + } + last_value_ = last_value_view; + } + + template <typename ArrayType> + void PutBinaryArray(const ArrayType& array) { + auto previous_len = static_cast<uint32_t>(last_value_.length()); + std::string_view last_value_view = last_value_; + + PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>( + *array.data(), + [&](::std::string_view view) { + if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) { + return Status::Invalid("Parquet cannot store strings with size 2GB or more"); + } + // Convert to ByteArray, so it can be passed to the suffix_encoder_. + const ByteArray src{view}; + + uint32_t j = 0; + const uint32_t len = src.len; + const uint32_t common_length = std::min(previous_len, len); Review Comment: Changed `common_length` to `maximum_common_prefix_length`. Shall we change `len` to `length`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
