wgtmac commented on code in PR #14341:
URL: https://github.com/apache/arrow/pull/14341#discussion_r1155415558


##########
cpp/src/parquet/encoding.cc:
##########
@@ -1275,6 +1280,37 @@ struct ArrowBinaryHelper {
   int64_t chunk_space_remaining;
 };
 
+template <>
+struct ArrowBinaryHelper<FLBAType> {
+  explicit ArrowBinaryHelper(EncodingTraits<FLBAType>::Accumulator* builder) {
+    this->builder = builder;
+    this->chunk_space_remaining =

Review Comment:
   Is overflow check a good idea here and line 1250 above?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,11 +3073,179 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_("") {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),

Review Comment:
   nit: `buffer` can be reused for better performance.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,11 +3073,179 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_("") {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),
+                                                                   
this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    uint32_t previous_len = static_cast<uint32_t>(last_value_.size());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB 
or more");

Review Comment:
   Do we have any evidence where does the `2GB` come from? Would be better to 
add a comment for the reason here.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,11 +3073,179 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_("") {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),
+                                                                   
this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    uint32_t previous_len = static_cast<uint32_t>(last_value_.size());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB 
or more");
+          }
+          // Convert view to ByteArray so it can be passed to the 
suffix_encoder_.
+          const ByteArray src{view};
+
+          uint32_t j = 0;
+          while (j < std::min(previous_len, src.len)) {
+            if (last_value_view[j] != view[j]) {
+              break;
+            }
+            j++;
+          }
+          previous_len = src.len;
+          prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+          const uint8_t* suffix_ptr = src.ptr + j;
+          const uint32_t suffix_length = static_cast<uint32_t>(src.len - j);
+          last_value_view = view;
+          // Convert suffix to ByteArray so it can be passed to the 
suffix_encoder_.
+          const ByteArray suffix(suffix_length, suffix_ptr);

Review Comment:
   Is it an UB if `suffix_length == 0`? Should we pass a static empty string 
for safety?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,11 +3073,179 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_("") {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),
+                                                                   
this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    uint32_t previous_len = static_cast<uint32_t>(last_value_.size());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB 
or more");
+          }
+          // Convert view to ByteArray so it can be passed to the 
suffix_encoder_.
+          const ByteArray src{view};
+
+          uint32_t j = 0;
+          while (j < std::min(previous_len, src.len)) {
+            if (last_value_view[j] != view[j]) {
+              break;
+            }
+            j++;
+          }
+          previous_len = src.len;
+          prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+          const uint8_t* suffix_ptr = src.ptr + j;
+          const uint32_t suffix_length = static_cast<uint32_t>(src.len - j);
+          last_value_view = view;
+          // Convert suffix to ByteArray so it can be passed to the 
suffix_encoder_.
+          const ByteArray suffix(suffix_length, suffix_ptr);
+          suffix_encoder_.Put(&suffix, 1);
+
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+    last_value_ = last_value_view;
+  }
+
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+  DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+  std::string last_value_;
+};
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+  ArrowPoolVector<int32_t> prefix_lengths(num_values,
+                                          
::arrow::stl::allocator<int32_t>(pool_));
+  std::string_view last_value_view = last_value_;
+
+  int i = 0;
+  while (i < num_values) {
+    // Convert to ByteArray so we can pass to the suffix_encoder_.
+    auto value = reinterpret_cast<const ByteArray*>(&src[i]);

Review Comment:
   In this case, the length should be derived from the type.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,11 +3073,182 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_("") {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),
+                                                                   
this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    uint32_t previous_len = static_cast<uint32_t>(last_value_.size());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB 
or more");
+          }
+          // Convert view to ByteArray so it can be passed to the 
suffix_encoder_.
+          const ByteArray src{view};
+
+          uint32_t j = 0;
+          const uint32_t common_length = std::min(previous_len, src.len);
+          while (j < common_length) {
+            if (last_value_view[j] != view[j]) {

Review Comment:
   I guess it may be caused by the cast at line 3183. FLBA type is not covered 
by any test yet. @wjones127 



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,11 +3073,179 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_("") {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),
+                                                                   
this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    uint32_t previous_len = static_cast<uint32_t>(last_value_.size());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB 
or more");
+          }
+          // Convert view to ByteArray so it can be passed to the 
suffix_encoder_.
+          const ByteArray src{view};
+
+          uint32_t j = 0;
+          while (j < std::min(previous_len, src.len)) {
+            if (last_value_view[j] != view[j]) {
+              break;
+            }
+            j++;
+          }
+          previous_len = src.len;
+          prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+          const uint8_t* suffix_ptr = src.ptr + j;
+          const uint32_t suffix_length = static_cast<uint32_t>(src.len - j);
+          last_value_view = view;
+          // Convert suffix to ByteArray so it can be passed to the 
suffix_encoder_.
+          const ByteArray suffix(suffix_length, suffix_ptr);
+          suffix_encoder_.Put(&suffix, 1);
+
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+    last_value_ = last_value_view;
+  }
+
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+  DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+  std::string last_value_;
+};
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+  ArrowPoolVector<int32_t> prefix_lengths(num_values,
+                                          
::arrow::stl::allocator<int32_t>(pool_));
+  std::string_view last_value_view = last_value_;
+
+  int i = 0;
+  while (i < num_values) {
+    // Convert to ByteArray so we can pass to the suffix_encoder_.
+    auto value = reinterpret_cast<const ByteArray*>(&src[i]);
+    if (ARROW_PREDICT_FALSE(value->len >= kMaxByteArraySize)) {
+      throw Status::Invalid("Parquet cannot store strings with size 2GB or 
more");
+    }
+
+    auto view = string_view{reinterpret_cast<const char*>(value->ptr), 
value->len};
+    uint32_t j = 0;
+    while (j < std::min(value->len, 
static_cast<uint32_t>(last_value_view.length()))) {

Review Comment:
   last_value_view is not mutated in the current version, should we do what 
suggested above for better readability and performance?



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1955,5 +1955,285 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) 
{
   CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), 
values));
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, 
descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, 
num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t 
valid_bits_offset) {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, 
descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, 
null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, 
num_values_,
+                                                        valid_bits, 
valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<ByteArrayType> TestDeltaByteArrayEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaByteArrayEncoding, TestDeltaByteArrayEncodingTypes);
+
+// TODO: add FLBAType and Decimal type tests
+
+TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(250, 2));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 
0));
+
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 10, /*valid_bits_offset*/ 64,
+      /*null_probability*/ 0.1));
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowBinaryDirectPut) {
+  const int64_t size = 50;
+  const int32_t min_length = 0;
+  const int32_t max_length = 10;
+  const int32_t num_unique = 10;
+  const double null_probability = 0.25;
+  auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+  auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+
+  auto CheckSeed = [&](std::shared_ptr<::arrow::Array> values) {
+    ASSERT_NO_THROW(encoder->Put(*values));
+    auto buf = encoder->FlushValues();
+
+    int num_values = static_cast<int>(values->length() - values->null_count());
+    decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+    typename EncodingTraits<ByteArrayType>::Accumulator acc;
+    if (::arrow::is_string(values->type()->id())) {
+      acc.builder = std::make_unique<::arrow::StringBuilder>();
+    } else {
+      acc.builder = std::make_unique<::arrow::BinaryBuilder>();
+    }
+    ASSERT_EQ(num_values,
+              decoder->DecodeArrow(static_cast<int>(values->length()),
+                                   static_cast<int>(values->null_count()),
+                                   values->null_bitmap_data(), 
values->offset(), &acc));
+
+    std::shared_ptr<::arrow::Array> result;
+    ASSERT_OK(acc.builder->Finish(&result));
+    ASSERT_EQ(values->length(), result->length());
+    ASSERT_OK(result->ValidateFull());
+
+    auto upcast_result = CastBinaryTypesHelper(result, values->type());
+    ::arrow::AssertArraysEqual(*values, *result);
+  };
+
+  ::arrow::random::RandomArrayGenerator rag(42);
+  auto values = rag.String(0, min_length, max_length, null_probability);
+  CheckSeed(values);
+  for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
+    rag = ::arrow::random::RandomArrayGenerator(seed);
+
+    values = rag.String(size, min_length, max_length, null_probability);
+    CheckSeed(values);
+
+    values =
+        rag.BinaryWithRepeats(size, num_unique, min_length, max_length, 
null_probability);
+    CheckSeed(values);
+  }
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowBinaryDirectPutFixedLength) {
+  const int64_t size = 50;
+  const double null_probability = 0.25;
+  ::arrow::random::RandomArrayGenerator rag(0);
+  auto encoder = MakeTypedEncoder<FLBAType>(Encoding::DELTA_BYTE_ARRAY);
+  auto decoder = MakeTypedDecoder<FLBAType>(Encoding::DELTA_BYTE_ARRAY);
+
+  auto CheckSeed = [&](std::shared_ptr<::arrow::Array> values) {
+    ASSERT_NO_THROW(encoder->Put(*values));
+    auto buf = encoder->FlushValues();
+
+    int num_values = static_cast<int>(values->length() - values->null_count());
+    decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+    typename EncodingTraits<FLBAType>::Accumulator acc(values->type());
+    ASSERT_EQ(num_values,
+              decoder->DecodeArrow(static_cast<int>(values->length()),
+                                   static_cast<int>(values->null_count()),
+                                   values->null_bitmap_data(), 
values->offset(), &acc));
+
+    std::shared_ptr<::arrow::Array> result;
+    ASSERT_OK(acc.Finish(&result));
+    ASSERT_EQ(values->length(), result->length());
+    ASSERT_OK(result->ValidateFull());
+    ::arrow::AssertArraysEqual(*values, *result);
+  };
+
+  for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
+    for (auto length : {0, 10, 100, 1000}) {
+      rag = ::arrow::random::RandomArrayGenerator(seed);
+      auto values = rag.FixedSizeBinary(size, length, null_probability);
+      CheckSeed(values);
+    }
+  }
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) {
+  auto CheckEncode = [](std::shared_ptr<::arrow::Array> values,
+                        std::shared_ptr<::arrow::Array> prefix_lengths,
+                        std::shared_ptr<::arrow::Array> suffix_lengths,
+                        std::string_view value) {
+    auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+    ASSERT_NO_THROW(encoder->Put(*values));
+    auto buf = encoder->FlushValues();
+
+    auto prefix_lengths_encoder =
+        MakeTypedEncoder<Int32Type>(Encoding::DELTA_BINARY_PACKED);
+    ASSERT_NO_THROW(prefix_lengths_encoder->Put(*prefix_lengths));
+    auto prefix_lengths_buf = prefix_lengths_encoder->FlushValues();
+
+    auto encoded_prefix_lengths_buf = SliceBuffer(buf, 0, 
prefix_lengths_buf->size());
+
+    auto suffix_lengths_encoder =
+        MakeTypedEncoder<Int32Type>(Encoding::DELTA_BINARY_PACKED);
+    ASSERT_NO_THROW(suffix_lengths_encoder->Put(*suffix_lengths));
+    auto suffix_lengths_buf = suffix_lengths_encoder->FlushValues();
+    auto encoded_values_buf =
+        SliceBuffer(buf, prefix_lengths_buf->size() + 
suffix_lengths_buf->size());
+
+    auto encoded_prefix_length_buf = SliceBuffer(buf, 0, 
prefix_lengths_buf->size());
+    EXPECT_TRUE(prefix_lengths_buf->Equals(*encoded_prefix_length_buf));
+    auto encoded_suffix_length_buf =
+        SliceBuffer(buf, prefix_lengths_buf->size(), 
suffix_lengths_buf->size());
+    EXPECT_TRUE(suffix_lengths_buf->Equals(*encoded_suffix_length_buf));
+    EXPECT_EQ(value, encoded_values_buf->ToString());
+  };
+
+  auto arrayToI32 = [](const std::shared_ptr<::arrow::Array>& lengths) {
+    std::vector<int32_t> arrays;
+    auto data_ptr = checked_cast<::arrow::Int32Array*>(lengths.get());
+    for (int i = 0; i < lengths->length(); ++i) {
+      arrays.push_back(data_ptr->GetView(i));
+    }
+    return arrays;
+  };
+
+  auto CheckDecode = [](std::shared_ptr<Buffer> buf,
+                        std::shared_ptr<::arrow::Array> values) {
+    int num_values = static_cast<int>(values->length());
+    auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+    decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+    typename EncodingTraits<ByteArrayType>::Accumulator acc;
+    if (::arrow::is_string(values->type()->id())) {
+      acc.builder = std::make_unique<::arrow::StringBuilder>();
+    } else {
+      acc.builder = std::make_unique<::arrow::BinaryBuilder>();
+    }
+
+    ASSERT_EQ(num_values,
+              decoder->DecodeArrow(static_cast<int>(values->length()),
+                                   static_cast<int>(values->null_count()),
+                                   values->null_bitmap_data(), 
values->offset(), &acc));
+
+    std::shared_ptr<::arrow::Array> result;
+    ASSERT_OK(acc.builder->Finish(&result));
+    ASSERT_EQ(num_values, result->length());
+    ASSERT_OK(result->ValidateFull());
+
+    auto upcast_result = CastBinaryTypesHelper(result, values->type());
+    ::arrow::AssertArraysEqual(*values, *upcast_result);
+  };
+
+  auto checkEncodeDecode = [&](std::string_view values,
+                               std::shared_ptr<::arrow::Array> prefix_lengths,
+                               std::shared_ptr<::arrow::Array> suffix_lengths,
+                               std::string_view suffix_data) {
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::utf8(), values), 
prefix_lengths,
+                suffix_lengths, suffix_data);
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::large_utf8(), values), 
prefix_lengths,
+                suffix_lengths, suffix_data);
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::binary(), values), 
prefix_lengths,
+                suffix_lengths, suffix_data);
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::large_binary(), values), 
prefix_lengths,
+                suffix_lengths, suffix_data);
+
+    auto encoded = 
::arrow::ConcatenateBuffers({DeltaEncode(arrayToI32(prefix_lengths)),
+                                                
DeltaEncode(arrayToI32(suffix_lengths)),
+                                                
std::make_shared<Buffer>(suffix_data)})
+                       .ValueOrDie();
+
+    CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::utf8(), values));
+    CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_utf8(), 
values));
+    CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::binary(), values));
+    CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), 
values));
+  };
+
+  {
+    auto values = R"(["axis", "axle", "babble", "babyhood"])";

Review Comment:
   Should we add a test for non-ASCII but UTF8-conforming characters?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,11 +3073,179 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_("") {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),
+                                                                   
this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    uint32_t previous_len = static_cast<uint32_t>(last_value_.size());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB 
or more");
+          }
+          // Convert view to ByteArray so it can be passed to the 
suffix_encoder_.
+          const ByteArray src{view};
+
+          uint32_t j = 0;
+          while (j < std::min(previous_len, src.len)) {
+            if (last_value_view[j] != view[j]) {
+              break;
+            }
+            j++;
+          }
+          previous_len = src.len;
+          prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+          const uint8_t* suffix_ptr = src.ptr + j;
+          const uint32_t suffix_length = static_cast<uint32_t>(src.len - j);
+          last_value_view = view;
+          // Convert suffix to ByteArray so it can be passed to the 
suffix_encoder_.
+          const ByteArray suffix(suffix_length, suffix_ptr);
+          suffix_encoder_.Put(&suffix, 1);
+
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+    last_value_ = last_value_view;
+  }
+
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+  DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+  std::string last_value_;
+};
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+  ArrowPoolVector<int32_t> prefix_lengths(num_values,
+                                          
::arrow::stl::allocator<int32_t>(pool_));
+  std::string_view last_value_view = last_value_;
+
+  int i = 0;
+  while (i < num_values) {
+    // Convert to ByteArray so we can pass to the suffix_encoder_.
+    auto value = reinterpret_cast<const ByteArray*>(&src[i]);

Review Comment:
   T is `FLBA` when DType is FLBAType, we cannot blindly cast it to `ByteArray`.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,11 +3073,179 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_("") {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),
+                                                                   
this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    uint32_t previous_len = static_cast<uint32_t>(last_value_.size());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB 
or more");
+          }
+          // Convert view to ByteArray so it can be passed to the 
suffix_encoder_.
+          const ByteArray src{view};
+
+          uint32_t j = 0;
+          while (j < std::min(previous_len, src.len)) {
+            if (last_value_view[j] != view[j]) {
+              break;
+            }
+            j++;
+          }
+          previous_len = src.len;
+          prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+          const uint8_t* suffix_ptr = src.ptr + j;
+          const uint32_t suffix_length = static_cast<uint32_t>(src.len - j);
+          last_value_view = view;
+          // Convert suffix to ByteArray so it can be passed to the 
suffix_encoder_.
+          const ByteArray suffix(suffix_length, suffix_ptr);
+          suffix_encoder_.Put(&suffix, 1);
+
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+    last_value_ = last_value_view;
+  }
+
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+  DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+  std::string last_value_;
+};
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+  ArrowPoolVector<int32_t> prefix_lengths(num_values,
+                                          
::arrow::stl::allocator<int32_t>(pool_));
+  std::string_view last_value_view = last_value_;
+
+  int i = 0;
+  while (i < num_values) {

Review Comment:
   We can use for loop instead of while loop since `i` is not used outside.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,11 +3073,179 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_("") {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values 
* sizeof(T),
+                                                                   
this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    uint32_t previous_len = static_cast<uint32_t>(last_value_.size());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB 
or more");
+          }
+          // Convert view to ByteArray so it can be passed to the 
suffix_encoder_.
+          const ByteArray src{view};
+
+          uint32_t j = 0;
+          while (j < std::min(previous_len, src.len)) {

Review Comment:
   `std::min(previous_len, src.len)` can be put out of the while loop as it is 
constant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to