This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 94bd0d2732 GH-32863: [C++][Parquet] Add DELTA_BYTE_ARRAY encoder to
Parquet writer (#14341)
94bd0d2732 is described below
commit 94bd0d27326e8f3bfc0f612fcf604617402888ec
Author: Rok Mihevc <[email protected]>
AuthorDate: Mon Aug 21 19:02:55 2023 +0200
GH-32863: [C++][Parquet] Add DELTA_BYTE_ARRAY encoder to Parquet writer
(#14341)
This is to add DELTA_BYTE_ARRAY encoder.
* Closes: #32863
Lead-authored-by: Rok Mihevc <[email protected]>
Co-authored-by: Rok <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: Will Jones <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/parquet/encoding.cc | 495 ++++++++++++++++++++++++-----
cpp/src/parquet/encoding.h | 2 +-
cpp/src/parquet/encoding_test.cc | 298 ++++++++++++++++-
cpp/src/parquet/test_util.cc | 52 +++
cpp/src/parquet/test_util.h | 38 ++-
cpp/src/parquet/types.h | 5 +
docs/source/cpp/parquet.rst | 2 +-
python/pyarrow/tests/parquet/test_basic.py | 25 +-
8 files changed, 825 insertions(+), 92 deletions(-)
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index e972a86ccf..a3cef4b4ce 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -56,6 +56,8 @@ using arrow::Status;
using arrow::VisitNullBitmapInline;
using arrow::internal::AddWithOverflow;
using arrow::internal::checked_cast;
+using arrow::internal::MultiplyWithOverflow;
+using arrow::internal::SubtractWithOverflow;
using arrow::util::SafeLoad;
using arrow::util::SafeLoadAs;
using std::string_view;
@@ -1178,41 +1180,126 @@ int PlainBooleanDecoder::Decode(bool* buffer, int
max_values) {
return max_values;
}
-struct ArrowBinaryHelper {
- explicit ArrowBinaryHelper(typename
EncodingTraits<ByteArrayType>::Accumulator* out) {
- this->out = out;
- this->builder = out->builder.get();
- this->chunk_space_remaining =
- ::arrow::kBinaryMemoryLimit - this->builder->value_data_length();
+// A helper class to abstract away differences between
EncodingTraits<DType>::Accumulator
+// for ByteArrayType and FLBAType.
+template <typename DType>
+struct ArrowBinaryHelper;
+
+template <>
+struct ArrowBinaryHelper<ByteArrayType> {
+ using Accumulator = typename EncodingTraits<ByteArrayType>::Accumulator;
+
+ ArrowBinaryHelper(Accumulator* acc, int64_t length)
+ : acc_(acc),
+ entries_remaining_(length),
+ chunk_space_remaining_(::arrow::kBinaryMemoryLimit -
+ acc_->builder->value_data_length()) {}
+
+ Status Prepare(std::optional<int64_t> estimated_data_length = {}) {
+ RETURN_NOT_OK(acc_->builder->Reserve(entries_remaining_));
+ if (estimated_data_length.has_value()) {
+ RETURN_NOT_OK(acc_->builder->ReserveData(
+ std::min<int64_t>(*estimated_data_length,
::arrow::kBinaryMemoryLimit)));
+ }
+ return Status::OK();
+ }
+
+ Status PrepareNextInput(int64_t next_value_length,
+ std::optional<int64_t>
estimated_remaining_data_length = {}) {
+ if (ARROW_PREDICT_FALSE(!CanFit(next_value_length))) {
+ // This element would exceed the capacity of a chunk
+ RETURN_NOT_OK(PushChunk());
+ RETURN_NOT_OK(acc_->builder->Reserve(entries_remaining_));
+ if (estimated_remaining_data_length.has_value()) {
+ RETURN_NOT_OK(acc_->builder->ReserveData(
+ std::min<int64_t>(*estimated_remaining_data_length,
chunk_space_remaining_)));
+ }
+ }
+ return Status::OK();
+ }
+
+ void UnsafeAppend(const uint8_t* data, int32_t length) {
+ DCHECK(CanFit(length));
+ DCHECK_GT(entries_remaining_, 0);
+ chunk_space_remaining_ -= length;
+ --entries_remaining_;
+ acc_->builder->UnsafeAppend(data, length);
+ }
+
+ Status Append(const uint8_t* data, int32_t length) {
+ DCHECK(CanFit(length));
+ DCHECK_GT(entries_remaining_, 0);
+ chunk_space_remaining_ -= length;
+ --entries_remaining_;
+ return acc_->builder->Append(data, length);
+ }
+
+ void UnsafeAppendNull() {
+ --entries_remaining_;
+ acc_->builder->UnsafeAppendNull();
}
+ Status AppendNull() {
+ --entries_remaining_;
+ return acc_->builder->AppendNull();
+ }
+
+ private:
Status PushChunk() {
- std::shared_ptr<::arrow::Array> result;
- RETURN_NOT_OK(builder->Finish(&result));
- out->chunks.push_back(result);
- chunk_space_remaining = ::arrow::kBinaryMemoryLimit;
+ ARROW_ASSIGN_OR_RAISE(auto chunk, acc_->builder->Finish());
+ acc_->chunks.push_back(std::move(chunk));
+ chunk_space_remaining_ = ::arrow::kBinaryMemoryLimit;
return Status::OK();
}
- bool CanFit(int64_t length) const { return length <= chunk_space_remaining; }
+ bool CanFit(int64_t length) const { return length <= chunk_space_remaining_;
}
- void UnsafeAppend(const uint8_t* data, int32_t length) {
- chunk_space_remaining -= length;
- builder->UnsafeAppend(data, length);
+ Accumulator* acc_;
+ int64_t entries_remaining_;
+ int64_t chunk_space_remaining_;
+};
+
+template <>
+struct ArrowBinaryHelper<FLBAType> {
+ using Accumulator = typename EncodingTraits<FLBAType>::Accumulator;
+
+ ArrowBinaryHelper(Accumulator* acc, int64_t length)
+ : acc_(acc), entries_remaining_(length) {}
+
+ Status Prepare(std::optional<int64_t> estimated_data_length = {}) {
+ return acc_->Reserve(entries_remaining_);
}
- void UnsafeAppendNull() { builder->UnsafeAppendNull(); }
+ Status PrepareNextInput(int64_t next_value_length,
+ std::optional<int64_t>
estimated_remaining_data_length = {}) {
+ return Status::OK();
+ }
+
+ void UnsafeAppend(const uint8_t* data, int32_t length) {
+ DCHECK_GT(entries_remaining_, 0);
+ --entries_remaining_;
+ acc_->UnsafeAppend(data);
+ }
Status Append(const uint8_t* data, int32_t length) {
- chunk_space_remaining -= length;
- return builder->Append(data, length);
+ DCHECK_GT(entries_remaining_, 0);
+ --entries_remaining_;
+ return acc_->Append(data);
+ }
+
+ void UnsafeAppendNull() {
+ --entries_remaining_;
+ acc_->UnsafeAppendNull();
}
- Status AppendNull() { return builder->AppendNull(); }
+ Status AppendNull() {
+ --entries_remaining_;
+ return acc_->AppendNull();
+ }
- typename EncodingTraits<ByteArrayType>::Accumulator* out;
- ::arrow::BinaryBuilder* builder;
- int64_t chunk_space_remaining;
+ private:
+ Accumulator* acc_;
+ int64_t entries_remaining_;
};
template <>
@@ -1313,12 +1400,10 @@ class PlainByteArrayDecoder : public
PlainDecoder<ByteArrayType>,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator*
out,
int* out_values_decoded) {
- ArrowBinaryHelper helper(out);
+ ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
int values_decoded = 0;
- RETURN_NOT_OK(helper.builder->Reserve(num_values));
- RETURN_NOT_OK(helper.builder->ReserveData(
- std::min<int64_t>(len_, helper.chunk_space_remaining)));
+ RETURN_NOT_OK(helper.Prepare(len_));
int i = 0;
RETURN_NOT_OK(VisitNullBitmapInline(
@@ -1335,13 +1420,7 @@ class PlainByteArrayDecoder : public
PlainDecoder<ByteArrayType>,
if (ARROW_PREDICT_FALSE(len_ < increment)) {
ParquetException::EofException();
}
- if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) {
- // This element would exceed the capacity of a chunk
- RETURN_NOT_OK(helper.PushChunk());
- RETURN_NOT_OK(helper.builder->Reserve(num_values - i));
- RETURN_NOT_OK(helper.builder->ReserveData(
- std::min<int64_t>(len_, helper.chunk_space_remaining)));
- }
+ RETURN_NOT_OK(helper.PrepareNextInput(value_len, len_));
helper.UnsafeAppend(data_ + 4, value_len);
data_ += increment;
len_ -= increment;
@@ -1421,7 +1500,7 @@ class DictDecoderImpl : public DecoderImpl, virtual
public DictDecoder<Type> {
byte_array_offsets_(AllocateBuffer(pool, 0)),
indices_scratch_space_(AllocateBuffer(pool, 0)) {}
- // Perform type-specific initiatialization
+ // Perform type-specific initialization
void SetDict(TypedDecoder<Type>* dictionary) override;
void SetData(int num_values, const uint8_t* data, int len) override {
@@ -1834,7 +1913,8 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
constexpr int32_t kBufferSize = 1024;
int32_t indices[kBufferSize];
- ArrowBinaryHelper helper(out);
+ ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
+ RETURN_NOT_OK(helper.Prepare());
auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
int values_decoded = 0;
@@ -1855,9 +1935,7 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
const auto index = indices[pos_indices++];
RETURN_NOT_OK(IndexInBounds(index));
const auto& val = dict_values[index];
- if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
- RETURN_NOT_OK(helper.PushChunk());
- }
+ RETURN_NOT_OK(helper.PrepareNextInput(val.len));
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
++values_decoded;
return Status::OK();
@@ -1903,20 +1981,21 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
int32_t indices[kBufferSize];
int values_decoded = 0;
- ArrowBinaryHelper helper(out);
+ ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
+ RETURN_NOT_OK(helper.Prepare(len_));
+
auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
while (values_decoded < num_values) {
- int32_t batch_size = std::min<int32_t>(kBufferSize, num_values -
values_decoded);
- int num_indices = idx_decoder_.GetBatch(indices, batch_size);
+ const int32_t batch_size =
+ std::min<int32_t>(kBufferSize, num_values - values_decoded);
+ const int num_indices = idx_decoder_.GetBatch(indices, batch_size);
if (num_indices == 0) ParquetException::EofException();
for (int i = 0; i < num_indices; ++i) {
auto idx = indices[i];
RETURN_NOT_OK(IndexInBounds(idx));
const auto& val = dict_values[idx];
- if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
- RETURN_NOT_OK(helper.PushChunk());
- }
+ RETURN_NOT_OK(helper.PrepareNextInput(val.len));
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
}
values_decoded += num_indices;
@@ -2746,7 +2825,8 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
int64_t valid_bits_offset,
typename EncodingTraits<ByteArrayType>::Accumulator*
out,
int* out_num_values) {
- ArrowBinaryHelper helper(out);
+ ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
+ RETURN_NOT_OK(helper.Prepare());
std::vector<ByteArray> values(num_values - null_count);
const int num_valid_values = Decode(values.data(), num_values -
null_count);
@@ -2762,9 +2842,7 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
const auto& val = values_ptr[value_idx];
- if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
- RETURN_NOT_OK(helper.PushChunk());
- }
+ RETURN_NOT_OK(helper.PrepareNextInput(val.len));
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
++value_idx;
return Status::OK();
@@ -2782,8 +2860,8 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
std::shared_ptr<::arrow::bit_util::BitReader> decoder_;
DeltaBitPackDecoder<Int32Type> len_decoder_;
- int num_valid_values_;
- uint32_t length_idx_;
+ int num_valid_values_{0};
+ uint32_t length_idx_{0};
std::shared_ptr<ResizableBuffer> buffered_length_;
};
@@ -2977,12 +3055,238 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front
compression:
+/// for each element in a sequence of strings, store the prefix length of the
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ static constexpr std::string_view kEmpty = "";
+
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(/*descr=*/nullptr, pool),
+ suffix_encoder_(descr, pool),
+ last_value_(""),
+ empty_(static_cast<uint32_t>(kEmpty.size()),
+ reinterpret_cast<const uint8_t*>(kEmpty.data())) {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != nullptr) {
+ if (buffer_ == nullptr) {
+ PARQUET_ASSIGN_OR_THROW(buffer_,
+ ::arrow::AllocateResizableBuffer(num_values *
sizeof(T),
+
this->memory_pool()));
+ } else {
+ PARQUET_THROW_NOT_OK(buffer_->Resize(num_values * sizeof(T), false));
+ }
+ T* data = reinterpret_cast<T*>(buffer_->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ protected:
+ template <typename VisitorType>
+ void PutInternal(const T* src, int num_values, const VisitorType visitor) {
+ if (num_values == 0) {
+ return;
+ }
+
+ std::string_view last_value_view = last_value_;
+ constexpr int kBatchSize = 256;
+ std::array<int32_t, kBatchSize> prefix_lengths;
+ std::array<ByteArray, kBatchSize> suffixes;
+
+ for (int i = 0; i < num_values; i += kBatchSize) {
+ const int batch_size = std::min(kBatchSize, num_values - i);
+
+ for (int j = 0; j < batch_size; ++j) {
+ const int idx = i + j;
+ const auto view = visitor[idx];
+ const auto len = static_cast<const uint32_t>(view.length());
+
+ uint32_t common_prefix_length = 0;
+ const uint32_t maximum_common_prefix_length =
+ std::min(len, static_cast<uint32_t>(last_value_view.length()));
+ while (common_prefix_length < maximum_common_prefix_length) {
+ if (last_value_view[common_prefix_length] !=
view[common_prefix_length]) {
+ break;
+ }
+ common_prefix_length++;
+ }
+
+ last_value_view = view;
+ prefix_lengths[j] = common_prefix_length;
+ const uint32_t suffix_length = len - common_prefix_length;
+ const uint8_t* suffix_ptr = src[idx].ptr + common_prefix_length;
+
+ // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffixes[j] = suffix;
+ }
+ suffix_encoder_.Put(suffixes.data(), batch_size);
+ prefix_length_encoder_.Put(prefix_lengths.data(), batch_size);
+ }
+ last_value_ = last_value_view;
+ }
+
+ template <typename ArrayType>
+ void PutBinaryArray(const ArrayType& array) {
+ auto previous_len = static_cast<uint32_t>(last_value_.length());
+ std::string_view last_value_view = last_value_;
+
+ PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename
ArrayType::TypeClass>(
+ *array.data(),
+ [&](::std::string_view view) {
+ if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+ return Status::Invalid("Parquet cannot store strings with size 2GB
or more");
+ }
+ const ByteArray src{view};
+
+ uint32_t common_prefix_length = 0;
+ const uint32_t len = src.len;
+ const uint32_t maximum_common_prefix_length = std::min(previous_len,
len);
+ while (common_prefix_length < maximum_common_prefix_length) {
+ if (last_value_view[common_prefix_length] !=
view[common_prefix_length]) {
+ break;
+ }
+ common_prefix_length++;
+ }
+ previous_len = len;
+
prefix_length_encoder_.Put({static_cast<int32_t>(common_prefix_length)}, 1);
+
+ last_value_view = view;
+ const auto suffix_length = static_cast<uint32_t>(len -
common_prefix_length);
+ if (suffix_length == 0) {
+ suffix_encoder_.Put(&empty_, 1);
+ return Status::OK();
+ }
+ const uint8_t* suffix_ptr = src.ptr + common_prefix_length;
+ // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ last_value_ = last_value_view;
+ }
+
+ ::arrow::BufferBuilder sink_;
+ DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+ DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+ std::string last_value_;
+ const ByteArray empty_;
+ std::unique_ptr<ResizableBuffer> buffer_;
+};
+
+struct ByteArrayVisitor {
+ const ByteArray* src;
+
+ std::string_view operator[](int i) const {
+ if (ARROW_PREDICT_FALSE(src[i].len >= kMaxByteArraySize)) {
+ throw ParquetException("Parquet cannot store strings with size 2GB or
more");
+ }
+ return std::string_view{src[i]};
+ }
+
+ uint32_t len(int i) const { return src[i].len; }
+};
+
+struct FLBAVisitor {
+ const FLBA* src;
+ const uint32_t type_length;
+
+ std::string_view operator[](int i) const {
+ return std::string_view{reinterpret_cast<const char*>(src[i].ptr),
type_length};
+ }
+
+ uint32_t len(int i) const { return type_length; }
+};
+
+template <>
+void DeltaByteArrayEncoder<ByteArrayType>::Put(const ByteArray* src, int
num_values) {
+ auto visitor = ByteArrayVisitor{src};
+ PutInternal<ByteArrayVisitor>(src, num_values, visitor);
+}
+
+template <>
+void DeltaByteArrayEncoder<FLBAType>::Put(const FLBA* src, int num_values) {
+ auto visitor = FLBAVisitor{src,
static_cast<uint32_t>(descr_->type_length())};
+ PutInternal<FLBAVisitor>(src, num_values, visitor);
+}
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const ::arrow::Array& values) {
+ if (::arrow::is_binary_like(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
+ } else if (::arrow::is_large_binary_like(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
+ } else if (::arrow::is_fixed_size_binary(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::FixedSizeBinaryArray&>(values));
+ } else {
+ throw ParquetException("Only BaseBinaryArray and subclasses supported");
+ }
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaByteArrayEncoder<DType>::FlushValues() {
+ PARQUET_THROW_NOT_OK(sink_.Resize(EstimatedDataEncodedSize(), false));
+
+ std::shared_ptr<Buffer> prefix_lengths =
prefix_length_encoder_.FlushValues();
+ PARQUET_THROW_NOT_OK(sink_.Append(prefix_lengths->data(),
prefix_lengths->size()));
+
+ std::shared_ptr<Buffer> suffixes = suffix_encoder_.FlushValues();
+ PARQUET_THROW_NOT_OK(sink_.Append(suffixes->data(), suffixes->size()));
+
+ std::shared_ptr<Buffer> buffer;
+ PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+ last_value_.clear();
+ return buffer;
+}
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayDecoder
+
+template <typename DType>
+class DeltaByteArrayDecoderImpl : public DecoderImpl, virtual public
TypedDecoder<DType> {
+ using T = typename DType::c_type;
+
+ public:
+ explicit DeltaByteArrayDecoderImpl(const ColumnDescriptor* descr,
+ MemoryPool* pool =
::arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY),
+ pool_(pool),
prefix_len_decoder_(nullptr, pool),
suffix_decoder_(nullptr, pool),
last_value_in_previous_page_(""),
@@ -3017,27 +3321,22 @@ class DeltaByteArrayDecoder : public DecoderImpl,
last_value_ = "";
}
- int Decode(ByteArray* buffer, int max_values) override {
- return GetInternal(buffer, max_values);
- }
-
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
- typename EncodingTraits<ByteArrayType>::Accumulator* out)
override {
+ typename EncodingTraits<DType>::Accumulator* out) override {
int result = 0;
PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
valid_bits_offset, out, &result));
return result;
}
- int DecodeArrow(
- int num_values, int null_count, const uint8_t* valid_bits,
- int64_t valid_bits_offset,
- typename EncodingTraits<ByteArrayType>::DictAccumulator* builder)
override {
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::DictAccumulator* builder)
override {
ParquetException::NYI("DecodeArrow of DictAccumulator for
DeltaByteArrayDecoder");
}
- private:
+ protected:
int GetInternal(ByteArray* buffer, int max_values) {
// Decode up to `max_values` strings into an internal buffer
// and reference them into `buffer`.
@@ -3080,7 +3379,7 @@ class DeltaByteArrayDecoder : public DecoderImpl,
buffer[i].ptr = data_ptr;
buffer[i].len += prefix_len_ptr[i];
data_ptr += buffer[i].len;
- prefix = string_view{reinterpret_cast<const char*>(buffer[i].ptr),
buffer[i].len};
+ prefix = std::string_view{buffer[i]};
}
prefix_len_offset_ += max_values;
this->num_values_ -= max_values;
@@ -3095,9 +3394,10 @@ class DeltaByteArrayDecoder : public DecoderImpl,
Status DecodeArrowDense(int num_values, int null_count, const uint8_t*
valid_bits,
int64_t valid_bits_offset,
- typename EncodingTraits<ByteArrayType>::Accumulator*
out,
+ typename EncodingTraits<DType>::Accumulator* out,
int* out_num_values) {
- ArrowBinaryHelper helper(out);
+ ArrowBinaryHelper<DType> helper(out, num_values);
+ RETURN_NOT_OK(helper.Prepare());
std::vector<ByteArray> values(num_values);
const int num_valid_values = GetInternal(values.data(), num_values -
null_count);
@@ -3110,9 +3410,7 @@ class DeltaByteArrayDecoder : public DecoderImpl,
valid_bits, valid_bits_offset, num_values, null_count,
[&]() {
const auto& val = values_ptr[value_idx];
- if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
- RETURN_NOT_OK(helper.PushChunk());
- }
+ RETURN_NOT_OK(helper.PrepareNextInput(val.len));
RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
++value_idx;
return Status::OK();
@@ -3128,18 +3426,54 @@ class DeltaByteArrayDecoder : public DecoderImpl,
return Status::OK();
}
+ MemoryPool* pool_;
+
+ private:
std::shared_ptr<::arrow::bit_util::BitReader> decoder_;
DeltaBitPackDecoder<Int32Type> prefix_len_decoder_;
DeltaLengthByteArrayDecoder suffix_decoder_;
std::string last_value_;
// string buffer for last value in previous page
std::string last_value_in_previous_page_;
- int num_valid_values_;
- uint32_t prefix_len_offset_;
+ int num_valid_values_{0};
+ uint32_t prefix_len_offset_{0};
std::shared_ptr<ResizableBuffer> buffered_prefix_length_;
std::shared_ptr<ResizableBuffer> buffered_data_;
};
+class DeltaByteArrayDecoder : public DeltaByteArrayDecoderImpl<ByteArrayType> {
+ public:
+ using Base = DeltaByteArrayDecoderImpl<ByteArrayType>;
+ using Base::DeltaByteArrayDecoderImpl;
+
+ int Decode(ByteArray* buffer, int max_values) override {
+ return GetInternal(buffer, max_values);
+ }
+};
+
+class DeltaByteArrayFLBADecoder : public DeltaByteArrayDecoderImpl<FLBAType>,
+ virtual public FLBADecoder {
+ public:
+ using Base = DeltaByteArrayDecoderImpl<FLBAType>;
+ using Base::DeltaByteArrayDecoderImpl;
+ using Base::pool_;
+
+ int Decode(FixedLenByteArray* buffer, int max_values) override {
+ // GetInternal currently only support ByteArray.
+ std::vector<ByteArray> decode_byte_array(max_values);
+ const int decoded_values_size = GetInternal(decode_byte_array.data(),
max_values);
+ const uint32_t type_length = descr_->type_length();
+
+ for (int i = 0; i < decoded_values_size; i++) {
+ if (ARROW_PREDICT_FALSE(decode_byte_array[i].len != type_length)) {
+ throw ParquetException("Fixed length byte array length mismatch");
+ }
+ buffer[i].ptr = decode_byte_array[i].ptr;
+ }
+ return decoded_values_size;
+ }
+};
+
// ----------------------------------------------------------------------
// BYTE_STREAM_SPLIT
@@ -3353,6 +3687,16 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type
type_num, Encoding::type encodin
default:
throw ParquetException("RLE only supports BOOLEAN");
}
+ } else if (encoding == Encoding::DELTA_BYTE_ARRAY) {
+ switch (type_num) {
+ case Type::BYTE_ARRAY:
+ return std::make_unique<DeltaByteArrayEncoder<ByteArrayType>>(descr,
pool);
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::make_unique<DeltaByteArrayEncoder<FLBAType>>(descr, pool);
+ default:
+ throw ParquetException(
+ "DELTA_BYTE_ARRAY only supports BYTE_ARRAY and
FIXED_LEN_BYTE_ARRAY");
+ }
} else {
ParquetException::NYI("Selected encoding is not supported");
}
@@ -3404,10 +3748,15 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type
type_num, Encoding::type encodin
"DELTA_BINARY_PACKED decoder only supports INT32 and INT64");
}
} else if (encoding == Encoding::DELTA_BYTE_ARRAY) {
- if (type_num == Type::BYTE_ARRAY) {
- return std::make_unique<DeltaByteArrayDecoder>(descr, pool);
+ switch (type_num) {
+ case Type::BYTE_ARRAY:
+ return std::make_unique<DeltaByteArrayDecoder>(descr, pool);
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::make_unique<DeltaByteArrayFLBADecoder>(descr, pool);
+ default:
+ throw ParquetException(
+ "DELTA_BYTE_ARRAY only supports BYTE_ARRAY and
FIXED_LEN_BYTE_ARRAY");
}
- throw ParquetException("DELTA_BYTE_ARRAY only supports BYTE_ARRAY");
} else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) {
if (type_num == Type::BYTE_ARRAY) {
return std::make_unique<DeltaLengthByteArrayDecoder>(descr, pool);
diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h
index 9f9b740ff3..6cdfe37920 100644
--- a/cpp/src/parquet/encoding.h
+++ b/cpp/src/parquet/encoding.h
@@ -141,13 +141,13 @@ struct EncodingTraits<ByteArrayType> {
using Encoder = ByteArrayEncoder;
using Decoder = ByteArrayDecoder;
+ using ArrowType = ::arrow::BinaryType;
/// \brief Internal helper class for decoding BYTE_ARRAY data where we can
/// overflow the capacity of a single arrow::BinaryArray
struct Accumulator {
std::unique_ptr<::arrow::BinaryBuilder> builder;
std::vector<std::shared_ptr<::arrow::Array>> chunks;
};
- using ArrowType = ::arrow::BinaryType;
using DictAccumulator = ::arrow::Dictionary32Builder<::arrow::BinaryType>;
};
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index 0ac5fd76e7..71dc40d33a 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -36,6 +36,7 @@
#include "arrow/util/bitmap_writer.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/endian.h"
+#include "arrow/util/span.h"
#include "arrow/util/string.h"
#include "parquet/encoding.h"
#include "parquet/platform.h"
@@ -181,7 +182,7 @@ class TestEncodingBase : public ::testing::Test {
void TearDown() {}
- void InitData(int nvalues, int repeats) {
+ virtual void InitData(int nvalues, int repeats) {
num_values_ = nvalues * repeats;
input_bytes_.resize(num_values_ * sizeof(c_type));
output_bytes_.resize(num_values_ * sizeof(c_type));
@@ -1705,11 +1706,13 @@ class TestDeltaLengthByteArrayEncoding : public
TestEncodingBase<Type> {
using c_type = typename Type::c_type;
static constexpr int TYPE = Type::type_num;
+ virtual Encoding::type GetEncoding() { return
Encoding::DELTA_LENGTH_BYTE_ARRAY; }
+
virtual void CheckRoundtrip() {
- auto encoder = MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
+ auto encoding = GetEncoding();
+ auto encoder = MakeTypedEncoder<Type>(encoding,
/*use_dictionary=*/false,
descr_.get());
- auto decoder =
- MakeTypedDecoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
descr_.get());
+ auto decoder = MakeTypedDecoder<Type>(encoding, descr_.get());
encoder->Put(draws_, num_values_);
encode_buffer_ = encoder->FlushValues();
@@ -1722,10 +1725,10 @@ class TestDeltaLengthByteArrayEncoding : public
TestEncodingBase<Type> {
}
void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t
valid_bits_offset) {
- auto encoder = MakeTypedEncoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
+ auto encoding = GetEncoding();
+ auto encoder = MakeTypedEncoder<Type>(encoding,
/*use_dictionary=*/false,
descr_.get());
- auto decoder =
- MakeTypedDecoder<Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
descr_.get());
+ auto decoder = MakeTypedDecoder<Type>(encoding, descr_.get());
int null_count = 0;
for (auto i = 0; i < num_values_; i++) {
if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
@@ -1771,6 +1774,19 @@ std::shared_ptr<Buffer> DeltaEncode(std::vector<int32_t>
lengths) {
return encoder->FlushValues();
}
+std::shared_ptr<Buffer> DeltaEncode(::arrow::util::span<const int32_t>
lengths) {
+ auto encoder = MakeTypedEncoder<Int32Type>(Encoding::DELTA_BINARY_PACKED);
+ encoder->Put(lengths.data(), static_cast<int>(lengths.size()));
+ return encoder->FlushValues();
+}
+
+std::shared_ptr<Buffer> DeltaEncode(std::shared_ptr<::arrow::Array>& lengths) {
+ auto data = ::arrow::internal::checked_pointer_cast<const
::arrow::Int32Array>(lengths);
+ auto span = ::arrow::util::span<const int32_t>{data->raw_values(),
+
static_cast<size_t>(lengths->length())};
+ return DeltaEncode(span);
+}
+
TEST(TestDeltaLengthByteArrayEncoding, AdHocRoundTrip) {
const std::shared_ptr<::arrow::Array> cases[] = {
::arrow::ArrayFromJSON(::arrow::utf8(), R"([])"),
@@ -1780,10 +1796,10 @@ TEST(TestDeltaLengthByteArrayEncoding, AdHocRoundTrip) {
};
std::string expected_encoded_vals[] = {
- DeltaEncode({})->ToString(),
- DeltaEncode({3, 2, 0})->ToString() + "abcde",
- DeltaEncode({0, 0, 0})->ToString(),
- DeltaEncode({0, 3})->ToString() + "xyz",
+ DeltaEncode(std::vector<int>({}))->ToString(),
+ DeltaEncode(std::vector<int>({3, 2, 0}))->ToString() + "abcde",
+ DeltaEncode(std::vector<int>({0, 0, 0}))->ToString(),
+ DeltaEncode(std::vector<int>({0, 3}))->ToString() + "xyz",
};
auto encoder =
MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY,
@@ -1894,7 +1910,6 @@ TEST(DeltaLengthByteArrayEncodingAdHoc,
ArrowBinaryDirectPut) {
ASSERT_EQ(values->length(), result->length());
ASSERT_OK(result->ValidateFull());
- auto upcast_result = CastBinaryTypesHelper(result, values->type());
::arrow::AssertArraysEqual(*values, *result);
};
@@ -1977,4 +1992,263 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut)
{
CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(),
values));
}
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public
TestDeltaLengthByteArrayEncoding<Type> {
+ public:
+ using c_type = typename Type::c_type;
+ static constexpr int TYPE = Type::type_num;
+ static constexpr double prefixed_probability = 0.5;
+
+ void InitData(int nvalues, int repeats) override {
+ num_values_ = nvalues * repeats;
+ input_bytes_.resize(num_values_ * sizeof(c_type));
+ output_bytes_.resize(num_values_ * sizeof(c_type));
+ draws_ = reinterpret_cast<c_type*>(input_bytes_.data());
+ decode_buf_ = reinterpret_cast<c_type*>(output_bytes_.data());
+ GeneratePrefixedData<c_type>(nvalues, draws_, &data_buffer_,
prefixed_probability);
+
+ // add some repeated values
+ for (int j = 1; j < repeats; ++j) {
+ for (int i = 0; i < nvalues; ++i) {
+ draws_[nvalues * j + i] = draws_[i];
+ }
+ }
+ }
+
+ Encoding::type GetEncoding() override { return Encoding::DELTA_BYTE_ARRAY; }
+
+ protected:
+ USING_BASE_MEMBERS();
+ std::vector<uint8_t> input_bytes_;
+ std::vector<uint8_t> output_bytes_;
+};
+
+using TestDeltaByteArrayEncodingTypes = ::testing::Types<ByteArrayType,
FLBAType>;
+TYPED_TEST_SUITE(TestDeltaByteArrayEncoding, TestDeltaByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) {
+ ASSERT_NO_FATAL_FAILURE(this->Execute(0, /*repeats=*/0));
+ ASSERT_NO_FATAL_FAILURE(this->Execute(250, 5));
+ ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 1));
+ ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+ /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
/*null_probability*/
+ 0));
+ ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+ /*nvalues*/ 1234, /*repeats*/ 10, /*valid_bits_offset*/ 64,
+ /*null_probability*/ 0.5));
+}
+
+template <typename Type>
+class TestDeltaByteArrayEncodingDirectPut : public TestEncodingBase<Type> {
+ using ArrowType = typename EncodingTraits<Type>::ArrowType;
+ using Accumulator = typename EncodingTraits<Type>::Accumulator;
+ using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
+
+ public:
+ std::unique_ptr<TypedEncoder<Type>> encoder =
+ MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY);
+ std::unique_ptr<TypedDecoder<Type>> decoder =
+ MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY);
+
+ void CheckDirectPut(std::shared_ptr<::arrow::Array> array);
+
+ void CheckRoundtrip() override;
+
+ protected:
+ USING_BASE_MEMBERS();
+};
+
+template <>
+void TestDeltaByteArrayEncodingDirectPut<ByteArrayType>::CheckDirectPut(
+ std::shared_ptr<::arrow::Array> array) {
+ ASSERT_NO_THROW(encoder->Put(*array));
+ auto buf = encoder->FlushValues();
+
+ int num_values = static_cast<int>(array->length() - array->null_count());
+ decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+ Accumulator acc;
+ acc.builder = std::make_unique<BuilderType>(array->type(),
default_memory_pool());
+
+ ASSERT_EQ(num_values,
+ decoder->DecodeArrow(static_cast<int>(array->length()),
+ static_cast<int>(array->null_count()),
+ array->null_bitmap_data(), array->offset(),
&acc));
+
+ ASSERT_EQ(acc.chunks.size(), 0) << "Accumulator shouldn't have overflowed
chunks";
+ ASSERT_OK_AND_ASSIGN(auto result, acc.builder->Finish());
+ ASSERT_EQ(array->length(), result->length());
+ ASSERT_OK(result->ValidateFull());
+
+ ::arrow::AssertArraysEqual(*array, *result);
+}
+
+template <>
+void TestDeltaByteArrayEncodingDirectPut<FLBAType>::CheckDirectPut(
+ std::shared_ptr<::arrow::Array> array) {
+ ASSERT_NO_THROW(encoder->Put(*array));
+ auto buf = encoder->FlushValues();
+
+ int num_values = static_cast<int>(array->length() - array->null_count());
+ decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+ Accumulator acc(array->type(), default_memory_pool());
+
+ ASSERT_EQ(num_values,
+ decoder->DecodeArrow(static_cast<int>(array->length()),
+ static_cast<int>(array->null_count()),
+ array->null_bitmap_data(), array->offset(),
&acc));
+
+ ASSERT_OK_AND_ASSIGN(auto result, acc.Finish());
+ ASSERT_EQ(array->length(), result->length());
+ ASSERT_OK(result->ValidateFull());
+
+ ::arrow::AssertArraysEqual(*array, *result);
+}
+
+template <>
+void TestDeltaByteArrayEncodingDirectPut<ByteArrayType>::CheckRoundtrip() {
+ constexpr int64_t kSize = 500;
+ constexpr int32_t kMinLength = 0;
+ constexpr int32_t kMaxLength = 10;
+ constexpr int32_t kNumUnique = 10;
+ constexpr double kNullProbability = 0.25;
+ constexpr int kSeed = 42;
+ ::arrow::random::RandomArrayGenerator rag{kSeed};
+ std::shared_ptr<::arrow::Array> values = rag.BinaryWithRepeats(
+ /*size=*/1, /*unique=*/1, kMinLength, kMaxLength, kNullProbability);
+ CheckDirectPut(values);
+
+ for (int i = 0; i < 10; ++i) {
+ values = rag.BinaryWithRepeats(kSize, kNumUnique, kMinLength, kMaxLength,
+ kNullProbability);
+ CheckDirectPut(values);
+ }
+}
+
+template <>
+void TestDeltaByteArrayEncodingDirectPut<FLBAType>::CheckRoundtrip() {
+ constexpr int64_t kSize = 50;
+ constexpr int kSeed = 42;
+ constexpr int kByteWidth = 4;
+ ::arrow::random::RandomArrayGenerator rag{kSeed};
+ std::shared_ptr<::arrow::Array> values =
+ rag.FixedSizeBinary(/*size=*/0, /*byte_width=*/kByteWidth);
+ CheckDirectPut(values);
+
+ for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
+ values = rag.FixedSizeBinary(kSize + seed, kByteWidth);
+ CheckDirectPut(values);
+ }
+}
+
+TYPED_TEST_SUITE(TestDeltaByteArrayEncodingDirectPut,
TestDeltaByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaByteArrayEncodingDirectPut, DirectPut) {
+ ASSERT_NO_FATAL_FAILURE(this->CheckRoundtrip());
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) {
+ auto CheckEncode = [](const std::shared_ptr<::arrow::Array>& values,
+ const std::shared_ptr<Buffer>& encoded) {
+ auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+ ASSERT_NO_THROW(encoder->Put(*values));
+ auto buf = encoder->FlushValues();
+ ASSERT_TRUE(encoded->Equals(*buf));
+ };
+
+ auto CheckDecode = [](std::shared_ptr<Buffer> buf,
+ std::shared_ptr<::arrow::Array> values) {
+ int num_values = static_cast<int>(values->length());
+ auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+ decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+ typename EncodingTraits<ByteArrayType>::Accumulator acc;
+ if (::arrow::is_string(values->type()->id())) {
+ acc.builder = std::make_unique<::arrow::StringBuilder>();
+ } else {
+ acc.builder = std::make_unique<::arrow::BinaryBuilder>();
+ }
+
+ ASSERT_EQ(num_values,
+ decoder->DecodeArrow(static_cast<int>(values->length()),
+ static_cast<int>(values->null_count()),
+ values->null_bitmap_data(),
values->offset(), &acc));
+
+ std::shared_ptr<::arrow::Array> result;
+ ASSERT_OK(acc.builder->Finish(&result));
+ ASSERT_EQ(num_values, result->length());
+ ASSERT_OK(result->ValidateFull());
+
+ auto upcast_result = CastBinaryTypesHelper(result, values->type());
+ ::arrow::AssertArraysEqual(*values, *upcast_result);
+ };
+
+ auto CheckEncodeDecode = [&](std::string_view values,
+ std::shared_ptr<::arrow::Array> prefix_lengths,
+ std::shared_ptr<::arrow::Array> suffix_lengths,
+ std::string_view suffix_data) {
+ auto encoded = ::arrow::ConcatenateBuffers({DeltaEncode(prefix_lengths),
+ DeltaEncode(suffix_lengths),
+
std::make_shared<Buffer>(suffix_data)})
+ .ValueOrDie();
+
+ CheckEncode(::arrow::ArrayFromJSON(::arrow::utf8(), values), encoded);
+ CheckEncode(::arrow::ArrayFromJSON(::arrow::large_utf8(), values),
encoded);
+ CheckEncode(::arrow::ArrayFromJSON(::arrow::binary(), values), encoded);
+ CheckEncode(::arrow::ArrayFromJSON(::arrow::large_binary(), values),
encoded);
+
+ CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::utf8(), values));
+ CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_utf8(),
values));
+ CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::binary(), values));
+ CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(),
values));
+ };
+
+ {
+ auto values = R"(["axis", "axle", "babble", "babyhood"])";
+ auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 2,
0, 3])");
+ auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([4, 2,
6, 5])");
+
+ constexpr std::string_view suffix_data = "axislebabbleyhood";
+ CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
+ }
+
+ {
+ auto values = R"(["axis", "axis", "axis", "axis"])";
+ auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 4,
4, 4])");
+ auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([4, 0,
0, 0])");
+
+ constexpr std::string_view suffix_data = "axis";
+ CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
+ }
+
+ {
+ auto values = R"(["axisba", "axis", "axis", "axis"])";
+ auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 4,
4, 4])");
+ auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([6, 0,
0, 0])");
+
+ constexpr std::string_view suffix_data = "axisba";
+ CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
+ }
+
+ {
+ auto values = R"(["baaxis", "axis", "axis", "axis"])";
+ auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 0,
4, 4])");
+ auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([6, 4,
0, 0])");
+
+ constexpr std::string_view suffix_data = "baaxisaxis";
+ CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
+ }
+
+ {
+ auto values = R"(["καλημέρα", "καμηλιέρη", "καμηλιέρη", "καλημέρα"])";
+ auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 5,
18, 5])");
+ auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([16, 13,
0, 11])");
+ const std::string suffix_data = "καλημέρα\xbcηλιέρη\xbbημέρα";
+ CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
+ }
+}
} // namespace parquet::test
diff --git a/cpp/src/parquet/test_util.cc b/cpp/src/parquet/test_util.cc
index 9d104618bf..b65945cc73 100644
--- a/cpp/src/parquet/test_util.cc
+++ b/cpp/src/parquet/test_util.cc
@@ -30,6 +30,7 @@
#include <utility>
#include <vector>
+#include "arrow/testing/uniform_real.h"
#include "parquet/column_page.h"
#include "parquet/column_reader.h"
#include "parquet/column_writer.h"
@@ -132,5 +133,56 @@ void random_byte_array(int n, uint32_t seed, uint8_t* buf,
ByteArray* out, int m
random_byte_array(n, seed, buf, out, 0, max_size);
}
+void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray*
out,
+ int min_size, int max_size, double
prefixed_probability) {
+ std::default_random_engine gen(seed);
+ std::uniform_int_distribution<int> dist_size(min_size, max_size);
+ std::uniform_int_distribution<int> dist_byte(0, 255);
+ std::bernoulli_distribution dist_has_prefix(prefixed_probability);
+ std::uniform_real_distribution<double> dist_prefix_length(0, 1);
+
+ for (int i = 0; i < n; ++i) {
+ int len = dist_size(gen);
+ out[i].len = len;
+ out[i].ptr = buf;
+
+ bool do_prefix = dist_has_prefix(gen) && i > 0;
+ int prefix_len = 0;
+ if (do_prefix) {
+ int max_prefix_len = std::min(len, static_cast<int>(out[i - 1].len));
+ prefix_len = static_cast<int>(std::ceil(max_prefix_len *
dist_prefix_length(gen)));
+ }
+ for (int j = 0; j < prefix_len; ++j) {
+ buf[j] = out[i - 1].ptr[j];
+ }
+ for (int j = prefix_len; j < len; ++j) {
+ buf[j] = static_cast<uint8_t>(dist_byte(gen));
+ }
+ buf += len;
+ }
+}
+
+void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, int len,
FLBA* out,
+ double prefixed_probability) {
+ std::default_random_engine gen(seed);
+ std::uniform_int_distribution<int> dist_byte(0, 255);
+ std::bernoulli_distribution dist_has_prefix(prefixed_probability);
+ std::uniform_int_distribution<int> dist_size(0, len);
+
+ for (int i = 0; i < n; ++i) {
+ out[i].ptr = buf;
+
+ bool do_prefix = dist_has_prefix(gen) && i > 0;
+ int prefix_len = do_prefix ? dist_size(gen) : 0;
+ for (int j = 0; j < prefix_len; ++j) {
+ buf[j] = out[i - 1].ptr[j];
+ }
+ for (int j = prefix_len; j < len; ++j) {
+ buf[j] = static_cast<uint8_t>(dist_byte(gen));
+ }
+ buf += len;
+ }
+}
+
} // namespace test
} // namespace parquet
diff --git a/cpp/src/parquet/test_util.h b/cpp/src/parquet/test_util.h
index b0aafa037e..c8578609e9 100644
--- a/cpp/src/parquet/test_util.h
+++ b/cpp/src/parquet/test_util.h
@@ -155,6 +155,12 @@ void random_byte_array(int n, uint32_t seed, uint8_t* buf,
ByteArray* out, int m
void random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, int
max_size);
+void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray*
out,
+ int min_size, int max_size, double
prefixed_probability);
+
+void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, int len,
FLBA* out,
+ double prefixed_probability);
+
template <typename Type, typename Sequence>
std::shared_ptr<Buffer> EncodeValues(Encoding::type encoding, bool
use_dictionary,
const Sequence& values, int length,
@@ -777,18 +783,46 @@ inline void GenerateData<Int96>(int num_values, Int96*
out, std::vector<uint8_t>
template <>
inline void GenerateData<ByteArray>(int num_values, ByteArray* out,
std::vector<uint8_t>* heap) {
- // seed the prng so failure is deterministic
int max_byte_array_len = 12;
heap->resize(num_values * max_byte_array_len);
+ // seed the prng so failure is deterministic
random_byte_array(num_values, 0, heap->data(), out, 2, max_byte_array_len);
}
+// Generate ByteArray or FLBA data where there is a given probability
+// for each value to share a common prefix with its predecessor.
+// This is useful to exercise prefix-based encodings such as DELTA_BYTE_ARRAY.
+template <typename T>
+inline void GeneratePrefixedData(int num_values, T* out, std::vector<uint8_t>*
heap,
+ double prefixed_probability);
+
+template <>
+inline void GeneratePrefixedData(int num_values, ByteArray* out,
+ std::vector<uint8_t>* heap,
+ double prefixed_probability) {
+ int max_byte_array_len = 12;
+ heap->resize(num_values * max_byte_array_len);
+ // seed the prng so failure is deterministic
+ prefixed_random_byte_array(num_values, /*seed=*/0, heap->data(), out,
/*min_size=*/2,
+ /*max_size=*/max_byte_array_len,
prefixed_probability);
+}
+
static constexpr int kGenerateDataFLBALength = 8;
template <>
-inline void GenerateData<FLBA>(int num_values, FLBA* out,
std::vector<uint8_t>* heap) {
+inline void GeneratePrefixedData<FLBA>(int num_values, FLBA* out,
+ std::vector<uint8_t>* heap,
+ double prefixed_probability) {
+ heap->resize(num_values * kGenerateDataFLBALength);
// seed the prng so failure is deterministic
+ prefixed_random_byte_array(num_values, /*seed=*/0, heap->data(),
+ kGenerateDataFLBALength, out,
prefixed_probability);
+}
+
+template <>
+inline void GenerateData<FLBA>(int num_values, FLBA* out,
std::vector<uint8_t>* heap) {
heap->resize(num_values * kGenerateDataFLBALength);
+ // seed the prng so failure is deterministic
random_fixed_byte_array(num_values, 0, heap->data(),
kGenerateDataFLBALength, out);
}
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index e81e9de0a1..0315376a88 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -577,6 +577,11 @@ struct ByteArray {
ByteArray(::std::string_view view) // NOLINT implicit conversion
: ByteArray(static_cast<uint32_t>(view.size()),
reinterpret_cast<const uint8_t*>(view.data())) {}
+
+ explicit operator std::string_view() const {
+ return std::string_view{reinterpret_cast<const char*>(ptr), len};
+ }
+
uint32_t len;
const uint8_t* ptr;
};
diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst
index 95f2d8d98d..23fca8fd73 100644
--- a/docs/source/cpp/parquet.rst
+++ b/docs/source/cpp/parquet.rst
@@ -401,7 +401,7 @@ Encodings
+--------------------------+----------+----------+---------+
| DELTA_BINARY_PACKED | ✓ | ✓ | |
+--------------------------+----------+----------+---------+
-| DELTA_BYTE_ARRAY | ✓ | | |
+| DELTA_BYTE_ARRAY | ✓ | ✓ | |
+--------------------------+----------+----------+---------+
| DELTA_LENGTH_BYTE_ARRAY | ✓ | ✓ | |
+--------------------------+----------+----------+---------+
diff --git a/python/pyarrow/tests/parquet/test_basic.py
b/python/pyarrow/tests/parquet/test_basic.py
index 9bc59cbcf9..dd12a26616 100644
--- a/python/pyarrow/tests/parquet/test_basic.py
+++ b/python/pyarrow/tests/parquet/test_basic.py
@@ -392,9 +392,13 @@ def test_byte_stream_split(use_legacy_dataset):
def test_column_encoding(use_legacy_dataset):
arr_float = pa.array(list(map(float, range(100))))
arr_int = pa.array(list(map(int, range(100))))
- arr_bin = pa.array([str(x) for x in range(100)])
- mixed_table = pa.Table.from_arrays([arr_float, arr_int, arr_bin],
- names=['a', 'b', 'c'])
+ arr_bin = pa.array([str(x) for x in range(100)], type=pa.binary())
+ arr_flba = pa.array(
+ [str(x).zfill(10) for x in range(100)], type=pa.binary(10))
+ arr_bool = pa.array([False, True, False, False] * 25)
+ mixed_table = pa.Table.from_arrays(
+ [arr_float, arr_int, arr_bin, arr_flba, arr_bool],
+ names=['a', 'b', 'c', 'd', 'e'])
# Check "BYTE_STREAM_SPLIT" for column 'a' and "PLAIN" column_encoding for
# column 'b' and 'c'.
@@ -426,6 +430,21 @@ def test_column_encoding(use_legacy_dataset):
'c': "DELTA_LENGTH_BYTE_ARRAY"},
use_legacy_dataset=use_legacy_dataset)
+ # Check "DELTA_BYTE_ARRAY" for byte columns.
+ _check_roundtrip(mixed_table, expected=mixed_table,
+ use_dictionary=False,
+ column_encoding={'a': "PLAIN",
+ 'b': "DELTA_BINARY_PACKED",
+ 'c': "DELTA_BYTE_ARRAY",
+ 'd': "DELTA_BYTE_ARRAY"},
+ use_legacy_dataset=use_legacy_dataset)
+
+ # Check "RLE" for boolean columns.
+ _check_roundtrip(mixed_table, expected=mixed_table,
+ use_dictionary=False,
+ column_encoding={'e': "RLE"},
+ use_legacy_dataset=use_legacy_dataset)
+
# Try to pass "BYTE_STREAM_SPLIT" column encoding for integer column 'b'.
# This should throw an error as it is only supports FLOAT and DOUBLE.
with pytest.raises(IOError,