shanhuuang commented on a change in pull request #10978:
URL: https://github.com/apache/arrow/pull/10978#discussion_r704104367
##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -4183,10 +4183,48 @@ TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) {
::arrow::AssertTablesEqual(*table, *expect_table);
}
+
+TEST(TestArrowReadDeltaEncoding, DeltaByteArray) {
+ auto file = test::get_data_file("delta_byte_array.parquet");
+ auto expect_file = test::get_data_file("delta_byte_array_expect.csv");
+ auto pool = ::arrow::default_memory_pool();
+ std::unique_ptr<FileReader> parquet_reader;
+ std::shared_ptr<::arrow::Table> parquet_table;
+ ASSERT_OK(
+ FileReader::Make(pool, ParquetFileReader::OpenFile(file, false),
&parquet_reader));
+ ASSERT_OK(parquet_reader->ReadTable(&parquet_table));
+ ASSERT_OK_AND_ASSIGN(auto actural_table, parquet_table->CombineChunks());
+
+ ASSERT_OK_AND_ASSIGN(auto input_file,
::arrow::io::ReadableFile::Open(expect_file));
+ auto convert_options = ::arrow::csv::ConvertOptions::Defaults();
+ std::array<std::string, 12> column_names = {
Review comment:
Sorry. I may have tested another file and forgot to modify it. It should
be 9 here. I will use `std::vector` :)
##########
File path: cpp/src/parquet/encoding.cc
##########
@@ -2226,30 +2248,69 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
MemoryPool* pool =
::arrow::default_memory_pool())
: DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
len_decoder_(nullptr, pool),
- pool_(pool) {}
+ buffered_length_(AllocateBuffer(pool, 0)),
+ buffered_data_(AllocateBuffer(pool, 0)) {}
Review comment:
DeltaLengthByteArrayDecoder is almost completed(except DecodeArrow
method) in this PR because it is an essential part of DeltaByteArrayDecoder,
i.e. DeltaByteArrayDecoder::suffix_decoder_.
Actually, I don't know how to generate data with DELTA_LENGTH_BYTE_ARRAY
encoding. So I didn't enable DELTA_LENGTH_BYTE_ARRAY and add a test case of it
in this PR.
##########
File path: cpp/src/arrow/util/bit_stream_utils.h
##########
@@ -423,6 +427,25 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) {
return true;
}
+inline bool BitReader::Advance(int64_t num_bits) {
+ int64_t bits_required = bit_offset_ + num_bits;
+ int64_t bytes_required = BitUtil::BytesForBits(bits_required);
+ if (ARROW_PREDICT_FALSE(bytes_required > max_bytes_ - byte_offset_)) {
+ return false;
+ }
+ byte_offset_ += static_cast<int>(bits_required >> 3);
+ bit_offset_ = static_cast<int>(bits_required & 7);
+ // Reset buffered_values_
Review comment:
OK. I will add a method named "ResetBufferdValues"
##########
File path: cpp/src/parquet/encoding.cc
##########
@@ -2281,46 +2346,130 @@ class DeltaByteArrayDecoder : public DecoderImpl,
: DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY),
prefix_len_decoder_(nullptr, pool),
suffix_decoder_(nullptr, pool),
- last_value_(0, nullptr) {}
+ last_value_in_previous_page_(""),
+ buffered_prefix_length_(AllocateBuffer(pool, 0)),
+ buffered_data_(AllocateBuffer(pool, 0)) {}
- virtual void SetData(int num_values, const uint8_t* data, int len) {
+ void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
- if (len == 0) return;
- int prefix_len_length = ::arrow::util::SafeLoadAs<int32_t>(data);
- data += 4;
- len -= 4;
- prefix_len_decoder_.SetData(num_values, data, prefix_len_length);
- data += prefix_len_length;
- len -= prefix_len_length;
- suffix_decoder_.SetData(num_values, data, len);
- }
-
- // TODO: this doesn't work and requires memory management. We need to
allocate
- // new strings to store the results.
- virtual int Decode(ByteArray* buffer, int max_values) {
- max_values = std::min(max_values, this->num_values_);
- for (int i = 0; i < max_values; ++i) {
- int prefix_len = 0;
- prefix_len_decoder_.Decode(&prefix_len, 1);
- ByteArray suffix = {0, nullptr};
- suffix_decoder_.Decode(&suffix, 1);
- buffer[i].len = prefix_len + suffix.len;
+ decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len);
+ prefix_len_decoder_.SetDecoder(num_values, decoder_);
+
+ int num_prefix = prefix_len_decoder_.ValidValuesCount();
+ PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix *
sizeof(int32_t)));
+ int ret = prefix_len_decoder_.Decode(
+ reinterpret_cast<int32_t*>(buffered_prefix_length_->mutable_data()),
num_prefix);
+ DCHECK_EQ(ret, num_prefix);
+ prefix_len_offset_ = 0;
+ num_valid_values_ = num_prefix;
+
+ suffix_decoder_.SetData(num_values, decoder_);
Review comment:
I think that we cannot get the end position of DELTA_BINARY_PACKED data
unless we've read and inited all the blocks that are distributed in this data
space. Because in DELTA_BINARY_PACKED encoding, the bitwidth of each miniblock
is stored in blocks rather than in the header.
Do you mean that a method like `DeltaBitPackDecoder::InitAllBlocks` should
be added? In this method we may init all the blocks in DELTA_BINARY_PACKED
data, compute the total bytes as a return value and finally reset `decoder_`
for follow-up `DeltaBitPackDecoder::Decode`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]