This is an automated email from the ASF dual-hosted git repository.

maplefu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f07404dac GH-41321: [C++][Parquet] More strict Parquet level checking 
(#41346)
1f07404dac is described below

commit 1f07404dac920bf81f852f834622f2fc30f8dcfc
Author: mwish <[email protected]>
AuthorDate: Tue May 21 18:38:17 2024 +0800

    GH-41321: [C++][Parquet] More strict Parquet level checking (#41346)
    
    
    
    ### Rationale for this change
    
    In https://github.com/apache/arrow/issues/41321 , user reports a corrupt 
when reading from a corrupt parquet file. This is because we lost some 
checking. Current code works on reading a normal parquet file. But when reading 
a corrupt file, this need to be more strict.
    
    **Currently this patch just enhance the checking on Parquet Level, the 
correspond value check would be add in later patches**
    
    ### What changes are included in this PR?
    
    More strict parquet checkings on Level
    
    ### Are these changes tested?
    
    Already exists test, maybe we can introduce parquet file as test file
    
    ### Are there any user-facing changes?
    
    More strict checkings
    
    * GitHub Issue: #41321
    
    Lead-authored-by: mwish <[email protected]>
    Co-authored-by: mwish <[email protected]>
    Signed-off-by: mwish <[email protected]>
---
 cpp/src/parquet/column_reader.cc      | 109 +++++++++++++++++++++-------------
 cpp/src/parquet/column_reader.h       |   2 +-
 cpp/src/parquet/column_reader_test.cc |  76 +++++++++++++++++++++++-
 3 files changed, 143 insertions(+), 44 deletions(-)

diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index a4794c5647..cfd2fea374 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -101,6 +101,10 @@ inline void CheckNumberDecoded(int64_t number_decoded, 
int64_t expected) {
                                    std::to_string(expected));
   }
 }
+
+constexpr std::string_view kErrorRepDefLevelNotMatchesNumValues =
+    "Number of decoded rep / def levels do not match num_values in page 
header";
+
 }  // namespace
 
 LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
@@ -907,6 +911,8 @@ class ColumnReaderImplBase {
                               static_cast<int>(data_size));
   }
 
+  // Available values in the current data page, value includes repeated values
+  // and nulls.
   int64_t available_values_current_page() const {
     return num_buffered_values_ - num_decoded_values_;
   }
@@ -933,7 +939,7 @@ class ColumnReaderImplBase {
   int64_t num_buffered_values_;
 
   // The number of values from the current data page that have been decoded
-  // into memory
+  // into memory or skipped over.
   int64_t num_decoded_values_;
 
   ::arrow::MemoryPool* pool_;
@@ -1026,28 +1032,36 @@ class TypedColumnReaderImpl : public 
TypedColumnReader<DType>,
 
   // Read definition and repetition levels. Also return the number of 
definition levels
   // and number of values to read. This function is called before reading 
values.
+  //
+  // ReadLevels will throw exception when any num-levels read is not equal to 
the number
+  // of the levels can be read.
   void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
-                  int64_t* num_def_levels, int64_t* values_to_read) {
-    batch_size =
-        std::min(batch_size, this->num_buffered_values_ - 
this->num_decoded_values_);
+                  int64_t* num_def_levels, int64_t* non_null_values_to_read) {
+    batch_size = std::min(batch_size, this->available_values_current_page());
 
     // If the field is required and non-repeated, there are no definition 
levels
     if (this->max_def_level_ > 0 && def_levels != nullptr) {
       *num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
+      if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) {
+        throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
+      }
       // TODO(wesm): this tallying of values-to-decode can be performed with 
better
       // cache-efficiency if fused with the level decoding.
-      *values_to_read +=
+      *non_null_values_to_read +=
           std::count(def_levels, def_levels + *num_def_levels, 
this->max_def_level_);
     } else {
       // Required field, read all values
-      *values_to_read = batch_size;
+      if (num_def_levels != nullptr) {
+        *num_def_levels = 0;
+      }
+      *non_null_values_to_read = batch_size;
     }
 
     // Not present for non-repeated fields
     if (this->max_rep_level_ > 0 && rep_levels != nullptr) {
       int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, 
rep_levels);
-      if (def_levels != nullptr && *num_def_levels != num_rep_levels) {
-        throw ParquetException("Number of decoded rep / def levels did not 
match");
+      if (batch_size != num_rep_levels) {
+        throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
       }
     }
   }
