pitrou commented on code in PR #14293:
URL: https://github.com/apache/arrow/pull/14293#discussion_r1094792239
##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -879,6 +907,13 @@ TYPED_TEST(EncodingAdHocTyped,
ByteStreamSplitArrowDirectPut) {
}
}
+TYPED_TEST(EncodingAdHocTyped, DeltaBitPackArrowDirectPut) {
+ this->null_probability_ = 0;
Review Comment:
Hmm, why? If this wouldn't work with a non-zero null count, then I think we
want these additional tests to be a separate PR (together with a fix).
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2572,6 +2572,128 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual
public TypedDecoder<DTyp
// ----------------------------------------------------------------------
// DELTA_LENGTH_BYTE_ARRAY
+// ----------------------------------------------------------------------
+// DeltaLengthByteArrayEncoder
+
+template <typename DType>
+class DeltaLengthByteArrayEncoder : public EncoderImpl,
+ virtual public TypedEncoder<ByteArrayType>
{
+ public:
+ explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool)
+ : EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY,
+ pool = ::arrow::default_memory_pool()),
+ sink_(pool),
+ length_encoder_(nullptr, pool),
+ encoded_size_{0} {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return encoded_size_ + length_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<ByteArrayType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ protected:
+ template <typename ArrayType>
+ void PutBinaryArray(const ArrayType& array) {
+ PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename
ArrayType::TypeClass>(
+ *array.data(),
+ [&](::std::string_view view) {
+ if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) {
+ return Status::Invalid("Parquet cannot store strings with size 2GB
or more");
+ }
+ length_encoder_.Put({static_cast<int32_t>(view.length())}, 1);
+ PARQUET_THROW_NOT_OK(sink_.Append(view.data(), view.length()));
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ }
+
+ ::arrow::BufferBuilder sink_;
+ DeltaBitPackEncoder<Int32Type> length_encoder_;
+ uint32_t encoded_size_;
+};
+
+template <typename DType>
+void DeltaLengthByteArrayEncoder<DType>::Put(const ::arrow::Array& values) {
+ AssertBaseBinary(values);
+ if (::arrow::is_binary_like(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
+ } else {
+ PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
+ }
+}
+
+template <typename DType>
+void DeltaLengthByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+
+ constexpr int kBatchSize = 256;
+ std::array<int32_t, kBatchSize> lengths;
+ for (int idx = 0; idx < num_values; idx += kBatchSize) {
+ const int batch_size = std::min(kBatchSize, num_values - idx);
+ for (int j = 0; j < batch_size; ++j) {
+ const int32_t len = src[idx + j].len;
+ if (AddWithOverflow(encoded_size_, len, &encoded_size_)) {
+ throw ParquetException("excess expansion in DELTA_LENGTH_BYTE_ARRAY");
+ }
+ lengths[j] = len;
+ }
+ length_encoder_.Put(lengths.data(), batch_size);
+ }
+
+ PARQUET_THROW_NOT_OK(sink_.Reserve(encoded_size_));
+ for (int idx = 0; idx < num_values; idx++) {
+ sink_.UnsafeAppend(src[idx].ptr, src[idx].len);
+ }
+}
+
+template <typename DType>
+void DeltaLengthByteArrayEncoder<DType>::PutSpaced(const T* src, int
num_values,
+ const uint8_t* valid_bits,
+ int64_t valid_bits_offset) {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values *
sizeof(T),
+
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaLengthByteArrayEncoder<DType>::FlushValues() {
+ std::shared_ptr<Buffer> encoded_lengths = length_encoder_.FlushValues();
+
+ std::shared_ptr<Buffer> data;
+ PARQUET_THROW_NOT_OK(sink_.Finish(&data));
+ sink_.Reset();
+
+ PARQUET_THROW_NOT_OK(sink_.Append(encoded_lengths->data(),
encoded_lengths->size()));
Review Comment:
Can probably presize the sink to avoid resizes between appends:
```suggestion
PARQUET_THROW_NOT_OK(sink_.Resize(encoded_lengths->size() + data->size()));
PARQUET_THROW_NOT_OK(sink_.Append(encoded_lengths->data(),
encoded_lengths->size()));
```
##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1493,5 +1528,105 @@ TYPED_TEST(TestDeltaBitPackEncoding,
NonZeroPaddedMiniblockBitWidth) {
}
}
+// ----------------------------------------------------------------------
+// DELTA_LENGTH_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaLengthByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+ using c_type = typename Type::c_type;
+ static constexpr int TYPE = Type::type_num;
+
+ virtual void CheckRoundtrip() {
+ auto encoder =
+ MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, false,
descr_.get());
+ auto decoder =
+ MakeTypedDecoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
descr_.get());
+
+ encoder->Put(draws_, num_values_);
+ encode_buffer_ = encoder->FlushValues();
+
+ decoder->SetData(num_values_, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ int values_decoded = decoder->Decode(decode_buf_, num_values_);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_,
num_values_));
+ }
+
+ void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t
valid_bits_offset) {
+ auto encoder =
+ MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, false,
descr_.get());
+ auto decoder =
+ MakeTypedDecoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
descr_.get());
+ int null_count = 0;
+ for (auto i = 0; i < num_values_; i++) {
+ if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+ null_count++;
+ }
+ }
+
+ encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+ encode_buffer_ = encoder->FlushValues();
+ decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_,
null_count,
+ valid_bits, valid_bits_offset);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_,
num_values_,
+ valid_bits,
valid_bits_offset));
+ }
+
+ protected:
+ USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<ByteArrayType> TestDeltaLengthByteArrayEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaLengthByteArrayEncoding,
TestDeltaLengthByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaLengthByteArrayEncoding, BasicRoundTrip) {
+ ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+ ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200));
+ ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+ /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
+ /*null_probability*/ 0.1));
+}
+
+TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) {
+ const int64_t size = 50;
+ const int32_t min_length = 0;
+ const int32_t max_length = 10;
+ const double null_probability = 0.25;
+ auto encoder =
MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY);
+ auto decoder =
MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY);
+
+ auto CheckSeed = [&](int seed, int64_t size) {
+ ::arrow::random::RandomArrayGenerator rag(seed);
+ auto values = rag.String(size, min_length, max_length, null_probability);
+
+ ASSERT_NO_THROW(encoder->Put(*values));
+ auto buf = encoder->FlushValues();
+
+ int num_values = static_cast<int>(values->length() - values->null_count());
+ decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+ typename EncodingTraits<ByteArrayType>::Accumulator acc;
+ acc.builder.reset(new ::arrow::StringBuilder);
+ ASSERT_EQ(num_values,
+ decoder->DecodeArrow(static_cast<int>(values->length()),
+ static_cast<int>(values->null_count()),
+ values->null_bitmap_data(),
values->offset(), &acc));
+
+ std::shared_ptr<::arrow::Array> result;
+ ASSERT_OK(acc.builder->Finish(&result));
+ ASSERT_EQ(size, result->length());
+ ::arrow::AssertArraysEqual(*values, *result);
Review Comment:
Let's also validate the result:
```suggestion
ASSERT_OK(result->ValidateFull());
::arrow::AssertArraysEqual(*values, *result);
```
##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1493,5 +1528,105 @@ TYPED_TEST(TestDeltaBitPackEncoding,
NonZeroPaddedMiniblockBitWidth) {
}
}
+// ----------------------------------------------------------------------
+// DELTA_LENGTH_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaLengthByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+ using c_type = typename Type::c_type;
+ static constexpr int TYPE = Type::type_num;
+
+ virtual void CheckRoundtrip() {
+ auto encoder =
+ MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, false,
descr_.get());
Review Comment:
What is false here? Please add parameter name for clarity:
```suggestion
MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
/*xxx=*/false, descr_.get());
```
##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1493,5 +1528,105 @@ TYPED_TEST(TestDeltaBitPackEncoding,
NonZeroPaddedMiniblockBitWidth) {
}
}
+// ----------------------------------------------------------------------
+// DELTA_LENGTH_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaLengthByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+ using c_type = typename Type::c_type;
+ static constexpr int TYPE = Type::type_num;
+
+ virtual void CheckRoundtrip() {
+ auto encoder =
+ MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, false,
descr_.get());
+ auto decoder =
+ MakeTypedDecoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
descr_.get());
+
+ encoder->Put(draws_, num_values_);
+ encode_buffer_ = encoder->FlushValues();
+
+ decoder->SetData(num_values_, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ int values_decoded = decoder->Decode(decode_buf_, num_values_);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_,
num_values_));
+ }
+
+ void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t
valid_bits_offset) {
+ auto encoder =
+ MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, false,
descr_.get());
+ auto decoder =
+ MakeTypedDecoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
descr_.get());
+ int null_count = 0;
+ for (auto i = 0; i < num_values_; i++) {
+ if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+ null_count++;
+ }
+ }
+
+ encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+ encode_buffer_ = encoder->FlushValues();
+ decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_,
null_count,
+ valid_bits, valid_bits_offset);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_,
num_values_,
+ valid_bits,
valid_bits_offset));
+ }
+
+ protected:
+ USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<ByteArrayType> TestDeltaLengthByteArrayEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaLengthByteArrayEncoding,
TestDeltaLengthByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaLengthByteArrayEncoding, BasicRoundTrip) {
+ ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+ ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200));
+ ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+ /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
+ /*null_probability*/ 0.1));
+}
+
+TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) {
+ const int64_t size = 50;
+ const int32_t min_length = 0;
+ const int32_t max_length = 10;
+ const double null_probability = 0.25;
+ auto encoder =
MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY);
+ auto decoder =
MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY);
+
+ auto CheckSeed = [&](int seed, int64_t size) {
+ ::arrow::random::RandomArrayGenerator rag(seed);
+ auto values = rag.String(size, min_length, max_length, null_probability);
Review Comment:
We should be a bit more thorough and check that it works for all four binary
types: Binary, String, LargeBinary and LargeString.
##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1408,5 +1408,67 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
}
}
+// ----------------------------------------------------------------------
+// DELTA_LENGTH_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaLengthByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+ using c_type = typename Type::c_type;
+ static constexpr int TYPE = Type::type_num;
+
+ virtual void CheckRoundtrip() {
+ auto encoder =
+ MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, false,
descr_.get());
+ auto decoder =
+ MakeTypedDecoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
descr_.get());
+
+ encoder->Put(draws_, num_values_);
+ encode_buffer_ = encoder->FlushValues();
+
+ decoder->SetData(num_values_, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ int values_decoded = decoder->Decode(decode_buf_, num_values_);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_,
num_values_));
+ }
+
+ void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t
valid_bits_offset) {
+ auto encoder =
+ MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, false,
descr_.get());
+ auto decoder =
+ MakeTypedDecoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
descr_.get());
+ int null_count = 0;
+ for (auto i = 0; i < num_values_; i++) {
+ if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+ null_count++;
+ }
+ }
+
+ encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+ encode_buffer_ = encoder->FlushValues();
+ decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_,
null_count,
+ valid_bits, valid_bits_offset);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_,
num_values_,
+ valid_bits,
valid_bits_offset));
+ }
+
+ protected:
+ USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<ByteArrayType> TestDeltaLengthByteArrayEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaLengthByteArrayEncoding,
TestDeltaLengthByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaLengthByteArrayEncoding, BasicRoundTrip) {
+ ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200));
+ ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+ /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
+ /*null_probability*/ 0.1));
Review Comment:
I like @wjones127's idea, though it can be a separate issue+PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]