pitrou commented on code in PR #14341:
URL: https://github.com/apache/arrow/pull/14341#discussion_r1268152431
##########
cpp/src/parquet/test_util.cc:
##########
@@ -132,5 +132,52 @@ void random_byte_array(int n, uint32_t seed, uint8_t* buf,
ByteArray* out, int m
random_byte_array(n, seed, buf, out, 0, max_size);
}
+void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray*
out,
+ int min_size, int max_size, double
prefixed_probability) {
+ std::default_random_engine gen(seed);
+ std::uniform_int_distribution<int> d1(min_size, max_size);
+ std::uniform_int_distribution<int> d2(0, 255);
+ std::uniform_real_distribution<double> d3(0, 1);
Review Comment:
Can you give those meaningful names?
```suggestion
std::uniform_int_distribution<int> dist_size(min_size, max_size);
std::uniform_int_distribution<int> dist_byte(0, 255);
std::uniform_real_distribution<double> dist_has_prefix(0, 1);
```
##########
cpp/src/parquet/test_util.h:
##########
@@ -783,8 +789,39 @@ inline void GenerateData<ByteArray>(int num_values,
ByteArray* out,
random_byte_array(num_values, 0, heap->data(), out, 2, max_byte_array_len);
}
+template <typename T>
+inline void GeneratePrefixedData(int num_values, T* out, std::vector<uint8_t>*
heap,
+ double prefixed_probability) {
+ // seed the prng so failure is deterministic
Review Comment:
You don't have to define the generic version since there's a specialized one
below. Just declare it:
```c++
template <typename T>
void GeneratePrefixedData(int num_values, T* out, std::vector<uint8_t>* heap,
double prefixed_probability);
```
##########
cpp/src/parquet/test_util.cc:
##########
@@ -132,5 +132,52 @@ void random_byte_array(int n, uint32_t seed, uint8_t* buf,
ByteArray* out, int m
random_byte_array(n, seed, buf, out, 0, max_size);
}
+void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray*
out,
+ int min_size, int max_size, double
prefixed_probability) {
+ std::default_random_engine gen(seed);
+ std::uniform_int_distribution<int> d1(min_size, max_size);
+ std::uniform_int_distribution<int> d2(0, 255);
+ std::uniform_real_distribution<double> d3(0, 1);
+
+ for (int i = 0; i < n; ++i) {
+ int len = d1(gen);
+ out[i].len = len;
+ out[i].ptr = buf;
+
+ bool do_prefix = d3(gen) < prefixed_probability && i > 0;
+ std::uniform_int_distribution<int> d4(min_size, len);
Review Comment:
Instead of creating a new distribution in each pass, you could generate a
fraction of the length:
```c++
std::uniform_real_distribution<double> dist_prefix_length(0, 1);
...
bool do_prefix = dist_has_prefix(gen) && i > 0;
int prefix_len = do_prefix ? static_cast<int>(std::ceil(len *
dist_prefix_length(gen))) : 0;
```
##########
cpp/src/parquet/encoding.cc:
##########
@@ -1238,43 +1240,62 @@ int PlainBooleanDecoder::Decode(bool* buffer, int
max_values) {
return max_values;
}
-struct ArrowBinaryHelper {
- explicit ArrowBinaryHelper(typename
EncodingTraits<ByteArrayType>::Accumulator* out) {
- this->out = out;
- this->builder = out->builder.get();
- this->chunk_space_remaining =
- ::arrow::kBinaryMemoryLimit - this->builder->value_data_length();
+template <typename DType, typename Enable = void>
+struct ArrowBinaryHelper;
+
+template <typename DType>
+struct ArrowBinaryHelper<DType, std::enable_if_t<std::is_same_v<DType,
ByteArrayType> ||
+ std::is_same_v<DType,
FLBAType>,
+ void>> {
+ explicit ArrowBinaryHelper(typename EncodingTraits<DType>::Accumulator* acc)
{
+ builder = acc->builder.get();
+ chunks = &acc->chunks;
+ if (ARROW_PREDICT_FALSE(SubtractWithOverflow(::arrow::kBinaryMemoryLimit,
+ builder->value_data_length(),
+ &chunk_space_remaining))) {
+ throw ParquetException("excess expansion in ArrowBinaryHelper<DType>");
+ }
}
Status PushChunk() {
std::shared_ptr<::arrow::Array> result;
RETURN_NOT_OK(builder->Finish(&result));
- out->chunks.push_back(result);
+ chunks->push_back(std::move(result));
chunk_space_remaining = ::arrow::kBinaryMemoryLimit;
return Status::OK();
}
bool CanFit(int64_t length) const { return length <= chunk_space_remaining; }
void UnsafeAppend(const uint8_t* data, int32_t length) {
+ DCHECK(CanFit(length));
chunk_space_remaining -= length;
builder->UnsafeAppend(data, length);
}
void UnsafeAppendNull() { builder->UnsafeAppendNull(); }
- Status Append(const uint8_t* data, int32_t length) {
- chunk_space_remaining -= length;
- return builder->Append(data, length);
- }
+ virtual Status Append(const uint8_t* data, int32_t length);
Review Comment:
This does not need to be `virtual` since you're not subclassing this class
anywhere, right?
##########
cpp/src/parquet/encoding.cc:
##########
@@ -1238,43 +1240,62 @@ int PlainBooleanDecoder::Decode(bool* buffer, int
max_values) {
return max_values;
}
-struct ArrowBinaryHelper {
- explicit ArrowBinaryHelper(typename
EncodingTraits<ByteArrayType>::Accumulator* out) {
- this->out = out;
- this->builder = out->builder.get();
- this->chunk_space_remaining =
- ::arrow::kBinaryMemoryLimit - this->builder->value_data_length();
+template <typename DType, typename Enable = void>
+struct ArrowBinaryHelper;
+
+template <typename DType>
+struct ArrowBinaryHelper<DType, std::enable_if_t<std::is_same_v<DType,
ByteArrayType> ||
+ std::is_same_v<DType,
FLBAType>,
+ void>> {
+ explicit ArrowBinaryHelper(typename EncodingTraits<DType>::Accumulator* acc)
{
+ builder = acc->builder.get();
+ chunks = &acc->chunks;
+ if (ARROW_PREDICT_FALSE(SubtractWithOverflow(::arrow::kBinaryMemoryLimit,
+ builder->value_data_length(),
+ &chunk_space_remaining))) {
+ throw ParquetException("excess expansion in ArrowBinaryHelper<DType>");
+ }
}
Status PushChunk() {
std::shared_ptr<::arrow::Array> result;
RETURN_NOT_OK(builder->Finish(&result));
- out->chunks.push_back(result);
+ chunks->push_back(std::move(result));
chunk_space_remaining = ::arrow::kBinaryMemoryLimit;
return Status::OK();
}
bool CanFit(int64_t length) const { return length <= chunk_space_remaining; }
void UnsafeAppend(const uint8_t* data, int32_t length) {
+ DCHECK(CanFit(length));
Review Comment:
Why is `UnsafeAppend` non-templated but `Append` is templated?
Also, why does `UnsafeAppend` have a `DCHECK(CanFit(length))` but not
`Append`?
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,11 +3073,179 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front
compression:
+/// for each element in a sequence of strings, store the prefix length of the
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool),
+ last_value_("") {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values
* sizeof(T),
Review Comment:
It could be deferred to another PR, though, assuming you open an issue about
it. Also, it applies to many encoders, not just this one.
##########
cpp/src/parquet/test_util.cc:
##########
@@ -132,5 +132,52 @@ void random_byte_array(int n, uint32_t seed, uint8_t* buf,
ByteArray* out, int m
random_byte_array(n, seed, buf, out, 0, max_size);
}
+void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray*
out,
+ int min_size, int max_size, double
prefixed_probability) {
+ std::default_random_engine gen(seed);
+ std::uniform_int_distribution<int> d1(min_size, max_size);
+ std::uniform_int_distribution<int> d2(0, 255);
+ std::uniform_real_distribution<double> d3(0, 1);
Review Comment:
Also use
`https://en.cppreference.com/w/cpp/numeric/random/bernoulli_distribution` for
`dist_has_prefix`
##########
cpp/src/parquet/test_util.cc:
##########
@@ -132,5 +132,52 @@ void random_byte_array(int n, uint32_t seed, uint8_t* buf,
ByteArray* out, int m
random_byte_array(n, seed, buf, out, 0, max_size);
}
+void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray*
out,
+ int min_size, int max_size, double
prefixed_probability) {
+ std::default_random_engine gen(seed);
+ std::uniform_int_distribution<int> d1(min_size, max_size);
+ std::uniform_int_distribution<int> d2(0, 255);
+ std::uniform_real_distribution<double> d3(0, 1);
Review Comment:
Oh, and we should probably use the compatibility versions from
`arrow/testing/uniform_real.h`.
##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1908,4 +1907,304 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut)
{
CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(),
values));
}
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+ using c_type = typename Type::c_type;
+ static constexpr int TYPE = Type::type_num;
+
+ void InitData(int nvalues, int repeats, double prefixed_probability) {
+ num_values_ = nvalues * repeats;
+ input_bytes_.resize(num_values_ * sizeof(c_type));
+ output_bytes_.resize(num_values_ * sizeof(c_type));
+ draws_ = reinterpret_cast<c_type*>(input_bytes_.data());
+ decode_buf_ = reinterpret_cast<c_type*>(output_bytes_.data());
+ GeneratePrefixedData<c_type>(nvalues, draws_, &data_buffer_,
prefixed_probability);
+
+ // add some repeated values
+ for (int j = 1; j < repeats; ++j) {
+ for (int i = 0; i < nvalues; ++i) {
+ draws_[nvalues * j + i] = draws_[i];
+ }
+ }
+ }
+
+ void Execute(int nvalues, int repeats, double prefixed_probability) {
+ InitData(nvalues, repeats, prefixed_probability);
+ CheckRoundtrip();
+ }
+
+ void ExecuteSpaced(int nvalues, int repeats, int64_t valid_bits_offset,
Review Comment:
It's looking like you're mostly copy-pasting code from another test class
here. You could just hard-code `prefixed_probability` to a reasonable value, or
arrange to vary it implicitly between calls to `InitData`.
##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1908,4 +1907,304 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut)
{
CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(),
values));
}
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+ using c_type = typename Type::c_type;
+ static constexpr int TYPE = Type::type_num;
+
+ void InitData(int nvalues, int repeats, double prefixed_probability) {
+ num_values_ = nvalues * repeats;
+ input_bytes_.resize(num_values_ * sizeof(c_type));
+ output_bytes_.resize(num_values_ * sizeof(c_type));
+ draws_ = reinterpret_cast<c_type*>(input_bytes_.data());
+ decode_buf_ = reinterpret_cast<c_type*>(output_bytes_.data());
+ GeneratePrefixedData<c_type>(nvalues, draws_, &data_buffer_,
prefixed_probability);
+
+ // add some repeated values
+ for (int j = 1; j < repeats; ++j) {
+ for (int i = 0; i < nvalues; ++i) {
+ draws_[nvalues * j + i] = draws_[i];
+ }
+ }
+ }
+
+ void Execute(int nvalues, int repeats, double prefixed_probability) {
+ InitData(nvalues, repeats, prefixed_probability);
+ CheckRoundtrip();
+ }
+
+ void ExecuteSpaced(int nvalues, int repeats, int64_t valid_bits_offset,
+ double null_probability, double prefixed_probability) {
+ InitData(nvalues, repeats, prefixed_probability);
+
+ int64_t size = num_values_ + valid_bits_offset;
+ auto rand = ::arrow::random::RandomArrayGenerator(1923);
+ const auto array = rand.UInt8(size, /*min=*/0, /*max=*/100,
null_probability);
+ const auto valid_bits = array->null_bitmap_data();
+ if (valid_bits) {
+ CheckRoundtripSpaced(valid_bits, valid_bits_offset);
+ }
+ }
+
+ void CheckRoundtrip() override {
Review Comment:
Similarly, it looks like this is exactly the same code as the one for
testing `DELTA_LENGTH_BYTE_ARRAY`.
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,11 +3073,179 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front
compression:
+/// for each element in a sequence of strings, store the prefix length of the
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool),
+ last_value_("") {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values
* sizeof(T),
Review Comment:
Did you actually address this comment @rok?
##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1908,4 +1907,304 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut)
{
CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(),
values));
}
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+ using c_type = typename Type::c_type;
+ static constexpr int TYPE = Type::type_num;
+
+ void InitData(int nvalues, int repeats, double prefixed_probability) {
+ num_values_ = nvalues * repeats;
+ input_bytes_.resize(num_values_ * sizeof(c_type));
+ output_bytes_.resize(num_values_ * sizeof(c_type));
+ draws_ = reinterpret_cast<c_type*>(input_bytes_.data());
+ decode_buf_ = reinterpret_cast<c_type*>(output_bytes_.data());
+ GeneratePrefixedData<c_type>(nvalues, draws_, &data_buffer_,
prefixed_probability);
+
+ // add some repeated values
+ for (int j = 1; j < repeats; ++j) {
+ for (int i = 0; i < nvalues; ++i) {
+ draws_[nvalues * j + i] = draws_[i];
+ }
+ }
+ }
+
+ void Execute(int nvalues, int repeats, double prefixed_probability) {
+ InitData(nvalues, repeats, prefixed_probability);
+ CheckRoundtrip();
+ }
+
+ void ExecuteSpaced(int nvalues, int repeats, int64_t valid_bits_offset,
+ double null_probability, double prefixed_probability) {
+ InitData(nvalues, repeats, prefixed_probability);
+
+ int64_t size = num_values_ + valid_bits_offset;
+ auto rand = ::arrow::random::RandomArrayGenerator(1923);
+ const auto array = rand.UInt8(size, /*min=*/0, /*max=*/100,
null_probability);
+ const auto valid_bits = array->null_bitmap_data();
+ if (valid_bits) {
+ CheckRoundtripSpaced(valid_bits, valid_bits_offset);
+ }
+ }
+
+ void CheckRoundtrip() override {
+ auto encoder = MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY,
+ /*use_dictionary=*/false,
descr_.get());
+ auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY,
descr_.get());
+
+ encoder->Put(draws_, num_values_);
+ encode_buffer_ = encoder->FlushValues();
+
+ decoder->SetData(num_values_, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ int values_decoded = decoder->Decode(decode_buf_, num_values_);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_,
num_values_));
+ }
+
+ void CheckRoundtripSpaced(const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ auto encoder = MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY,
+ /*use_dictionary=*/false,
descr_.get());
+ auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY,
descr_.get());
+ int null_count = 0;
+ for (auto i = 0; i < num_values_; i++) {
+ if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+ null_count++;
+ }
+ }
+
+ encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+ encode_buffer_ = encoder->FlushValues();
+ decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_,
null_count,
+ valid_bits, valid_bits_offset);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_,
num_values_,
+ valid_bits,
valid_bits_offset));
+ }
+
+ protected:
+ USING_BASE_MEMBERS();
+ std::vector<uint8_t> input_bytes_;
+ std::vector<uint8_t> output_bytes_;
+};
+
+using TestDeltaByteArrayEncodingTypes = ::testing::Types<ByteArrayType,
FLBAType>;
+TYPED_TEST_SUITE(TestDeltaByteArrayEncoding, TestDeltaByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) {
+ ASSERT_NO_FATAL_FAILURE(this->Execute(0, /*repeats=*/0,
/*prefixed_probability=*/0.1));
+ ASSERT_NO_FATAL_FAILURE(this->Execute(250, 5, 0.2));
+ ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 1, 0.3));
+ ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+ /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
/*null_probability*/
+ 0, 0.4));
+ ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+ /*nvalues*/ 1234, /*repeats*/ 10, /*valid_bits_offset*/ 64,
+ /*null_probability*/ 0.5, 0.5));
+}
+
+template <typename Type>
+class DeltaByteArrayEncodingDirectPut : public TestEncodingBase<Type> {
+ public:
+ std::unique_ptr<TypedEncoder<Type>> encoder =
+ MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY);
+ std::unique_ptr<TypedDecoder<Type>> decoder =
+ MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY);
+
+ void CheckDirectPut(std::shared_ptr<::arrow::Array> array) {
+ ASSERT_NO_THROW(encoder->Put(*array));
+ auto buf = encoder->FlushValues();
+
+ int num_values = static_cast<int>(array->length() - array->null_count());
+ decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+ typename EncodingTraits<Type>::Accumulator acc;
+ using BuilderType = typename EncodingTraits<Type>::BuilderType;
+ acc.builder = std::make_unique<BuilderType>(array->type(),
default_memory_pool());
+
+ ASSERT_EQ(num_values,
+ decoder->DecodeArrow(static_cast<int>(array->length()),
+ static_cast<int>(array->null_count()),
+ array->null_bitmap_data(), array->offset(),
&acc));
+
+ std::shared_ptr<::arrow::Array> result;
+ ASSERT_OK(acc.builder->Finish(&result));
+ ASSERT_EQ(array->length(), result->length());
+ ASSERT_OK(result->ValidateFull());
+
+ ::arrow::AssertArraysEqual(*array, *result);
+ }
+
+ void CheckRoundtripFLBA() {
+ constexpr int64_t kSize = 50;
+ constexpr int kSeed = 42;
+ constexpr int kByteWidth = 4;
+ ::arrow::random::RandomArrayGenerator rag{kSeed};
+ std::shared_ptr<::arrow::Array> values =
+ rag.FixedSizeBinary(/*size=*/0, /*byte_width=*/kByteWidth);
+ CheckDirectPut(values);
+
+ for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
+ rag = ::arrow::random::RandomArrayGenerator(seed);
+ values = rag.FixedSizeBinary(kSize + seed, kByteWidth);
+ CheckDirectPut(values);
+ }
+ }
+
+ void CheckRoundtripByteArray() {
+ constexpr int64_t kSize = 500;
+ constexpr int32_t kMinLength = 0;
+ constexpr int32_t kMaxLength = 10;
+ constexpr int32_t kNumUnique = 10;
+ constexpr double kNullProbability = 0.25;
+ constexpr int kSeed = 42;
+ ::arrow::random::RandomArrayGenerator rag{kSeed};
+ std::shared_ptr<::arrow::Array> values = rag.BinaryWithRepeats(
+ /*size=*/1, /*unique=*/1, kMinLength, kMaxLength, kNullProbability);
+ CheckDirectPut(values);
+
+ for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
+ rag = ::arrow::random::RandomArrayGenerator(seed);
+ values = rag.BinaryWithRepeats(kSize, kNumUnique, kMinLength, kMaxLength,
+ kNullProbability);
+ CheckDirectPut(values);
+ }
+ }
+
+ void CheckRoundtrip() override {
+ using ArrowType = typename EncodingTraits<Type>::ArrowType;
+ using IsFixedSizeBinary = ::arrow::is_fixed_size_binary_type<ArrowType>;
+
+ if constexpr (IsFixedSizeBinary::value) {
+ CheckRoundtripFLBA();
+ } else {
+ CheckRoundtripByteArray();
+ }
+ }
+
+ protected:
+ USING_BASE_MEMBERS();
+};
+
+TYPED_TEST_SUITE(DeltaByteArrayEncodingDirectPut,
TestDeltaByteArrayEncodingTypes);
+
+TYPED_TEST(DeltaByteArrayEncodingDirectPut, DirectPut) {
+ ASSERT_NO_FATAL_FAILURE(this->CheckRoundtrip());
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) {
+ auto CheckEncode = [](const std::shared_ptr<::arrow::Array>& values,
+ const std::shared_ptr<Buffer>& encoded) {
+ auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+ ASSERT_NO_THROW(encoder->Put(*values));
+ auto buf = encoder->FlushValues();
+ ASSERT_TRUE(encoded->Equals(*buf));
+ };
+
+ auto ArrayToInt32Vector = [](const std::shared_ptr<::arrow::Array>& lengths)
{
+ std::vector<int32_t> arrays;
Review Comment:
Why the name?
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3033,11 +3064,269 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+// This is also known as incremental encoding or front compression: for each
element in a
+// sequence of strings, store the prefix length of the previous entry plus the
suffix.
+//
+// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+// followed by the suffixes encoded as delta length byte arrays
(DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool),
+ last_value_("") {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
Review Comment:
The change has not been made apparently.
##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1908,4 +1907,304 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut)
{
CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(),
values));
}
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+ using c_type = typename Type::c_type;
+ static constexpr int TYPE = Type::type_num;
+
+ void InitData(int nvalues, int repeats, double prefixed_probability) {
+ num_values_ = nvalues * repeats;
+ input_bytes_.resize(num_values_ * sizeof(c_type));
+ output_bytes_.resize(num_values_ * sizeof(c_type));
+ draws_ = reinterpret_cast<c_type*>(input_bytes_.data());
+ decode_buf_ = reinterpret_cast<c_type*>(output_bytes_.data());
+ GeneratePrefixedData<c_type>(nvalues, draws_, &data_buffer_,
prefixed_probability);
+
+ // add some repeated values
+ for (int j = 1; j < repeats; ++j) {
+ for (int i = 0; i < nvalues; ++i) {
+ draws_[nvalues * j + i] = draws_[i];
+ }
+ }
+ }
+
+ void Execute(int nvalues, int repeats, double prefixed_probability) {
+ InitData(nvalues, repeats, prefixed_probability);
+ CheckRoundtrip();
+ }
+
+ void ExecuteSpaced(int nvalues, int repeats, int64_t valid_bits_offset,
+ double null_probability, double prefixed_probability) {
+ InitData(nvalues, repeats, prefixed_probability);
+
+ int64_t size = num_values_ + valid_bits_offset;
+ auto rand = ::arrow::random::RandomArrayGenerator(1923);
+ const auto array = rand.UInt8(size, /*min=*/0, /*max=*/100,
null_probability);
+ const auto valid_bits = array->null_bitmap_data();
+ if (valid_bits) {
+ CheckRoundtripSpaced(valid_bits, valid_bits_offset);
+ }
+ }
+
+ void CheckRoundtrip() override {
+ auto encoder = MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY,
+ /*use_dictionary=*/false,
descr_.get());
+ auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY,
descr_.get());
+
+ encoder->Put(draws_, num_values_);
+ encode_buffer_ = encoder->FlushValues();
+
+ decoder->SetData(num_values_, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ int values_decoded = decoder->Decode(decode_buf_, num_values_);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_,
num_values_));
+ }
+
+ void CheckRoundtripSpaced(const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ auto encoder = MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY,
+ /*use_dictionary=*/false,
descr_.get());
+ auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY,
descr_.get());
+ int null_count = 0;
+ for (auto i = 0; i < num_values_; i++) {
+ if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+ null_count++;
+ }
+ }
+
+ encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+ encode_buffer_ = encoder->FlushValues();
+ decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_,
null_count,
+ valid_bits, valid_bits_offset);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_,
num_values_,
+ valid_bits,
valid_bits_offset));
+ }
+
+ protected:
+ USING_BASE_MEMBERS();
+ std::vector<uint8_t> input_bytes_;
+ std::vector<uint8_t> output_bytes_;
+};
+
+using TestDeltaByteArrayEncodingTypes = ::testing::Types<ByteArrayType,
FLBAType>;
+TYPED_TEST_SUITE(TestDeltaByteArrayEncoding, TestDeltaByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) {
+ ASSERT_NO_FATAL_FAILURE(this->Execute(0, /*repeats=*/0,
/*prefixed_probability=*/0.1));
+ ASSERT_NO_FATAL_FAILURE(this->Execute(250, 5, 0.2));
+ ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 1, 0.3));
+ ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+ /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
/*null_probability*/
+ 0, 0.4));
+ ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+ /*nvalues*/ 1234, /*repeats*/ 10, /*valid_bits_offset*/ 64,
+ /*null_probability*/ 0.5, 0.5));
+}
+
+template <typename Type>
+class DeltaByteArrayEncodingDirectPut : public TestEncodingBase<Type> {
+ public:
+ std::unique_ptr<TypedEncoder<Type>> encoder =
+ MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY);
+ std::unique_ptr<TypedDecoder<Type>> decoder =
+ MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY);
+
+ void CheckDirectPut(std::shared_ptr<::arrow::Array> array) {
+ ASSERT_NO_THROW(encoder->Put(*array));
+ auto buf = encoder->FlushValues();
+
+ int num_values = static_cast<int>(array->length() - array->null_count());
+ decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+ typename EncodingTraits<Type>::Accumulator acc;
+ using BuilderType = typename EncodingTraits<Type>::BuilderType;
+ acc.builder = std::make_unique<BuilderType>(array->type(),
default_memory_pool());
+
+ ASSERT_EQ(num_values,
+ decoder->DecodeArrow(static_cast<int>(array->length()),
+ static_cast<int>(array->null_count()),
+ array->null_bitmap_data(), array->offset(),
&acc));
+
+ std::shared_ptr<::arrow::Array> result;
+ ASSERT_OK(acc.builder->Finish(&result));
+ ASSERT_EQ(array->length(), result->length());
+ ASSERT_OK(result->ValidateFull());
+
+ ::arrow::AssertArraysEqual(*array, *result);
+ }
+
+ void CheckRoundtripFLBA() {
+ constexpr int64_t kSize = 50;
+ constexpr int kSeed = 42;
+ constexpr int kByteWidth = 4;
+ ::arrow::random::RandomArrayGenerator rag{kSeed};
+ std::shared_ptr<::arrow::Array> values =
+ rag.FixedSizeBinary(/*size=*/0, /*byte_width=*/kByteWidth);
+ CheckDirectPut(values);
+
+ for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
+ rag = ::arrow::random::RandomArrayGenerator(seed);
+ values = rag.FixedSizeBinary(kSize + seed, kByteWidth);
+ CheckDirectPut(values);
+ }
Review Comment:
You don't have to recreate a new `RandomArrayGenerator` every time. Just
reuse the one created above. It will produce different results everytime.
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,12 +3058,237 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front
compression:
+/// for each element in a sequence of strings, store the prefix length of the
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
+ static constexpr std::string_view kEmpty = "";
+
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ // Prefix lengths are encoded using DeltaBitPackEncoder that can be
left
+ // uninitialized.
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(descr, pool),
+ last_value_(""),
+ empty_(static_cast<uint32_t>(kEmpty.size()),
Review Comment:
Idea: if you make the `ByteArray` constructors `constexpr`, you might be
able to write:
```c++
static constexpr ByteArray kEmpty = std::string_view("");
```
##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,12 +3083,230 @@ class RleBooleanDecoder : public DecoderImpl, virtual
public BooleanDecoder {
// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY
-class DeltaByteArrayDecoder : public DecoderImpl,
- virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front
compression:
+/// for each element in a sequence of strings, store the prefix length of the
previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public
TypedEncoder<DType> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ using T = typename DType::c_type;
+
+ explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
MemoryPool* pool =
::arrow::default_memory_pool())
+ : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+ sink_(pool),
+ prefix_length_encoder_(nullptr, pool),
+ suffix_encoder_(nullptr, pool),
+ last_value_(""),
+ kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {}
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override {
+ return prefix_length_encoder_.EstimatedDataEncodedSize() +
+ suffix_encoder_.EstimatedDataEncodedSize();
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values
* sizeof(T),
+
this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ protected:
+ template <typename VisitorType>
+ void PutInternal(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+ uint32_t len = descr_->type_length();
+
+ std::string_view last_value_view = last_value_;
+ constexpr int kBatchSize = 256;
+ std::array<int32_t, kBatchSize> prefix_lengths;
+ std::array<ByteArray, kBatchSize> suffixes;
+ auto visitor = VisitorType{src, len};
+
+ for (int i = 0; i < num_values; i += kBatchSize) {
+ const int batch_size = std::min(kBatchSize, num_values - i);
+
+ for (int j = 0; j < batch_size; ++j) {
+ auto view = visitor[i + j];
+ len = visitor.len(i + j);
+
+ uint32_t k = 0;
+ const uint32_t common_length =
+ std::min(len, static_cast<uint32_t>(last_value_view.length()));
+ while (k < common_length) {
+ if (last_value_view[k] != view[k]) {
+ break;
+ }
+ k++;
+ }
+
+ last_value_view = view;
+ prefix_lengths[j] = k;
+ const uint32_t suffix_length = len - k;
+ const uint8_t* suffix_ptr = src[i + j].ptr + k;
+
+ // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffixes[j] = suffix;
+ }
+ suffix_encoder_.Put(suffixes.data(), batch_size);
+ prefix_length_encoder_.Put(prefix_lengths.data(), batch_size);
+ }
+ last_value_ = last_value_view;
+ }
+
+ template <typename ArrayType>
+ void PutBinaryArray(const ArrayType& array) {
+ auto previous_len = static_cast<uint32_t>(last_value_.length());
+ std::string_view last_value_view = last_value_;
+
+ PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename
ArrayType::TypeClass>(
+ *array.data(),
+ [&](::std::string_view view) {
+ if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+ return Status::Invalid("Parquet cannot store strings with size 2GB
or more");
+ }
+ // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+ const ByteArray src{view};
+
+ uint32_t j = 0;
+ const uint32_t len = src.len;
+ const uint32_t common_length = std::min(previous_len, len);
+ while (j < common_length) {
+ if (last_value_view[j] != view[j]) {
+ break;
+ }
+ j++;
+ }
+ previous_len = len;
+ prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+ last_value_view = view;
+ const auto suffix_length = static_cast<uint32_t>(len - j);
+ if (suffix_length == 0) {
+ suffix_encoder_.Put(&kEmpty, 1);
+ return Status::OK();
+ }
+ const uint8_t* suffix_ptr = src.ptr + j;
+ // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+ const ByteArray suffix(suffix_length, suffix_ptr);
+ suffix_encoder_.Put(&suffix, 1);
+
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ last_value_ = last_value_view;
+ }
+
+ ::arrow::BufferBuilder sink_;
+ DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+ DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+ std::string last_value_;
+ const ByteArray kEmpty;
+};
+
+struct ByteArrayVisitor {
+ const ByteArray* src;
+ const uint32_t type_length;
Review Comment:
Or, you know, instead of having:
```c++
template <typename VisitorType>
void PutInternal(const T* src, int num_values) {
...
auto visitor = VisitorType{src, flba_len};
```
just pass the visitor instance so that you don't have to match the
constructor signatures:
```c++
template <typename VisitorType>
void PutInternal(const T* src, int num_values, VisitorType* visitor) {
```
##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -874,7 +874,7 @@ std::shared_ptr<::arrow::Array>
EncodingAdHocTyped<FLBAType>::GetValues(int seed
}
using EncodingAdHocTypedCases =
- ::testing::Types<BooleanType, Int32Type, Int64Type, FloatType, DoubleType,
FLBAType>;
+ ::testing::Types<BooleanType, Int32Type, Int64Type, FloatType, DoubleType>;
Review Comment:
Hmm... does this mean FLBA handling is broken for some existing encodings?
It's not clear what the consequences are here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]