This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 94bd0d2732 GH-32863: [C++][Parquet] Add DELTA_BYTE_ARRAY encoder to 
Parquet writer (#14341)
94bd0d2732 is described below

commit 94bd0d27326e8f3bfc0f612fcf604617402888ec
Author: Rok Mihevc <[email protected]>
AuthorDate: Mon Aug 21 19:02:55 2023 +0200

    GH-32863: [C++][Parquet] Add DELTA_BYTE_ARRAY encoder to Parquet writer 
(#14341)
    
    This is to add DELTA_BYTE_ARRAY encoder.
    * Closes: #32863
    
    Lead-authored-by: Rok Mihevc <[email protected]>
    Co-authored-by: Rok <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: Gang Wu <[email protected]>
    Co-authored-by: mwish <[email protected]>
    Co-authored-by: Will Jones <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/parquet/encoding.cc                | 495 ++++++++++++++++++++++++-----
 cpp/src/parquet/encoding.h                 |   2 +-
 cpp/src/parquet/encoding_test.cc           | 298 ++++++++++++++++-
 cpp/src/parquet/test_util.cc               |  52 +++
 cpp/src/parquet/test_util.h                |  38 ++-
 cpp/src/parquet/types.h                    |   5 +
 docs/source/cpp/parquet.rst                |   2 +-
 python/pyarrow/tests/parquet/test_basic.py |  25 +-
 8 files changed, 825 insertions(+), 92 deletions(-)

diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index e972a86ccf..a3cef4b4ce 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -56,6 +56,8 @@ using arrow::Status;
 using arrow::VisitNullBitmapInline;
 using arrow::internal::AddWithOverflow;
 using arrow::internal::checked_cast;
+using arrow::internal::MultiplyWithOverflow;
+using arrow::internal::SubtractWithOverflow;
 using arrow::util::SafeLoad;
 using arrow::util::SafeLoadAs;
 using std::string_view;
@@ -1178,41 +1180,126 @@ int PlainBooleanDecoder::Decode(bool* buffer, int 
max_values) {
   return max_values;
 }
 
-struct ArrowBinaryHelper {
-  explicit ArrowBinaryHelper(typename 
EncodingTraits<ByteArrayType>::Accumulator* out) {
-    this->out = out;
-    this->builder = out->builder.get();
-    this->chunk_space_remaining =
-        ::arrow::kBinaryMemoryLimit - this->builder->value_data_length();
+// A helper class to abstract away differences between 
EncodingTraits<DType>::Accumulator
+// for ByteArrayType and FLBAType.
+template <typename DType>
+struct ArrowBinaryHelper;
+
+template <>
+struct ArrowBinaryHelper<ByteArrayType> {
+  using Accumulator = typename EncodingTraits<ByteArrayType>::Accumulator;
+
+  ArrowBinaryHelper(Accumulator* acc, int64_t length)
+      : acc_(acc),
+        entries_remaining_(length),
+        chunk_space_remaining_(::arrow::kBinaryMemoryLimit -
+                               acc_->builder->value_data_length()) {}
+
+  Status Prepare(std::optional<int64_t> estimated_data_length = {}) {
+    RETURN_NOT_OK(acc_->builder->Reserve(entries_remaining_));
+    if (estimated_data_length.has_value()) {
+      RETURN_NOT_OK(acc_->builder->ReserveData(
+          std::min<int64_t>(*estimated_data_length, 
::arrow::kBinaryMemoryLimit)));
+    }
+    return Status::OK();
+  }
+
+  Status PrepareNextInput(int64_t next_value_length,
+                          std::optional<int64_t> 
estimated_remaining_data_length = {}) {
+    if (ARROW_PREDICT_FALSE(!CanFit(next_value_length))) {
+      // This element would exceed the capacity of a chunk
+      RETURN_NOT_OK(PushChunk());
+      RETURN_NOT_OK(acc_->builder->Reserve(entries_remaining_));
+      if (estimated_remaining_data_length.has_value()) {
+        RETURN_NOT_OK(acc_->builder->ReserveData(
+            std::min<int64_t>(*estimated_remaining_data_length, 
chunk_space_remaining_)));
+      }
+    }
+    return Status::OK();
+  }
+
+  void UnsafeAppend(const uint8_t* data, int32_t length) {
+    DCHECK(CanFit(length));
+    DCHECK_GT(entries_remaining_, 0);
+    chunk_space_remaining_ -= length;
+    --entries_remaining_;
+    acc_->builder->UnsafeAppend(data, length);
+  }
+
+  Status Append(const uint8_t* data, int32_t length) {
+    DCHECK(CanFit(length));
+    DCHECK_GT(entries_remaining_, 0);
+    chunk_space_remaining_ -= length;
+    --entries_remaining_;
+    return acc_->builder->Append(data, length);
+  }
+
+  void UnsafeAppendNull() {
+    --entries_remaining_;
+    acc_->builder->UnsafeAppendNull();
   }
 
+  Status AppendNull() {
+    --entries_remaining_;
+    return acc_->builder->AppendNull();
+  }
+
+ private:
   Status PushChunk() {
-    std::shared_ptr<::arrow::Array> result;
-    RETURN_NOT_OK(builder->Finish(&result));
-    out->chunks.push_back(result);
-    chunk_space_remaining = ::arrow::kBinaryMemoryLimit;
+    ARROW_ASSIGN_OR_RAISE(auto chunk, acc_->builder->Finish());
+    acc_->chunks.push_back(std::move(chunk));
+    chunk_space_remaining_ = ::arrow::kBinaryMemoryLimit;
     return Status::OK();
   }
 
-  bool CanFit(int64_t length) const { return length <= chunk_space_remaining; }
+  bool CanFit(int64_t length) const { return length <= chunk_space_remaining_; 
}
 
-  void UnsafeAppend(const uint8_t* data, int32_t length) {
-    chunk_space_remaining -= length;
-    builder->UnsafeAppend(data, length);
+  Accumulator* acc_;
+  int64_t entries_remaining_;
+  int64_t chunk_space_remaining_;
+};
+
+template <>
+struct ArrowBinaryHelper<FLBAType> {
+  using Accumulator = typename EncodingTraits<FLBAType>::Accumulator;
+
+  ArrowBinaryHelper(Accumulator* acc, int64_t length)
+      : acc_(acc), entries_remaining_(length) {}
+
+  Status Prepare(std::optional<int64_t> estimated_data_length = {}) {
+    return acc_->Reserve(entries_remaining_);
   }
 
-  void UnsafeAppendNull() { builder->UnsafeAppendNull(); }
+  Status PrepareNextInput(int64_t next_value_length,
+                          std::optional<int64_t> 
estimated_remaining_data_length = {}) {
+    return Status::OK();
+  }
+
+  void UnsafeAppend(const uint8_t* data, int32_t length) {
+    DCHECK_GT(entries_remaining_, 0);
+    --entries_remaining_;
+    acc_->UnsafeAppend(data);
+  }
 
   Status Append(const uint8_t* data, int32_t length) {
-    chunk_space_remaining -= length;
-    return builder->Append(data, length);
+    DCHECK_GT(entries_remaining_, 0);
+    --entries_remaining_;
+    return acc_->Append(data);
+  }
+
+  void UnsafeAppendNull() {
+    --entries_remaining_;
+    acc_->UnsafeAppendNull();
   }
 
-  Status AppendNull() { return builder->AppendNull(); }
+  Status AppendNull() {
+    --entries_remaining_;
+    return acc_->AppendNull();
+  }
 
-  typename EncodingTraits<ByteArrayType>::Accumulator* out;
-  ::arrow::BinaryBuilder* builder;
-  int64_t chunk_space_remaining;
+ private:
+  Accumulator* acc_;
+  int64_t entries_remaining_;
 };
 
 template <>
@@ -1313,12 +1400,10 @@ class PlainByteArrayDecoder : public 
PlainDecoder<ByteArrayType>,
                           int64_t valid_bits_offset,
                           typename EncodingTraits<ByteArrayType>::Accumulator* 
out,
                           int* out_values_decoded) {
-    ArrowBinaryHelper helper(out);
+    ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
     int values_decoded = 0;
 
-    RETURN_NOT_OK(helper.builder->Reserve(num_values));
-    RETURN_NOT_OK(helper.builder->ReserveData(
-        std::min<int64_t>(len_, helper.chunk_space_remaining)));
+    RETURN_NOT_OK(helper.Prepare(len_));
 
     int i = 0;
     RETURN_NOT_OK(VisitNullBitmapInline(
@@ -1335,13 +1420,7 @@ class PlainByteArrayDecoder : public 
PlainDecoder<ByteArrayType>,
           if (ARROW_PREDICT_FALSE(len_ < increment)) {
             ParquetException::EofException();
           }
-          if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) {
-            // This element would exceed the capacity of a chunk
-            RETURN_NOT_OK(helper.PushChunk());
-            RETURN_NOT_OK(helper.builder->Reserve(num_values - i));
-            RETURN_NOT_OK(helper.builder->ReserveData(
-                std::min<int64_t>(len_, helper.chunk_space_remaining)));
-          }
+          RETURN_NOT_OK(helper.PrepareNextInput(value_len, len_));
           helper.UnsafeAppend(data_ + 4, value_len);
           data_ += increment;
           len_ -= increment;
@@ -1421,7 +1500,7 @@ class DictDecoderImpl : public DecoderImpl, virtual 
public DictDecoder<Type> {
         byte_array_offsets_(AllocateBuffer(pool, 0)),
         indices_scratch_space_(AllocateBuffer(pool, 0)) {}
 
-  // Perform type-specific initiatialization
+  // Perform type-specific initialization
   void SetDict(TypedDecoder<Type>* dictionary) override;
 
   void SetData(int num_values, const uint8_t* data, int len) override {
@@ -1834,7 +1913,8 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
     constexpr int32_t kBufferSize = 1024;
     int32_t indices[kBufferSize];
 
-    ArrowBinaryHelper helper(out);
+    ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
+    RETURN_NOT_OK(helper.Prepare());
 
     auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
     int values_decoded = 0;
@@ -1855,9 +1935,7 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
       const auto index = indices[pos_indices++];
       RETURN_NOT_OK(IndexInBounds(index));
       const auto& val = dict_values[index];
-      if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
-        RETURN_NOT_OK(helper.PushChunk());
-      }
+      RETURN_NOT_OK(helper.PrepareNextInput(val.len));
       RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
       ++values_decoded;
       return Status::OK();
@@ -1903,20 +1981,21 @@ class DictByteArrayDecoderImpl : public 
DictDecoderImpl<ByteArrayType>,
     int32_t indices[kBufferSize];
     int values_decoded = 0;
 
-    ArrowBinaryHelper helper(out);
+    ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
+    RETURN_NOT_OK(helper.Prepare(len_));
+
     auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
 
     while (values_decoded < num_values) {
-      int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - 
values_decoded);
-      int num_indices = idx_decoder_.GetBatch(indices, batch_size);
+      const int32_t batch_size =
+          std::min<int32_t>(kBufferSize, num_values - values_decoded);
+      const int num_indices = idx_decoder_.GetBatch(indices, batch_size);
       if (num_indices == 0) ParquetException::EofException();
       for (int i = 0; i < num_indices; ++i) {
         auto idx = indices[i];
         RETURN_NOT_OK(IndexInBounds(idx));
         const auto& val = dict_values[idx];
-        if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
-          RETURN_NOT_OK(helper.PushChunk());
-        }
+        RETURN_NOT_OK(helper.PrepareNextInput(val.len));
         RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
       }
       values_decoded += num_indices;
@@ -2746,7 +2825,8 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
                           int64_t valid_bits_offset,
                           typename EncodingTraits<ByteArrayType>::Accumulator* 
out,
                           int* out_num_values) {
-    ArrowBinaryHelper helper(out);
+    ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
+    RETURN_NOT_OK(helper.Prepare());
 
     std::vector<ByteArray> values(num_values - null_count);
     const int num_valid_values = Decode(values.data(), num_values - 
null_count);
@@ -2762,9 +2842,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
         valid_bits, valid_bits_offset, num_values, null_count,
         [&]() {
           const auto& val = values_ptr[value_idx];
-          if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
-            RETURN_NOT_OK(helper.PushChunk());
-          }
+          RETURN_NOT_OK(helper.PrepareNextInput(val.len));
           RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
           ++value_idx;
           return Status::OK();
@@ -2782,8 +2860,8 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
 
   std::shared_ptr<::arrow::bit_util::BitReader> decoder_;
   DeltaBitPackDecoder<Int32Type> len_decoder_;
-  int num_valid_values_;
-  uint32_t length_idx_;
+  int num_valid_values_{0};
+  uint32_t length_idx_{0};
   std::shared_ptr<ResizableBuffer> buffered_length_;
 };
 
@@ -2977,12 +3055,238 @@ class RleBooleanDecoder : public DecoderImpl, virtual 
public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front 
compression:
+/// for each element in a sequence of strings, store the prefix length of the 
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths 
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public 
TypedEncoder<DType> {
+  static constexpr std::string_view kEmpty = "";
+
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = 
::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(/*descr=*/nullptr, pool),
+        suffix_encoder_(descr, pool),
+        last_value_(""),
+        empty_(static_cast<uint32_t>(kEmpty.size()),
+               reinterpret_cast<const uint8_t*>(kEmpty.data())) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != nullptr) {
+      if (buffer_ == nullptr) {
+        PARQUET_ASSIGN_OR_THROW(buffer_,
+                                ::arrow::AllocateResizableBuffer(num_values * 
sizeof(T),
+                                                                 
this->memory_pool()));
+      } else {
+        PARQUET_THROW_NOT_OK(buffer_->Resize(num_values * sizeof(T), false));
+      }
+      T* data = reinterpret_cast<T*>(buffer_->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename VisitorType>
+  void PutInternal(const T* src, int num_values, const VisitorType visitor) {
+    if (num_values == 0) {
+      return;
+    }
+
+    std::string_view last_value_view = last_value_;
+    constexpr int kBatchSize = 256;
+    std::array<int32_t, kBatchSize> prefix_lengths;
+    std::array<ByteArray, kBatchSize> suffixes;
+
+    for (int i = 0; i < num_values; i += kBatchSize) {
+      const int batch_size = std::min(kBatchSize, num_values - i);
+
+      for (int j = 0; j < batch_size; ++j) {
+        const int idx = i + j;
+        const auto view = visitor[idx];
+        const auto len = static_cast<const uint32_t>(view.length());
+
+        uint32_t common_prefix_length = 0;
+        const uint32_t maximum_common_prefix_length =
+            std::min(len, static_cast<uint32_t>(last_value_view.length()));
+        while (common_prefix_length < maximum_common_prefix_length) {
+          if (last_value_view[common_prefix_length] != 
view[common_prefix_length]) {
+            break;
+          }
+          common_prefix_length++;
+        }
+
+        last_value_view = view;
+        prefix_lengths[j] = common_prefix_length;
+        const uint32_t suffix_length = len - common_prefix_length;
+        const uint8_t* suffix_ptr = src[idx].ptr + common_prefix_length;
+
+        // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+        const ByteArray suffix(suffix_length, suffix_ptr);
+        suffixes[j] = suffix;
+      }
+      suffix_encoder_.Put(suffixes.data(), batch_size);
+      prefix_length_encoder_.Put(prefix_lengths.data(), batch_size);
+    }
+    last_value_ = last_value_view;
+  }
+
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    auto previous_len = static_cast<uint32_t>(last_value_.length());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename 
ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB 
or more");
+          }
+          const ByteArray src{view};
+
+          uint32_t common_prefix_length = 0;
+          const uint32_t len = src.len;
+          const uint32_t maximum_common_prefix_length = std::min(previous_len, 
len);
+          while (common_prefix_length < maximum_common_prefix_length) {
+            if (last_value_view[common_prefix_length] != 
view[common_prefix_length]) {
+              break;
+            }
+            common_prefix_length++;
+          }
+          previous_len = len;
+          
prefix_length_encoder_.Put({static_cast<int32_t>(common_prefix_length)}, 1);
+
+          last_value_view = view;
+          const auto suffix_length = static_cast<uint32_t>(len - 
common_prefix_length);
+          if (suffix_length == 0) {
+            suffix_encoder_.Put(&empty_, 1);
+            return Status::OK();
+          }
+          const uint8_t* suffix_ptr = src.ptr + common_prefix_length;
+          // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+          const ByteArray suffix(suffix_length, suffix_ptr);
+          suffix_encoder_.Put(&suffix, 1);
+
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+    last_value_ = last_value_view;
+  }
+
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+  DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+  std::string last_value_;
+  const ByteArray empty_;
+  std::unique_ptr<ResizableBuffer> buffer_;
+};
+
+struct ByteArrayVisitor {
+  const ByteArray* src;
+
+  std::string_view operator[](int i) const {
+    if (ARROW_PREDICT_FALSE(src[i].len >= kMaxByteArraySize)) {
+      throw ParquetException("Parquet cannot store strings with size 2GB or 
more");
+    }
+    return std::string_view{src[i]};
+  }
+
+  uint32_t len(int i) const { return src[i].len; }
+};
+
+struct FLBAVisitor {
+  const FLBA* src;
+  const uint32_t type_length;
+
+  std::string_view operator[](int i) const {
+    return std::string_view{reinterpret_cast<const char*>(src[i].ptr), 
type_length};
+  }
+
+  uint32_t len(int i) const { return type_length; }
+};
+
+template <>
+void DeltaByteArrayEncoder<ByteArrayType>::Put(const ByteArray* src, int 
num_values) {
+  auto visitor = ByteArrayVisitor{src};
+  PutInternal<ByteArrayVisitor>(src, num_values, visitor);
+}
+
+template <>
+void DeltaByteArrayEncoder<FLBAType>::Put(const FLBA* src, int num_values) {
+  auto visitor = FLBAVisitor{src, 
static_cast<uint32_t>(descr_->type_length())};
+  PutInternal<FLBAVisitor>(src, num_values, visitor);
+}
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const ::arrow::Array& values) {
+  if (::arrow::is_binary_like(values.type_id())) {
+    PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
+  } else if (::arrow::is_large_binary_like(values.type_id())) {
+    PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
+  } else if (::arrow::is_fixed_size_binary(values.type_id())) {
+    PutBinaryArray(checked_cast<const ::arrow::FixedSizeBinaryArray&>(values));
+  } else {
+    throw ParquetException("Only BaseBinaryArray and subclasses supported");
+  }
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaByteArrayEncoder<DType>::FlushValues() {
+  PARQUET_THROW_NOT_OK(sink_.Resize(EstimatedDataEncodedSize(), false));
+
+  std::shared_ptr<Buffer> prefix_lengths = 
prefix_length_encoder_.FlushValues();
+  PARQUET_THROW_NOT_OK(sink_.Append(prefix_lengths->data(), 
prefix_lengths->size()));
+
+  std::shared_ptr<Buffer> suffixes = suffix_encoder_.FlushValues();
+  PARQUET_THROW_NOT_OK(sink_.Append(suffixes->data(), suffixes->size()));
+
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+  last_value_.clear();
+  return buffer;
+}
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayDecoder
+
+template <typename DType>
+class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public 
TypedDecoder<DType> {
+  using T = typename DType::c_type;
+
+ public:
+  explicit DeltaByteArrayDecoderImpl(const ColumnDescriptor* descr,
+                                     MemoryPool* pool = 
::arrow::default_memory_pool())
       : DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY),
+        pool_(pool),
         prefix_len_decoder_(nullptr, pool),
         suffix_decoder_(nullptr, pool),
         last_value_in_previous_page_(""),
@@ -3017,27 +3321,22 @@ class DeltaByteArrayDecoder : public DecoderImpl,
     last_value_ = "";
   }
 
-  int Decode(ByteArray* buffer, int max_values) override {
-    return GetInternal(buffer, max_values);
-  }
-
   int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                   int64_t valid_bits_offset,
-                  typename EncodingTraits<ByteArrayType>::Accumulator* out) 
override {
+                  typename EncodingTraits<DType>::Accumulator* out) override {
     int result = 0;
     PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
                                           valid_bits_offset, out, &result));
     return result;
   }
 
-  int DecodeArrow(
-      int num_values, int null_count, const uint8_t* valid_bits,
-      int64_t valid_bits_offset,
-      typename EncodingTraits<ByteArrayType>::DictAccumulator* builder) 
override {
+  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+                  int64_t valid_bits_offset,
+                  typename EncodingTraits<DType>::DictAccumulator* builder) 
override {
     ParquetException::NYI("DecodeArrow of DictAccumulator for 
DeltaByteArrayDecoder");
   }
 
- private:
+ protected:
   int GetInternal(ByteArray* buffer, int max_values) {
     // Decode up to `max_values` strings into an internal buffer
     // and reference them into `buffer`.
@@ -3080,7 +3379,7 @@ class DeltaByteArrayDecoder : public DecoderImpl,
       buffer[i].ptr = data_ptr;
       buffer[i].len += prefix_len_ptr[i];
       data_ptr += buffer[i].len;
-      prefix = string_view{reinterpret_cast<const char*>(buffer[i].ptr), 
buffer[i].len};
+      prefix = std::string_view{buffer[i]};
     }
     prefix_len_offset_ += max_values;
     this->num_values_ -= max_values;
@@ -3095,9 +3394,10 @@ class DeltaByteArrayDecoder : public DecoderImpl,
 
   Status DecodeArrowDense(int num_values, int null_count, const uint8_t* 
valid_bits,
                           int64_t valid_bits_offset,
-                          typename EncodingTraits<ByteArrayType>::Accumulator* 
out,
+                          typename EncodingTraits<DType>::Accumulator* out,
                           int* out_num_values) {
-    ArrowBinaryHelper helper(out);
+    ArrowBinaryHelper<DType> helper(out, num_values);
+    RETURN_NOT_OK(helper.Prepare());
 
     std::vector<ByteArray> values(num_values);
     const int num_valid_values = GetInternal(values.data(), num_values - 
null_count);
@@ -3110,9 +3410,7 @@ class DeltaByteArrayDecoder : public DecoderImpl,
         valid_bits, valid_bits_offset, num_values, null_count,
         [&]() {
           const auto& val = values_ptr[value_idx];
-          if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
-            RETURN_NOT_OK(helper.PushChunk());
-          }
+          RETURN_NOT_OK(helper.PrepareNextInput(val.len));
           RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
           ++value_idx;
           return Status::OK();
@@ -3128,18 +3426,54 @@ class DeltaByteArrayDecoder : public DecoderImpl,
     return Status::OK();
   }
 
+  MemoryPool* pool_;
+
+ private:
   std::shared_ptr<::arrow::bit_util::BitReader> decoder_;
   DeltaBitPackDecoder<Int32Type> prefix_len_decoder_;
   DeltaLengthByteArrayDecoder suffix_decoder_;
   std::string last_value_;
   // string buffer for last value in previous page
   std::string last_value_in_previous_page_;
-  int num_valid_values_;
-  uint32_t prefix_len_offset_;
+  int num_valid_values_{0};
+  uint32_t prefix_len_offset_{0};
   std::shared_ptr<ResizableBuffer> buffered_prefix_length_;
   std::shared_ptr<ResizableBuffer> buffered_data_;
 };
 
+class DeltaByteArrayDecoder : public DeltaByteArrayDecoderImpl<ByteArrayType> {
+ public:
+  using Base = DeltaByteArrayDecoderImpl<ByteArrayType>;
+  using Base::DeltaByteArrayDecoderImpl;
+
+  int Decode(ByteArray* buffer, int max_values) override {
+    return GetInternal(buffer, max_values);
+  }
+};
+
+class DeltaByteArrayFLBADecoder : public DeltaByteArrayDecoderImpl<FLBAType>,
+                                  virtual public FLBADecoder {
+ public:
+  using Base = DeltaByteArrayDecoderImpl<FLBAType>;
+  using Base::DeltaByteArrayDecoderImpl;
+  using Base::pool_;
+
+  int Decode(FixedLenByteArray* buffer, int max_values) override {
+    // GetInternal currently only support ByteArray.
+    std::vector<ByteArray> decode_byte_array(max_values);
+    const int decoded_values_size = GetInternal(decode_byte_array.data(), 
max_values);
+    const uint32_t type_length = descr_->type_length();
+
+    for (int i = 0; i < decoded_values_size; i++) {
+      if (ARROW_PREDICT_FALSE(decode_byte_array[i].len != type_length)) {
+        throw ParquetException("Fixed length byte array length mismatch");
+      }
+      buffer[i].ptr = decode_byte_array[i].ptr;
+    }
+    return decoded_values_size;
+  }
+};
+
 // ----------------------------------------------------------------------
 // BYTE_STREAM_SPLIT
 
@@ -3353,6 +3687,16 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type 
type_num, Encoding::type encodin
       default:
         throw ParquetException("RLE only supports BOOLEAN");
     }