@@ -1090,8 +1104,7 @@ int64_t 
TypedColumnReaderImpl<DType>::ReadBatchWithDictionary(
   *indices_read = ReadDictionaryIndices(indices_to_read, indices);
   int64_t total_indices = std::max<int64_t>(num_def_levels, *indices_read);
   // Some callers use a batch size of 0 just to get the dictionary.
-  int64_t expected_values =
-      std::min(batch_size, this->num_buffered_values_ - 
this->num_decoded_values_);
+  int64_t expected_values = std::min(batch_size, 
this->available_values_current_page());
   if (total_indices == 0 && expected_values > 0) {
     std::stringstream ss;
     ss << "Read 0 values, expected " << expected_values;
@@ -1106,7 +1119,8 @@ template <typename DType>
 int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* 
def_levels,
                                                 int16_t* rep_levels, T* values,
                                                 int64_t* values_read) {
-  // HasNext invokes ReadNewPage
+  // HasNext might invoke ReadNewPage until a data page with
+  // `available_values_current_page() > 0` is found.
   if (!HasNext()) {
     *values_read = 0;
     return 0;
@@ -1115,20 +1129,31 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t 
batch_size, int16_t* def
   // TODO(wesm): keep reading data pages until batch_size is reached, or the
   // row group is finished
   int64_t num_def_levels = 0;
-  int64_t values_to_read = 0;
-  ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, 
&values_to_read);
-
-  *values_read = this->ReadValues(values_to_read, values);
+  // Number of non-null values to read within `num_def_levels`.
+  int64_t non_null_values_to_read = 0;
+  ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels,
+             &non_null_values_to_read);
+  // Should not return more values than available in the current data page,
+  // since currently, ReadLevels would only consume level from current
+  // data page.
+  if (ARROW_PREDICT_FALSE(num_def_levels > 
this->available_values_current_page())) {
+    throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
+  }
+  if (non_null_values_to_read != 0) {
+    *values_read = this->ReadValues(non_null_values_to_read, values);
+  } else {
+    *values_read = 0;
+  }
+  // Adjust total_values, since if max_def_level_ == 0, num_def_levels would
+  // be 0 and `values_read` would adjust to `available_values_current_page()`.
   int64_t total_values = std::max<int64_t>(num_def_levels, *values_read);
-  int64_t expected_values =
-      std::min(batch_size, this->num_buffered_values_ - 
this->num_decoded_values_);
+  int64_t expected_values = std::min(batch_size, 
this->available_values_current_page());
   if (total_values == 0 && expected_values > 0) {
     std::stringstream ss;
     ss << "Read 0 values, expected " << expected_values;
     ParquetException::EofException(ss.str());
   }
   this->ConsumeBufferedValues(total_values);
-
   return total_values;
 }
 
