This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch maint-9.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 0d8c1d5d98be6ac38da42409a98c1f08b6f9db8c
Author: Will Jones <[email protected]>
AuthorDate: Wed Jul 27 08:11:01 2022 -0400

    ARROW-17100: [C++][Parquet] Fix backwards compatibility for ParquetV2 data 
pages written prior to 3.0.0 per ARROW-10353 (#13665)
    
    With these changes I can successfully read the parquet file provides in the 
original report.
    
    Parquet file: https://www.dropbox.com/s/portxgch3fpovnz/test2.parq?dl=0
    Gist to generate: 
https://gist.github.com/bivald/f93448eaf25808284c4029c691a58a6a
    Original report: 
https://lists.apache.org/thread/wtbqozdhj2hwn6f0sps2j70lr07grk06
    
    Based off of changes in ARROW-10353
    
    Authored-by: Will Jones <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 16 ++++++++++++++++
 cpp/src/parquet/column_reader.cc                  | 19 ++++++++++++++-----
 cpp/src/parquet/column_reader.h                   |  4 +++-
 cpp/src/parquet/file_reader.cc                    | 11 ++++++++---
 cpp/src/parquet/metadata.cc                       |  9 +++++++++
 cpp/src/parquet/metadata.h                        |  1 +
 testing                                           |  2 +-
 7 files changed, 52 insertions(+), 10 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc 
b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index db8b685fa5..d719f0e642 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -3943,6 +3943,22 @@ TEST(TestArrowReaderAdHoc, 
WriteBatchedNestedNullableStringColumn) {
   ::arrow::AssertTablesEqual(*expected, *actual, /*same_chunk_layout=*/false);
 }
 
+TEST(TestArrowReaderAdHoc, OldDataPageV2) {
+  // ARROW-17100
+#ifndef ARROW_WITH_SNAPPY
+  GTEST_SKIP() << "Test requires Snappy compression";
+#endif
+  const char* c_root = std::getenv("ARROW_TEST_DATA");
+  if (!c_root) {
+    GTEST_SKIP() << "ARROW_TEST_DATA not set.";
+  }
+  std::stringstream ss;
+  ss << c_root << "/"
+     << "parquet/ARROW-17100.parquet";
+  std::string path = ss.str();
+  TryReadDataFile(path);
+}
+
 class TestArrowReaderAdHocSparkAndHvr
     : public ::testing::TestWithParam<
           std::tuple<std::string, std::shared_ptr<DataType>>> {};
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index b8d3b767b0..523030fd78 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -224,7 +224,7 @@ class SerializedPageReader : public PageReader {
  public:
   SerializedPageReader(std::shared_ptr<ArrowInputStream> stream, int64_t 
total_num_rows,
                        Compression::type codec, const ReaderProperties& 
properties,
-                       const CryptoContext* crypto_ctx)
+                       const CryptoContext* crypto_ctx, bool always_compressed)
       : properties_(properties),
         stream_(std::move(stream)),
         decompression_buffer_(AllocateBuffer(properties_.memory_pool(), 0)),
@@ -238,6 +238,7 @@ class SerializedPageReader : public PageReader {
     }
     max_page_header_size_ = kDefaultMaxPageHeaderSize;
     decompressor_ = GetCodec(codec);
+    always_compressed_ = always_compressed;
   }
 
   // Implement the PageReader interface
@@ -265,6 +266,8 @@ class SerializedPageReader : public PageReader {
   std::unique_ptr<::arrow::util::Codec> decompressor_;
   std::shared_ptr<ResizableBuffer> decompression_buffer_;
 
+  bool always_compressed_;
+
   // The fields below are used for calculation of AAD (additional 
authenticated data)
   // suffix which is part of the Parquet Modular Encryption.
   // The AAD suffix for a parquet module is built internally by
@@ -449,7 +452,10 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
           header.repetition_levels_byte_length < 0) {
         throw ParquetException("Invalid page header (negative levels byte 
length)");
       }
-      bool is_compressed = header.__isset.is_compressed ? header.is_compressed 
: false;
+      // Arrow prior to 3.0.0 set is_compressed to false but still compressed.
+      bool is_compressed =
+          (header.__isset.is_compressed ? header.is_compressed : false) ||
+          always_compressed_;
       EncodedStatistics page_statistics = ExtractStatsFromHeader(header);
       seen_num_rows_ += header.num_values;
 
@@ -516,18 +522,21 @@ std::unique_ptr<PageReader> 
PageReader::Open(std::shared_ptr<ArrowInputStream> s
                                              int64_t total_num_rows,
                                              Compression::type codec,
                                              const ReaderProperties& 
properties,
+                                             bool always_compressed,
                                              const CryptoContext* ctx) {
   return std::unique_ptr<PageReader>(new SerializedPageReader(
-      std::move(stream), total_num_rows, codec, properties, ctx));
+      std::move(stream), total_num_rows, codec, properties, ctx, 
always_compressed));
 }
 
 std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> 
