wgtmac commented on code in PR #14341:
URL: https://github.com/apache/arrow/pull/14341#discussion_r1159204933


##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,11 +3083,242 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_(""),
+        kEmpty(ByteArray(0, nullptr)) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(
+          buffer_, ::arrow::AllocateBuffer(num_values * sizeof(T), 
this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer_->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    auto previous_len = static_cast<uint32_t>(last_value_.size());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB 
or more");
+          }
+          // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+          const ByteArray src{view};
+
+          uint32_t j = 0;
+          const uint32_t common_length = std::min(previous_len, src.len);
+          while (j < common_length) {
+            if (last_value_view[j] != view[j]) {
+              break;
+            }
+            j++;
+          }
+          previous_len = src.len;
+          prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+          last_value_view = view;
+          const auto suffix_length = static_cast<uint32_t>(src.len - j);
+          if (suffix_length == 0) {
+            suffix_encoder_.Put(&kEmpty, 1);

Review Comment:
   ```
   [----------] 3 tests from DeltaByteArrayEncodingAdHoc
   [ RUN      ] DeltaByteArrayEncodingAdHoc.ArrowBinaryDirectPut
   /arrow/cpp/src/arrow/buffer_builder.h:138:27: runtime error: null pointer 
passed as argument 2, which is declared to never be null
   /usr/include/string.h:44:28: note: nonnull attribute specified here
       #0 0x55a380a6b658 in arrow::BufferBuilder::UnsafeAppend(void const*, 
long) /arrow/cpp/src/arrow/buffer_builder.h:138:5
       #1 0x7f976e1ca580 in parquet::(anonymous 
namespace)::DeltaLengthByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6>
 >::Put(parquet::ByteArray const*, int) 
/arrow/cpp/src/parquet/encoding.cc:2719:11
       #2 0x7f976e1f7f7f in void parquet::(anonymous 
namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> 
>::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray 
const&)::'lambda'(std::basic_string_view<char, std::char_traits<char> 
>)::operator()(std::basic_string_view<char, std::char_traits<char> >) const 
/arrow/cpp/src/parquet/encoding.cc:3150:5
   ```
   
   The ASAN & UBSAN check fails here, though the line number does not appear to 
be precise. If `suffix_length` is 0, we probably can skip calling 
`suffix_encoder_.Put` as there is nothing to concatenate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to