This is an automated email from the ASF dual-hosted git repository.

maplefu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d904d64ba GH-38271: [C++][Parquet] Support reading parquet files with 
multiple gzip members (#38272)
1d904d64ba is described below

commit 1d904d64ba89835fa6587ed08121a06df51f92b4
Author: Atheel Massalha <[email protected]>
AuthorDate: Wed Nov 29 07:07:42 2023 +0200

    GH-38271: [C++][Parquet] Support reading parquet files with multiple gzip 
members (#38272)
    
    ### What changes are included in this PR?
    Adding support in GZipCodec to decompress concatenated gzip members
    
    ### Are these changes tested?
    test is attached
    
    ### Are there any user-facing changes?
    no
    
    * Closes: #38271
    
    Lead-authored-by: amassalha <[email protected]>
    Co-authored-by: Atheel Massalha <[email protected]>
    Signed-off-by: mwish <[email protected]>
---
 cpp/src/arrow/util/compression_test.cc | 41 +++++++++++++++++++++++
 cpp/src/arrow/util/compression_zlib.cc | 59 ++++++++++++++++++++--------------
 cpp/src/parquet/reader_test.cc         | 23 +++++++++++++
 cpp/submodules/parquet-testing         |  2 +-
 4 files changed, 99 insertions(+), 26 deletions(-)

diff --git a/cpp/src/arrow/util/compression_test.cc 
b/cpp/src/arrow/util/compression_test.cc
index 8f2a7f052c..5289320e7c 100644
--- a/cpp/src/arrow/util/compression_test.cc
+++ b/cpp/src/arrow/util/compression_test.cc
@@ -368,6 +368,47 @@ TEST_P(CodecTest, CodecRoundtrip) {
   }
 }
 
+TEST(CodecTest, CodecRoundtripGzipMembers) {
+  std::unique_ptr<Codec> gzip_codec;
+  ASSERT_OK_AND_ASSIGN(gzip_codec, Codec::Create(Compression::GZIP));
+
+  for (int data_size : {0, 10000, 100000}) {
+    int64_t compressed_size_p1, compressed_size_p2;
+    uint32_t p1_size = data_size / 4;
+    uint32_t p2_size = data_size - p1_size;
+    std::vector<uint8_t> data_full = MakeRandomData(data_size);
+    std::vector<uint8_t> data_p1(data_full.begin(), data_full.begin() + 
p1_size);
+    std::vector<uint8_t> data_p2(data_full.begin() + p1_size, data_full.end());
+
+    int max_compressed_len_p1 =
+        static_cast<int>(gzip_codec->MaxCompressedLen(p1_size, 
data_p1.data()));
+    int max_compressed_len_p2 =
+        static_cast<int>(gzip_codec->MaxCompressedLen(p2_size, 
data_p2.data()));
+    std::vector<uint8_t> compressed(max_compressed_len_p1 + 
max_compressed_len_p2);
+
+    // Compress in 2 parts separately
+    ASSERT_OK_AND_ASSIGN(compressed_size_p1,
+                         gzip_codec->Compress(p1_size, data_p1.data(),
+                                              max_compressed_len_p1, 
compressed.data()));
+    ASSERT_OK_AND_ASSIGN(
+        compressed_size_p2,
+        gzip_codec->Compress(p2_size, data_p2.data(), max_compressed_len_p2,
+                             compressed.data() + compressed_size_p1));
+    compressed.resize(compressed_size_p1 + compressed_size_p2);
+
+    // Decompress the concatenated compressed gzip members
+    std::vector<uint8_t> decompressed(data_size);
+    int64_t actual_decompressed_size;
+    ASSERT_OK_AND_ASSIGN(
+        actual_decompressed_size,
+        gzip_codec->Decompress(compressed.size(), compressed.data(), 
decompressed.size(),
+                               decompressed.data()));
+
+    ASSERT_EQ(data_size, actual_decompressed_size);
+    ASSERT_EQ(data_full, decompressed);
+  }
+}
+
 TEST(TestCodecMisc, SpecifyCompressionLevel) {
   struct CombinationOption {
     Compression::type codec;
diff --git a/cpp/src/arrow/util/compression_zlib.cc 
b/cpp/src/arrow/util/compression_zlib.cc
index 2b38bdceab..a51f783be3 100644
--- a/cpp/src/arrow/util/compression_zlib.cc
+++ b/cpp/src/arrow/util/compression_zlib.cc
@@ -381,6 +381,9 @@ class GZipCodec : public Codec {
 
   Result<int64_t> Decompress(int64_t input_length, const uint8_t* input,
                              int64_t output_buffer_length, uint8_t* output) 
override {
+    int64_t read_input_bytes = 0;
+    int64_t decompressed_bytes = 0;
+
     if (!decompressor_initialized_) {
       RETURN_NOT_OK(InitDecompressor());
     }
@@ -392,40 +395,46 @@ class GZipCodec : public Codec {
       return 0;
     }
 
-    // Reset the stream for this block
-    if (inflateReset(&stream_) != Z_OK) {
-      return ZlibErrorPrefix("zlib inflateReset failed: ", stream_.msg);
-    }
+    // inflate() will not automatically decode concatenated gzip members, keep 
calling
+    // inflate until reading all input data (GH-38271).
+    while (read_input_bytes < input_length) {
+      // Reset the stream for this block
+      if (inflateReset(&stream_) != Z_OK) {
+        return ZlibErrorPrefix("zlib inflateReset failed: ", stream_.msg);
+      }
 
-    int ret = 0;
-    // gzip can run in streaming mode or non-streaming mode.  We only
-    // support the non-streaming use case where we present it the entire
-    // compressed input and a buffer big enough to contain the entire
-    // compressed output.  In the case where we don't know the output,
-    // we just make a bigger buffer and try the non-streaming mode
-    // from the beginning again.
-    while (ret != Z_STREAM_END) {
-      stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const 
Bytef*>(input));
-      stream_.avail_in = static_cast<uInt>(input_length);
-      stream_.next_out = reinterpret_cast<Bytef*>(output);
-      stream_.avail_out = static_cast<uInt>(output_buffer_length);
+      int ret = 0;
+      // gzip can run in streaming mode or non-streaming mode.  We only
+      // support the non-streaming use case where we present it the entire
+      // compressed input and a buffer big enough to contain the entire
+      // compressed output.  In the case where we don't know the output,
+      // we just make a bigger buffer and try the non-streaming mode
+      // from the beginning again.
+      stream_.next_in =
+          const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input + 
read_input_bytes));
+      stream_.avail_in = static_cast<uInt>(input_length - read_input_bytes);
+      stream_.next_out = reinterpret_cast<Bytef*>(output + decompressed_bytes);
+      stream_.avail_out = static_cast<uInt>(output_buffer_length - 
decompressed_bytes);
 
       // We know the output size.  In this case, we can use Z_FINISH
       // which is more efficient.
       ret = inflate(&stream_, Z_FINISH);
-      if (ret == Z_STREAM_END || ret != Z_OK) break;
+      if (ret == Z_OK) {
+        // Failure, buffer was too small
+        return Status::IOError("Too small a buffer passed to GZipCodec. 
InputLength=",
+                               input_length, " OutputLength=", 
output_buffer_length);
+      }
 
-      // Failure, buffer was too small
-      return Status::IOError("Too small a buffer passed to GZipCodec. 
InputLength=",
-                             input_length, " OutputLength=", 
output_buffer_length);
-    }
+      // Failure for some other reason
+      if (ret != Z_STREAM_END) {
+        return ZlibErrorPrefix("GZipCodec failed: ", stream_.msg);
+      }
 
-    // Failure for some other reason
-    if (ret != Z_STREAM_END) {
-      return ZlibErrorPrefix("GZipCodec failed: ", stream_.msg);
+      read_input_bytes += stream_.total_in;
+      decompressed_bytes += stream_.total_out;
     }
 
-    return stream_.total_out;
+    return decompressed_bytes;
   }
 
   int64_t MaxCompressedLen(int64_t input_length,
diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc
index 8fe12d3de0..e1ed79ef2c 100644
--- a/cpp/src/parquet/reader_test.cc
+++ b/cpp/src/parquet/reader_test.cc
@@ -116,6 +116,10 @@ std::string rle_dict_uncompressed_corrupt_checksum() {
   return data_file("rle-dict-uncompressed-corrupt-checksum.parquet");
 }
 
+std::string concatenated_gzip_members() {
+  return data_file("concatenated_gzip_members.parquet");
+}
+
 // TODO: Assert on definition and repetition levels
 template <typename DType, typename ValueType>
 void AssertColumnValues(std::shared_ptr<TypedColumnReader<DType>> col, int64_t 
batch_size,
@@ -778,6 +782,25 @@ TEST_F(TestCheckDataPageCrc, CorruptDict) {
   }
 }
 
+TEST(TestGzipMembersRead, TwoConcatenatedMembers) {
+  auto file_reader = ParquetFileReader::OpenFile(concatenated_gzip_members(),
+                                                 /*memory_map=*/false);
+  auto col_reader = std::dynamic_pointer_cast<TypedColumnReader<Int64Type>>(
+      file_reader->RowGroup(0)->Column(0));
+  int64_t num_values = 0;
+  int64_t num_repdef = 0;
+  std::vector<int16_t> reps(1024);
+  std::vector<int16_t> defs(1024);
+  std::vector<int64_t> vals(1024);
+
+  num_repdef =
+      col_reader->ReadBatch(1024, defs.data(), reps.data(), vals.data(), 
&num_values);
+  EXPECT_EQ(num_repdef, 513);
+  for (int64_t i = 0; i < num_repdef; i++) {
+    EXPECT_EQ(i + 1, vals[i]);
+  }
+}
+
 TEST(TestFileReaderAdHoc, NationDictTruncatedDataPage) {
   // PARQUET-816. Some files generated by older Parquet implementations may
   // contain malformed data page metadata, and we can successfully decode them
diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing
index da467dac2f..d69d979223 160000
--- a/cpp/submodules/parquet-testing
+++ b/cpp/submodules/parquet-testing
@@ -1 +1 @@
-Subproject commit da467dac2f095b979af37bcf40fa0d1dee5ff652
+Subproject commit d69d979223e883faef9dc6fe3cf573087243c28a

Reply via email to