mapleFU commented on code in PR #46532: URL: https://github.com/apache/arrow/pull/46532#discussion_r2104518713
########## cpp/src/parquet/arrow/schema.cc: ########## @@ -1033,11 +1035,11 @@ Result<bool> ApplyOriginalStorageMetadata(const Field& origin_field, modified = true; } - if ((origin_type->id() == ::arrow::Type::LARGE_BINARY && - inferred_type->id() == ::arrow::Type::BINARY) || - (origin_type->id() == ::arrow::Type::LARGE_STRING && - inferred_type->id() == ::arrow::Type::STRING)) { - // Read back binary-like arrays with the intended offset width. + if ((::arrow::is_binary_or_binary_view(origin_type->id()) && + ::arrow::is_binary_or_binary_view(inferred_type->id())) || + (::arrow::is_string_or_string_view(origin_type->id()) && + ::arrow::is_string_or_string_view(inferred_type->id()))) { + // Read back binary-like arrays with the intended layout (narrow, large, view). Review Comment: (so the binary/string is called "narrow" ? ) ########## cpp/src/parquet/decoder.cc: ########## @@ -654,43 +701,41 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>, int64_t valid_bits_offset, typename EncodingTraits<ByteArrayType>::Accumulator* out, int* out_values_decoded) { - ArrowBinaryHelper<ByteArrayType> helper(out, num_values); - int values_decoded = 0; - - RETURN_NOT_OK(helper.Prepare(len_)); - - int i = 0; - RETURN_NOT_OK(VisitNullBitmapInline( - valid_bits, valid_bits_offset, num_values, null_count, - [&]() { - if (ARROW_PREDICT_FALSE(len_ < 4)) { - ParquetException::EofException(); - } - auto value_len = SafeLoadAs<int32_t>(data_); - if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) { - return Status::Invalid("Invalid or corrupted value_len '", value_len, "'"); - } - auto increment = value_len + 4; - if (ARROW_PREDICT_FALSE(len_ < increment)) { - ParquetException::EofException(); - } - RETURN_NOT_OK(helper.PrepareNextInput(value_len, len_)); - helper.UnsafeAppend(data_ + 4, value_len); - data_ += increment; - len_ -= increment; - ++values_decoded; - ++i; - return Status::OK(); - }, - [&]() { - helper.UnsafeAppendNull(); - ++i; - return Status::OK(); - })); + auto visit_binary_helper = [&](auto* helper) { + int values_decoded = 0; + + RETURN_NOT_OK(VisitNullBitmapInline( + valid_bits, valid_bits_offset, num_values, null_count, + [&]() { + if (ARROW_PREDICT_FALSE(len_ < 4)) { + return Status::Invalid( + "Invalid or truncated PLAIN-encoded BYTE_ARRAY data"); + } + auto value_len = SafeLoadAs<int32_t>(data_); + if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > len_ - 4)) { + return Status::Invalid( + "Invalid or truncated PLAIN-encoded BYTE_ARRAY data"); + } + RETURN_NOT_OK(helper->AppendValue(data_ + 4, value_len, + /*estimated_remaining_data_length=*/len_)); Review Comment: This is so confusing to me. `AppendValue` continously with `len_` (nearing the "whole" page size) would be heavy? ########## cpp/src/parquet/arrow/schema_internal.cc: ########## @@ -117,34 +118,67 @@ Result<std::shared_ptr<ArrowType>> MakeArrowTimestamp(const LogicalType& logical Result<std::shared_ptr<ArrowType>> FromByteArray( const LogicalType& logical_type, const ArrowReaderProperties& reader_properties, const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) { + auto binary_type = [&]() -> Result<std::shared_ptr<ArrowType>> { + const auto configured_binary_type = reader_properties.binary_type(); + switch (configured_binary_type) { + case ::arrow::Type::BINARY: + return ::arrow::binary(); + case ::arrow::Type::LARGE_BINARY: + return ::arrow::large_binary(); + case ::arrow::Type::BINARY_VIEW: + return ::arrow::binary_view(); + default: + return Status::TypeError("Invalid Arrow type for BYTE_ARRAY columns: ", + ::arrow::internal::ToString(configured_binary_type)); + } + }; + + auto utf8_type = [&]() -> Result<std::shared_ptr<ArrowType>> { Review Comment: (The table LGTM, but the binary-like column is BINARY rather than STRING, lol) ########## cpp/src/parquet/column_reader.cc: ########## @@ -2069,11 +2070,35 @@ class ByteArrayChunkedRecordReader final : public TypedRecordReader<ByteArrayTyp virtual public BinaryRecordReader { public: ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, - ::arrow::MemoryPool* pool, bool read_dense_for_nullable) + ::arrow::MemoryPool* pool, bool read_dense_for_nullable, + std::shared_ptr<::arrow::DataType> arrow_type) Review Comment: Passing const ref or just type_id? `type->ToString()` doesn't need a shared_ptr copying? ########## cpp/src/parquet/encoder.cc: ########## @@ -1304,11 +1326,15 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl, }; void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) { - AssertBaseBinary(values); + AssertVarLengthBinary(values); if (::arrow::is_binary_like(values.type_id())) { PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values)); - } else { + } else if (::arrow::is_large_binary_like(values.type_id())) { PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values)); + } else if (::arrow::is_binary_view_like(values.type_id())) { + PutBinaryArray(checked_cast<const ::arrow::BinaryViewArray&>(values)); + } else { + throw ParquetException("Only binary-like data supported"); Review Comment: Should we also throw type name here? ########## cpp/src/parquet/decoder.cc: ########## @@ -63,6 +65,191 @@ using arrow::util::SafeLoadAs; namespace parquet { namespace { +// A helper class to abstract away differences between EncodingTraits<DType>::Accumulator +// for ByteArrayType and FLBAType. + +template <typename DType, typename ArrowType> +struct ArrowBinaryHelper; + +template <> +struct ArrowBinaryHelper<ByteArrayType, ::arrow::BinaryType> { + using Accumulator = typename EncodingTraits<ByteArrayType>::Accumulator; + + explicit ArrowBinaryHelper(Accumulator* acc) + : acc_(acc), + builder_(checked_cast<::arrow::BinaryBuilder*>(acc->builder.get())), + chunk_space_remaining_(::arrow::kBinaryMemoryLimit - + builder_->value_data_length()) {} + + // Prepare will reserve the number of entries in the current chunk. + // If estimated_data_length is provided, it will also reserve the estimated data length. + Status Prepare(int64_t length, std::optional<int64_t> estimated_data_length = {}) { + entries_remaining_ = length; + RETURN_NOT_OK(builder_->Reserve(entries_remaining_)); + if (estimated_data_length.has_value()) { + RETURN_NOT_OK(builder_->ReserveData( + std::min<int64_t>(*estimated_data_length, this->chunk_space_remaining_))); + } + return Status::OK(); + } + + // If a new chunk is created and estimated_remaining_data_length is provided, + // it will also reserve the estimated data length for this chunk. + Status AppendValue(const uint8_t* data, int32_t length, + std::optional<int64_t> estimated_remaining_data_length = {}) { + DCHECK_GT(entries_remaining_, 0); + + if (ARROW_PREDICT_FALSE(!CanFit(length))) { + // This element would exceed the capacity of a chunk + RETURN_NOT_OK(PushChunk()); + // Reserve entries and data in new chunk + RETURN_NOT_OK(builder_->Reserve(entries_remaining_)); + if (estimated_remaining_data_length.has_value()) { + RETURN_NOT_OK(builder_->ReserveData( + std::min<int64_t>(*estimated_remaining_data_length, chunk_space_remaining_))); + } + } + chunk_space_remaining_ -= length; + --entries_remaining_; + if (estimated_remaining_data_length.has_value()) { + // Assume Prepare() was already called with an estimated_data_length + builder_->UnsafeAppend(data, length); + return Status::OK(); + } else { + return builder_->Append(data, length); + } + } + + void UnsafeAppendNull() { + DCHECK_GT(entries_remaining_, 0); + --entries_remaining_; + builder_->UnsafeAppendNull(); + } + + private: + Status PushChunk() { + ARROW_ASSIGN_OR_RAISE(auto chunk, acc_->builder->Finish()); + acc_->chunks.push_back(std::move(chunk)); + chunk_space_remaining_ = ::arrow::kBinaryMemoryLimit; + return Status::OK(); + } + + bool CanFit(int64_t length) const { return length <= chunk_space_remaining_; } + + Accumulator* acc_; + ::arrow::BinaryBuilder* builder_; + int64_t entries_remaining_; + int64_t chunk_space_remaining_; +}; + +template <typename ArrowBinaryType> +struct ArrowBinaryHelper<ByteArrayType, ArrowBinaryType> { + using Accumulator = typename EncodingTraits<ByteArrayType>::Accumulator; + using BuilderType = typename ::arrow::TypeTraits<ArrowBinaryType>::BuilderType; + + static constexpr bool kIsBinaryView = + ::arrow::is_binary_view_like_type<ArrowBinaryType>::value; + + explicit ArrowBinaryHelper(Accumulator* acc) + : builder_(checked_cast<BuilderType*>(acc->builder.get())) {} + + // Prepare will reserve the number of entries in the current chunk. + // If estimated_data_length is provided, it will also reserve the estimated data length, + // and the caller should better call `UnsafeAppend` instead of `Append` to avoid + // double-checking the data length. + Status Prepare(int64_t length, std::optional<int64_t> estimated_data_length = {}) { + RETURN_NOT_OK(builder_->Reserve(length)); + // Avoid reserving data when reading into a binary-view array, because many + // values may be very short and not require any heap storage, which would make Review Comment: 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org