+  } else if (encoding == Encoding::DELTA_BYTE_ARRAY) {
+    switch (type_num) {
+      case Type::BYTE_ARRAY:
+        return std::make_unique<DeltaByteArrayEncoder<ByteArrayType>>(descr, 
pool);
+      case Type::FIXED_LEN_BYTE_ARRAY:
+        return std::make_unique<DeltaByteArrayEncoder<FLBAType>>(descr, pool);
+      default:
+        throw ParquetException(
+            "DELTA_BYTE_ARRAY only supports BYTE_ARRAY and 
FIXED_LEN_BYTE_ARRAY");
+    }
   } else {
     ParquetException::NYI("Selected encoding is not supported");
   }
@@ -3404,10 +3748,15 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type 
type_num, Encoding::type encodin
             "DELTA_BINARY_PACKED decoder only supports INT32 and INT64");
     }
   } else if (encoding == Encoding::DELTA_BYTE_ARRAY) {
-    if (type_num == Type::BYTE_ARRAY) {
-      return std::make_unique<DeltaByteArrayDecoder>(descr, pool);
+    switch (type_num) {
+      case Type::BYTE_ARRAY:
+        return std::make_unique<DeltaByteArrayDecoder>(descr, pool);
+      case Type::FIXED_LEN_BYTE_ARRAY:
+        return std::make_unique<DeltaByteArrayFLBADecoder>(descr, pool);
+      default:
+        throw ParquetException(
+            "DELTA_BYTE_ARRAY only supports BYTE_ARRAY and 
FIXED_LEN_BYTE_ARRAY");
     }
-    throw ParquetException("DELTA_BYTE_ARRAY only supports BYTE_ARRAY");
   } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) {
     if (type_num == Type::BYTE_ARRAY) {
       return std::make_unique<DeltaLengthByteArrayDecoder>(descr, pool);
diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h
index 9f9b740ff3..6cdfe37920 100644
--- a/cpp/src/parquet/encoding.h
+++ b/cpp/src/parquet/encoding.h
@@ -141,13 +141,13 @@ struct EncodingTraits<ByteArrayType> {
   using Encoder = ByteArrayEncoder;
   using Decoder = ByteArrayDecoder;
 
+  using ArrowType = ::arrow::BinaryType;
   /// \brief Internal helper class for decoding BYTE_ARRAY data where we can
   /// overflow the capacity of a single arrow::BinaryArray
   struct Accumulator {
     std::unique_ptr<::arrow::BinaryBuilder> builder;
     std::vector<std::shared_ptr<::arrow::Array>> chunks;
   };
-  using ArrowType = ::arrow::BinaryType;
   using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::BinaryType>;
 };
 
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index 0ac5fd76e7..71dc40d33a 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -36,6 +36,7 @@
 #include "arrow/util/bitmap_writer.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/endian.h"
+#include "arrow/util/span.h"
 #include "arrow/util/string.h"
 #include "parquet/encoding.h"
 #include "parquet/platform.h"
@@ -181,7 +182,7 @@ class TestEncodingBase : public ::testing::Test {
 
   void TearDown() {}
 
-  void InitData(int nvalues, int repeats) {
+  virtual void InitData(int nvalues, int repeats) {
     num_values_ = nvalues * repeats;
     input_bytes_.resize(num_values_ * sizeof(c_type));
     output_bytes_.resize(num_values_ * sizeof(c_type));
@@ -1705,11 +1706,13 @@ class TestDeltaLengthByteArrayEncoding : public 
TestEncodingBase<Type> {
   using c_type = typename Type::c_type;
   static constexpr int TYPE = Type::type_num;
 
+  virtual Encoding::type GetEncoding() { return 
Encoding::DELTA_LENGTH_BYTE_ARRAY; }
+
   virtual void CheckRoundtrip() {
-    auto encoder = MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
+    auto encoding = GetEncoding();
+    auto encoder = MakeTypedEncoder<Type>(encoding,
                                           /*use_dictionary=*/false, 
descr_.get());
-    auto decoder =
-        MakeTypedDecoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(encoding, descr_.get());
 
     encoder->Put(draws_, num_values_);
     encode_buffer_ = encoder->FlushValues();
@@ -1722,10 +1725,10 @@ class TestDeltaLengthByteArrayEncoding : public 
TestEncodingBase<Type> {
   }
 
   void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t 
valid_bits_offset) {
-    auto encoder = MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
+    auto encoding = GetEncoding();
+    auto encoder = MakeTypedEncoder<Type>(encoding,
                                           /*use_dictionary=*/false, 
descr_.get());
-    auto decoder =
-        MakeTypedDecoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, 
descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(encoding, descr_.get());
     int null_count = 0;
     for (auto i = 0; i < num_values_; i++) {
       if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
@@ -1771,6 +1774,19 @@ std::shared_ptr<Buffer> DeltaEncode(std::vector<int32_t> 
lengths) {
   return encoder->FlushValues();
 }
 
+std::shared_ptr<Buffer> DeltaEncode(::arrow::util::span<const int32_t> 
lengths) {
+  auto encoder = MakeTypedEncoder<Int32Type>(Encoding::DELTA_BINARY_PACKED);
+  encoder->Put(lengths.data(), static_cast<int>(lengths.size()));
+  return encoder->FlushValues();
+}
+
+std::shared_ptr<Buffer> DeltaEncode(std::shared_ptr<::arrow::Array>& lengths) {
+  auto data = ::arrow::internal::checked_pointer_cast<const 
::arrow::Int32Array>(lengths);
+  auto span = ::arrow::util::span<const int32_t>{data->raw_values(),
+                                                 
static_cast<size_t>(lengths->length())};
+  return DeltaEncode(span);
+}
+
 TEST(TestDeltaLengthByteArrayEncoding, AdHocRoundTrip) {
   const std::shared_ptr<::arrow::Array> cases[] = {
       ::arrow::ArrayFromJSON(::arrow::utf8(), R"([])"),
@@ -1780,10 +1796,10 @@ TEST(TestDeltaLengthByteArrayEncoding, AdHocRoundTrip) {
   };
 
   std::string expected_encoded_vals[] = {
-      DeltaEncode({})->ToString(),
-      DeltaEncode({3, 2, 0})->ToString() + "abcde",
-      DeltaEncode({0, 0, 0})->ToString(),
-      DeltaEncode({0, 3})->ToString() + "xyz",
+      DeltaEncode(std::vector<int>({}))->ToString(),
+      DeltaEncode(std::vector<int>({3, 2, 0}))->ToString() + "abcde",
+      DeltaEncode(std::vector<int>({0, 0, 0}))->ToString(),
+      DeltaEncode(std::vector<int>({0, 3}))->ToString() + "xyz",
   };
 
   auto encoder = 
MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
@@ -1894,7 +1910,6 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, 
ArrowBinaryDirectPut) {
     ASSERT_EQ(values->length(), result->length());
     ASSERT_OK(result->ValidateFull());
 
-    auto upcast_result = CastBinaryTypesHelper(result, values->type());
     ::arrow::AssertArraysEqual(*values, *result);
   };
 
@@ -1977,4 +1992,263 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) 
{
   CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), 
values));
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public 
TestDeltaLengthByteArrayEncoding<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+  static constexpr double prefixed_probability = 0.5;
+
+  void InitData(int nvalues, int repeats) override {
+    num_values_ = nvalues * repeats;
+    input_bytes_.resize(num_values_ * sizeof(c_type));
+    output_bytes_.resize(num_values_ * sizeof(c_type));
+    draws_ = reinterpret_cast<c_type*>(input_bytes_.data());
+    decode_buf_ = reinterpret_cast<c_type*>(output_bytes_.data());
+    GeneratePrefixedData<c_type>(nvalues, draws_, &data_buffer_, 
prefixed_probability);
+
+    // add some repeated values
+    for (int j = 1; j < repeats; ++j) {
+      for (int i = 0; i < nvalues; ++i) {
+        draws_[nvalues * j + i] = draws_[i];
+      }
+    }
+  }
+
+  Encoding::type GetEncoding() override { return Encoding::DELTA_BYTE_ARRAY; }
+
+ protected:
+  USING_BASE_MEMBERS();
+  std::vector<uint8_t> input_bytes_;
+  std::vector<uint8_t> output_bytes_;
+};
+
+using TestDeltaByteArrayEncodingTypes = ::testing::Types<ByteArrayType, 
FLBAType>;
+TYPED_TEST_SUITE(TestDeltaByteArrayEncoding, TestDeltaByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, /*repeats=*/0));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(250, 5));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 1));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, 
/*null_probability*/
+      0));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 10, /*valid_bits_offset*/ 64,
+      /*null_probability*/ 0.5));
+}
+
+template <typename Type>
+class TestDeltaByteArrayEncodingDirectPut : public TestEncodingBase<Type> {
+  using ArrowType = typename EncodingTraits<Type>::ArrowType;
+  using Accumulator = typename EncodingTraits<Type>::Accumulator;
+  using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
+
+ public:
+  std::unique_ptr<TypedEncoder<Type>> encoder =
+      MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY);
+  std::unique_ptr<TypedDecoder<Type>> decoder =
+      MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY);
+
+  void CheckDirectPut(std::shared_ptr<::arrow::Array> array);
+
+  void CheckRoundtrip() override;
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+template <>
+void TestDeltaByteArrayEncodingDirectPut<ByteArrayType>::CheckDirectPut(
+    std::shared_ptr<::arrow::Array> array) {
+  ASSERT_NO_THROW(encoder->Put(*array));
+  auto buf = encoder->FlushValues();
+
+  int num_values = static_cast<int>(array->length() - array->null_count());
+  decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+  Accumulator acc;
+  acc.builder = std::make_unique<BuilderType>(array->type(), 
default_memory_pool());
+
+  ASSERT_EQ(num_values,
+            decoder->DecodeArrow(static_cast<int>(array->length()),
+                                 static_cast<int>(array->null_count()),
+                                 array->null_bitmap_data(), array->offset(), 
&acc));
+
+  ASSERT_EQ(acc.chunks.size(), 0) << "Accumulator shouldn't have overflowed 
chunks";
+  ASSERT_OK_AND_ASSIGN(auto result, acc.builder->Finish());
+  ASSERT_EQ(array->length(), result->length());
+  ASSERT_OK(result->ValidateFull());
+
+  ::arrow::AssertArraysEqual(*array, *result);
+}
+
+template <>
+void TestDeltaByteArrayEncodingDirectPut<FLBAType>::CheckDirectPut(
+    std::shared_ptr<::arrow::Array> array) {
+  ASSERT_NO_THROW(encoder->Put(*array));
+  auto buf = encoder->FlushValues();
+
+  int num_values = static_cast<int>(array->length() - array->null_count());
+  decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+  Accumulator acc(array->type(), default_memory_pool());
+
+  ASSERT_EQ(num_values,
+            decoder->DecodeArrow(static_cast<int>(array->length()),
+                                 static_cast<int>(array->null_count()),
+                                 array->null_bitmap_data(), array->offset(), 
&acc));
+
+  ASSERT_OK_AND_ASSIGN(auto result, acc.Finish());
+  ASSERT_EQ(array->length(), result->length());
+  ASSERT_OK(result->ValidateFull());
+
+  ::arrow::AssertArraysEqual(*array, *result);
+}
+
+template <>
+void TestDeltaByteArrayEncodingDirectPut<ByteArrayType>::CheckRoundtrip() {
+  constexpr int64_t kSize = 500;
+  constexpr int32_t kMinLength = 0;
+  constexpr int32_t kMaxLength = 10;
+  constexpr int32_t kNumUnique = 10;
+  constexpr double kNullProbability = 0.25;
+  constexpr int kSeed = 42;
+  ::arrow::random::RandomArrayGenerator rag{kSeed};
+  std::shared_ptr<::arrow::Array> values = rag.BinaryWithRepeats(
+      /*size=*/1, /*unique=*/1, kMinLength, kMaxLength, kNullProbability);
+  CheckDirectPut(values);
+
+  for (int i = 0; i < 10; ++i) {
+    values = rag.BinaryWithRepeats(kSize, kNumUnique, kMinLength, kMaxLength,
+                                   kNullProbability);
+    CheckDirectPut(values);
+  }
+}
+
+template <>
+void TestDeltaByteArrayEncodingDirectPut<FLBAType>::CheckRoundtrip() {
+  constexpr int64_t kSize = 50;
+  constexpr int kSeed = 42;
+  constexpr int kByteWidth = 4;
+  ::arrow::random::RandomArrayGenerator rag{kSeed};
+  std::shared_ptr<::arrow::Array> values =
+      rag.FixedSizeBinary(/*size=*/0, /*byte_width=*/kByteWidth);
+  CheckDirectPut(values);
+
+  for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
+    values = rag.FixedSizeBinary(kSize + seed, kByteWidth);
+    CheckDirectPut(values);
+  }
+}
+
+TYPED_TEST_SUITE(TestDeltaByteArrayEncodingDirectPut, 
TestDeltaByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaByteArrayEncodingDirectPut, DirectPut) {
+  ASSERT_NO_FATAL_FAILURE(this->CheckRoundtrip());
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) {
+  auto CheckEncode = [](const std::shared_ptr<::arrow::Array>& values,
+                        const std::shared_ptr<Buffer>& encoded) {
+    auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+    ASSERT_NO_THROW(encoder->Put(*values));
+    auto buf = encoder->FlushValues();
+    ASSERT_TRUE(encoded->Equals(*buf));
+  };
+
+  auto CheckDecode = [](std::shared_ptr<Buffer> buf,
+                        std::shared_ptr<::arrow::Array> values) {
+    int num_values = static_cast<int>(values->length());
+    auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+    decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+    typename EncodingTraits<ByteArrayType>::Accumulator acc;
+    if (::arrow::is_string(values->type()->id())) {
+      acc.builder = std::make_unique<::arrow::StringBuilder>();
+    } else {
+      acc.builder = std::make_unique<::arrow::BinaryBuilder>();
+    }
+
+    ASSERT_EQ(num_values,
+              decoder->DecodeArrow(static_cast<int>(values->length()),
+                                   static_cast<int>(values->null_count()),
+                                   values->null_bitmap_data(), 
values->offset(), &acc));
+
+    std::shared_ptr<::arrow::Array> result;
+    ASSERT_OK(acc.builder->Finish(&result));
+    ASSERT_EQ(num_values, result->length());
+    ASSERT_OK(result->ValidateFull());
+
+    auto upcast_result = CastBinaryTypesHelper(result, values->type());
+    ::arrow::AssertArraysEqual(*values, *upcast_result);
+  };
+
+  auto CheckEncodeDecode = [&](std::string_view values,
+                               std::shared_ptr<::arrow::Array> prefix_lengths,
+                               std::shared_ptr<::arrow::Array> suffix_lengths,
+                               std::string_view suffix_data) {
+    auto encoded = ::arrow::ConcatenateBuffers({DeltaEncode(prefix_lengths),
+                                                DeltaEncode(suffix_lengths),
+                                                
std::make_shared<Buffer>(suffix_data)})
+                       .ValueOrDie();
+
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::utf8(), values), encoded);
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::large_utf8(), values), 
encoded);
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::binary(), values), encoded);
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::large_binary(), values), 
encoded);
+
+    CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::utf8(), values));
+    CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_utf8(), 
values));
+    CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::binary(), values));
+    CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), 
values));
+  };
+
+  {
+    auto values = R"(["axis", "axle", "babble", "babyhood"])";
+    auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 2, 
0, 3])");
+    auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([4, 2, 
6, 5])");
+
+    constexpr std::string_view suffix_data = "axislebabbleyhood";
+    CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
+  }
+
+  {
+    auto values = R"(["axis", "axis", "axis", "axis"])";
+    auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 4, 
4, 4])");
+    auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([4, 0, 
0, 0])");
+
+    constexpr std::string_view suffix_data = "axis";
+    CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
+  }
+
+  {
+    auto values = R"(["axisba", "axis", "axis", "axis"])";
+    auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 4, 
4, 4])");
+    auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([6, 0, 
0, 0])");
+
+    constexpr std::string_view suffix_data = "axisba";
+    CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
+  }
+
+  {
+    auto values = R"(["baaxis", "axis", "axis", "axis"])";
+    auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 0, 
4, 4])");
+    auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([6, 4, 
0, 0])");
+
+    constexpr std::string_view suffix_data = "baaxisaxis";
+    CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
+  }
+
+  {
+    auto values = R"(["καλημέρα", "καμηλιέρη", "καμηλιέρη", "καλημέρα"])";
+    auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 5, 
18, 5])");
+    auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([16, 13, 
0, 11])");
+    const std::string suffix_data = "καλημέρα\xbcηλιέρη\xbbημέρα";
+    CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
+  }
+}
 }  // namespace parquet::test
