mapleFU commented on code in PR #14293:
URL: https://github.com/apache/arrow/pull/14293#discussion_r1050922988


##########
cpp/src/parquet/encoding.cc:
##########
@@ -2537,6 +2537,102 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual 
public TypedDecoder<DTyp
 // ----------------------------------------------------------------------
 // DELTA_LENGTH_BYTE_ARRAY
 
+// ----------------------------------------------------------------------
+// DeltaLengthByteArrayEncoder
+
+template <typename DType>
+class DeltaLengthByteArrayEncoder : public EncoderImpl,
+                                    virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr, 
MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY,
+                    pool = ::arrow::default_memory_pool()),
+        sink_(pool),
+        length_encoder_(nullptr, pool),
+        encoded_size_{0} {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return encoded_size_; }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+ protected:
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> length_encoder_;
+  uint32_t encoded_size_;
+};
+
+template <>
+void DeltaLengthByteArrayEncoder<ByteArrayType>::Put(const ::arrow::Array& 
values) {
+  auto src = values.data()->GetValues<ByteArray>(1);
+  Put(src, static_cast<int>(values.length()));
+}
+
+template <typename DType>
+void DeltaLengthByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  std::vector<int32_t> lengths(num_values);

Review Comment:
   would construct a vector during every user `Put` be expansive? Can we cache 
an array and just use `resize`?
   
   https://en.cppreference.com/w/cpp/container/vector/resize . Seems that we 
don't need to allocate a array every time we need it.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2537,6 +2537,102 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual 
public TypedDecoder<DTyp
 // ----------------------------------------------------------------------
 // DELTA_LENGTH_BYTE_ARRAY
 
+// ----------------------------------------------------------------------
+// DeltaLengthByteArrayEncoder
+
+template <typename DType>

Review Comment:
   Seems our standard says DeltaLengthByteArray only supports BYTE_ARRAY, can 
we use a fixed type or limit the `DType` here?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2537,6 +2537,102 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual 
public TypedDecoder<DTyp
 // ----------------------------------------------------------------------
 // DELTA_LENGTH_BYTE_ARRAY
 
+// ----------------------------------------------------------------------
+// DeltaLengthByteArrayEncoder
+
+template <typename DType>
+class DeltaLengthByteArrayEncoder : public EncoderImpl,
+                                    virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr, 
MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY,
+                    pool = ::arrow::default_memory_pool()),
+        sink_(pool),
+        length_encoder_(nullptr, pool),
+        encoded_size_{0} {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return encoded_size_; }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+ protected:
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> length_encoder_;
+  uint32_t encoded_size_;
+};
+
+template <>
+void DeltaLengthByteArrayEncoder<ByteArrayType>::Put(const ::arrow::Array& 
values) {
+  auto src = values.data()->GetValues<ByteArray>(1);
+  Put(src, static_cast<int>(values.length()));
+}
+
+template <typename DType>
+void DeltaLengthByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  std::vector<int32_t> lengths(num_values);
+  for (int idx = 0; idx < num_values; idx++) {
+    auto len = static_cast<const int32_t>(src[idx].len);
+    lengths[idx] = len;
+    encoded_size_ += len;
+  }
+  length_encoder_.Put(lengths.data(), num_values);
+  PARQUET_THROW_NOT_OK(sink_.Reserve(encoded_size_));
+
+  for (int idx = 0; idx < num_values; idx++) {
+    sink_.UnsafeAppend(src[idx].ptr, lengths[idx]);
+  }
+}
+
+template <typename DType>
+void DeltaLengthByteArrayEncoder<DType>::PutSpaced(const T* src, int 
num_values,
+                                                   const uint8_t* valid_bits,
+                                                   int64_t valid_bits_offset) {
+  if (valid_bits != NULLPTR) {
+    PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * 
sizeof(T),
+                                                                 
this->memory_pool()));
+    T* data = reinterpret_cast<T*>(buffer->mutable_data());
+    int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+        src, num_values, valid_bits, valid_bits_offset, data);
+    Put(data, num_valid_values);
+  } else {
+    Put(src, num_values);
+  }
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaLengthByteArrayEncoder<DType>::FlushValues() {
+  std::shared_ptr<Buffer> encoded_lengths = length_encoder_.FlushValues();
+
+  std::shared_ptr<Buffer> data;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&data));
+  sink_.Reset();
+
+  PARQUET_THROW_NOT_OK(sink_.Append(encoded_lengths->data(), 
encoded_lengths->size()));
+  PARQUET_THROW_NOT_OK(sink_.Append(data->mutable_data(), data->size()));
+
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));

Review Comment:
   remember to clear the `encoded_size_` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to