mapleFU commented on code in PR #14341:
URL: https://github.com/apache/arrow/pull/14341#discussion_r1145634331
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3033,11 +3062,226 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+// This is also known as incremental encoding or front compression: for each
element in a
+// sequence of strings, store the prefix length of the previous entry plus the
suffix.
+//
+// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+// followed by the suffixes encoded as delta length byte arrays
(DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool) {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values
* sizeof(T),
+
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ protected:
+ void PutFixedLenByteArray(const ::arrow::FixedSizeBinaryArray& array) {
+ const uint32_t byte_width = array.byte_width();
+ uint32_t previous_len = byte_width;
+
+
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::FixedSizeBinaryType>(
+ *array.data(),
+ [&](::std::string_view view) {
+ const ByteArray src{view};
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
Review Comment:
I suggest that not PREDICT here, or we can `ARROW_PREDICT_FALSE`
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3033,11 +3062,226 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+// This is also known as incremental encoding or front compression: for each
element in a
+// sequence of strings, store the prefix length of the previous entry plus the
suffix.
+//
+// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+// followed by the suffixes encoded as delta length byte arrays
(DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool) {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values
* sizeof(T),
+
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ protected:
+ void PutFixedLenByteArray(const ::arrow::FixedSizeBinaryArray& array) {
+ const uint32_t byte_width = array.byte_width();
+ uint32_t previous_len = byte_width;
+
+
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::FixedSizeBinaryType>(
+ *array.data(),
+ [&](::std::string_view view) {
+ const ByteArray src{view};
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+ last_value_ = view;
+ suffix_encoder_.Put(&src, 1);
+ prefix_length_encoder_.Put({static_cast<int32_t>(0)}, 1);
+ previous_len = byte_width;
+ } else {
+ uint32_t j = 0;
+ while (j < previous_len) {
+ if (last_value_[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = j;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ const uint8_t* suffix_ptr = src.ptr + j;
+ const uint32_t suffix_length = static_cast<uint32_t>(byte_width -
j);
+ last_value_ =
+ string_view{reinterpret_cast<const char*>(suffix_ptr),
suffix_length};
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+ }
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ }
+
+ template <typename ArrayType>
+ void PutBinaryArray(const ArrayType& array) {
+ uint32_t previous_len = 0;
+
+ PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename
ArrayType::TypeClass>(
+ *array.data(),
+ [&](::std::string_view view) {
+ if (ARROW_PREDICT_TRUE(view.size() > kMaxByteArraySize)) {
+ return Status::Invalid("Parquet cannot store strings with size 2GB
or more");
+ }
+ const ByteArray src{view};
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+ last_value_ = view;
+ suffix_encoder_.Put(&src, 1);
+ prefix_length_encoder_.Put({static_cast<int32_t>(0)}, 1);
+ previous_len = src.len;
+ } else {
+ uint32_t j = 0;
+ while (j < std::min(previous_len, src.len)) {
+ if (last_value_[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = j;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ const uint8_t* suffix_ptr = src.ptr + j;
+ const uint32_t suffix_length = static_cast<uint32_t>(src.len - j);
+ last_value_ =
Review Comment:
ditto
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3033,11 +3062,226 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+// This is also known as incremental encoding or front compression: for each
element in a
+// sequence of strings, store the prefix length of the previous entry plus the
suffix.
+//
+// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+// followed by the suffixes encoded as delta length byte arrays
(DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool) {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values
* sizeof(T),
+
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ protected:
+ void PutFixedLenByteArray(const ::arrow::FixedSizeBinaryArray& array) {
+ const uint32_t byte_width = array.byte_width();
+ uint32_t previous_len = byte_width;
+
+
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::FixedSizeBinaryType>(
+ *array.data(),
+ [&](::std::string_view view) {
+ const ByteArray src{view};
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+ last_value_ = view;
+ suffix_encoder_.Put(&src, 1);
+ prefix_length_encoder_.Put({static_cast<int32_t>(0)}, 1);
+ previous_len = byte_width;
+ } else {
+ uint32_t j = 0;
+ while (j < previous_len) {
+ if (last_value_[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = j;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ const uint8_t* suffix_ptr = src.ptr + j;
+ const uint32_t suffix_length = static_cast<uint32_t>(byte_width -
j);
+ last_value_ =
+ string_view{reinterpret_cast<const char*>(suffix_ptr),
suffix_length};
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+ }
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ }
+
+ template <typename ArrayType>
+ void PutBinaryArray(const ArrayType& array) {
+ uint32_t previous_len = 0;
+
+ PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename
ArrayType::TypeClass>(
+ *array.data(),
+ [&](::std::string_view view) {
+ if (ARROW_PREDICT_TRUE(view.size() > kMaxByteArraySize)) {
Review Comment:
The code is ok, but "with size 2GB or more" should be `">="`. And should we
check it in FLBA?
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3033,11 +3062,226 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+// This is also known as incremental encoding or front compression: for each
element in a
+// sequence of strings, store the prefix length of the previous entry plus the
suffix.
+//
+// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+// followed by the suffixes encoded as delta length byte arrays
(DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool) {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values
* sizeof(T),
+
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ protected:
+ void PutFixedLenByteArray(const ::arrow::FixedSizeBinaryArray& array) {
+ const uint32_t byte_width = array.byte_width();
+ uint32_t previous_len = byte_width;
+
+
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::FixedSizeBinaryType>(
+ *array.data(),
+ [&](::std::string_view view) {
+ const ByteArray src{view};
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+ last_value_ = view;
+ suffix_encoder_.Put(&src, 1);
+ prefix_length_encoder_.Put({static_cast<int32_t>(0)}, 1);
+ previous_len = byte_width;
+ } else {
+ uint32_t j = 0;
+ while (j < previous_len) {
+ if (last_value_[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = j;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ const uint8_t* suffix_ptr = src.ptr + j;
+ const uint32_t suffix_length = static_cast<uint32_t>(byte_width -
j);
+ last_value_ =
Review Comment:
Hi rok, when program is within `PutFixedLenByteArray`, `last_value_` works
well, however, when `Put` is finished, and arrow array destructor,
`last_value_` would point to an invalid address, causing invalid memory access.
I think we can use:
```
std::string last_value_;
```
and in the function, we have:
```
std::string_view last_value_view = last_value_;
// ... change inner last_value_ to last_value_view
// put it back in the end of function
last_value_ = std::string(last_value_view);
```
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3033,11 +3062,226 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+// This is also known as incremental encoding or front compression: for each
element in a
+// sequence of strings, store the prefix length of the previous entry plus the
suffix.
+//
+// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+// followed by the suffixes encoded as delta length byte arrays
(DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool) {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values
* sizeof(T),
+
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ protected:
+ void PutFixedLenByteArray(const ::arrow::FixedSizeBinaryArray& array) {
+ const uint32_t byte_width = array.byte_width();
+ uint32_t previous_len = byte_width;
+
+
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::FixedSizeBinaryType>(
+ *array.data(),
+ [&](::std::string_view view) {
+ const ByteArray src{view};
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+ last_value_ = view;
+ suffix_encoder_.Put(&src, 1);
+ prefix_length_encoder_.Put({static_cast<int32_t>(0)}, 1);
+ previous_len = byte_width;
+ } else {
+ uint32_t j = 0;
+ while (j < previous_len) {
+ if (last_value_[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = j;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ const uint8_t* suffix_ptr = src.ptr + j;
+ const uint32_t suffix_length = static_cast<uint32_t>(byte_width -
j);
+ last_value_ =
+ string_view{reinterpret_cast<const char*>(suffix_ptr),
suffix_length};
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+ }
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ }
+
+ template <typename ArrayType>
+ void PutBinaryArray(const ArrayType& array) {
+ uint32_t previous_len = 0;
+
+ PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename
ArrayType::TypeClass>(
+ *array.data(),
+ [&](::std::string_view view) {
+ if (ARROW_PREDICT_TRUE(view.size() > kMaxByteArraySize)) {
+ return Status::Invalid("Parquet cannot store strings with size 2GB
or more");
+ }
+ const ByteArray src{view};
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+ last_value_ = view;
+ suffix_encoder_.Put(&src, 1);
+ prefix_length_encoder_.Put({static_cast<int32_t>(0)}, 1);
+ previous_len = src.len;
+ } else {
+ uint32_t j = 0;
+ while (j < std::min(previous_len, src.len)) {
+ if (last_value_[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = j;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ const uint8_t* suffix_ptr = src.ptr + j;
+ const uint32_t suffix_length = static_cast<uint32_t>(src.len - j);
+ last_value_ =
+ string_view{reinterpret_cast<const char*>(suffix_ptr),
suffix_length};
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+ }
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ }
+
+ ::arrow::BufferBuilder sink_;
+ DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+ DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+ string_view last_value_;
+};
+
+template <>
+inline void DeltaByteArrayEncoder<FLBAType>::Put(const FixedLenByteArray* src,
+ int num_values) {
+ if (descr_->type_length() == 0) {
+ return;
+ }
+ for (int i = 0; i < num_values; ++i) {
+ // Write the result to the output stream
+ DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL";
+ PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length()));
+ }
+}
+
+template <>
+void DeltaByteArrayEncoder<FLBAType>::Put(const ::arrow::Array& values) {
+ if (!::arrow::is_fixed_size_binary(values.type_id())) {
+ throw ParquetException("Only FixedSizeBinaryArray and subclasses
supported");
+ }
+ PutFixedLenByteArray(checked_cast<const
::arrow::FixedSizeBinaryArray&>(values));
+}
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const ::arrow::Array& values) {
+ AssertBaseBinary(values);
+ if (::arrow::is_binary_like(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
+ } else if (::arrow::is_large_binary_like(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
+ }
+}
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+ ArrowPoolVector<int32_t> prefix_lengths(num_values);
+
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
Review Comment:
not predict true here
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3033,11 +3062,226 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+// This is also known as incremental encoding or front compression: for each
element in a
+// sequence of strings, store the prefix length of the previous entry plus the
suffix.
+//
+// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+// followed by the suffixes encoded as delta length byte arrays
(DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool) {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values
* sizeof(T),
+
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ protected:
+ void PutFixedLenByteArray(const ::arrow::FixedSizeBinaryArray& array) {
+ const uint32_t byte_width = array.byte_width();
+ uint32_t previous_len = byte_width;
+
+
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::FixedSizeBinaryType>(
+ *array.data(),
+ [&](::std::string_view view) {
+ const ByteArray src{view};
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+ last_value_ = view;
+ suffix_encoder_.Put(&src, 1);
+ prefix_length_encoder_.Put({static_cast<int32_t>(0)}, 1);
+ previous_len = byte_width;
+ } else {
+ uint32_t j = 0;
+ while (j < previous_len) {
+ if (last_value_[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = j;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ const uint8_t* suffix_ptr = src.ptr + j;
+ const uint32_t suffix_length = static_cast<uint32_t>(byte_width -
j);
+ last_value_ =
+ string_view{reinterpret_cast<const char*>(suffix_ptr),
suffix_length};
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+ }
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ }
+
+ template <typename ArrayType>
+ void PutBinaryArray(const ArrayType& array) {
+ uint32_t previous_len = 0;
+
+ PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename
ArrayType::TypeClass>(
+ *array.data(),
+ [&](::std::string_view view) {
+ if (ARROW_PREDICT_TRUE(view.size() > kMaxByteArraySize)) {
+ return Status::Invalid("Parquet cannot store strings with size 2GB
or more");
+ }
+ const ByteArray src{view};
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+ last_value_ = view;
+ suffix_encoder_.Put(&src, 1);
+ prefix_length_encoder_.Put({static_cast<int32_t>(0)}, 1);
+ previous_len = src.len;
+ } else {
+ uint32_t j = 0;
+ while (j < std::min(previous_len, src.len)) {
+ if (last_value_[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = j;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ const uint8_t* suffix_ptr = src.ptr + j;
+ const uint32_t suffix_length = static_cast<uint32_t>(src.len - j);
+ last_value_ =
+ string_view{reinterpret_cast<const char*>(suffix_ptr),
suffix_length};
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+ }
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ }
+
+ ::arrow::BufferBuilder sink_;
+ DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+ DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+ string_view last_value_;
+};
+
+template <>
+inline void DeltaByteArrayEncoder<FLBAType>::Put(const FixedLenByteArray* src,
+ int num_values) {
+ if (descr_->type_length() == 0) {
+ return;
+ }
+ for (int i = 0; i < num_values; ++i) {
+ // Write the result to the output stream
+ DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL";
+ PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length()));
+ }
+}
+
+template <>
+void DeltaByteArrayEncoder<FLBAType>::Put(const ::arrow::Array& values) {
+ if (!::arrow::is_fixed_size_binary(values.type_id())) {
+ throw ParquetException("Only FixedSizeBinaryArray and subclasses
supported");
+ }
+ PutFixedLenByteArray(checked_cast<const
::arrow::FixedSizeBinaryArray&>(values));
+}
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const ::arrow::Array& values) {
+ AssertBaseBinary(values);
+ if (::arrow::is_binary_like(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
+ } else if (::arrow::is_large_binary_like(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
+ }
+}
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+ ArrowPoolVector<int32_t> prefix_lengths(num_values);
+
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+ last_value_ = string_view{reinterpret_cast<const char*>(src[0].ptr),
src[0].len};
+ suffix_encoder_.Put(&src[0], 1);
+ prefix_lengths[0] = 0;
+ }
+
+ for (int32_t i = 1; i < num_values; i++) {
Review Comment:
a bit confusing here, if `last_value_` is not empty, the first value would
be ignored.
##########
cpp/src/parquet/encoding.cc:
##########
@@ -1275,6 +1275,35 @@ struct ArrowBinaryHelper {
int64_t chunk_space_remaining;
};
+struct ArrowFLBAHelper {
+ explicit ArrowFLBAHelper(::arrow::FixedSizeBinaryBuilder* builder) {
+ this->builder = builder;
+ this->chunk_space_remaining =
+ ::arrow::kBinaryMemoryLimit - this->builder->value_data_length();
+ }
+
+ Status PushChunk() {
+ std::shared_ptr<::arrow::Array> result;
+ RETURN_NOT_OK(builder->Finish(&result));
+ chunks.push_back(result);
Review Comment:
`chunks.push_back(std::move(result));`
##########
cpp/src/parquet/encoding.cc:
##########
@@ -2900,8 +2929,222 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+// This is also known as incremental encoding or front compression: for each
element in a
+// sequence of strings, store the prefix length of the previous entry plus the
suffix.
+//
+// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+// followed by the suffixes encoded as delta length byte arrays
(DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool*
pool)
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY,
+ pool = ::arrow::default_memory_pool()),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool) {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values
* sizeof(T),
+
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ protected:
+ void PutFixedLenByteArray(const ::arrow::FixedSizeBinaryArray& array) {
+ const uint32_t byte_width = array.byte_width();
+
+ // TODO(rok): optimize using ArrowPoolVector<int32_t>
prefix_lengths(num_values);
+
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::FixedSizeBinaryType>(
+ *array.data(),
+ [&](::std::string_view view) {
+ uint32_t previous_len = 0;
+ const ByteArray src{view};
Review Comment:
okay, you can leave a comment for it, I think it's ok but a bit confusing
for code reader, because FLBA will change it to another type here.
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3033,11 +3062,226 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+// This is also known as incremental encoding or front compression: for each
element in a
+// sequence of strings, store the prefix length of the previous entry plus the
suffix.
+//
+// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+// followed by the suffixes encoded as delta length byte arrays
(DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool) {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values
* sizeof(T),
+
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ protected:
+ void PutFixedLenByteArray(const ::arrow::FixedSizeBinaryArray& array) {
+ const uint32_t byte_width = array.byte_width();
+ uint32_t previous_len = byte_width;
+
+
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::FixedSizeBinaryType>(
+ *array.data(),
+ [&](::std::string_view view) {
+ const ByteArray src{view};
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+ last_value_ = view;
+ suffix_encoder_.Put(&src, 1);
+ prefix_length_encoder_.Put({static_cast<int32_t>(0)}, 1);
+ previous_len = byte_width;
+ } else {
+ uint32_t j = 0;
+ while (j < previous_len) {
+ if (last_value_[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = j;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ const uint8_t* suffix_ptr = src.ptr + j;
+ const uint32_t suffix_length = static_cast<uint32_t>(byte_width -
j);
+ last_value_ =
+ string_view{reinterpret_cast<const char*>(suffix_ptr),
suffix_length};
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+ }
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ }
+
+ template <typename ArrayType>
+ void PutBinaryArray(const ArrayType& array) {
+ uint32_t previous_len = 0;
+
+ PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename
ArrayType::TypeClass>(
+ *array.data(),
+ [&](::std::string_view view) {
+ if (ARROW_PREDICT_TRUE(view.size() > kMaxByteArraySize)) {
+ return Status::Invalid("Parquet cannot store strings with size 2GB
or more");
+ }
+ const ByteArray src{view};
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+ last_value_ = view;
+ suffix_encoder_.Put(&src, 1);
+ prefix_length_encoder_.Put({static_cast<int32_t>(0)}, 1);
+ previous_len = src.len;
+ } else {
+ uint32_t j = 0;
+ while (j < std::min(previous_len, src.len)) {
+ if (last_value_[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = j;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ const uint8_t* suffix_ptr = src.ptr + j;
+ const uint32_t suffix_length = static_cast<uint32_t>(src.len - j);
+ last_value_ =
+ string_view{reinterpret_cast<const char*>(suffix_ptr),
suffix_length};
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+ }
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ }
+
+ ::arrow::BufferBuilder sink_;
+ DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+ DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+ string_view last_value_;
+};
+
+template <>
+inline void DeltaByteArrayEncoder<FLBAType>::Put(const FixedLenByteArray* src,
+ int num_values) {
+ if (descr_->type_length() == 0) {
+ return;
+ }
+ for (int i = 0; i < num_values; ++i) {
+ // Write the result to the output stream
+ DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL";
+ PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length()));
+ }
+}
+
+template <>
+void DeltaByteArrayEncoder<FLBAType>::Put(const ::arrow::Array& values) {
+ if (!::arrow::is_fixed_size_binary(values.type_id())) {
+ throw ParquetException("Only FixedSizeBinaryArray and subclasses
supported");
+ }
+ PutFixedLenByteArray(checked_cast<const
::arrow::FixedSizeBinaryArray&>(values));
+}
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const ::arrow::Array& values) {
+ AssertBaseBinary(values);
+ if (::arrow::is_binary_like(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
+ } else if (::arrow::is_large_binary_like(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
+ }
+}
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+ ArrowPoolVector<int32_t> prefix_lengths(num_values);
+
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+ last_value_ = string_view{reinterpret_cast<const char*>(src[0].ptr),
src[0].len};
+ suffix_encoder_.Put(&src[0], 1);
+ prefix_lengths[0] = 0;
+ }
+
+ for (int32_t i = 1; i < num_values; i++) {
+ auto prefix = string_view{reinterpret_cast<const char*>(src[i].ptr),
src[i].len};
+
+ uint32_t j = 0;
+ while (j < std::min(src[i - 1].len, src[i].len)) {
+ if (last_value_[j] != prefix[j]) {
+ break;
+ }
+ j++;
+ }
+
+ prefix_lengths[i] = j;
+ const uint8_t* suffix_ptr = src[i].ptr + j;
+ const uint32_t suffix_length = static_cast<uint32_t>(src[i].len - j);
+ last_value_ = string_view{reinterpret_cast<const char*>(suffix_ptr),
suffix_length};
Review Comment:
ditto
##########
cpp/src/parquet/encoding.cc:
##########
@@ -1268,6 +1268,35 @@ struct ArrowBinaryHelper {
int64_t chunk_space_remaining;
};
+struct ArrowFLBAHelper {
+ explicit ArrowFLBAHelper(::arrow::FixedSizeBinaryBuilder* builder) {
+ this->builder = builder;
+ this->chunk_space_remaining =
+ ::arrow::kBinaryMemoryLimit - this->builder->value_data_length();
+ }
+
+ Status PushChunk() {
+ std::shared_ptr<::arrow::Array> result;
+ RETURN_NOT_OK(builder->Finish(&result));
+ chunks.push_back(result);
+ chunk_space_remaining = ::arrow::kBinaryMemoryLimit;
+ return Status::OK();
+ }
+
+ bool CanFit(int64_t length) const { return length <= chunk_space_remaining; }
+
+ Status Append(const uint8_t* data, int32_t length) {
+ chunk_space_remaining -= length;
Review Comment:
no, I just want a Debug time check here. No checking is ok for me
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3033,11 +3062,226 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+// This is also known as incremental encoding or front compression: for each
element in a
+// sequence of strings, store the prefix length of the previous entry plus the
suffix.
+//
+// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+// followed by the suffixes encoded as delta length byte arrays
(DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool) {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values
* sizeof(T),
+
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ protected:
+ void PutFixedLenByteArray(const ::arrow::FixedSizeBinaryArray& array) {
+ const uint32_t byte_width = array.byte_width();
+ uint32_t previous_len = byte_width;
+
+
PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::FixedSizeBinaryType>(
+ *array.data(),
+ [&](::std::string_view view) {
+ const ByteArray src{view};
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+ last_value_ = view;
+ suffix_encoder_.Put(&src, 1);
+ prefix_length_encoder_.Put({static_cast<int32_t>(0)}, 1);
+ previous_len = byte_width;
+ } else {
+ uint32_t j = 0;
+ while (j < previous_len) {
+ if (last_value_[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = j;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ const uint8_t* suffix_ptr = src.ptr + j;
+ const uint32_t suffix_length = static_cast<uint32_t>(byte_width -
j);
+ last_value_ =
+ string_view{reinterpret_cast<const char*>(suffix_ptr),
suffix_length};
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+ }
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ }
+
+ template <typename ArrayType>
+ void PutBinaryArray(const ArrayType& array) {
+ uint32_t previous_len = 0;
+
+ PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename
ArrayType::TypeClass>(
+ *array.data(),
+ [&](::std::string_view view) {
+ if (ARROW_PREDICT_TRUE(view.size() > kMaxByteArraySize)) {
+ return Status::Invalid("Parquet cannot store strings with size 2GB
or more");
+ }
+ const ByteArray src{view};
+ if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+ last_value_ = view;
+ suffix_encoder_.Put(&src, 1);
+ prefix_length_encoder_.Put({static_cast<int32_t>(0)}, 1);
+ previous_len = src.len;
+ } else {
+ uint32_t j = 0;
+ while (j < std::min(previous_len, src.len)) {
+ if (last_value_[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = j;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ const uint8_t* suffix_ptr = src.ptr + j;
+ const uint32_t suffix_length = static_cast<uint32_t>(src.len - j);
+ last_value_ =
+ string_view{reinterpret_cast<const char*>(suffix_ptr),
suffix_length};
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+ }
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ }
+
+ ::arrow::BufferBuilder sink_;
+ DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+ DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+ string_view last_value_;
+};
+
+template <>
+inline void DeltaByteArrayEncoder<FLBAType>::Put(const FixedLenByteArray* src,
+ int num_values) {
+ if (descr_->type_length() == 0) {
+ return;
+ }
+ for (int i = 0; i < num_values; ++i) {
+ // Write the result to the output stream
+ DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL";
+ PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length()));
+ }
+}
+
+template <>
+void DeltaByteArrayEncoder<FLBAType>::Put(const ::arrow::Array& values) {
+ if (!::arrow::is_fixed_size_binary(values.type_id())) {
+ throw ParquetException("Only FixedSizeBinaryArray and subclasses
supported");
+ }
+ PutFixedLenByteArray(checked_cast<const
::arrow::FixedSizeBinaryArray&>(values));
+}
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const ::arrow::Array& values) {
+ AssertBaseBinary(values);
+ if (::arrow::is_binary_like(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
+ } else if (::arrow::is_large_binary_like(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
+ }
+}
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+ ArrowPoolVector<int32_t> prefix_lengths(num_values);
Review Comment:
```
ArrowPoolVector<int32_t> prefix_lengths(num_values, pool_);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]