mapleFU commented on code in PR #14293:
URL: https://github.com/apache/arrow/pull/14293#discussion_r1089642851
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2572,6 +2572,139 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual
public TypedDecoder<DTyp
// ----------------------------------------------------------------------
// DELTA_LENGTH_BYTE_ARRAY
+// ----------------------------------------------------------------------
+// DeltaLengthByteArrayEncoder
+
+template <typename DType>
+class DeltaLengthByteArrayEncoder : public EncoderImpl,
+ virtual public TypedEncoder<ByteArrayType>
{
+ public:
+ explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool)
+ : EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY,
+ pool = ::arrow::default_memory_pool()),
+ sink_(pool),
+ length_encoder_(nullptr, pool),
+ encoded_size_{0} {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return encoded_size_ + length_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<ByteArrayType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ void Put(const ByteArray& val) {
+ // Write the result to the output stream
+ const int64_t increment = static_cast<int64_t>(val.len + sizeof(uint32_t));
+ if (ARROW_PREDICT_FALSE(sink_.length() + increment > sink_.capacity())) {
+ PARQUET_THROW_NOT_OK(sink_.Reserve(increment));
+ }
+ DCHECK(val.len == 0 || val.ptr != nullptr) << "Value ptr cannot be NULL";
+ sink_.UnsafeAppend(val.ptr, val.len);
+ }
+
+ protected:
+ template <typename ArrayType>
+ void PutBinaryArray(const ArrayType& array) {
+ PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename
ArrayType::TypeClass>(
+ *array.data(),
+ [&](::std::string_view view) {
+ if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) {
+ return Status::Invalid("Parquet cannot store strings with size 2GB
or more");
+ }
+ length_encoder_.Put({static_cast<int32_t>(view.length())}, 1);
+ Put(view);
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ }
+
+ ::arrow::BufferBuilder sink_;
+ DeltaBitPackEncoder<Int32Type> length_encoder_;
+ uint32_t encoded_size_;
+};
+
+template <typename DType>
+void DeltaLengthByteArrayEncoder<DType>::Put(const ::arrow::Array& values) {
+ AssertBaseBinary(values);
+ if (::arrow::is_binary_like(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
+ } else {
+ DCHECK(::arrow::is_large_binary_like(values.type_id()));
+ PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
+ }
+}
+
+template <typename DType>
+void DeltaLengthByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+
+ constexpr int kBatchSize = 256;
+ std::array<int32_t, kBatchSize> lengths;
+ for (int idx = 0; idx < num_values; idx += kBatchSize) {
+ const int batch_size = std::min(kBatchSize, num_values - idx);
+ for (int j = 0; j < batch_size; ++j) {
+ const int32_t len = src[idx + j].len;
+ if (AddWithOverflow(encoded_size_, len, &encoded_size_)) {
+ throw ParquetException("excess expansion in DELTA_LENGTH_BYTE_ARRAY");
+ }
+ lengths[j] = len;
+ }
+ length_encoder_.Put(lengths.data(), batch_size);
+ }
+
+ PARQUET_THROW_NOT_OK(sink_.Reserve(encoded_size_));
+ for (int idx = 0; idx < num_values; idx++) {
+ sink_.UnsafeAppend(src[idx].ptr, src[idx].len);
+ }
+}
+
+template <typename DType>
+void DeltaLengthByteArrayEncoder<DType>::PutSpaced(const T* src, int
num_values,
+ const uint8_t* valid_bits,
+ int64_t valid_bits_offset) {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values *
sizeof(T),
+
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaLengthByteArrayEncoder<DType>::FlushValues() {
+ std::shared_ptr<Buffer> encoded_lengths = length_encoder_.FlushValues();
+
+ std::shared_ptr<Buffer> data;
+ PARQUET_THROW_NOT_OK(sink_.Finish(&data));
+ sink_.Reset();
+
+ PARQUET_THROW_NOT_OK(sink_.Append(encoded_lengths->data(),
encoded_lengths->size()));
+ PARQUET_THROW_NOT_OK(sink_.Append(data->mutable_data(), data->size()));
Review Comment:
why `multiple_data` here?
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2537,6 +2537,116 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual
public TypedDecoder<DTyp
// ----------------------------------------------------------------------
// DELTA_LENGTH_BYTE_ARRAY
+// ----------------------------------------------------------------------
+// DeltaLengthByteArrayEncoder
+
+class DeltaLengthByteArrayEncoder : public EncoderImpl,
+ virtual public TypedEncoder<ByteArrayType>
{
+ public:
+ using T = typename ByteArrayType::c_type;
+
+ explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool)
+ : EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY,
+ pool = ::arrow::default_memory_pool()),
+ sink_(pool),
+ length_encoder_(nullptr, pool),
+ encoded_size_{0} {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return encoded_size_ + length_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<ByteArrayType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ protected:
+ ::arrow::BufferBuilder sink_;
+ DeltaBitPackEncoder<Int32Type> length_encoder_;
+ uint32_t encoded_size_;
+};
+
+void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) {
+ const ::arrow::ArrayData& data = *values.data();
+ if (values.type_id() != ::arrow::Type::BINARY) {
+ throw ParquetException("Expected ByteArrayType, got ",
values.type()->ToString());
+ }
+ if (data.length > std::numeric_limits<int32_t>::max()) {
+ throw ParquetException("Array cannot be longer than ",
+ std::numeric_limits<int32_t>::max());
+ }
+ if (values.null_count() == 0) {
+ Put(data.GetValues<ByteArray>(1), static_cast<int>(data.length));
+ } else {
+ PutSpaced(data.GetValues<ByteArray>(1), static_cast<int>(data.length),
+ data.GetValues<uint8_t>(0, 0), data.offset);
+ }
+}
+
+void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+
+ constexpr int kBatchSize = 256;
+ std::array<int32_t, kBatchSize> lengths;
+ for (int idx = 0; idx < num_values; idx += kBatchSize) {
+ const int batch_size = std::min(kBatchSize, num_values - idx);
+ for (int j = 0; j < batch_size; ++j) {
+ const int32_t len = src[idx + j].len;
+ encoded_size_ += len;
+ lengths[j] = len;
+ }
+ length_encoder_.Put(lengths.data(), batch_size);
+ }
+
+ PARQUET_THROW_NOT_OK(sink_.Reserve(encoded_size_));
+ for (int idx = 0; idx < num_values; idx++) {
+ sink_.UnsafeAppend(src[idx].ptr, src[idx].len);
Review Comment:
`memcpy(*sink_.mutable_data(), array.raw_data(), encoded_size_)` is great, I
think we can leave a todo for performance here?
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3086,7 +3257,17 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type
type_num, Encoding::type encodin
default:
throw ParquetException(
"DELTA_BINARY_PACKED encoder only supports INT32 and INT64");
- break;
+ }
+ } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) {
+ switch (type_num) {
+ case Type::BYTE_ARRAY:
+ return std::unique_ptr<Encoder>(
+ new DeltaLengthByteArrayEncoder<ByteArray>(descr, pool));
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::unique_ptr<Encoder>(
+ new DeltaLengthByteArrayEncoder<FLBAType>(descr, pool));
+ default:
+ throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports
BYTE_ARRAY");
Review Comment:
and `FIXED_LENGTH_BYTE_ARRAY`?
##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -429,12 +431,16 @@ TEST_F(TestValuesWriterInt64Type,
RequiredDeltaBinaryPacked) {
this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
}
-/*
-TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
+TEST_F(TestByteArrayValuesWriter, RequiredDeltaLengthByteArray) {
this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
}
-TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) {
+/*
+TYPED_TEST(TestByteArrayValuesWriter, RequiredDeltaByteArray) {
Review Comment:
Does here has multiple
`this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]