stream,
                                              int64_t total_num_rows,
                                              Compression::type codec,
+                                             bool always_compressed,
                                              ::arrow::MemoryPool* pool,
                                              const CryptoContext* ctx) {
-  return std::unique_ptr<PageReader>(new SerializedPageReader(
-      std::move(stream), total_num_rows, codec, ReaderProperties(pool), ctx));
+  return std::unique_ptr<PageReader>(
+      new SerializedPageReader(std::move(stream), total_num_rows, codec,
+                               ReaderProperties(pool), ctx, 
always_compressed));
 }
 
 namespace {
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index c22f9f2fc7..1d35e3988c 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -105,11 +105,13 @@ class PARQUET_EXPORT PageReader {
 
   static std::unique_ptr<PageReader> Open(
       std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows,
-      Compression::type codec, ::arrow::MemoryPool* pool = 
::arrow::default_memory_pool(),
+      Compression::type codec, bool always_compressed = false,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
       const CryptoContext* ctx = NULLPTR);
   static std::unique_ptr<PageReader> Open(std::shared_ptr<ArrowInputStream> 
stream,
                                           int64_t total_num_rows, 
Compression::type codec,
                                           const ReaderProperties& properties,
+                                          bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
   // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index 8086b0a280..90e19e594e 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -208,10 +208,15 @@ class SerializedRowGroup : public 
RowGroupReader::Contents {
 
     std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = 
col->crypto_metadata();
 
+    // Prior to Arrow 3.0.0, is_compressed was always set to false in column 
headers,
+    // even if compression was used. See ARROW-17100.
+    bool always_compressed = file_metadata_->writer_version().VersionLt(
+        ApplicationVersion::PARQUET_CPP_10353_FIXED_VERSION());
+
     // Column is encrypted only if crypto_metadata exists.
     if (!crypto_metadata) {
       return PageReader::Open(stream, col->num_values(), col->compression(),
-                              properties_.memory_pool());
+                              always_compressed, properties_.memory_pool());
     }
 
     if (file_decryptor_ == nullptr) {
@@ -233,7 +238,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
       CryptoContext ctx(col->has_dictionary_page(), row_group_ordinal_,
                         static_cast<int16_t>(i), meta_decryptor, 
data_decryptor);
       return PageReader::Open(stream, col->num_values(), col->compression(),
-                              properties_.memory_pool(), &ctx);
+                              always_compressed, properties_.memory_pool(), 
&ctx);
     }
 
     // The column is encrypted with its own key
@@ -248,7 +253,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
     CryptoContext ctx(col->has_dictionary_page(), row_group_ordinal_,
                       static_cast<int16_t>(i), meta_decryptor, data_decryptor);
     return PageReader::Open(stream, col->num_values(), col->compression(),
-                            properties_.memory_pool(), &ctx);
+                            always_compressed, properties_.memory_pool(), 
&ctx);
   }
 
  private:
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index 6226c3ad09..1b2a3df9c4 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -58,6 +58,15 @@ const ApplicationVersion& 
ApplicationVersion::PARQUET_MR_FIXED_STATS_VERSION() {
   return version;
 }
 
+const ApplicationVersion& 
ApplicationVersion::PARQUET_CPP_10353_FIXED_VERSION() {
+  // parquet-cpp versions released prior to Arrow 3.0 would write DataPageV2 
pages
+  // with is_compressed==0 but still write compressed data. (See: ARROW-10353).
+  // Parquet 1.5.1 had this problem, and after that we switched to the
+  // application name "parquet-cpp-arrow", so this version is fake.
+  static ApplicationVersion version("parquet-cpp", 2, 0, 0);
+  return version;
+}
+
 std::string ParquetVersionToString(ParquetVersion::type ver) {
   switch (ver) {
     case ParquetVersion::PARQUET_1_0:
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index 89dca5667b..bd59c628dc 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -57,6 +57,7 @@ class PARQUET_EXPORT ApplicationVersion {
   static const ApplicationVersion& PARQUET_816_FIXED_VERSION();
   static const ApplicationVersion& PARQUET_CPP_FIXED_STATS_VERSION();
   static const ApplicationVersion& PARQUET_MR_FIXED_STATS_VERSION();
+  static const ApplicationVersion& PARQUET_CPP_10353_FIXED_VERSION();
 
   // Application that wrote the file. e.g. "IMPALA"
   std::string application_;
diff --git a/testing b/testing
index 53b4980471..5bab2f264a 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 53b498047109d9940fcfab388bd9d6aeb8c57425
+Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88

Reply via email to