pitrou commented on a change in pull request #10978:
URL: https://github.com/apache/arrow/pull/10978#discussion_r703335755
##########
File path: cpp/src/arrow/util/bit_stream_utils.h
##########
@@ -423,6 +427,25 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) {
return true;
}
+inline bool BitReader::Advance(int64_t num_bits) {
+ int64_t bits_required = bit_offset_ + num_bits;
+ int64_t bytes_required = BitUtil::BytesForBits(bits_required);
+ if (ARROW_PREDICT_FALSE(bytes_required > max_bytes_ - byte_offset_)) {
+ return false;
+ }
+ byte_offset_ += static_cast<int>(bits_required >> 3);
+ bit_offset_ = static_cast<int>(bits_required & 7);
+ // Reset buffered_values_
Review comment:
A similar snippet already appears 3 times (see `GetValue_`,
`GetAligned`, `GetBatch`). Perhaps factor it out as a private method?
##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -4183,10 +4183,48 @@ TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) {
::arrow::AssertTablesEqual(*table, *expect_table);
}
+
+TEST(TestArrowReadDeltaEncoding, DeltaByteArray) {
+ auto file = test::get_data_file("delta_byte_array.parquet");
+ auto expect_file = test::get_data_file("delta_byte_array_expect.csv");
+ auto pool = ::arrow::default_memory_pool();
+ std::unique_ptr<FileReader> parquet_reader;
+ std::shared_ptr<::arrow::Table> parquet_table;
+ ASSERT_OK(
+ FileReader::Make(pool, ParquetFileReader::OpenFile(file, false),
&parquet_reader));
+ ASSERT_OK(parquet_reader->ReadTable(&parquet_table));
+ ASSERT_OK_AND_ASSIGN(auto actural_table, parquet_table->CombineChunks());
Review comment:
Typo: "actural"
##########
File path: cpp/src/parquet/encoding.cc
##########
@@ -2226,30 +2248,69 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
MemoryPool* pool =
::arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
len_decoder_(nullptr, pool),
- pool_(pool) {}
+ buffered_length_(AllocateBuffer(pool, 0)),
+ buffered_data_(AllocateBuffer(pool, 0)) {}
void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
if (len == 0) return;
- int total_lengths_len = ::arrow::util::SafeLoadAs<int32_t>(data);
- data += 4;
- this->len_decoder_.SetData(num_values, data, total_lengths_len);
- data_ = data + total_lengths_len;
- this->len_ = len - 4 - total_lengths_len;
+
+ decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len);
+ len_decoder_.SetDecoder(num_values, decoder_);
+
+ int num_length = len_decoder_.ValidValuesCount();
+ PARQUET_THROW_NOT_OK(buffered_length_->Resize(num_length *
sizeof(int32_t)));
+
+ int ret = len_decoder_.Decode(
+ reinterpret_cast<int32_t*>(buffered_length_->mutable_data()),
num_length);
+ DCHECK_EQ(ret, num_length);
+ length_idx_ = 0;
+ num_valid_values_ = num_length;
+ }
+
+ void SetData(int num_values, std::shared_ptr<::arrow::BitUtil::BitReader>
decoder) {
+ num_values_ = num_values;
+ decoder_ = decoder;
+
+ len_decoder_.SetDecoder(num_values, decoder_);
+
+ int num_length = len_decoder_.ValidValuesCount();
+ PARQUET_THROW_NOT_OK(buffered_length_->Resize(num_length *
sizeof(int32_t)));
+
+ int ret = len_decoder_.Decode(
+ reinterpret_cast<int32_t*>(buffered_length_->mutable_data()),
num_length);
+ DCHECK_EQ(ret, num_length);
+ length_idx_ = 0;
+ num_valid_values_ = num_length;
}
int Decode(ByteArray* buffer, int max_values) override {
- using VectorT = ArrowPoolVector<int>;
- max_values = std::min(max_values, num_values_);
- VectorT lengths(max_values, 0, ::arrow::stl::allocator<int>(pool_));
- len_decoder_.Decode(lengths.data(), max_values);
+ max_values = std::min(max_values, num_valid_values_);
+
+ int64_t data_size = 0;
+ const int32_t* length_ptr =
+ reinterpret_cast<const int32_t*>(buffered_length_->data()) +
length_idx_;
for (int i = 0; i < max_values; ++i) {
- buffer[i].len = lengths[i];
- buffer[i].ptr = data_;
- this->data_ += lengths[i];
- this->len_ -= lengths[i];
+ int32_t len = length_ptr[i];
+ buffer[i].len = len;
+ data_size += len;
+ }
+ length_idx_ += max_values;
+
+ PARQUET_THROW_NOT_OK(buffered_data_->Resize(data_size));
+ if (decoder_->GetBatch(8, buffered_data_->mutable_data(),
+ static_cast<int>(data_size)) !=
static_cast<int>(data_size)) {
Review comment:
It seems a bit weird to call both `len_decoder_` and `decoder_` methods.
More fundamentally, what happens if the user calls `Decode` incrementally
(i.e. with `max_values < num_values_values_`)? It seems you should compute the
position of the byte array data in `SetData` instead of assuming the `decoder_`
is at the end of the encoded length data here.
##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -4183,10 +4183,48 @@ TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) {
::arrow::AssertTablesEqual(*table, *expect_table);
}
+
+TEST(TestArrowReadDeltaEncoding, DeltaByteArray) {
+ auto file = test::get_data_file("delta_byte_array.parquet");
+ auto expect_file = test::get_data_file("delta_byte_array_expect.csv");
+ auto pool = ::arrow::default_memory_pool();
+ std::unique_ptr<FileReader> parquet_reader;
+ std::shared_ptr<::arrow::Table> parquet_table;
+ ASSERT_OK(
+ FileReader::Make(pool, ParquetFileReader::OpenFile(file, false),
&parquet_reader));
+ ASSERT_OK(parquet_reader->ReadTable(&parquet_table));
+ ASSERT_OK_AND_ASSIGN(auto actural_table, parquet_table->CombineChunks());
+
Review comment:
Can you sanity-check the result?
`ASSERT_OK(actual_table->ValidateFull())`
##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -4183,10 +4183,48 @@ TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) {
::arrow::AssertTablesEqual(*table, *expect_table);
}
+
+TEST(TestArrowReadDeltaEncoding, DeltaByteArray) {
+ auto file = test::get_data_file("delta_byte_array.parquet");
+ auto expect_file = test::get_data_file("delta_byte_array_expect.csv");
+ auto pool = ::arrow::default_memory_pool();
+ std::unique_ptr<FileReader> parquet_reader;
+ std::shared_ptr<::arrow::Table> parquet_table;
+ ASSERT_OK(
+ FileReader::Make(pool, ParquetFileReader::OpenFile(file, false),
&parquet_reader));
+ ASSERT_OK(parquet_reader->ReadTable(&parquet_table));
+ ASSERT_OK_AND_ASSIGN(auto actural_table, parquet_table->CombineChunks());
+
+ ASSERT_OK_AND_ASSIGN(auto input_file,
::arrow::io::ReadableFile::Open(expect_file));
+ auto convert_options = ::arrow::csv::ConvertOptions::Defaults();
+ std::array<std::string, 12> column_names = {
Review comment:
Why 12? You seem to have 9 of them :-)
More idiomatically, just use `std::vector<std::string>`?
##########
File path: cpp/src/parquet/encoding.cc
##########
@@ -2226,30 +2248,69 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
MemoryPool* pool =
::arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
len_decoder_(nullptr, pool),
- pool_(pool) {}
+ buffered_length_(AllocateBuffer(pool, 0)),
+ buffered_data_(AllocateBuffer(pool, 0)) {}
Review comment:
Are you planning to complete and enable DELTA_LENGTH_BYTE_ARRAY in
another PR?
##########
File path: cpp/src/parquet/encoding.cc
##########
@@ -2281,46 +2346,130 @@ class DeltaByteArrayDecoder : public DecoderImpl,
: DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY),
prefix_len_decoder_(nullptr, pool),
suffix_decoder_(nullptr, pool),
- last_value_(0, nullptr) {}
+ last_value_in_previous_page_(""),
+ buffered_prefix_length_(AllocateBuffer(pool, 0)),
+ buffered_data_(AllocateBuffer(pool, 0)) {}
- virtual void SetData(int num_values, const uint8_t* data, int len) {
+ void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
- if (len == 0) return;
- int prefix_len_length = ::arrow::util::SafeLoadAs<int32_t>(data);
- data += 4;
- len -= 4;
- prefix_len_decoder_.SetData(num_values, data, prefix_len_length);
- data += prefix_len_length;
- len -= prefix_len_length;
- suffix_decoder_.SetData(num_values, data, len);
- }
-
- // TODO: this doesn't work and requires memory management. We need to
allocate
- // new strings to store the results.
- virtual int Decode(ByteArray* buffer, int max_values) {
- max_values = std::min(max_values, this->num_values_);
- for (int i = 0; i < max_values; ++i) {
- int prefix_len = 0;
- prefix_len_decoder_.Decode(&prefix_len, 1);
- ByteArray suffix = {0, nullptr};
- suffix_decoder_.Decode(&suffix, 1);
- buffer[i].len = prefix_len + suffix.len;
+ decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len);
+ prefix_len_decoder_.SetDecoder(num_values, decoder_);
+
+ int num_prefix = prefix_len_decoder_.ValidValuesCount();
+ PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix *
sizeof(int32_t)));
+ int ret = prefix_len_decoder_.Decode(
+ reinterpret_cast<int32_t*>(buffered_prefix_length_->mutable_data()),
num_prefix);
+ DCHECK_EQ(ret, num_prefix);
+ prefix_len_offset_ = 0;
+ num_valid_values_ = num_prefix;
+
+ suffix_decoder_.SetData(num_values, decoder_);
Review comment:
Here as well, it's weird to have the same `BitReader` shared by the two
decoders.
The spec says:
> This is stored as a sequence of delta-encoded prefix lengths
(DELTA_BINARY_PACKED), followed by the suffixes encoded as delta length byte
arrays (DELTA_LENGTH_BYTE_ARRAY).
This means you should ideally compute the position of the encoded suffixes
and then call `suffix_decoder_.SetData` with the right `(data, len)` values.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]