mapleFU commented on code in PR #14341:
URL: https://github.com/apache/arrow/pull/14341#discussion_r1160518706
##########
cpp/src/parquet/encoding.cc:
##########
@@ -1238,25 +1240,35 @@ int PlainBooleanDecoder::Decode(bool* buffer, int
max_values) {
return max_values;
}
-struct ArrowBinaryHelper {
+template <typename DType>
+struct ArrowBinaryHelper;
+
+template <>
+struct ArrowBinaryHelper<ByteArrayType> {
explicit ArrowBinaryHelper(typename
EncodingTraits<ByteArrayType>::Accumulator* out) {
this->out = out;
this->builder = out->builder.get();
+ if (SubtractWithOverflow(::arrow::kBinaryMemoryLimit,
Review Comment:
Mind add a `ARROW_PREDICT_FALSE`?
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,11 +3083,240 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front
compression:
+/// for each element in a sequence of strings, store the prefix length of the
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool),
+ last_value_(""),
+ kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(
+ buffer_, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer_->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ protected:
+ template <typename ArrayType>
+ void PutBinaryArray(const ArrayType& array) {
+ auto previous_len = static_cast<uint32_t>(last_value_.size());
+ std::string_view last_value_view = last_value_;
+
+ PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename
ArrayType::TypeClass>(
+ *array.data(),
+ [&](::std::string_view view) {
+ if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+ return Status::Invalid("Parquet cannot store strings with size 2GB
or more");
+ }
+ // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+ const ByteArray src{view};
+
+ uint32_t j = 0;
+ const uint32_t common_length = std::min(previous_len, src.len);
+ while (j < common_length) {
+ if (last_value_view[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = src.len;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ last_value_view = view;
+ const auto suffix_length = static_cast<uint32_t>(src.len - j);
+ if (suffix_length == 0) {
+ suffix_encoder_.Put(&kEmpty, 1);
+ return Status::OK();
+ }
+ const uint8_t* suffix_ptr = src.ptr + j;
+ // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ last_value_ = last_value_view;
+ }
+
+ ::arrow::BufferBuilder sink_;
+ DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+ DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+ std::string last_value_;
+ std::unique_ptr<::arrow::Buffer> buffer_;
+ const ByteArray kEmpty;
+};
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+ throw Status::Invalid("Put not implemented for " + this->descr_->ToString());
+}
+
+template <>
+void DeltaByteArrayEncoder<ByteArrayType>::Put(const ByteArray* src, int
num_values) {
+ if (num_values == 0) {
+ return;
+ }
+ ArrowPoolVector<int32_t> prefix_lengths(num_values,
+
::arrow::stl::allocator<int32_t>(pool_));
+ std::string_view last_value_view = last_value_;
+
+ for (int i = 0; i < num_values; i++) {
+ // Convert to ByteArray, so we can pass to the suffix_encoder_.
+ const ByteArray value = src[i];
+ if (ARROW_PREDICT_FALSE(value.len >=
static_cast<int32_t>(kMaxByteArraySize))) {
+ throw Status::Invalid("Parquet cannot store strings with size 2GB or
more");
+ }
+
+ auto view = string_view{reinterpret_cast<const char*>(value.ptr),
+ static_cast<uint32_t>(value.len)};
+ uint32_t j = 0;
+ const uint32_t common_length =
+ std::min(value.len, static_cast<uint32_t>(last_value_view.length()));
+ while (j < common_length) {
+ if (last_value_view[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+
+ last_value_view = view;
+ prefix_lengths[i] = j;
+ const auto suffix_length = static_cast<uint32_t>(value.len - j);
+
+ if (suffix_length == 0) {
+ continue;
+ }
+ const uint8_t* suffix_ptr = value.ptr + j;
+ // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+ }
+ prefix_length_encoder_.Put(prefix_lengths.data(), num_values);
+ last_value_ = last_value_view;
+}
+
+template <>
+void DeltaByteArrayEncoder<FLBAType>::Put(const FLBA* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+ ArrowPoolVector<int32_t> prefix_lengths(num_values,
+
::arrow::stl::allocator<int32_t>(pool_));
+ std::string_view last_value_view = last_value_;
+ const int32_t len = descr_->type_length();
+
+ if (ARROW_PREDICT_FALSE(len >= static_cast<int32_t>(kMaxByteArraySize))) {
+ throw Status::Invalid("Parquet cannot store strings with size 2GB or
more");
+ }
+
+ for (int i = 0; i < num_values; i++) {
+ auto view = string_view{reinterpret_cast<const char*>(src[i].ptr),
+ static_cast<uint32_t>(len)};
+ int32_t j = 0;
+ const int32_t common_length =
+ std::min(len, static_cast<int32_t>(last_value_view.length()));
+ while (j < common_length) {
+ if (last_value_view[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+
+ last_value_view = view;
+ prefix_lengths[i] = j;
+ const auto suffix_length = static_cast<uint32_t>(len - j);
+
+ if (suffix_length == 0) {
+ continue;
Review Comment:
ditto
##########
python/pyarrow/tests/parquet/test_basic.py:
##########
@@ -426,6 +430,21 @@ def test_column_encoding(use_legacy_dataset):
'c': "DELTA_LENGTH_BYTE_ARRAY"},
use_legacy_dataset=use_legacy_dataset)
+ # Check "DELTA_BYTE_ARRAY" for byte columns.
+ _check_roundtrip(mixed_table, expected=mixed_table,
+ use_dictionary=False,
+ column_encoding={'a': "PLAIN",
+ 'b': "DELTA_BINARY_PACKED",
+ 'c': "DELTA_BYTE_ARRAY",
+ 'd': "DELTA_BYTE_ARRAY"},
+ use_legacy_dataset=use_legacy_dataset)
+
+ # Check "RLE" for boolean columns.
Review Comment:
Thanks!
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,11 +3083,240 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front
compression:
+/// for each element in a sequence of strings, store the prefix length of the
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool),
+ last_value_(""),
+ kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(
+ buffer_, ::arrow::AllocateBuffer(num_values * sizeof(T),
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer_->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ protected:
+ template <typename ArrayType>
+ void PutBinaryArray(const ArrayType& array) {
+ auto previous_len = static_cast<uint32_t>(last_value_.size());
+ std::string_view last_value_view = last_value_;
+
+ PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename
ArrayType::TypeClass>(
+ *array.data(),
+ [&](::std::string_view view) {
+ if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+ return Status::Invalid("Parquet cannot store strings with size 2GB
or more");
+ }
+ // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+ const ByteArray src{view};
+
+ uint32_t j = 0;
+ const uint32_t common_length = std::min(previous_len, src.len);
+ while (j < common_length) {
+ if (last_value_view[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = src.len;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ last_value_view = view;
+ const auto suffix_length = static_cast<uint32_t>(src.len - j);
+ if (suffix_length == 0) {
+ suffix_encoder_.Put(&kEmpty, 1);
+ return Status::OK();
+ }
+ const uint8_t* suffix_ptr = src.ptr + j;
+ // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ last_value_ = last_value_view;
+ }
+
+ ::arrow::BufferBuilder sink_;
+ DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+ DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+ std::string last_value_;
+ std::unique_ptr<::arrow::Buffer> buffer_;
+ const ByteArray kEmpty;
+};
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+ throw Status::Invalid("Put not implemented for " + this->descr_->ToString());
+}
+
+template <>
+void DeltaByteArrayEncoder<ByteArrayType>::Put(const ByteArray* src, int
num_values) {
+ if (num_values == 0) {
+ return;
+ }
+ ArrowPoolVector<int32_t> prefix_lengths(num_values,
+
::arrow::stl::allocator<int32_t>(pool_));
+ std::string_view last_value_view = last_value_;
+
+ for (int i = 0; i < num_values; i++) {
+ // Convert to ByteArray, so we can pass to the suffix_encoder_.
+ const ByteArray value = src[i];
+ if (ARROW_PREDICT_FALSE(value.len >=
static_cast<int32_t>(kMaxByteArraySize))) {
+ throw Status::Invalid("Parquet cannot store strings with size 2GB or
more");
+ }
+
+ auto view = string_view{reinterpret_cast<const char*>(value.ptr),
+ static_cast<uint32_t>(value.len)};
+ uint32_t j = 0;
+ const uint32_t common_length =
+ std::min(value.len, static_cast<uint32_t>(last_value_view.length()));
+ while (j < common_length) {
+ if (last_value_view[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+
+ last_value_view = view;
+ prefix_lengths[i] = j;
+ const auto suffix_length = static_cast<uint32_t>(value.len - j);
+
+ if (suffix_length == 0) {
+ continue;
Review Comment:
Here should put a `kEmpty`
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3196,6 +3467,43 @@ class DeltaByteArrayDecoder : public DecoderImpl,
std::shared_ptr<ResizableBuffer> buffered_data_;
};
+class DeltaByteArrayDecoder : public DeltaByteArrayDecoderImpl<ByteArrayType> {
+ public:
+ using Base = DeltaByteArrayDecoderImpl<ByteArrayType>;
+ using Base::DeltaByteArrayDecoderImpl;
+
+ int Decode(ByteArray* buffer, int max_values) override {
+ return GetInternal(buffer, max_values);
+ }
+};
+
+class DeltaByteArrayFLBADecoder : public DeltaByteArrayDecoderImpl<FLBAType>,
+ virtual public FLBADecoder {
+ public:
+ using Base = DeltaByteArrayDecoderImpl<FLBAType>;
+ using Base::DeltaByteArrayDecoderImpl;
+
+ int Decode(ByteArray* buffer, int max_values) {
+ return GetInternal(buffer, max_values);
+ }
+ int Decode(FixedLenByteArray* buffer, int max_values) override {
+ int decoded_values_size = max_values;
+ if (MultiplyWithOverflow(decoded_values_size,
Review Comment:
Add a predict false here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]