wjones127 commented on code in PR #14293:
URL: https://github.com/apache/arrow/pull/14293#discussion_r1068658234


##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1408,5 +1408,67 @@ TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
   }
 }
 
+// ----------------------------------------------------------------------
+// DELTA_LENGTH_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaLengthByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, false, 
descr_.get());
+    auto decoder =
+        MakeTypedDecoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, 
descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, 
num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t 
valid_bits_offset) {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, false, 
descr_.get());
+    auto decoder =
+        MakeTypedDecoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, 
descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, 
null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, 
num_values_,
+                                                        valid_bits, 
valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<ByteArrayType> TestDeltaLengthByteArrayEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaLengthByteArrayEncoding, 
TestDeltaLengthByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaLengthByteArrayEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
+      /*null_probability*/ 0.1));

Review Comment:
   It seems like it would also be worth it adding a `CheckEncode()` function 
like this: 
https://github.com/apache/arrow/blob/219e7d74670717f3f89a4df703eb194ae400b15a/cpp/src/parquet/encoding_test.cc#L1147-L1149
   
   And then you can have simple test cases like: 
https://github.com/apache/arrow/blob/219e7d74670717f3f89a4df703eb194ae400b15a/cpp/src/parquet/encoding_test.cc#L1199-L1208



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2537,6 +2537,114 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual 
public TypedDecoder<DTyp
 // ----------------------------------------------------------------------
 // DELTA_LENGTH_BYTE_ARRAY
 
+// ----------------------------------------------------------------------
+// DeltaLengthByteArrayEncoder
+
+class DeltaLengthByteArrayEncoder : public EncoderImpl,
+                                    virtual public TypedEncoder<ByteArrayType> 
{
+ public:
+  using T = typename ByteArrayType::c_type;
+
+  explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr, 
MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY,
+                    pool = ::arrow::default_memory_pool()),
+        sink_(pool),
+        length_encoder_(nullptr, pool),
+        encoded_size_{0},
+        lengths_(0, ::arrow::stl::allocator<int32_t>(pool)) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return encoded_size_ + length_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<ByteArrayType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+ protected:
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> length_encoder_;
+  uint32_t encoded_size_;
+  ArrowPoolVector<int32_t> lengths_;
+};
+
+void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) {
+  const ::arrow::ArrayData& data = *values.data();
+  if (values.type_id() != ::arrow::Type::BINARY) {
+    throw ParquetException("Expected ByteArrayType, got ", 
values.type()->ToString());
+  }
+  if (data.length > std::numeric_limits<int32_t>::max()) {
+    throw ParquetException("Array cannot be longer than ",
+                           std::numeric_limits<int32_t>::max());
+  }
+  if (values.null_count() == 0) {
+    Put(data.GetValues<ByteArray>(1), static_cast<int>(data.length));

Review Comment:
   FYI this function isn't hit by the tests in `parquet-internals-test`. One 
thing worth doing is attaching a debugger and setting a breakpoint in each new 
branch added in your changes, then verifying each breakpoint is hit. It's not 
perfect, but it gives you a sort of basic code coverage. :)



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2537,6 +2537,116 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual 
public TypedDecoder<DTyp
 // ----------------------------------------------------------------------
 // DELTA_LENGTH_BYTE_ARRAY
 
+// ----------------------------------------------------------------------
+// DeltaLengthByteArrayEncoder
+
+class DeltaLengthByteArrayEncoder : public EncoderImpl,
+                                    virtual public TypedEncoder<ByteArrayType> 
{
+ public:
+  using T = typename ByteArrayType::c_type;
+
+  explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr, 
MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY,
+                    pool = ::arrow::default_memory_pool()),
+        sink_(pool),
+        length_encoder_(nullptr, pool),
+        encoded_size_{0} {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return encoded_size_ + length_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<ByteArrayType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+ protected:
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> length_encoder_;
+  uint32_t encoded_size_;
+};
+
+void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) {
+  const ::arrow::ArrayData& data = *values.data();
+  if (values.type_id() != ::arrow::Type::BINARY) {
+    throw ParquetException("Expected ByteArrayType, got ", 
values.type()->ToString());
+  }
+  if (data.length > std::numeric_limits<int32_t>::max()) {
+    throw ParquetException("Array cannot be longer than ",
+                           std::numeric_limits<int32_t>::max());
+  }
+  if (values.null_count() == 0) {
+    Put(data.GetValues<ByteArray>(1), static_cast<int>(data.length));
+  } else {
+    PutSpaced(data.GetValues<ByteArray>(1), static_cast<int>(data.length),
+              data.GetValues<uint8_t>(0, 0), data.offset);
+  }
+}
+
+void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  constexpr int kBatchSize = 256;
+  std::array<int32_t, kBatchSize> lengths;
+  for (int idx = 0; idx < num_values; idx += kBatchSize) {
+    const int batch_size = std::min(kBatchSize, num_values - idx);
+    for (int j = 0; j < batch_size; ++j) {
+      const int32_t len = src[idx + j].len;
+      encoded_size_ += len;
+      lengths[j] = len;
+    }
+    length_encoder_.Put(lengths.data(), batch_size);
+  }
+
+  PARQUET_THROW_NOT_OK(sink_.Reserve(encoded_size_));
+  for (int idx = 0; idx < num_values; idx++) {
+    sink_.UnsafeAppend(src[idx].ptr, src[idx].len);
+  }
+}
+
+void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values,
+                                            const uint8_t* valid_bits,
+                                            int64_t valid_bits_offset) {
+  if (valid_bits != NULLPTR) {
+    PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * 
sizeof(T),
+                                                                 
this->memory_pool()));
+    T* data = reinterpret_cast<T*>(buffer->mutable_data());
+    int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+        src, num_values, valid_bits, valid_bits_offset, data);
+    Put(data, num_valid_values);
+  } else {
+    Put(src, num_values);

Review Comment:
   This branch also isn't hit by any test.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2537,6 +2537,116 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual 
public TypedDecoder<DTyp
 // ----------------------------------------------------------------------
 // DELTA_LENGTH_BYTE_ARRAY
 
+// ----------------------------------------------------------------------
+// DeltaLengthByteArrayEncoder
+
+class DeltaLengthByteArrayEncoder : public EncoderImpl,
+                                    virtual public TypedEncoder<ByteArrayType> 
{
+ public:
+  using T = typename ByteArrayType::c_type;
+
+  explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr, 
MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY,
+                    pool = ::arrow::default_memory_pool()),
+        sink_(pool),
+        length_encoder_(nullptr, pool),
+        encoded_size_{0} {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return encoded_size_ + length_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<ByteArrayType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+ protected:
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> length_encoder_;
+  uint32_t encoded_size_;
+};
+
+void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) {
+  const ::arrow::ArrayData& data = *values.data();
+  if (values.type_id() != ::arrow::Type::BINARY) {
+    throw ParquetException("Expected ByteArrayType, got ", 
values.type()->ToString());
+  }
+  if (data.length > std::numeric_limits<int32_t>::max()) {
+    throw ParquetException("Array cannot be longer than ",
+                           std::numeric_limits<int32_t>::max());
+  }
+  if (values.null_count() == 0) {
+    Put(data.GetValues<ByteArray>(1), static_cast<int>(data.length));
+  } else {
+    PutSpaced(data.GetValues<ByteArray>(1), static_cast<int>(data.length),
+              data.GetValues<uint8_t>(0, 0), data.offset);
+  }
+}
+
+void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;

Review Comment:
   This branch isn't hit by any test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to