wgtmac commented on code in PR #14341:
URL: https://github.com/apache/arrow/pull/14341#discussion_r1256043091


##########
cpp/src/parquet/encoding.cc:
##########
@@ -2842,7 +2888,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
 
   std::shared_ptr<::arrow::bit_util::BitReader> decoder_;
   DeltaBitPackDecoder<Int32Type> len_decoder_;
-  int num_valid_values_;
+  int num_valid_values_{0};

Review Comment:
   This is initialized via `SetData`. But it doesn't hurt to provide a default 
value here. Would you want to do the same thing for `length_idx_`?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -1238,25 +1240,35 @@ int PlainBooleanDecoder::Decode(bool* buffer, int 
max_values) {
   return max_values;
 }
 
-struct ArrowBinaryHelper {
+template <typename DType>
+struct ArrowBinaryHelper;

Review Comment:
   Just a reminder that this may conflict with 
https://github.com/apache/arrow/pull/35825/files once it gets merged.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,12 +3083,230 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),

Review Comment:
   Why passing nullptr instead of descr? Is it safe to do so?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,12 +3083,230 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_(""),
+        kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),
+                                                                   
this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename VisitorType>
+  void PutInternal(const T* src, int num_values) {
+    if (num_values == 0) {
+      return;
+    }
+    uint32_t len = descr_->type_length();
+
+    std::string_view last_value_view = last_value_;
+    constexpr int kBatchSize = 256;
+    std::array<int32_t, kBatchSize> prefix_lengths;
+    std::array<ByteArray, kBatchSize> suffixes;
+    auto visitor = VisitorType{src, len};
+
+    for (int i = 0; i < num_values; i += kBatchSize) {
+      const int batch_size = std::min(kBatchSize, num_values - i);
+
+      for (int j = 0; j < batch_size; ++j) {
+        auto view = visitor[i + j];
+        len = visitor.len(i + j);
+
+        uint32_t k = 0;
+        const uint32_t common_length =
+            std::min(len, static_cast<uint32_t>(last_value_view.length()));
+        while (k < common_length) {
+          if (last_value_view[k] != view[k]) {
+            break;
+          }
+          k++;
+        }
+
+        last_value_view = view;
+        prefix_lengths[j] = k;
+        const uint32_t suffix_length = len - k;
+        const uint8_t* suffix_ptr = src[i + j].ptr + k;
+
+        // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+        const ByteArray suffix(suffix_length, suffix_ptr);
+        suffixes[j] = suffix;
+      }
+      suffix_encoder_.Put(suffixes.data(), batch_size);
+      prefix_length_encoder_.Put(prefix_lengths.data(), batch_size);
+    }
+    last_value_ = last_value_view;
+  }
+
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    auto previous_len = static_cast<uint32_t>(last_value_.length());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB 
or more");
+          }
+          // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+          const ByteArray src{view};
+
+          uint32_t j = 0;
+          const uint32_t len = src.len;
+          const uint32_t common_length = std::min(previous_len, len);
+          while (j < common_length) {
+            if (last_value_view[j] != view[j]) {
+              break;
+            }
+            j++;
+          }
+          previous_len = len;
+          prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+          last_value_view = view;
+          const auto suffix_length = static_cast<uint32_t>(len - j);
+          if (suffix_length == 0) {
+            suffix_encoder_.Put(&kEmpty, 1);
+            return Status::OK();
+          }
+          const uint8_t* suffix_ptr = src.ptr + j;
+          // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+          const ByteArray suffix(suffix_length, suffix_ptr);
+          suffix_encoder_.Put(&suffix, 1);
+
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+    last_value_ = last_value_view;
+  }
+
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+  DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+  std::string last_value_;
+  const ByteArray kEmpty;
+};
+
+struct ByteArrayVisitor {
+  const ByteArray* src;
+  const uint32_t type_length;

Review Comment:
   Add a comment to say type_length is not used for ByteArratType or simply add 
`[[maybe_unused]]`?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,12 +3083,230 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_(""),
+        kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),
+                                                                   
this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename VisitorType>
+  void PutInternal(const T* src, int num_values) {
+    if (num_values == 0) {
+      return;
+    }
+    uint32_t len = descr_->type_length();
+
+    std::string_view last_value_view = last_value_;
+    constexpr int kBatchSize = 256;
+    std::array<int32_t, kBatchSize> prefix_lengths;
+    std::array<ByteArray, kBatchSize> suffixes;
+    auto visitor = VisitorType{src, len};
+
+    for (int i = 0; i < num_values; i += kBatchSize) {
+      const int batch_size = std::min(kBatchSize, num_values - i);
+
+      for (int j = 0; j < batch_size; ++j) {
+        auto view = visitor[i + j];

Review Comment:
   `i + j` is computed for three times, probably worth a local variable?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3188,18 +3447,54 @@ class DeltaByteArrayDecoder : public DecoderImpl,
     return Status::OK();
   }
 
+  MemoryPool* pool_;
+
+ private:
   std::shared_ptr<::arrow::bit_util::BitReader> decoder_;
   DeltaBitPackDecoder<Int32Type> prefix_len_decoder_;
   DeltaLengthByteArrayDecoder suffix_decoder_;
   std::string last_value_;
   // string buffer for last value in previous page
   std::string last_value_in_previous_page_;
-  int num_valid_values_;
+  int num_valid_values_{0};

Review Comment:
   ditto, is this necessary?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,12 +3083,230 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>

Review Comment:
   ```suggestion
   // DeltaByteArrayEncoder
   
   constexpr std::string_view kEmpty = "";
   
   template <typename DType>
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,12 +3083,230 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_(""),
+        kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {}

Review Comment:
   I am afraid this may be dangerous. What about declaring the constant kEmpty 
outside like above, and then use it here:
   ```cpp
           empty_(kEmpty.size(), reinterpret_cast<const 
uint8_t*>(kEmpty.data())) {}
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -1238,25 +1240,35 @@ int PlainBooleanDecoder::Decode(bool* buffer, int 
max_values) {
   return max_values;
 }
 
-struct ArrowBinaryHelper {
+template <typename DType>
+struct ArrowBinaryHelper;
+
+template <>
+struct ArrowBinaryHelper<ByteArrayType> {
   explicit ArrowBinaryHelper(typename 
EncodingTraits<ByteArrayType>::Accumulator* out) {
     this->out = out;
     this->builder = out->builder.get();
+    if (ARROW_PREDICT_FALSE(SubtractWithOverflow(::arrow::kBinaryMemoryLimit,
+                                                 
this->builder->value_data_length(),
+                                                 
&this->chunk_space_remaining))) {
+      throw ParquetException("excess expansion in 
ArrowBinaryHelper<ByteArrayType>");
+    }
     this->chunk_space_remaining =

Review Comment:
   This line would be redundant, it has been set via line 1251 already.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,12 +3083,230 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_(""),
+        kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),
+                                                                   
this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename VisitorType>
+  void PutInternal(const T* src, int num_values) {
+    if (num_values == 0) {
+      return;
+    }
+    uint32_t len = descr_->type_length();

Review Comment:
   IIRC, `descr_->type_length()` will be 0 for ByteArrayType, so it is only 
used for FLBAType, right?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -1275,6 +1287,40 @@ struct ArrowBinaryHelper {
   int64_t chunk_space_remaining;
 };
 
+template <>
+struct ArrowBinaryHelper<FLBAType> {
+  explicit ArrowBinaryHelper(EncodingTraits<FLBAType>::Accumulator* builder) {
+    this->builder = builder;
+    if (ARROW_PREDICT_FALSE(SubtractWithOverflow(::arrow::kBinaryMemoryLimit,
+                                                 
this->builder->value_data_length(),
+                                                 
&this->chunk_space_remaining))) {
+      throw ParquetException("excess expansion in 
ArrowBinaryHelper<FLBAType>");
+    }
+  }
+
+  Status PushChunk() {
+    std::shared_ptr<::arrow::Array> result;
+    RETURN_NOT_OK(builder->Finish(&result));
+    chunks.push_back(std::move(result));
+    chunk_space_remaining = ::arrow::kBinaryMemoryLimit;
+    return Status::OK();
+  }
+
+  bool CanFit(int64_t length) const { return length <= chunk_space_remaining; }
+
+  Status Append(const uint8_t* data, int32_t length) {
+    DCHECK(CanFit(length));
+    chunk_space_remaining -= length;
+    return builder->Append(data);
+  }
+
+  Status AppendNull() { return builder->AppendNull(); }
+
+  ::arrow::FixedSizeBinaryBuilder* builder;
+  std::vector<std::shared_ptr<::arrow::Array>> chunks;

Review Comment:
   I'm not sure if this has been discussed already. Would it help to change 
`struct EncodingTraits<FLBAType>` to be similar with `struct 
EncodingTraits<ByteArrayType>`? Something like:
   ```cpp
   template <>
   struct EncodingTraits<FLBAType> {
     using Encoder = FLBAEncoder;
     using Decoder = FLBADecoder;
   
     struct Accumulator {
       std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder;
       std::vector<std::shared_ptr<::arrow::Array>> chunks;
     };
   
     using ArrowType = ::arrow::FixedSizeBinaryType;
     using DictAccumulator = 
::arrow::Dictionary32Builder<::arrow::FixedSizeBinaryType>;
   };
   ```
   
   This may help eliminate the duplication introduced by template 
specialization. You may simply define this:
   ```cpp
   template <typename DType, typename Enable = void>
   struct ArrowBinaryHelper;
   
   template <typename DType>
   struct ArrowBinaryHelper<DType, std::enable_if_t<std::is_same_v<DType, 
ByteArrayType> || std::is_same_v<DType, FLBAType>, void>)> {
     ...
   };
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to