pitrou commented on a change in pull request #10978:
URL: https://github.com/apache/arrow/pull/10978#discussion_r712182707



##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -4183,10 +4183,95 @@ TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) {
 
   ::arrow::AssertTablesEqual(*table, *expect_table);
 }
+
+TEST(TestArrowReadDeltaEncoding, DeltaByteArray) {
+  auto file = test::get_data_file("delta_byte_array.parquet");
+  auto expect_file = test::get_data_file("delta_byte_array_expect.csv");
+  auto pool = ::arrow::default_memory_pool();
+  std::unique_ptr<FileReader> parquet_reader;
+  std::shared_ptr<::arrow::Table> parquet_table;
+  ASSERT_OK(
+      FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), 
&parquet_reader));
+  ASSERT_OK(parquet_reader->ReadTable(&parquet_table));
+  ASSERT_OK(parquet_table->ValidateFull());
+
+  ASSERT_OK_AND_ASSIGN(auto input_file, 
::arrow::io::ReadableFile::Open(expect_file));
+  auto convert_options = ::arrow::csv::ConvertOptions::Defaults();
+  std::vector<std::string> column_names = {
+      "c_customer_id", "c_salutation",          "c_first_name",
+      "c_last_name",   "c_preferred_cust_flag", "c_birth_country",
+      "c_login",       "c_email_address",       "c_last_review_date"};
+  for (auto name : column_names) {
+    convert_options.column_types[name] = ::arrow::utf8();
+  }
+  convert_options.strings_can_be_null = true;
+  ASSERT_OK_AND_ASSIGN(auto csv_reader,
+                       ::arrow::csv::TableReader::Make(
+                           ::arrow::io::default_io_context(), input_file,
+                           ::arrow::csv::ReadOptions::Defaults(),
+                           ::arrow::csv::ParseOptions::Defaults(), 
convert_options));
+  ASSERT_OK_AND_ASSIGN(auto csv_table, csv_reader->Read());
+
+  ::arrow::AssertTablesEqual(*parquet_table, *csv_table, false);
+}
+
+TEST(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) {
+  auto file = test::get_data_file("delta_byte_array.parquet");
+  auto expect_file = test::get_data_file("delta_byte_array_expect.csv");
+  auto pool = ::arrow::default_memory_pool();
+  const int64_t batch_size = 100;
+  ArrowReaderProperties properties = default_arrow_reader_properties();
+  properties.set_batch_size(batch_size);
+
+  std::unique_ptr<FileReader> parquet_reader;
+  std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
+  ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), 
properties,
+                             &parquet_reader));
+  
ASSERT_OK(parquet_reader->GetRecordBatchReader(Iota(parquet_reader->num_row_groups()),
+                                                 &rb_reader));
+
+  ASSERT_OK_AND_ASSIGN(auto input_file, 
::arrow::io::ReadableFile::Open(expect_file));
+  auto convert_options = ::arrow::csv::ConvertOptions::Defaults();
+  std::vector<std::string> column_names = {
+      "c_customer_id", "c_salutation",          "c_first_name",
+      "c_last_name",   "c_preferred_cust_flag", "c_birth_country",
+      "c_login",       "c_email_address",       "c_last_review_date"};
+  for (auto name : column_names) {
+    convert_options.column_types[name] = ::arrow::utf8();
+  }
+  convert_options.strings_can_be_null = true;
+  ASSERT_OK_AND_ASSIGN(auto csv_reader,
+                       ::arrow::csv::TableReader::Make(
+                           ::arrow::io::default_io_context(), input_file,
+                           ::arrow::csv::ReadOptions::Defaults(),
+                           ::arrow::csv::ParseOptions::Defaults(), 
convert_options));
+  ASSERT_OK_AND_ASSIGN(auto csv_table, csv_reader->Read());
+  ::arrow::TableBatchReader table_reader(*csv_table);
+  table_reader.set_chunksize(batch_size);
+
+  std::shared_ptr<::arrow::RecordBatch> actual_batch, expected_batch;
+  for (int i = 0; i < csv_table->num_rows() / batch_size; ++i) {
+    ASSERT_OK(rb_reader->ReadNext(&actual_batch));
+    ASSERT_OK(table_reader.ReadNext(&expected_batch));

Review comment:
       Can you call `ASSERT_OK(actual_batch->ValidateFull())`?

##########
File path: cpp/src/parquet/encoding.cc
##########
@@ -2281,46 +2345,134 @@ class DeltaByteArrayDecoder : public DecoderImpl,
       : DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY),
         prefix_len_decoder_(nullptr, pool),
         suffix_decoder_(nullptr, pool),
