This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 1d904d64ba GH-38271: [C++][Parquet] Support reading parquet files with
multiple gzip members (#38272)
1d904d64ba is described below
commit 1d904d64ba89835fa6587ed08121a06df51f92b4
Author: Atheel Massalha <[email protected]>
AuthorDate: Wed Nov 29 07:07:42 2023 +0200
GH-38271: [C++][Parquet] Support reading parquet files with multiple gzip
members (#38272)
### What changes are included in this PR?
Adding support in GZipCodec to decompress concatenated gzip members
### Are these changes tested?
test is attached
### Are there any user-facing changes?
no
* Closes: #38271
Lead-authored-by: amassalha <[email protected]>
Co-authored-by: Atheel Massalha <[email protected]>
Signed-off-by: mwish <[email protected]>
---
cpp/src/arrow/util/compression_test.cc | 41 +++++++++++++++++++++++
cpp/src/arrow/util/compression_zlib.cc | 59 ++++++++++++++++++++--------------
cpp/src/parquet/reader_test.cc | 23 +++++++++++++
cpp/submodules/parquet-testing | 2 +-
4 files changed, 99 insertions(+), 26 deletions(-)
diff --git a/cpp/src/arrow/util/compression_test.cc
b/cpp/src/arrow/util/compression_test.cc
index 8f2a7f052c..5289320e7c 100644
--- a/cpp/src/arrow/util/compression_test.cc
+++ b/cpp/src/arrow/util/compression_test.cc
@@ -368,6 +368,47 @@ TEST_P(CodecTest, CodecRoundtrip) {
}
}
+TEST(CodecTest, CodecRoundtripGzipMembers) {
+ std::unique_ptr<Codec> gzip_codec;
+ ASSERT_OK_AND_ASSIGN(gzip_codec, Codec::Create(Compression::GZIP));
+
+ for (int data_size : {0, 10000, 100000}) {
+ int64_t compressed_size_p1, compressed_size_p2;
+ uint32_t p1_size = data_size / 4;
+ uint32_t p2_size = data_size - p1_size;
+ std::vector<uint8_t> data_full = MakeRandomData(data_size);
+ std::vector<uint8_t> data_p1(data_full.begin(), data_full.begin() +
p1_size);
+ std::vector<uint8_t> data_p2(data_full.begin() + p1_size, data_full.end());
+
+ int max_compressed_len_p1 =
+ static_cast<int>(gzip_codec->MaxCompressedLen(p1_size,
data_p1.data()));
+ int max_compressed_len_p2 =
+ static_cast<int>(gzip_codec->MaxCompressedLen(p2_size,
data_p2.data()));
+ std::vector<uint8_t> compressed(max_compressed_len_p1 +
max_compressed_len_p2);
+
+ // Compress in 2 parts separately
+ ASSERT_OK_AND_ASSIGN(compressed_size_p1,
+ gzip_codec->Compress(p1_size, data_p1.data(),
+ max_compressed_len_p1,
compressed.data()));
+ ASSERT_OK_AND_ASSIGN(
+ compressed_size_p2,
+ gzip_codec->Compress(p2_size, data_p2.data(), max_compressed_len_p2,
+ compressed.data() + compressed_size_p1));
+ compressed.resize(compressed_size_p1 + compressed_size_p2);
+
+ // Decompress the concatenated compressed gzip members
+ std::vector<uint8_t> decompressed(data_size);
+ int64_t actual_decompressed_size;
+ ASSERT_OK_AND_ASSIGN(
+ actual_decompressed_size,
+ gzip_codec->Decompress(compressed.size(), compressed.data(),
decompressed.size(),
+ decompressed.data()));
+
+ ASSERT_EQ(data_size, actual_decompressed_size);
+ ASSERT_EQ(data_full, decompressed);
+ }
+}
+
TEST(TestCodecMisc, SpecifyCompressionLevel) {
struct CombinationOption {
Compression::type codec;
diff --git a/cpp/src/arrow/util/compression_zlib.cc
b/cpp/src/arrow/util/compression_zlib.cc
index 2b38bdceab..a51f783be3 100644
--- a/cpp/src/arrow/util/compression_zlib.cc
+++ b/cpp/src/arrow/util/compression_zlib.cc
@@ -381,6 +381,9 @@ class GZipCodec : public Codec {
Result<int64_t> Decompress(int64_t input_length, const uint8_t* input,
int64_t output_buffer_length, uint8_t* output)
override {
+ int64_t read_input_bytes = 0;
+ int64_t decompressed_bytes = 0;
+
if (!decompressor_initialized_) {
RETURN_NOT_OK(InitDecompressor());
}
@@ -392,40 +395,46 @@ class GZipCodec : public Codec {
return 0;
}
- // Reset the stream for this block
- if (inflateReset(&stream_) != Z_OK) {
- return ZlibErrorPrefix("zlib inflateReset failed: ", stream_.msg);
- }
+ // inflate() will not automatically decode concatenated gzip members, keep
calling
+ // inflate until reading all input data (GH-38271).
+ while (read_input_bytes < input_length) {
+ // Reset the stream for this block
+ if (inflateReset(&stream_) != Z_OK) {
+ return ZlibErrorPrefix("zlib inflateReset failed: ", stream_.msg);
+ }
- int ret = 0;
- // gzip can run in streaming mode or non-streaming mode. We only
- // support the non-streaming use case where we present it the entire
- // compressed input and a buffer big enough to contain the entire
- // compressed output. In the case where we don't know the output,
- // we just make a bigger buffer and try the non-streaming mode
- // from the beginning again.
- while (ret != Z_STREAM_END) {
- stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const
Bytef*>(input));
- stream_.avail_in = static_cast<uInt>(input_length);
- stream_.next_out = reinterpret_cast<Bytef*>(output);
- stream_.avail_out = static_cast<uInt>(output_buffer_length);
+ int ret = 0;
+ // gzip can run in streaming mode or non-streaming mode. We only
+ // support the non-streaming use case where we present it the entire
+ // compressed input and a buffer big enough to contain the entire
+ // compressed output. In the case where we don't know the output,
+ // we just make a bigger buffer and try the non-streaming mode
+ // from the beginning again.
+ stream_.next_in =
+ const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input +
read_input_bytes));
+ stream_.avail_in = static_cast<uInt>(input_length - read_input_bytes);
+ stream_.next_out = reinterpret_cast<Bytef*>(output + decompressed_bytes);
+ stream_.avail_out = static_cast<uInt>(output_buffer_length -
decompressed_bytes);
// We know the output size. In this case, we can use Z_FINISH
// which is more efficient.
ret = inflate(&stream_, Z_FINISH);
- if (ret == Z_STREAM_END || ret != Z_OK) break;
+ if (ret == Z_OK) {
+ // Failure, buffer was too small
+ return Status::IOError("Too small a buffer passed to GZipCodec.
InputLength=",
+ input_length, " OutputLength=",
output_buffer_length);
+ }
- // Failure, buffer was too small
- return Status::IOError("Too small a buffer passed to GZipCodec.
InputLength=",
- input_length, " OutputLength=",
output_buffer_length);
- }
+ // Failure for some other reason
+ if (ret != Z_STREAM_END) {
+ return ZlibErrorPrefix("GZipCodec failed: ", stream_.msg);
+ }
- // Failure for some other reason
- if (ret != Z_STREAM_END) {
- return ZlibErrorPrefix("GZipCodec failed: ", stream_.msg);
+ read_input_bytes += stream_.total_in;
+ decompressed_bytes += stream_.total_out;
}
- return stream_.total_out;
+ return decompressed_bytes;
}
int64_t MaxCompressedLen(int64_t input_length,
diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc
index 8fe12d3de0..e1ed79ef2c 100644
--- a/cpp/src/parquet/reader_test.cc
+++ b/cpp/src/parquet/reader_test.cc
@@ -116,6 +116,10 @@ std::string rle_dict_uncompressed_corrupt_checksum() {
return data_file("rle-dict-uncompressed-corrupt-checksum.parquet");
}
+std::string concatenated_gzip_members() {
+ return data_file("concatenated_gzip_members.parquet");
+}
+
// TODO: Assert on definition and repetition levels
template <typename DType, typename ValueType>
void AssertColumnValues(std::shared_ptr<TypedColumnReader<DType>> col, int64_t
batch_size,
@@ -778,6 +782,25 @@ TEST_F(TestCheckDataPageCrc, CorruptDict) {
}
}
+TEST(TestGzipMembersRead, TwoConcatenatedMembers) {
+ auto file_reader = ParquetFileReader::OpenFile(concatenated_gzip_members(),
+ /*memory_map=*/false);
+ auto col_reader = std::dynamic_pointer_cast<TypedColumnReader<Int64Type>>(
+ file_reader->RowGroup(0)->Column(0));
+ int64_t num_values = 0;
+ int64_t num_repdef = 0;
+ std::vector<int16_t> reps(1024);
+ std::vector<int16_t> defs(1024);
+ std::vector<int64_t> vals(1024);
+
+ num_repdef =
+ col_reader->ReadBatch(1024, defs.data(), reps.data(), vals.data(),
&num_values);
+ EXPECT_EQ(num_repdef, 513);
+ for (int64_t i = 0; i < num_repdef; i++) {
+ EXPECT_EQ(i + 1, vals[i]);
+ }
+}
+
TEST(TestFileReaderAdHoc, NationDictTruncatedDataPage) {
// PARQUET-816. Some files generated by older Parquet implementations may
// contain malformed data page metadata, and we can successfully decode them
diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing
index da467dac2f..d69d979223 160000
--- a/cpp/submodules/parquet-testing
+++ b/cpp/submodules/parquet-testing
@@ -1 +1 @@
-Subproject commit da467dac2f095b979af37bcf40fa0d1dee5ff652
+Subproject commit d69d979223e883faef9dc6fe3cf573087243c28a