@@ -1137,7 +1162,8 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
     int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
     uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
     int64_t* values_read, int64_t* null_count_out) {
-  // HasNext invokes ReadNewPage
+  // HasNext might invoke ReadNewPage until a data page with
+  // `available_values_current_page() > 0` is found.
   if (!HasNext()) {
     *levels_read = 0;
     *values_read = 0;
@@ -1145,21 +1171,24 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
     return 0;
   }
 
+  // Number of non-null values to read
   int64_t total_values;
   // TODO(wesm): keep reading data pages until batch_size is reached, or the
   // row group is finished
-  batch_size =
-      std::min(batch_size, this->num_buffered_values_ - 
this->num_decoded_values_);
+  batch_size = std::min(batch_size, this->available_values_current_page());
 
   // If the field is required and non-repeated, there are no definition levels
   if (this->max_def_level_ > 0) {
     int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, 
def_levels);
+    if (ARROW_PREDICT_FALSE(num_def_levels != batch_size)) {
+      throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
+    }
 
     // Not present for non-repeated fields
     if (this->max_rep_level_ > 0) {
       int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, 
rep_levels);
-      if (num_def_levels != num_rep_levels) {
-        throw ParquetException("Number of decoded rep / def levels did not 
match");
+      if (ARROW_PREDICT_FALSE(num_def_levels != num_rep_levels)) {
+        throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
       }
     }
 
@@ -1401,26 +1430,21 @@ class TypedRecordReader : public 
TypedColumnReaderImpl<DType>,
         int16_t* def_levels = this->def_levels() + levels_written_;
         int16_t* rep_levels = this->rep_levels() + levels_written_;
 
-        // Not present for non-repeated fields
-        int64_t levels_read = 0;
+        if (ARROW_PREDICT_FALSE(this->ReadDefinitionLevels(batch_size, 
def_levels) !=
+                                batch_size)) {
+          throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
+        }
         if (this->max_rep_level_ > 0) {
-          levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
-          if (this->ReadRepetitionLevels(batch_size, rep_levels) != 
levels_read) {
-            throw ParquetException("Number of decoded rep / def levels did not 
match");
+          int64_t rep_levels_read = this->ReadRepetitionLevels(batch_size, 
rep_levels);
+          if (ARROW_PREDICT_FALSE(rep_levels_read != batch_size)) {
+            throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
           }
-        } else if (this->max_def_level_ > 0) {
-          levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
         }
 
-        // Exhausted column chunk
-        if (levels_read == 0) {
-          break;
-        }
-
-        levels_written_ += levels_read;
+        levels_written_ += batch_size;
         records_read += ReadRecordData(num_records - records_read);
       } else {
-        // No repetition or definition levels
+        // No repetition and definition levels, we can read values directly
         batch_size = std::min(num_records - records_read, batch_size);
         records_read += ReadRecordData(batch_size);
       }
@@ -1574,13 +1598,14 @@ class TypedRecordReader : public 
TypedColumnReaderImpl<DType>,
       int16_t* def_levels = this->def_levels() + levels_written_;
       int16_t* rep_levels = this->rep_levels() + levels_written_;
 
-      int64_t levels_read = 0;
-      levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
-      if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
-        throw ParquetException("Number of decoded rep / def levels did not 
match");
+      if (this->ReadDefinitionLevels(batch_size, def_levels) != batch_size) {
+        throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
+      }
+      if (this->ReadRepetitionLevels(batch_size, rep_levels) != batch_size) {
+        throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
       }
 
-      levels_written_ += levels_read;
+      levels_written_ += batch_size;
       int64_t remaining_records = num_records - skipped_records;
       // This updates at_record_start_.
       skipped_records += DelimitAndSkipRecordsInBuffer(remaining_records);
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index 086f6c0e55..29e1b2a25e 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -197,7 +197,7 @@ class PARQUET_EXPORT ColumnReader {
 template <typename DType>
 class TypedColumnReader : public ColumnReader {
  public:
-  typedef typename DType::c_type T;
+  using T = typename DType::c_type;
 
   // Read a batch of repetition levels, definition levels, and values from the
   // column.
diff --git a/cpp/src/parquet/column_reader_test.cc 
b/cpp/src/parquet/column_reader_test.cc
index a48573966a..9096f19568 100644
--- a/cpp/src/parquet/column_reader_test.cc
+++ b/cpp/src/parquet/column_reader_test.cc
@@ -415,7 +415,7 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) {
       &descr, values, /*num_values=*/2, Encoding::PLAIN, /*indices=*/{},
       /*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_,
       /*rep_levels=*/{},
-      /*max_rep_level=*/0);
+      /*max_rep_level=*/max_rep_level_);
   pages_.push_back(data_page);
   InitReader(&descr);
   auto reader = static_cast<BoolReader*>(reader_.get());
@@ -431,6 +431,80 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) {
                ParquetException);
 }
 
+// GH-41321: When max_def_level > 0 or max_rep_level > 0, and
+// Page has more or less levels than the `num_values` in
+// PageHeader. We should detect and throw exception.
+TEST_F(TestPrimitiveReader, DefRepLevelNotExpected) {
+  auto do_check = [&](const NodePtr& type, const std::vector<int16_t>& 
input_def_levels,
+                      const std::vector<int16_t>& input_rep_levels, int 
num_values) {
+    std::vector<bool> values(num_values, false);
+    const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+
+    // The data page falls back to plain encoding
+    std::shared_ptr<ResizableBuffer> dummy = AllocateBuffer();
+    std::shared_ptr<DataPageV1> data_page = MakeDataPage<BooleanType>(
+        &descr, values, /*num_values=*/num_values, Encoding::PLAIN, 
/*indices=*/{},
+        /*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_,
+        /*rep_levels=*/input_rep_levels,
+        /*max_rep_level=*/max_rep_level_);
+    pages_.push_back(data_page);
+    InitReader(&descr);
+    auto reader = static_cast<BoolReader*>(reader_.get());
+    ASSERT_TRUE(reader->HasNext());
+
+    constexpr int batch_size = 10;
+    std::vector<int16_t> def_levels(batch_size, 0);
+    std::vector<int16_t> rep_levels(batch_size, 0);
+    bool values_out[batch_size];
+    int64_t values_read;
+    EXPECT_THROW_THAT(
+        [&]() {
+          reader->ReadBatch(batch_size, def_levels.data(), rep_levels.data(), 
values_out,
+                            &values_read);
+        },
+        ParquetException,
+        ::testing::Property(&ParquetException::what,
+                            ::testing::HasSubstr("Number of decoded rep / def 
levels do "
+                                                 "not match num_values in page 
header")));
+  };
+  // storing def-levels less than value in page-header
+  {
+    max_def_level_ = 1;
+    max_rep_level_ = 0;
+    NodePtr type = schema::Boolean("a", Repetition::OPTIONAL);
+    std::vector<int16_t> input_def_levels(1, 1);
+    std::vector<int16_t> input_rep_levels{};
+    do_check(type, input_def_levels, input_rep_levels, /*num_values=*/3);
+  }
+  // storing def-levels more than value in page-header
+  {
+    max_def_level_ = 1;
+    max_rep_level_ = 0;
+    NodePtr type = schema::Boolean("a", Repetition::OPTIONAL);
+    std::vector<int16_t> input_def_levels(2, 1);
+    std::vector<int16_t> input_rep_levels{};
+    do_check(type, input_def_levels, input_rep_levels, /*num_values=*/1);
+  }
+  // storing rep-levels less than value in page-header
+  {
+    max_def_level_ = 0;
+    max_rep_level_ = 1;
+    NodePtr type = schema::Boolean("a", Repetition::REPEATED);
+    std::vector<int16_t> input_def_levels{};
+    std::vector<int16_t> input_rep_levels(3, 0);
+    do_check(type, input_def_levels, input_rep_levels, /*num_values=*/4);
+  }
+  // storing rep-levels more than value in page-header
+  {
+    max_def_level_ = 0;
+    max_rep_level_ = 1;
+    NodePtr type = schema::Boolean("a", Repetition::REPEATED);
+    std::vector<int16_t> input_def_levels{};
+    std::vector<int16_t> input_rep_levels(2, 1);
+    do_check(type, input_def_levels, input_rep_levels, /*num_values=*/1);
+  }
+}
+
 // Repetition level byte length reported in Page but Max Repetition level
 // is zero for the column.
 TEST_F(TestPrimitiveReader, TestRepetitionLvlBytesWithMaxRepetitionZero) {

Reply via email to