-        last_value_(0, nullptr) {}
+        last_value_in_previous_page_(""),
+        buffered_prefix_length_(AllocateBuffer(pool, 0)),
+        buffered_data_(AllocateBuffer(pool, 0)) {}
 
-  virtual void SetData(int num_values, const uint8_t* data, int len) {
+  void SetData(int num_values, const uint8_t* data, int len) override {
     num_values_ = num_values;
-    if (len == 0) return;
-    int prefix_len_length = ::arrow::util::SafeLoadAs<int32_t>(data);
-    data += 4;
-    len -= 4;
-    prefix_len_decoder_.SetData(num_values, data, prefix_len_length);
-    data += prefix_len_length;
-    len -= prefix_len_length;
-    suffix_decoder_.SetData(num_values, data, len);
-  }
-
-  // TODO: this doesn't work and requires memory management. We need to 
allocate
-  // new strings to store the results.
-  virtual int Decode(ByteArray* buffer, int max_values) {
-    max_values = std::min(max_values, this->num_values_);
-    for (int i = 0; i < max_values; ++i) {
-      int prefix_len = 0;
-      prefix_len_decoder_.Decode(&prefix_len, 1);
-      ByteArray suffix = {0, nullptr};
-      suffix_decoder_.Decode(&suffix, 1);
-      buffer[i].len = prefix_len + suffix.len;
+    decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len);
+    prefix_len_decoder_.SetDecoder(num_values, decoder_);
+
+    // get the number of encoded prefix lengths
+    int num_prefix = prefix_len_decoder_.ValidValuesCount();
+    // call prefix_len_decoder_.Decode to decode all the prefix lengths.
+    // all the prefix lengths are buffered in buffered_prefix_length_.
+    PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix * 
sizeof(int32_t)));
+    int ret = prefix_len_decoder_.Decode(
+        reinterpret_cast<int32_t*>(buffered_prefix_length_->mutable_data()), 
num_prefix);
+    DCHECK_EQ(ret, num_prefix);
+    prefix_len_offset_ = 0;
+    num_valid_values_ = num_prefix;
+
+    // at this time, the decoder_ will be at the start of the encoded suffix 
data.
+    suffix_decoder_.SetDecoder(num_values, decoder_);
+
+    // TODO: read corrupted files written with bug(PARQUET-246). last_value_ 
should be set
+    // to last_value_in_previous_page_ when decoding a new page(except the 
first page)

Review comment:
       Do you intend to work on this? If so, can you open a new JIRA for it?

##########
File path: cpp/src/parquet/encoding.cc
##########
@@ -2226,30 +2248,68 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
                                        MemoryPool* pool = 
::arrow::default_memory_pool())
       : DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
         len_decoder_(nullptr, pool),