diff --git a/cpp/src/parquet/test_util.cc b/cpp/src/parquet/test_util.cc
index 9d104618bf..b65945cc73 100644
--- a/cpp/src/parquet/test_util.cc
+++ b/cpp/src/parquet/test_util.cc
@@ -30,6 +30,7 @@
 #include <utility>
 #include <vector>
 
+#include "arrow/testing/uniform_real.h"
 #include "parquet/column_page.h"
 #include "parquet/column_reader.h"
 #include "parquet/column_writer.h"
@@ -132,5 +133,56 @@ void random_byte_array(int n, uint32_t seed, uint8_t* buf, 
ByteArray* out, int m
   random_byte_array(n, seed, buf, out, 0, max_size);
 }
 
+void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* 
out,
+                                int min_size, int max_size, double 
prefixed_probability) {
+  std::default_random_engine gen(seed);
+  std::uniform_int_distribution<int> dist_size(min_size, max_size);
+  std::uniform_int_distribution<int> dist_byte(0, 255);
+  std::bernoulli_distribution dist_has_prefix(prefixed_probability);
+  std::uniform_real_distribution<double> dist_prefix_length(0, 1);
+
+  for (int i = 0; i < n; ++i) {
+    int len = dist_size(gen);
+    out[i].len = len;
+    out[i].ptr = buf;
+
+    bool do_prefix = dist_has_prefix(gen) && i > 0;
+    int prefix_len = 0;
+    if (do_prefix) {
+      int max_prefix_len = std::min(len, static_cast<int>(out[i - 1].len));
+      prefix_len = static_cast<int>(std::ceil(max_prefix_len * 
dist_prefix_length(gen)));
+    }
+    for (int j = 0; j < prefix_len; ++j) {
+      buf[j] = out[i - 1].ptr[j];
+    }
+    for (int j = prefix_len; j < len; ++j) {
+      buf[j] = static_cast<uint8_t>(dist_byte(gen));
+    }
+    buf += len;
+  }
+}
+
+void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, int len, 
FLBA* out,
+                                double prefixed_probability) {
+  std::default_random_engine gen(seed);
+  std::uniform_int_distribution<int> dist_byte(0, 255);
+  std::bernoulli_distribution dist_has_prefix(prefixed_probability);
+  std::uniform_int_distribution<int> dist_size(0, len);
+
+  for (int i = 0; i < n; ++i) {
+    out[i].ptr = buf;
+
+    bool do_prefix = dist_has_prefix(gen) && i > 0;
+    int prefix_len = do_prefix ? dist_size(gen) : 0;
+    for (int j = 0; j < prefix_len; ++j) {
+      buf[j] = out[i - 1].ptr[j];
+    }
+    for (int j = prefix_len; j < len; ++j) {
+      buf[j] = static_cast<uint8_t>(dist_byte(gen));
+    }
+    buf += len;
+  }
+}
+
 }  // namespace test
 }  // namespace parquet
