This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 1f07404dac GH-41321: [C++][Parquet] More strict Parquet level checking
(#41346)
1f07404dac is described below
commit 1f07404dac920bf81f852f834622f2fc30f8dcfc
Author: mwish <[email protected]>
AuthorDate: Tue May 21 18:38:17 2024 +0800
GH-41321: [C++][Parquet] More strict Parquet level checking (#41346)
### Rationale for this change
In https://github.com/apache/arrow/issues/41321 , user reports a corrupt
when reading from a corrupt parquet file. This is because we lost some
checking. Current code works on reading a normal parquet file. But when reading
a corrupt file, this need to be more strict.
**Currently this patch just enhance the checking on Parquet Level, the
correspond value check would be add in later patches**
### What changes are included in this PR?
More strict parquet checkings on Level
### Are these changes tested?
Already exists test, maybe we can introduce parquet file as test file
### Are there any user-facing changes?
More strict checkings
* GitHub Issue: #41321
Lead-authored-by: mwish <[email protected]>
Co-authored-by: mwish <[email protected]>
Signed-off-by: mwish <[email protected]>
---
cpp/src/parquet/column_reader.cc | 109 +++++++++++++++++++++-------------
cpp/src/parquet/column_reader.h | 2 +-
cpp/src/parquet/column_reader_test.cc | 76 +++++++++++++++++++++++-
3 files changed, 143 insertions(+), 44 deletions(-)
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index a4794c5647..cfd2fea374 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -101,6 +101,10 @@ inline void CheckNumberDecoded(int64_t number_decoded,
int64_t expected) {
std::to_string(expected));
}
}
+
+constexpr std::string_view kErrorRepDefLevelNotMatchesNumValues =
+ "Number of decoded rep / def levels do not match num_values in page
header";
+
} // namespace
LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
@@ -907,6 +911,8 @@ class ColumnReaderImplBase {
static_cast<int>(data_size));
}
+ // Available values in the current data page, value includes repeated values
+ // and nulls.
int64_t available_values_current_page() const {
return num_buffered_values_ - num_decoded_values_;
}
@@ -933,7 +939,7 @@ class ColumnReaderImplBase {
int64_t num_buffered_values_;
// The number of values from the current data page that have been decoded
- // into memory
+ // into memory or skipped over.
int64_t num_decoded_values_;
::arrow::MemoryPool* pool_;
@@ -1026,28 +1032,36 @@ class TypedColumnReaderImpl : public
TypedColumnReader<DType>,
// Read definition and repetition levels. Also return the number of
definition levels
// and number of values to read. This function is called before reading
values.
+ //
+ // ReadLevels will throw exception when any num-levels read is not equal to
the number
+ // of the levels can be read.
void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
- int64_t* num_def_levels, int64_t* values_to_read) {
- batch_size =
- std::min(batch_size, this->num_buffered_values_ -
this->num_decoded_values_);
+ int64_t* num_def_levels, int64_t* non_null_values_to_read) {
+ batch_size = std::min(batch_size, this->available_values_current_page());
// If the field is required and non-repeated, there are no definition
levels
if (this->max_def_level_ > 0 && def_levels != nullptr) {
*num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
+ if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) {
+ throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
+ }
// TODO(wesm): this tallying of values-to-decode can be performed with
better
// cache-efficiency if fused with the level decoding.
- *values_to_read +=
+ *non_null_values_to_read +=
std::count(def_levels, def_levels + *num_def_levels,
this->max_def_level_);
} else {
// Required field, read all values
- *values_to_read = batch_size;
+ if (num_def_levels != nullptr) {
+ *num_def_levels = 0;
+ }
+ *non_null_values_to_read = batch_size;
}
// Not present for non-repeated fields
if (this->max_rep_level_ > 0 && rep_levels != nullptr) {
int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size,
rep_levels);
- if (def_levels != nullptr && *num_def_levels != num_rep_levels) {
- throw ParquetException("Number of decoded rep / def levels did not
match");
+ if (batch_size != num_rep_levels) {
+ throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
}
}
@@ -1090,8 +1104,7 @@ int64_t
TypedColumnReaderImpl<DType>::ReadBatchWithDictionary(
*indices_read = ReadDictionaryIndices(indices_to_read, indices);
int64_t total_indices = std::max<int64_t>(num_def_levels, *indices_read);
// Some callers use a batch size of 0 just to get the dictionary.
- int64_t expected_values =
- std::min(batch_size, this->num_buffered_values_ -
this->num_decoded_values_);
+ int64_t expected_values = std::min(batch_size,
this->available_values_current_page());
if (total_indices == 0 && expected_values > 0) {
std::stringstream ss;
ss << "Read 0 values, expected " << expected_values;
@@ -1106,7 +1119,8 @@ template <typename DType>
int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t*
def_levels,
int16_t* rep_levels, T* values,
int64_t* values_read) {
- // HasNext invokes ReadNewPage
+ // HasNext might invoke ReadNewPage until a data page with
+ // `available_values_current_page() > 0` is found.
if (!HasNext()) {
*values_read = 0;
return 0;
@@ -1115,20 +1129,31 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t
batch_size, int16_t* def
// TODO(wesm): keep reading data pages until batch_size is reached, or the
// row group is finished
int64_t num_def_levels = 0;
- int64_t values_to_read = 0;
- ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels,
&values_to_read);
-
- *values_read = this->ReadValues(values_to_read, values);
+ // Number of non-null values to read within `num_def_levels`.
+ int64_t non_null_values_to_read = 0;
+ ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels,
+ &non_null_values_to_read);
+ // Should not return more values than available in the current data page,
+ // since currently, ReadLevels would only consume level from current
+ // data page.
+ if (ARROW_PREDICT_FALSE(num_def_levels >
this->available_values_current_page())) {
+ throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
+ }
+ if (non_null_values_to_read != 0) {
+ *values_read = this->ReadValues(non_null_values_to_read, values);
+ } else {
+ *values_read = 0;
+ }
+ // Adjust total_values, since if max_def_level_ == 0, num_def_levels would
+ // be 0 and `values_read` would adjust to `available_values_current_page()`.
int64_t total_values = std::max<int64_t>(num_def_levels, *values_read);
- int64_t expected_values =
- std::min(batch_size, this->num_buffered_values_ -
this->num_decoded_values_);
+ int64_t expected_values = std::min(batch_size,
this->available_values_current_page());
if (total_values == 0 && expected_values > 0) {
std::stringstream ss;
ss << "Read 0 values, expected " << expected_values;
ParquetException::EofException(ss.str());
}
this->ConsumeBufferedValues(total_values);
-
return total_values;
}
@@ -1137,7 +1162,8 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
int64_t* values_read, int64_t* null_count_out) {
- // HasNext invokes ReadNewPage
+ // HasNext might invoke ReadNewPage until a data page with
+ // `available_values_current_page() > 0` is found.
if (!HasNext()) {
*levels_read = 0;
*values_read = 0;
@@ -1145,21 +1171,24 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
return 0;
}
+ // Number of non-null values to read
int64_t total_values;
// TODO(wesm): keep reading data pages until batch_size is reached, or the
// row group is finished
- batch_size =
- std::min(batch_size, this->num_buffered_values_ -
this->num_decoded_values_);
+ batch_size = std::min(batch_size, this->available_values_current_page());
// If the field is required and non-repeated, there are no definition levels
if (this->max_def_level_ > 0) {
int64_t num_def_levels = this->ReadDefinitionLevels(batch_size,
def_levels);
+ if (ARROW_PREDICT_FALSE(num_def_levels != batch_size)) {
+ throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
+ }
// Not present for non-repeated fields
if (this->max_rep_level_ > 0) {
int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size,
rep_levels);
- if (num_def_levels != num_rep_levels) {
- throw ParquetException("Number of decoded rep / def levels did not
match");
+ if (ARROW_PREDICT_FALSE(num_def_levels != num_rep_levels)) {
+ throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
}
@@ -1401,26 +1430,21 @@ class TypedRecordReader : public
TypedColumnReaderImpl<DType>,
int16_t* def_levels = this->def_levels() + levels_written_;
int16_t* rep_levels = this->rep_levels() + levels_written_;
- // Not present for non-repeated fields
- int64_t levels_read = 0;
+ if (ARROW_PREDICT_FALSE(this->ReadDefinitionLevels(batch_size,
def_levels) !=
+ batch_size)) {
+ throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
+ }
if (this->max_rep_level_ > 0) {
- levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
- if (this->ReadRepetitionLevels(batch_size, rep_levels) !=
levels_read) {
- throw ParquetException("Number of decoded rep / def levels did not
match");
+ int64_t rep_levels_read = this->ReadRepetitionLevels(batch_size,
rep_levels);
+ if (ARROW_PREDICT_FALSE(rep_levels_read != batch_size)) {
+ throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
- } else if (this->max_def_level_ > 0) {
- levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
}
- // Exhausted column chunk
- if (levels_read == 0) {
- break;
- }
-
- levels_written_ += levels_read;
+ levels_written_ += batch_size;
records_read += ReadRecordData(num_records - records_read);
} else {
- // No repetition or definition levels
+ // No repetition and definition levels, we can read values directly
batch_size = std::min(num_records - records_read, batch_size);
records_read += ReadRecordData(batch_size);
}
@@ -1574,13 +1598,14 @@ class TypedRecordReader : public
TypedColumnReaderImpl<DType>,
int16_t* def_levels = this->def_levels() + levels_written_;
int16_t* rep_levels = this->rep_levels() + levels_written_;
- int64_t levels_read = 0;
- levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
- if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
- throw ParquetException("Number of decoded rep / def levels did not
match");
+ if (this->ReadDefinitionLevels(batch_size, def_levels) != batch_size) {
+ throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
+ }
+ if (this->ReadRepetitionLevels(batch_size, rep_levels) != batch_size) {
+ throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
- levels_written_ += levels_read;
+ levels_written_ += batch_size;
int64_t remaining_records = num_records - skipped_records;
// This updates at_record_start_.
skipped_records += DelimitAndSkipRecordsInBuffer(remaining_records);
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index 086f6c0e55..29e1b2a25e 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -197,7 +197,7 @@ class PARQUET_EXPORT ColumnReader {
template <typename DType>
class TypedColumnReader : public ColumnReader {
public:
- typedef typename DType::c_type T;
+ using T = typename DType::c_type;
// Read a batch of repetition levels, definition levels, and values from the
// column.
diff --git a/cpp/src/parquet/column_reader_test.cc
b/cpp/src/parquet/column_reader_test.cc
index a48573966a..9096f19568 100644
--- a/cpp/src/parquet/column_reader_test.cc
+++ b/cpp/src/parquet/column_reader_test.cc
@@ -415,7 +415,7 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) {
&descr, values, /*num_values=*/2, Encoding::PLAIN, /*indices=*/{},
/*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_,
/*rep_levels=*/{},
- /*max_rep_level=*/0);
+ /*max_rep_level=*/max_rep_level_);
pages_.push_back(data_page);
InitReader(&descr);
auto reader = static_cast<BoolReader*>(reader_.get());
@@ -431,6 +431,80 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) {
ParquetException);
}
+// GH-41321: When max_def_level > 0 or max_rep_level > 0, and
+// Page has more or less levels than the `num_values` in
+// PageHeader. We should detect and throw exception.
+TEST_F(TestPrimitiveReader, DefRepLevelNotExpected) {
+ auto do_check = [&](const NodePtr& type, const std::vector<int16_t>&
input_def_levels,
+ const std::vector<int16_t>& input_rep_levels, int
num_values) {
+ std::vector<bool> values(num_values, false);
+ const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+
+ // The data page falls back to plain encoding
+ std::shared_ptr<ResizableBuffer> dummy = AllocateBuffer();
+ std::shared_ptr<DataPageV1> data_page = MakeDataPage<BooleanType>(
+ &descr, values, /*num_values=*/num_values, Encoding::PLAIN,
/*indices=*/{},
+ /*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_,
+ /*rep_levels=*/input_rep_levels,
+ /*max_rep_level=*/max_rep_level_);
+ pages_.push_back(data_page);
+ InitReader(&descr);
+ auto reader = static_cast<BoolReader*>(reader_.get());
+ ASSERT_TRUE(reader->HasNext());
+
+ constexpr int batch_size = 10;
+ std::vector<int16_t> def_levels(batch_size, 0);
+ std::vector<int16_t> rep_levels(batch_size, 0);
+ bool values_out[batch_size];
+ int64_t values_read;
+ EXPECT_THROW_THAT(
+ [&]() {
+ reader->ReadBatch(batch_size, def_levels.data(), rep_levels.data(),
values_out,
+ &values_read);
+ },
+ ParquetException,
+ ::testing::Property(&ParquetException::what,
+ ::testing::HasSubstr("Number of decoded rep / def
levels do "
+ "not match num_values in page
header")));
+ };
+ // storing def-levels less than value in page-header
+ {
+ max_def_level_ = 1;
+ max_rep_level_ = 0;
+ NodePtr type = schema::Boolean("a", Repetition::OPTIONAL);
+ std::vector<int16_t> input_def_levels(1, 1);
+ std::vector<int16_t> input_rep_levels{};
+ do_check(type, input_def_levels, input_rep_levels, /*num_values=*/3);
+ }
+ // storing def-levels more than value in page-header
+ {
+ max_def_level_ = 1;
+ max_rep_level_ = 0;
+ NodePtr type = schema::Boolean("a", Repetition::OPTIONAL);
+ std::vector<int16_t> input_def_levels(2, 1);
+ std::vector<int16_t> input_rep_levels{};
+ do_check(type, input_def_levels, input_rep_levels, /*num_values=*/1);
+ }
+ // storing rep-levels less than value in page-header
+ {
+ max_def_level_ = 0;
+ max_rep_level_ = 1;
+ NodePtr type = schema::Boolean("a", Repetition::REPEATED);
+ std::vector<int16_t> input_def_levels{};
+ std::vector<int16_t> input_rep_levels(3, 0);
+ do_check(type, input_def_levels, input_rep_levels, /*num_values=*/4);
+ }
+ // storing rep-levels more than value in page-header
+ {
+ max_def_level_ = 0;
+ max_rep_level_ = 1;
+ NodePtr type = schema::Boolean("a", Repetition::REPEATED);
+ std::vector<int16_t> input_def_levels{};
+ std::vector<int16_t> input_rep_levels(2, 1);
+ do_check(type, input_def_levels, input_rep_levels, /*num_values=*/1);
+ }
+}
+
// Repetition level byte length reported in Page but Max Repetition level
// is zero for the column.
TEST_F(TestPrimitiveReader, TestRepetitionLvlBytesWithMaxRepetitionZero) {