shanhuuang commented on a change in pull request #10978:
URL: https://github.com/apache/arrow/pull/10978#discussion_r704104367



##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -4183,10 +4183,48 @@ TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) {
 
   ::arrow::AssertTablesEqual(*table, *expect_table);
 }
+
+TEST(TestArrowReadDeltaEncoding, DeltaByteArray) {
+  auto file = test::get_data_file("delta_byte_array.parquet");
+  auto expect_file = test::get_data_file("delta_byte_array_expect.csv");
+  auto pool = ::arrow::default_memory_pool();
+  std::unique_ptr<FileReader> parquet_reader;
+  std::shared_ptr<::arrow::Table> parquet_table;
+  ASSERT_OK(
+      FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), 
&parquet_reader));
+  ASSERT_OK(parquet_reader->ReadTable(&parquet_table));
+  ASSERT_OK_AND_ASSIGN(auto actural_table, parquet_table->CombineChunks());
+
+  ASSERT_OK_AND_ASSIGN(auto input_file, 
::arrow::io::ReadableFile::Open(expect_file));
+  auto convert_options = ::arrow::csv::ConvertOptions::Defaults();
+  std::array<std::string, 12> column_names = {

Review comment:
       Sorry. I may have tested another file and forgot to modify it. It should 
be 9 here. I will use `std::vector` :)

##########
File path: cpp/src/parquet/encoding.cc
##########
@@ -2226,30 +2248,69 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
                                        MemoryPool* pool = 
::arrow::default_memory_pool())
       : DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
         len_decoder_(nullptr, pool),
-        pool_(pool) {}
+        buffered_length_(AllocateBuffer(pool, 0)),
+        buffered_data_(AllocateBuffer(pool, 0)) {}

Review comment:
       DeltaLengthByteArrayDecoder is almost completed(except DecodeArrow 
method) in this PR because it is an essential part of DeltaByteArrayDecoder, 
i.e. DeltaByteArrayDecoder::suffix_decoder_.
   
   Actually, I don't know how to generate data with DELTA_LENGTH_BYTE_ARRAY 
encoding. So I didn't enable DELTA_LENGTH_BYTE_ARRAY and add a test case of it 
in this PR.

##########
File path: cpp/src/arrow/util/bit_stream_utils.h
##########
@@ -423,6 +427,25 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) {
   return true;
 }
 
+inline bool BitReader::Advance(int64_t num_bits) {
+  int64_t bits_required = bit_offset_ + num_bits;
+  int64_t bytes_required = BitUtil::BytesForBits(bits_required);
+  if (ARROW_PREDICT_FALSE(bytes_required > max_bytes_ - byte_offset_)) {
+    return false;
+  }
+  byte_offset_ += static_cast<int>(bits_required >> 3);
+  bit_offset_ = static_cast<int>(bits_required & 7);
+  // Reset buffered_values_

Review comment:
       OK. I will add a method named "ResetBufferdValues"

##########
File path: cpp/src/parquet/encoding.cc
##########
@@ -2281,46 +2346,130 @@ class DeltaByteArrayDecoder : public DecoderImpl,
       : DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY),
         prefix_len_decoder_(nullptr, pool),
         suffix_decoder_(nullptr, pool),
-        last_value_(0, nullptr) {}
+        last_value_in_previous_page_(""),
+        buffered_prefix_length_(AllocateBuffer(pool, 0)),
+        buffered_data_(AllocateBuffer(pool, 0)) {}
 
-  virtual void SetData(int num_values, const uint8_t* data, int len) {
+  void SetData(int num_values, const uint8_t* data, int len) override {
     num_values_ = num_values;
-    if (len == 0) return;
-    int prefix_len_length = ::arrow::util::SafeLoadAs<int32_t>(data);
-    data += 4;
-    len -= 4;
-    prefix_len_decoder_.SetData(num_values, data, prefix_len_length);
-    data += prefix_len_length;
-    len -= prefix_len_length;
-    suffix_decoder_.SetData(num_values, data, len);
-  }
-
-  // TODO: this doesn't work and requires memory management. We need to 
allocate
-  // new strings to store the results.
-  virtual int Decode(ByteArray* buffer, int max_values) {
-    max_values = std::min(max_values, this->num_values_);
-    for (int i = 0; i < max_values; ++i) {
-      int prefix_len = 0;
-      prefix_len_decoder_.Decode(&prefix_len, 1);
-      ByteArray suffix = {0, nullptr};
-      suffix_decoder_.Decode(&suffix, 1);
-      buffer[i].len = prefix_len + suffix.len;
+    decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len);
+    prefix_len_decoder_.SetDecoder(num_values, decoder_);
+
+    int num_prefix = prefix_len_decoder_.ValidValuesCount();
+    PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix * 
sizeof(int32_t)));
+    int ret = prefix_len_decoder_.Decode(
+        reinterpret_cast<int32_t*>(buffered_prefix_length_->mutable_data()), 
num_prefix);
+    DCHECK_EQ(ret, num_prefix);
+    prefix_len_offset_ = 0;
+    num_valid_values_ = num_prefix;
+
+    suffix_decoder_.SetData(num_values, decoder_);

Review comment:
       I think that we cannot get the end position of DELTA_BINARY_PACKED data 
unless we've read and inited all the blocks that are distributed in this data 
space. Because in DELTA_BINARY_PACKED encoding, the bitwidth of each miniblock 
is stored in blocks rather than in the header.
   
   Do you mean that a method like `DeltaBitPackDecoder::InitAllBlocks` should 
be added? In this method we may init all the blocks in DELTA_BINARY_PACKED 
data, compute the total bytes as a return value and finally reset `decoder_` 
for follow-up `DeltaBitPackDecoder::Decode`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to