This is an automated email from the ASF dual-hosted git repository.

maplefu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9801801df3f GH-31992: [C++][Parquet] Handling the special case when 
DataPageV2 values buffer is empty (#45252)
9801801df3f is described below

commit 9801801df3f75339f705264252c0eac189f5f2a3
Author: mwish <[email protected]>
AuthorDate: Mon Jan 20 19:02:11 2025 +0800

    GH-31992: [C++][Parquet] Handling the special case when DataPageV2 values 
buffer is empty (#45252)
    
    
    
    ### Rationale for this change
    
    In DataPageV2, the levels and data will not be compressed together. So, we 
might get the "empty" data page buffer.
    
    When meeting this, Snappy C++ will failed to decompress the `(input_len == 
0, output_len == 0)` data.
    
    ### What changes are included in this PR?
    
    Handling the case in `column_reader.cc`
    
    ### Are these changes tested?
    
    * [x] Will add
    
    ### Are there any user-facing changes?
    
    Minor fix
    
    * GitHub Issue: #31992
    
    Lead-authored-by: mwish <[email protected]>
    Co-authored-by: mwish <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: mwish <[email protected]>
---
 cpp/src/parquet/column_reader.cc      | 22 +++++++++++++++-------
 cpp/src/parquet/column_reader_test.cc |  2 +-
 cpp/src/parquet/column_writer_test.cc | 33 +++++++++++++++++++++++++++++++++
 3 files changed, 49 insertions(+), 8 deletions(-)

diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 2a3bbf76d1c..8d4169b034a 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -580,13 +580,21 @@ std::shared_ptr<Buffer> 
SerializedPageReader::DecompressIfNeeded(
     memcpy(decompressed, page_buffer->data(), levels_byte_len);
   }
 
-  // Decompress the values
-  PARQUET_ASSIGN_OR_THROW(
-      auto decompressed_len,
-      decompressor_->Decompress(compressed_len - levels_byte_len,
-                                page_buffer->data() + levels_byte_len,
-                                uncompressed_len - levels_byte_len,
-                                decompression_buffer_->mutable_data() + 
levels_byte_len));
+  // GH-31992: DataPageV2 may store only levels and no values when all
+  // values are null. In this case, Parquet java is known to produce a
+  // 0-len compressed area (which is invalid compressed input).
+  // See https://github.com/apache/parquet-java/issues/3122
+  int64_t decompressed_len = 0;
+  if (uncompressed_len - levels_byte_len != 0) {
+    // Decompress the values
+    PARQUET_ASSIGN_OR_THROW(
+        decompressed_len,
+        decompressor_->Decompress(
+            compressed_len - levels_byte_len, page_buffer->data() + 
levels_byte_len,
+            uncompressed_len - levels_byte_len,
+            decompression_buffer_->mutable_data() + levels_byte_len));
+  }
+
   if (decompressed_len != uncompressed_len - levels_byte_len) {
     throw ParquetException("Page didn't decompress to expected size, expected: 
" +
                            std::to_string(uncompressed_len - levels_byte_len) +
diff --git a/cpp/src/parquet/column_reader_test.cc 
b/cpp/src/parquet/column_reader_test.cc
index f3d580ab5d3..87514d87db6 100644
--- a/cpp/src/parquet/column_reader_test.cc
+++ b/cpp/src/parquet/column_reader_test.cc
@@ -465,7 +465,7 @@ TEST_F(TestPrimitiveReader, 
TestRepetitionLvlBytesWithMaxRepetitionZero) {
   int32_t values[batch_size];
   int64_t values_read;
   ASSERT_TRUE(reader->HasNext());
-  EXPECT_EQ(4, reader->ReadBatch(batch_size, def_levels_out, 
/*replevels=*/nullptr,
+  EXPECT_EQ(4, reader->ReadBatch(batch_size, def_levels_out, 
/*rep_levels=*/nullptr,
                                  values, &values_read));
   EXPECT_EQ(3, values_read);
 }
diff --git a/cpp/src/parquet/column_writer_test.cc 
b/cpp/src/parquet/column_writer_test.cc
index 25446aefd68..744859cf0f0 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -1774,5 +1774,38 @@ TEST_F(TestInt32Writer, WriteKeyValueMetadataEndToEnd) {
   ASSERT_EQ("bar", value);
 }
 
+TEST_F(TestValuesWriterInt32Type, AllNullsCompressionInPageV2) {
+  // GH-31992: In DataPageV2, the levels and data will not be compressed 
together,
+  // so, when all values are null, the compressed values should be empty. And
+  // we should handle this case correctly.
+  std::vector<Compression::type> compressions = {Compression::SNAPPY, 
Compression::GZIP,
+                                                 Compression::ZSTD, 
Compression::BROTLI,
+                                                 Compression::LZ4};
+  for (auto compression : compressions) {
+    if (!Codec::IsAvailable(compression)) {
+      continue;
+    }
+    ARROW_SCOPED_TRACE("compression = ", Codec::GetCodecAsString(compression));
+    // Optional and non-repeated, with definition levels
+    // but no repetition levels
+    this->SetUpSchema(Repetition::OPTIONAL);
+    this->GenerateData(SMALL_SIZE);
+    std::fill(this->def_levels_.begin(), this->def_levels_.end(), 0);
+    ColumnProperties column_properties;
+    column_properties.set_compression(compression);
+
+    auto writer =
+        this->BuildWriter(SMALL_SIZE, column_properties, 
ParquetVersion::PARQUET_2_LATEST,
+                          ParquetDataPageVersion::V2);
+    writer->WriteBatch(this->values_.size(), this->def_levels_.data(), nullptr,
+                       this->values_ptr_);
+    writer->Close();
+
+    ASSERT_EQ(100, this->metadata_num_values());
+    this->ReadColumn(compression);
+    ASSERT_EQ(0, this->values_read_);
+  }
+}
+
 }  // namespace test
 }  // namespace parquet

Reply via email to