This is an automated email from the ASF dual-hosted git repository. kszucs pushed a commit to branch maint-9.0.0 in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 0d8c1d5d98be6ac38da42409a98c1f08b6f9db8c Author: Will Jones <[email protected]> AuthorDate: Wed Jul 27 08:11:01 2022 -0400 ARROW-17100: [C++][Parquet] Fix backwards compatibility for ParquetV2 data pages written prior to 3.0.0 per ARROW-10353 (#13665) With these changes I can successfully read the parquet file provides in the original report. Parquet file: https://www.dropbox.com/s/portxgch3fpovnz/test2.parq?dl=0 Gist to generate: https://gist.github.com/bivald/f93448eaf25808284c4029c691a58a6a Original report: https://lists.apache.org/thread/wtbqozdhj2hwn6f0sps2j70lr07grk06 Based off of changes in ARROW-10353 Authored-by: Will Jones <[email protected]> Signed-off-by: David Li <[email protected]> --- cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 16 ++++++++++++++++ cpp/src/parquet/column_reader.cc | 19 ++++++++++++++----- cpp/src/parquet/column_reader.h | 4 +++- cpp/src/parquet/file_reader.cc | 11 ++++++++--- cpp/src/parquet/metadata.cc | 9 +++++++++ cpp/src/parquet/metadata.h | 1 + testing | 2 +- 7 files changed, 52 insertions(+), 10 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index db8b685fa5..d719f0e642 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -3943,6 +3943,22 @@ TEST(TestArrowReaderAdHoc, WriteBatchedNestedNullableStringColumn) { ::arrow::AssertTablesEqual(*expected, *actual, /*same_chunk_layout=*/false); } +TEST(TestArrowReaderAdHoc, OldDataPageV2) { + // ARROW-17100 +#ifndef ARROW_WITH_SNAPPY + GTEST_SKIP() << "Test requires Snappy compression"; +#endif + const char* c_root = std::getenv("ARROW_TEST_DATA"); + if (!c_root) { + GTEST_SKIP() << "ARROW_TEST_DATA not set."; + } + std::stringstream ss; + ss << c_root << "/" + << "parquet/ARROW-17100.parquet"; + std::string path = ss.str(); + TryReadDataFile(path); +} + class TestArrowReaderAdHocSparkAndHvr : public ::testing::TestWithParam< std::tuple<std::string, std::shared_ptr<DataType>>> {}; diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index b8d3b767b0..523030fd78 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -224,7 +224,7 @@ class SerializedPageReader : public PageReader { public: SerializedPageReader(std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows, Compression::type codec, const ReaderProperties& properties, - const CryptoContext* crypto_ctx) + const CryptoContext* crypto_ctx, bool always_compressed) : properties_(properties), stream_(std::move(stream)), decompression_buffer_(AllocateBuffer(properties_.memory_pool(), 0)), @@ -238,6 +238,7 @@ class SerializedPageReader : public PageReader { } max_page_header_size_ = kDefaultMaxPageHeaderSize; decompressor_ = GetCodec(codec); + always_compressed_ = always_compressed; } // Implement the PageReader interface @@ -265,6 +266,8 @@ class SerializedPageReader : public PageReader { std::unique_ptr<::arrow::util::Codec> decompressor_; std::shared_ptr<ResizableBuffer> decompression_buffer_; + bool always_compressed_; + // The fields below are used for calculation of AAD (additional authenticated data) // suffix which is part of the Parquet Modular Encryption. // The AAD suffix for a parquet module is built internally by @@ -449,7 +452,10 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() { header.repetition_levels_byte_length < 0) { throw ParquetException("Invalid page header (negative levels byte length)"); } - bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false; + // Arrow prior to 3.0.0 set is_compressed to false but still compressed. + bool is_compressed = + (header.__isset.is_compressed ? header.is_compressed : false) || + always_compressed_; EncodedStatistics page_statistics = ExtractStatsFromHeader(header); seen_num_rows_ += header.num_values; @@ -516,18 +522,21 @@ std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> s int64_t total_num_rows, Compression::type codec, const ReaderProperties& properties, + bool always_compressed, const CryptoContext* ctx) { return std::unique_ptr<PageReader>(new SerializedPageReader( - std::move(stream), total_num_rows, codec, properties, ctx)); + std::move(stream), total_num_rows, codec, properties, ctx, always_compressed)); } std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows, Compression::type codec, + bool always_compressed, ::arrow::MemoryPool* pool, const CryptoContext* ctx) { - return std::unique_ptr<PageReader>(new SerializedPageReader( - std::move(stream), total_num_rows, codec, ReaderProperties(pool), ctx)); + return std::unique_ptr<PageReader>( + new SerializedPageReader(std::move(stream), total_num_rows, codec, + ReaderProperties(pool), ctx, always_compressed)); } namespace { diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index c22f9f2fc7..1d35e3988c 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -105,11 +105,13 @@ class PARQUET_EXPORT PageReader { static std::unique_ptr<PageReader> Open( std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows, - Compression::type codec, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), + Compression::type codec, bool always_compressed = false, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), const CryptoContext* ctx = NULLPTR); static std::unique_ptr<PageReader> Open(std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows, Compression::type codec, const ReaderProperties& properties, + bool always_compressed = false, const CryptoContext* ctx = NULLPTR); // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page> diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 8086b0a280..90e19e594e 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -208,10 +208,15 @@ class SerializedRowGroup : public RowGroupReader::Contents { std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = col->crypto_metadata(); + // Prior to Arrow 3.0.0, is_compressed was always set to false in column headers, + // even if compression was used. See ARROW-17100. + bool always_compressed = file_metadata_->writer_version().VersionLt( + ApplicationVersion::PARQUET_CPP_10353_FIXED_VERSION()); + // Column is encrypted only if crypto_metadata exists. if (!crypto_metadata) { return PageReader::Open(stream, col->num_values(), col->compression(), - properties_.memory_pool()); + always_compressed, properties_.memory_pool()); } if (file_decryptor_ == nullptr) { @@ -233,7 +238,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { CryptoContext ctx(col->has_dictionary_page(), row_group_ordinal_, static_cast<int16_t>(i), meta_decryptor, data_decryptor); return PageReader::Open(stream, col->num_values(), col->compression(), - properties_.memory_pool(), &ctx); + always_compressed, properties_.memory_pool(), &ctx); } // The column is encrypted with its own key @@ -248,7 +253,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { CryptoContext ctx(col->has_dictionary_page(), row_group_ordinal_, static_cast<int16_t>(i), meta_decryptor, data_decryptor); return PageReader::Open(stream, col->num_values(), col->compression(), - properties_.memory_pool(), &ctx); + always_compressed, properties_.memory_pool(), &ctx); } private: diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 6226c3ad09..1b2a3df9c4 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -58,6 +58,15 @@ const ApplicationVersion& ApplicationVersion::PARQUET_MR_FIXED_STATS_VERSION() { return version; } +const ApplicationVersion& ApplicationVersion::PARQUET_CPP_10353_FIXED_VERSION() { + // parquet-cpp versions released prior to Arrow 3.0 would write DataPageV2 pages + // with is_compressed==0 but still write compressed data. (See: ARROW-10353). + // Parquet 1.5.1 had this problem, and after that we switched to the + // application name "parquet-cpp-arrow", so this version is fake. + static ApplicationVersion version("parquet-cpp", 2, 0, 0); + return version; +} + std::string ParquetVersionToString(ParquetVersion::type ver) { switch (ver) { case ParquetVersion::PARQUET_1_0: diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index 89dca5667b..bd59c628dc 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -57,6 +57,7 @@ class PARQUET_EXPORT ApplicationVersion { static const ApplicationVersion& PARQUET_816_FIXED_VERSION(); static const ApplicationVersion& PARQUET_CPP_FIXED_STATS_VERSION(); static const ApplicationVersion& PARQUET_MR_FIXED_STATS_VERSION(); + static const ApplicationVersion& PARQUET_CPP_10353_FIXED_VERSION(); // Application that wrote the file. e.g. "IMPALA" std::string application_; diff --git a/testing b/testing index 53b4980471..5bab2f264a 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 53b498047109d9940fcfab388bd9d6aeb8c57425 +Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88