-        pool_(pool) {}
+        buffered_length_(AllocateBuffer(pool, 0)),
+        buffered_data_(AllocateBuffer(pool, 0)) {}
 
   void SetData(int num_values, const uint8_t* data, int len) override {
     num_values_ = num_values;
     if (len == 0) return;
-    int total_lengths_len = ::arrow::util::SafeLoadAs<int32_t>(data);
-    data += 4;
-    this->len_decoder_.SetData(num_values, data, total_lengths_len);
-    data_ = data + total_lengths_len;
-    this->len_ = len - 4 - total_lengths_len;
+
+    decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len);
+    len_decoder_.SetDecoder(num_values, decoder_);
+
+    int num_length = len_decoder_.ValidValuesCount();
+    PARQUET_THROW_NOT_OK(buffered_length_->Resize(num_length * 
sizeof(int32_t)));
+
+    int ret = len_decoder_.Decode(
+        reinterpret_cast<int32_t*>(buffered_length_->mutable_data()), 
num_length);
+    DCHECK_EQ(ret, num_length);
+    length_idx_ = 0;
+    num_valid_values_ = num_length;
+  }
+
+  void SetDecoder(int num_values, std::shared_ptr<::arrow::BitUtil::BitReader> 
decoder) {
+    num_values_ = num_values;
+    decoder_ = decoder;
+
+    len_decoder_.SetDecoder(num_values, decoder_);
+
+    int num_length = len_decoder_.ValidValuesCount();
+    PARQUET_THROW_NOT_OK(buffered_length_->Resize(num_length * 
sizeof(int32_t)));
+
+    int ret = len_decoder_.Decode(
+        reinterpret_cast<int32_t*>(buffered_length_->mutable_data()), 
num_length);
+    DCHECK_EQ(ret, num_length);
+    length_idx_ = 0;
+    num_valid_values_ = num_length;

Review comment:
       You could factor out repeated code between `SetData` and `SetDecoder` by 
factoring out into a helper function (perhaps `DecodeLengths`?).

##########
File path: cpp/src/arrow/util/bit_stream_utils.h
##########
@@ -255,6 +259,16 @@ inline bool BitWriter::PutAligned(T val, int num_bytes) {
 
 namespace detail {
 
+inline void ResetBufferdValues_(const uint8_t* buffer, const int& byte_offset,

Review comment:
       Nit: "ResetBufferedValues_"?

##########
File path: cpp/src/parquet/arrow/arrow_reader_writer_test.cc
##########
@@ -4183,10 +4183,95 @@ TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) {
 
   ::arrow::AssertTablesEqual(*table, *expect_table);
 }
+
+TEST(TestArrowReadDeltaEncoding, DeltaByteArray) {
+  auto file = test::get_data_file("delta_byte_array.parquet");
+  auto expect_file = test::get_data_file("delta_byte_array_expect.csv");
+  auto pool = ::arrow::default_memory_pool();
+  std::unique_ptr<FileReader> parquet_reader;
+  std::shared_ptr<::arrow::Table> parquet_table;
+  ASSERT_OK(
+      FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), 
&parquet_reader));
+  ASSERT_OK(parquet_reader->ReadTable(&parquet_table));
+  ASSERT_OK(parquet_table->ValidateFull());
+
+  ASSERT_OK_AND_ASSIGN(auto input_file, 
::arrow::io::ReadableFile::Open(expect_file));
+  auto convert_options = ::arrow::csv::ConvertOptions::Defaults();
+  std::vector<std::string> column_names = {
+      "c_customer_id", "c_salutation",          "c_first_name",
+      "c_last_name",   "c_preferred_cust_flag", "c_birth_country",
+      "c_login",       "c_email_address",       "c_last_review_date"};
+  for (auto name : column_names) {
+    convert_options.column_types[name] = ::arrow::utf8();
+  }
+  convert_options.strings_can_be_null = true;
+  ASSERT_OK_AND_ASSIGN(auto csv_reader,
+                       ::arrow::csv::TableReader::Make(
+                           ::arrow::io::default_io_context(), input_file,
+                           ::arrow::csv::ReadOptions::Defaults(),
+                           ::arrow::csv::ParseOptions::Defaults(), 
convert_options));
+  ASSERT_OK_AND_ASSIGN(auto csv_table, csv_reader->Read());
+
+  ::arrow::AssertTablesEqual(*parquet_table, *csv_table, false);
+}
+
+TEST(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) {
+  auto file = test::get_data_file("delta_byte_array.parquet");
+  auto expect_file = test::get_data_file("delta_byte_array_expect.csv");
+  auto pool = ::arrow::default_memory_pool();
+  const int64_t batch_size = 100;
+  ArrowReaderProperties properties = default_arrow_reader_properties();
+  properties.set_batch_size(batch_size);
+
+  std::unique_ptr<FileReader> parquet_reader;
+  std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
+  ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), 
properties,
+                             &parquet_reader));
+  
ASSERT_OK(parquet_reader->GetRecordBatchReader(Iota(parquet_reader->num_row_groups()),
+                                                 &rb_reader));
+
+  ASSERT_OK_AND_ASSIGN(auto input_file, 
::arrow::io::ReadableFile::Open(expect_file));
+  auto convert_options = ::arrow::csv::ConvertOptions::Defaults();
+  std::vector<std::string> column_names = {
+      "c_customer_id", "c_salutation",          "c_first_name",
+      "c_last_name",   "c_preferred_cust_flag", "c_birth_country",
+      "c_login",       "c_email_address",       "c_last_review_date"};
+  for (auto name : column_names) {
+    convert_options.column_types[name] = ::arrow::utf8();
+  }
+  convert_options.strings_can_be_null = true;
+  ASSERT_OK_AND_ASSIGN(auto csv_reader,
+                       ::arrow::csv::TableReader::Make(
+                           ::arrow::io::default_io_context(), input_file,
+                           ::arrow::csv::ReadOptions::Defaults(),
+                           ::arrow::csv::ParseOptions::Defaults(), 
convert_options));
+  ASSERT_OK_AND_ASSIGN(auto csv_table, csv_reader->Read());

Review comment:
       There's a lot of code in common between the two tests. Can you factor it 
out (you could for example use a test fixture)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to