This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 9801801df3f GH-31992: [C++][Parquet] Handling the special case when
DataPageV2 values buffer is empty (#45252)
9801801df3f is described below
commit 9801801df3f75339f705264252c0eac189f5f2a3
Author: mwish <[email protected]>
AuthorDate: Mon Jan 20 19:02:11 2025 +0800
GH-31992: [C++][Parquet] Handling the special case when DataPageV2 values
buffer is empty (#45252)
### Rationale for this change
In DataPageV2, the levels and data will not be compressed together. So, we
might get the "empty" data page buffer.
When meeting this, Snappy C++ will failed to decompress the `(input_len ==
0, output_len == 0)` data.
### What changes are included in this PR?
Handling the case in `column_reader.cc`
### Are these changes tested?
* [x] Will add
### Are there any user-facing changes?
Minor fix
* GitHub Issue: #31992
Lead-authored-by: mwish <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: mwish <[email protected]>
---
cpp/src/parquet/column_reader.cc | 22 +++++++++++++++-------
cpp/src/parquet/column_reader_test.cc | 2 +-
cpp/src/parquet/column_writer_test.cc | 33 +++++++++++++++++++++++++++++++++
3 files changed, 49 insertions(+), 8 deletions(-)
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 2a3bbf76d1c..8d4169b034a 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -580,13 +580,21 @@ std::shared_ptr<Buffer>
SerializedPageReader::DecompressIfNeeded(
memcpy(decompressed, page_buffer->data(), levels_byte_len);
}
- // Decompress the values
- PARQUET_ASSIGN_OR_THROW(
- auto decompressed_len,
- decompressor_->Decompress(compressed_len - levels_byte_len,
- page_buffer->data() + levels_byte_len,
- uncompressed_len - levels_byte_len,
- decompression_buffer_->mutable_data() +
levels_byte_len));
+ // GH-31992: DataPageV2 may store only levels and no values when all
+ // values are null. In this case, Parquet java is known to produce a
+ // 0-len compressed area (which is invalid compressed input).
+ // See https://github.com/apache/parquet-java/issues/3122
+ int64_t decompressed_len = 0;
+ if (uncompressed_len - levels_byte_len != 0) {
+ // Decompress the values
+ PARQUET_ASSIGN_OR_THROW(
+ decompressed_len,
+ decompressor_->Decompress(
+ compressed_len - levels_byte_len, page_buffer->data() +
levels_byte_len,
+ uncompressed_len - levels_byte_len,
+ decompression_buffer_->mutable_data() + levels_byte_len));
+ }
+
if (decompressed_len != uncompressed_len - levels_byte_len) {
throw ParquetException("Page didn't decompress to expected size, expected:
" +
std::to_string(uncompressed_len - levels_byte_len) +
diff --git a/cpp/src/parquet/column_reader_test.cc
b/cpp/src/parquet/column_reader_test.cc
index f3d580ab5d3..87514d87db6 100644
--- a/cpp/src/parquet/column_reader_test.cc
+++ b/cpp/src/parquet/column_reader_test.cc
@@ -465,7 +465,7 @@ TEST_F(TestPrimitiveReader,
TestRepetitionLvlBytesWithMaxRepetitionZero) {
int32_t values[batch_size];
int64_t values_read;
ASSERT_TRUE(reader->HasNext());
- EXPECT_EQ(4, reader->ReadBatch(batch_size, def_levels_out,
/*replevels=*/nullptr,
+ EXPECT_EQ(4, reader->ReadBatch(batch_size, def_levels_out,
/*rep_levels=*/nullptr,
values, &values_read));
EXPECT_EQ(3, values_read);
}
diff --git a/cpp/src/parquet/column_writer_test.cc
b/cpp/src/parquet/column_writer_test.cc
index 25446aefd68..744859cf0f0 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -1774,5 +1774,38 @@ TEST_F(TestInt32Writer, WriteKeyValueMetadataEndToEnd) {
ASSERT_EQ("bar", value);
}
+TEST_F(TestValuesWriterInt32Type, AllNullsCompressionInPageV2) {
+ // GH-31992: In DataPageV2, the levels and data will not be compressed
together,
+ // so, when all values are null, the compressed values should be empty. And
+ // we should handle this case correctly.
+ std::vector<Compression::type> compressions = {Compression::SNAPPY,
Compression::GZIP,
+ Compression::ZSTD,
Compression::BROTLI,
+ Compression::LZ4};
+ for (auto compression : compressions) {
+ if (!Codec::IsAvailable(compression)) {
+ continue;
+ }
+ ARROW_SCOPED_TRACE("compression = ", Codec::GetCodecAsString(compression));
+ // Optional and non-repeated, with definition levels
+ // but no repetition levels
+ this->SetUpSchema(Repetition::OPTIONAL);
+ this->GenerateData(SMALL_SIZE);
+ std::fill(this->def_levels_.begin(), this->def_levels_.end(), 0);
+ ColumnProperties column_properties;
+ column_properties.set_compression(compression);
+
+ auto writer =
+ this->BuildWriter(SMALL_SIZE, column_properties,
ParquetVersion::PARQUET_2_LATEST,
+ ParquetDataPageVersion::V2);
+ writer->WriteBatch(this->values_.size(), this->def_levels_.data(), nullptr,
+ this->values_ptr_);
+ writer->Close();
+
+ ASSERT_EQ(100, this->metadata_num_values());
+ this->ReadColumn(compression);
+ ASSERT_EQ(0, this->values_read_);
+ }
+}
+
} // namespace test
} // namespace parquet