diff --git a/cpp/src/parquet/test_util.h b/cpp/src/parquet/test_util.h
index b0aafa037e..c8578609e9 100644
--- a/cpp/src/parquet/test_util.h
+++ b/cpp/src/parquet/test_util.h
@@ -155,6 +155,12 @@ void random_byte_array(int n, uint32_t seed, uint8_t* buf, 
ByteArray* out, int m
 
 void random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, int 
max_size);
 
+void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* 
out,
+                                int min_size, int max_size, double 
prefixed_probability);
+
+void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, int len, 
FLBA* out,
+                                double prefixed_probability);
+
 template <typename Type, typename Sequence>
 std::shared_ptr<Buffer> EncodeValues(Encoding::type encoding, bool 
use_dictionary,
                                      const Sequence& values, int length,
@@ -777,18 +783,46 @@ inline void GenerateData<Int96>(int num_values, Int96* 
out, std::vector<uint8_t>
 template <>
 inline void GenerateData<ByteArray>(int num_values, ByteArray* out,
                                     std::vector<uint8_t>* heap) {
-  // seed the prng so failure is deterministic
   int max_byte_array_len = 12;
   heap->resize(num_values * max_byte_array_len);
+  // seed the prng so failure is deterministic
   random_byte_array(num_values, 0, heap->data(), out, 2, max_byte_array_len);
 }
 
+// Generate ByteArray or FLBA data where there is a given probability
+// for each value to share a common prefix with its predecessor.
+// This is useful to exercise prefix-based encodings such as DELTA_BYTE_ARRAY.
+template <typename T>
+inline void GeneratePrefixedData(int num_values, T* out, std::vector<uint8_t>* 
heap,
+                                 double prefixed_probability);
+
+template <>
+inline void GeneratePrefixedData(int num_values, ByteArray* out,
+                                 std::vector<uint8_t>* heap,
+                                 double prefixed_probability) {
+  int max_byte_array_len = 12;
+  heap->resize(num_values * max_byte_array_len);
+  // seed the prng so failure is deterministic
+  prefixed_random_byte_array(num_values, /*seed=*/0, heap->data(), out, 
/*min_size=*/2,
+                             /*max_size=*/max_byte_array_len, 
prefixed_probability);
+}
+
 static constexpr int kGenerateDataFLBALength = 8;
 
 template <>
-inline void GenerateData<FLBA>(int num_values, FLBA* out, 
std::vector<uint8_t>* heap) {
+inline void GeneratePrefixedData<FLBA>(int num_values, FLBA* out,
+                                       std::vector<uint8_t>* heap,
+                                       double prefixed_probability) {
+  heap->resize(num_values * kGenerateDataFLBALength);
   // seed the prng so failure is deterministic
+  prefixed_random_byte_array(num_values, /*seed=*/0, heap->data(),
+                             kGenerateDataFLBALength, out, 
prefixed_probability);
+}
+
+template <>
+inline void GenerateData<FLBA>(int num_values, FLBA* out, 
std::vector<uint8_t>* heap) {
   heap->resize(num_values * kGenerateDataFLBALength);
+  // seed the prng so failure is deterministic
   random_fixed_byte_array(num_values, 0, heap->data(), 
kGenerateDataFLBALength, out);
 }
 
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index e81e9de0a1..0315376a88 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -577,6 +577,11 @@ struct ByteArray {
   ByteArray(::std::string_view view)  // NOLINT implicit conversion
       : ByteArray(static_cast<uint32_t>(view.size()),
                   reinterpret_cast<const uint8_t*>(view.data())) {}
+
+  explicit operator std::string_view() const {
+    return std::string_view{reinterpret_cast<const char*>(ptr), len};
+  }
+
   uint32_t len;
   const uint8_t* ptr;
 };
diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst
index 95f2d8d98d..23fca8fd73 100644
--- a/docs/source/cpp/parquet.rst
+++ b/docs/source/cpp/parquet.rst
@@ -401,7 +401,7 @@ Encodings
 +--------------------------+----------+----------+---------+
 | DELTA_BINARY_PACKED      | ✓        | ✓        |         |
 +--------------------------+----------+----------+---------+
-| DELTA_BYTE_ARRAY         | ✓        |          |         |
+| DELTA_BYTE_ARRAY         | ✓        | ✓        |         |
 +--------------------------+----------+----------+---------+
 | DELTA_LENGTH_BYTE_ARRAY  | ✓        | ✓        |         |
 +--------------------------+----------+----------+---------+
diff --git a/python/pyarrow/tests/parquet/test_basic.py 
b/python/pyarrow/tests/parquet/test_basic.py
index 9bc59cbcf9..dd12a26616 100644
--- a/python/pyarrow/tests/parquet/test_basic.py
+++ b/python/pyarrow/tests/parquet/test_basic.py
@@ -392,9 +392,13 @@ def test_byte_stream_split(use_legacy_dataset):
 def test_column_encoding(use_legacy_dataset):
     arr_float = pa.array(list(map(float, range(100))))
     arr_int = pa.array(list(map(int, range(100))))
-    arr_bin = pa.array([str(x) for x in range(100)])
-    mixed_table = pa.Table.from_arrays([arr_float, arr_int, arr_bin],
-                                       names=['a', 'b', 'c'])
+    arr_bin = pa.array([str(x) for x in range(100)], type=pa.binary())
+    arr_flba = pa.array(
+        [str(x).zfill(10) for x in range(100)], type=pa.binary(10))
+    arr_bool = pa.array([False, True, False, False] * 25)
+    mixed_table = pa.Table.from_arrays(
+        [arr_float, arr_int, arr_bin, arr_flba, arr_bool],
+        names=['a', 'b', 'c', 'd', 'e'])
 
     # Check "BYTE_STREAM_SPLIT" for column 'a' and "PLAIN" column_encoding for
     # column 'b' and 'c'.
@@ -426,6 +430,21 @@ def test_column_encoding(use_legacy_dataset):
                                       'c': "DELTA_LENGTH_BYTE_ARRAY"},
                      use_legacy_dataset=use_legacy_dataset)
 
+    # Check "DELTA_BYTE_ARRAY" for byte columns.
+    _check_roundtrip(mixed_table, expected=mixed_table,
+                     use_dictionary=False,
+                     column_encoding={'a': "PLAIN",
+                                      'b': "DELTA_BINARY_PACKED",
+                                      'c': "DELTA_BYTE_ARRAY",
+                                      'd': "DELTA_BYTE_ARRAY"},
+                     use_legacy_dataset=use_legacy_dataset)
+
+    # Check "RLE" for boolean columns.
+    _check_roundtrip(mixed_table, expected=mixed_table,
+                     use_dictionary=False,
+                     column_encoding={'e': "RLE"},
+                     use_legacy_dataset=use_legacy_dataset)
+
     # Try to pass "BYTE_STREAM_SPLIT" column encoding for integer column 'b'.
     # This should throw an error as it is only supports FLOAT and DOUBLE.
     with pytest.raises(IOError,

Reply via email to