This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new c16eccb  PARQUET-1482: [C++] Add branch to 
TypedRecordReader::ReadNewPage for …
c16eccb is described below

commit c16eccb1d220a749d7d1ec48b70e828fc74cce54
Author: Wes McKinney <[email protected]>
AuthorDate: Wed Mar 6 15:46:22 2019 -0600

    PARQUET-1482: [C++] Add branch to TypedRecordReader::ReadNewPage for …
    
    …PageType::DATA_PAGE_V2 to address incompatibility with parquetjs.
    
    **Tests**
    
    This commit doesn't include tests right now; I am working on adding tests 
and was hoping for some initial feedback on the code changes. I may need to use 
an actual file generated by `parquetjs` to test this issue, so I wonder if 
adding `feeds1kMicros.parquet` from the JIRA task to the parquet-testing 
repository is an option for this.
    
    **Description**
    
    `parquetjs` seems to be writing Parquet V2 files with 
[`DataPageV2`](https://github.com/apache/parquet-format/blob/e93dd628d90aa076745558998f0bf5d9c262bf22/src/main/thrift/parquet.thrift#L529)
 pages, while `parquet-cpp` writes Parquet V2 files with 
[`DataPage`](https://github.com/apache/parquet-format/blob/e93dd628d90aa076745558998f0bf5d9c262bf22/src/main/thrift/parquet.thrift#L492)
 pages.
    
    Since `TypedRecordReader::ReadNewPage()` only had a branch for 
`PageType::DATA_PAGE`, the reader would return without reading any data for 
records that have `DATA_PAGE_V2` pages. This explains the behavior observed in 
[PARQUET-1482](https://issues.apache.org/jira/projects/PARQUET/issues/PARQUET-1482?filter=allopenissues).
    
    This commit adds a new if-else branch for the `DataPageV2` case in 
`TypedRecordReader::ReadNewPage()`. Since the `DataPageV2` branch needed to 
reuse the code from the `DataPage` case, I refactored the repetition/definition 
level decoder initialization and the data decoder initialization to two new 
methods in the `TypedRecordReader` class. These new methods are now called by 
the `DataPage` and `DataPageV2` initialization branches in 
`TypedRecordReader::ReadNewPage()`.
    
    There is an alternate implementation possible (with a smaller diff) by 
sharing the same else-if branch between `DataPage` and `DataPageV2` using a 
pointer-to-derived `shared_ptr<Page>`. However, since the Page superclass 
doesn't have the necessary `encoding()` or `num_values()` methods, I would need 
to add a common superclass to both `DataPage` and `DataPageV2` that defined 
these methods. I didn't do this because I was hesitant to modify the `Page` 
class hierarchy for this commit.
    
    Author: Wes McKinney <[email protected]>
    Author: rdmello <[email protected]>
    Author: Rylan Dmello <[email protected]>
    
    Closes #3312 from rdmello/parquet_1482 and squashes the following commits:
    
    c5cb0f37 <Wes McKinney> Add DataPage base class for DataPageV1 and 
DataPageV2
    8df8328e <rdmello> PARQUET-1482:  Adding basic unit test for DataPageV2 
serialization and deserialization.
    9df32222 <Rylan Dmello> PARQUET-1482:  Add branch to 
TypedRecordReader::ReadNewPage for PageType::DATA_PAGE_V2 to address 
incompatibility with parquetjs.
---
 cpp/src/parquet/arrow/record_reader.cc   | 186 ++++++++++++++++++-------------
 cpp/src/parquet/column_page.h            |  53 ++++-----
 cpp/src/parquet/column_reader-test.cc    |   2 +-
 cpp/src/parquet/column_reader.cc         |  16 +--
 cpp/src/parquet/file-deserialize-test.cc |  54 ++++++++-
 cpp/src/parquet/test-util.h              |  10 +-
 6 files changed, 198 insertions(+), 123 deletions(-)

diff --git a/cpp/src/parquet/arrow/record_reader.cc 
b/cpp/src/parquet/arrow/record_reader.cc
index 39945af..c800f36 100644
--- a/cpp/src/parquet/arrow/record_reader.cc
+++ b/cpp/src/parquet/arrow/record_reader.cc
@@ -48,7 +48,7 @@ namespace BitUtil = ::arrow::BitUtil;
 
 // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
 // encoding.
-static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
+static bool IsDictionaryIndexEncoding(Encoding::type e) {
   return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
 }
 
@@ -86,7 +86,7 @@ class RecordReader::RecordReaderImpl {
 
   virtual ~RecordReaderImpl() = default;
 
-  virtual int64_t ReadRecordData(const int64_t num_records) = 0;
+  virtual int64_t ReadRecordData(int64_t num_records) = 0;
 
   // Returns true if there are still values in this column.
   bool HasNext() {
@@ -494,7 +494,7 @@ class TypedRecordReader : public 
RecordReader::RecordReaderImpl {
   }
 
   // Return number of logical records read
-  int64_t ReadRecordData(const int64_t num_records) override {
+  int64_t ReadRecordData(int64_t num_records) override {
     // Conservative upper bound
     const int64_t possible_num_values =
         std::max(num_records, levels_written_ - levels_position_);
@@ -580,6 +580,13 @@ class TypedRecordReader : public 
RecordReader::RecordReaderImpl {
 
   DecoderType* current_decoder_;
 
+  // Initialize repetition and definition level decoders on the next data page.
+  int64_t InitializeLevelDecoders(const DataPage& page,
+                                  Encoding::type repetition_level_encoding,
+                                  Encoding::type definition_level_encoding);
+
+  void InitializeDataDecoder(const DataPage& page, int64_t levels_bytes);
+
   // Advance to the next data page
   bool ReadNewPage() override;
 
@@ -717,11 +724,94 @@ inline void 
TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage*
   DCHECK(current_decoder_);
 }
 
+// If the data page includes repetition and definition levels, we
+// initialize the level decoders and return the number of encoded level bytes.
+// The return value helps determine the number of bytes in the encoded data.
+template <typename DType>
+int64_t TypedRecordReader<DType>::InitializeLevelDecoders(
+    const DataPage& page, Encoding::type repetition_level_encoding,
+    Encoding::type definition_level_encoding) {
+  // Read a data page.
+  num_buffered_values_ = page.num_values();
+
+  // Have not decoded any values from the data page yet
+  num_decoded_values_ = 0;
+
+  const uint8_t* buffer = page.data();
+  int64_t levels_byte_size = 0;
+
+  // Data page Layout: Repetition Levels - Definition Levels - encoded values.
+  // Levels are encoded as rle or bit-packed.
+  // Init repetition levels
+  if (descr_->max_repetition_level() > 0) {
+    int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
+        repetition_level_encoding, descr_->max_repetition_level(),
+        static_cast<int>(num_buffered_values_), buffer);
+    buffer += rep_levels_bytes;
+    levels_byte_size += rep_levels_bytes;
+  }
+  // TODO figure a way to set max_definition_level_ to 0
+  // if the initial value is invalid
+
+  // Init definition levels
+  if (descr_->max_definition_level() > 0) {
+    int64_t def_levels_bytes = definition_level_decoder_.SetData(
+        definition_level_encoding, descr_->max_definition_level(),
+        static_cast<int>(num_buffered_values_), buffer);
+    levels_byte_size += def_levels_bytes;
+  }
+
+  return levels_byte_size;
+}
+
+// Get a decoder object for this page or create a new decoder if this is the
+// first page with this encoding.
+template <typename DType>
+void TypedRecordReader<DType>::InitializeDataDecoder(const DataPage& page,
+                                                     int64_t levels_byte_size) 
{
+  const uint8_t* buffer = page.data() + levels_byte_size;
+  const int64_t data_size = page.size() - levels_byte_size;
+
+  Encoding::type encoding = page.encoding();
+
+  if (IsDictionaryIndexEncoding(encoding)) {
+    encoding = Encoding::RLE_DICTIONARY;
+  }
+
+  auto it = decoders_.find(static_cast<int>(encoding));
+  if (it != decoders_.end()) {
+    DCHECK(it->second.get() != nullptr);
+    if (encoding == Encoding::RLE_DICTIONARY) {
+      DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
+    }
+    current_decoder_ = it->second.get();
+  } else {
+    switch (encoding) {
+      case Encoding::PLAIN: {
+        auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+        current_decoder_ = decoder.get();
+        decoders_[static_cast<int>(encoding)] = std::move(decoder);
+        break;
+      }
+      case Encoding::RLE_DICTIONARY:
+        throw ParquetException("Dictionary page must be before data page.");
+
+      case Encoding::DELTA_BINARY_PACKED:
+      case Encoding::DELTA_LENGTH_BYTE_ARRAY:
+      case Encoding::DELTA_BYTE_ARRAY:
+        ParquetException::NYI("Unsupported encoding");
+
+      default:
+        throw ParquetException("Unknown encoding type.");
+    }
+  }
+  current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
+                            static_cast<int>(data_size));
+}
+
 template <typename DType>
 bool TypedRecordReader<DType>::ReadNewPage() {
   // Loop until we find the next data page.
-  const uint8_t* buffer;
-
   while (true) {
     current_page_ = pager_->NextPage();
     if (!current_page_) {
@@ -733,80 +823,18 @@ bool TypedRecordReader<DType>::ReadNewPage() {
       ConfigureDictionary(static_cast<const 
DictionaryPage*>(current_page_.get()));
       continue;
     } else if (current_page_->type() == PageType::DATA_PAGE) {
-      const DataPage* page = static_cast<const DataPage*>(current_page_.get());
-
-      // Read a data page.
-      num_buffered_values_ = page->num_values();
-
-      // Have not decoded any values from the data page yet
-      num_decoded_values_ = 0;
-
-      buffer = page->data();
-
-      // If the data page includes repetition and definition levels, we
-      // initialize the level decoder and subtract the encoded level bytes from
-      // the page size to determine the number of bytes in the encoded data.
-      int64_t data_size = page->size();
-
-      // Data page Layout: Repetition Levels - Definition Levels - encoded 
values.
-      // Levels are encoded as rle or bit-packed.
-      // Init repetition levels
-      if (descr_->max_repetition_level() > 0) {
-        int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
-            page->repetition_level_encoding(), descr_->max_repetition_level(),
-            static_cast<int>(num_buffered_values_), buffer);
-        buffer += rep_levels_bytes;
-        data_size -= rep_levels_bytes;
-      }
-      // TODO figure a way to set max_definition_level_ to 0
-      // if the initial value is invalid
-
-      // Init definition levels
-      if (descr_->max_definition_level() > 0) {
-        int64_t def_levels_bytes = definition_level_decoder_.SetData(
-            page->definition_level_encoding(), descr_->max_definition_level(),
-            static_cast<int>(num_buffered_values_), buffer);
-        buffer += def_levels_bytes;
-        data_size -= def_levels_bytes;
-      }
-
-      // Get a decoder object for this page or create a new decoder if this is 
the
-      // first page with this encoding.
-      Encoding::type encoding = page->encoding();
-
-      if (IsDictionaryIndexEncoding(encoding)) {
-        encoding = Encoding::RLE_DICTIONARY;
-      }
-
-      auto it = decoders_.find(static_cast<int>(encoding));
-      if (it != decoders_.end()) {
-        DCHECK(it->second.get() != nullptr);
-        if (encoding == Encoding::RLE_DICTIONARY) {
-          DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
-        }
-        current_decoder_ = it->second.get();
-      } else {
-        switch (encoding) {
-          case Encoding::PLAIN: {
-            auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
-            current_decoder_ = decoder.get();
-            decoders_[static_cast<int>(encoding)] = std::move(decoder);
-            break;
-          }
-          case Encoding::RLE_DICTIONARY:
-            throw ParquetException("Dictionary page must be before data 
page.");
-
-          case Encoding::DELTA_BINARY_PACKED:
-          case Encoding::DELTA_LENGTH_BYTE_ARRAY:
-          case Encoding::DELTA_BYTE_ARRAY:
-            ParquetException::NYI("Unsupported encoding");
-
-          default:
-            throw ParquetException("Unknown encoding type.");
-        }
-      }
-      current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
-                                static_cast<int>(data_size));
+      const auto page = std::static_pointer_cast<DataPageV1>(current_page_);
+      const int64_t levels_byte_size = InitializeLevelDecoders(
+          *page, page->repetition_level_encoding(), 
page->definition_level_encoding());
+      InitializeDataDecoder(*page, levels_byte_size);
+      return true;
+    } else if (current_page_->type() == PageType::DATA_PAGE_V2) {
+      const auto page = std::static_pointer_cast<DataPageV2>(current_page_);
+      // Repetition and definition levels are always encoded using RLE encoding
+      // in the DataPageV2 format.
+      const int64_t levels_byte_size =
+          InitializeLevelDecoders(*page, Encoding::RLE, Encoding::RLE);
+      InitializeDataDecoder(*page, levels_byte_size);
       return true;
     } else {
       // We don't know what this page type is. We're allowed to skip non-data
diff --git a/cpp/src/parquet/column_page.h b/cpp/src/parquet/column_page.h
index c34eee7..3a0355a 100644
--- a/cpp/src/parquet/column_page.h
+++ b/cpp/src/parquet/column_page.h
@@ -59,45 +59,54 @@ class Page {
   PageType::type type_;
 };
 
+/// \brief Base type for DataPageV1 and DataPageV2 including common attributes
 class DataPage : public Page {
  public:
-  DataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
-           Encoding::type encoding, Encoding::type definition_level_encoding,
-           Encoding::type repetition_level_encoding,
+  int32_t num_values() const { return num_values_; }
+  Encoding::type encoding() const { return encoding_; }
+  const EncodedStatistics& statistics() const { return statistics_; }
+
+ protected:
+  DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t 
num_values,
+           Encoding::type encoding,
            const EncodedStatistics& statistics = EncodedStatistics())
-      : Page(buffer, PageType::DATA_PAGE),
+      : Page(buffer, type),
         num_values_(num_values),
         encoding_(encoding),
-        definition_level_encoding_(definition_level_encoding),
-        repetition_level_encoding_(repetition_level_encoding),
         statistics_(statistics) {}
 
-  int32_t num_values() const { return num_values_; }
+  int32_t num_values_;
+  Encoding::type encoding_;
+  EncodedStatistics statistics_;
+};
 
-  Encoding::type encoding() const { return encoding_; }
+class DataPageV1 : public DataPage {
+ public:
+  DataPageV1(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
+             Encoding::type encoding, Encoding::type definition_level_encoding,
+             Encoding::type repetition_level_encoding,
+             const EncodedStatistics& statistics = EncodedStatistics())
+      : DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, 
statistics),
+        definition_level_encoding_(definition_level_encoding),
+        repetition_level_encoding_(repetition_level_encoding) {}
 
   Encoding::type repetition_level_encoding() const { return 
repetition_level_encoding_; }
 
   Encoding::type definition_level_encoding() const { return 
definition_level_encoding_; }
 
-  const EncodedStatistics& statistics() const { return statistics_; }
-
  private:
-  int32_t num_values_;
-  Encoding::type encoding_;
   Encoding::type definition_level_encoding_;
   Encoding::type repetition_level_encoding_;
-  EncodedStatistics statistics_;
 };
 
-class CompressedDataPage : public DataPage {
+class CompressedDataPage : public DataPageV1 {
  public:
   CompressedDataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
                      Encoding::type encoding, Encoding::type 
definition_level_encoding,
                      Encoding::type repetition_level_encoding, int64_t 
uncompressed_size,
                      const EncodedStatistics& statistics = EncodedStatistics())
-      : DataPage(buffer, num_values, encoding, definition_level_encoding,
-                 repetition_level_encoding, statistics),
+      : DataPageV1(buffer, num_values, encoding, definition_level_encoding,
+                   repetition_level_encoding, statistics),
         uncompressed_size_(uncompressed_size) {}
 
   int64_t uncompressed_size() const { return uncompressed_size_; }
@@ -106,29 +115,23 @@ class CompressedDataPage : public DataPage {
   int64_t uncompressed_size_;
 };
 
-class DataPageV2 : public Page {
+class DataPageV2 : public DataPage {
  public:
   DataPageV2(const std::shared_ptr<Buffer>& buffer, int32_t num_values, 
int32_t num_nulls,
              int32_t num_rows, Encoding::type encoding,
              int32_t definition_levels_byte_length, int32_t 
repetition_levels_byte_length,
              bool is_compressed = false)
-      : Page(buffer, PageType::DATA_PAGE_V2),
-        num_values_(num_values),
+      : DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding),
         num_nulls_(num_nulls),
         num_rows_(num_rows),
-        encoding_(encoding),
         definition_levels_byte_length_(definition_levels_byte_length),
         repetition_levels_byte_length_(repetition_levels_byte_length),
         is_compressed_(is_compressed) {}
 
-  int32_t num_values() const { return num_values_; }
-
   int32_t num_nulls() const { return num_nulls_; }
 
   int32_t num_rows() const { return num_rows_; }
 
-  Encoding::type encoding() const { return encoding_; }
-
   int32_t definition_levels_byte_length() const { return 
definition_levels_byte_length_; }
 
   int32_t repetition_levels_byte_length() const { return 
repetition_levels_byte_length_; }
@@ -136,10 +139,8 @@ class DataPageV2 : public Page {
   bool is_compressed() const { return is_compressed_; }
 
  private:
-  int32_t num_values_;
   int32_t num_nulls_;
   int32_t num_rows_;
-  Encoding::type encoding_;
   int32_t definition_levels_byte_length_;
   int32_t repetition_levels_byte_length_;
   bool is_compressed_;
diff --git a/cpp/src/parquet/column_reader-test.cc 
b/cpp/src/parquet/column_reader-test.cc
index 0475ca5..49ca15b 100644
--- a/cpp/src/parquet/column_reader-test.cc
+++ b/cpp/src/parquet/column_reader-test.cc
@@ -332,7 +332,7 @@ TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
 
   shared_ptr<DictionaryPage> dict_page =
       std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
-  shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(
+  shared_ptr<DataPageV1> data_page = MakeDataPage<Int32Type>(
       &descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
   pages_.push_back(dict_page);
   pages_.push_back(data_page);
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 33d2f5c..9f3e52f 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -234,7 +234,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
 
       seen_num_rows_ += header.num_values;
 
-      return std::make_shared<DataPage>(
+      return std::make_shared<DataPageV1>(
           page_buffer, header.num_values, FromThrift(header.encoding),
           FromThrift(header.definition_level_encoding),
           FromThrift(header.repetition_level_encoding), page_statistics);
@@ -613,27 +613,27 @@ bool TypedColumnReaderImpl<DType>::ReadNewPage() {
       ConfigureDictionary(static_cast<const 
DictionaryPage*>(current_page_.get()));
       continue;
     } else if (current_page_->type() == PageType::DATA_PAGE) {
-      const DataPage* page = static_cast<const DataPage*>(current_page_.get());
+      const DataPageV1& page = static_cast<const DataPageV1&>(*current_page_);
 
       // Read a data page.
-      num_buffered_values_ = page->num_values();
+      num_buffered_values_ = page.num_values();
 
       // Have not decoded any values from the data page yet
       num_decoded_values_ = 0;
 
-      buffer = page->data();
+      buffer = page.data();
 
       // If the data page includes repetition and definition levels, we
       // initialize the level decoder and subtract the encoded level bytes from
       // the page size to determine the number of bytes in the encoded data.
-      int64_t data_size = page->size();
+      int64_t data_size = page.size();
 
       // Data page Layout: Repetition Levels - Definition Levels - encoded 
values.
       // Levels are encoded as rle or bit-packed.
       // Init repetition levels
       if (descr_->max_repetition_level() > 0) {
         int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
-            page->repetition_level_encoding(), descr_->max_repetition_level(),
+            page.repetition_level_encoding(), descr_->max_repetition_level(),
             static_cast<int>(num_buffered_values_), buffer);
         buffer += rep_levels_bytes;
         data_size -= rep_levels_bytes;
@@ -644,7 +644,7 @@ bool TypedColumnReaderImpl<DType>::ReadNewPage() {
       // Init definition levels
       if (descr_->max_definition_level() > 0) {
         int64_t def_levels_bytes = definition_level_decoder_.SetData(
-            page->definition_level_encoding(), descr_->max_definition_level(),
+            page.definition_level_encoding(), descr_->max_definition_level(),
             static_cast<int>(num_buffered_values_), buffer);
         buffer += def_levels_bytes;
         data_size -= def_levels_bytes;
@@ -652,7 +652,7 @@ bool TypedColumnReaderImpl<DType>::ReadNewPage() {
 
       // Get a decoder object for this page or create a new decoder if this is 
the
       // first page with this encoding.
-      Encoding::type encoding = page->encoding();
+      Encoding::type encoding = page.encoding();
 
       if (IsDictionaryIndexEncoding(encoding)) {
         encoding = Encoding::RLE_DICTIONARY;
diff --git a/cpp/src/parquet/file-deserialize-test.cc 
b/cpp/src/parquet/file-deserialize-test.cc
index e62968e..f2b10c7 100644
--- a/cpp/src/parquet/file-deserialize-test.cc
+++ b/cpp/src/parquet/file-deserialize-test.cc
@@ -90,6 +90,21 @@ class TestPageSerde : public ::testing::Test {
     ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
   }
 
+  void WriteDataPageHeaderV2(int max_serialized_len = 1024, int32_t 
uncompressed_size = 0,
+                             int32_t compressed_size = 0) {
+    // Simplifying writing serialized data page V2 headers which may or may not
+    // have meaningful data associated with them
+
+    // Serialize the Page header
+    page_header_.__set_data_page_header_v2(data_page_header_v2_);
+    page_header_.uncompressed_page_size = uncompressed_size;
+    page_header_.compressed_page_size = compressed_size;
+    page_header_.type = format::PageType::DATA_PAGE_V2;
+
+    ThriftSerializer serializer;
+    ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
+  }
+
   void ResetStream() { out_stream_.reset(new InMemoryOutputStream); }
 
   void EndStream() { out_buffer_ = out_stream_->GetBuffer(); }
@@ -101,12 +116,13 @@ class TestPageSerde : public ::testing::Test {
   std::unique_ptr<PageReader> page_reader_;
   format::PageHeader page_header_;
   format::DataPageHeader data_page_header_;
+  format::DataPageHeaderV2 data_page_header_v2_;
 };
 
 void CheckDataPageHeader(const format::DataPageHeader expected, const Page* 
page) {
   ASSERT_EQ(PageType::DATA_PAGE, page->type());
 
-  const DataPage* data_page = static_cast<const DataPage*>(page);
+  const DataPageV1* data_page = static_cast<const DataPageV1*>(page);
   ASSERT_EQ(expected.num_values, data_page->num_values());
   ASSERT_EQ(expected.encoding, data_page->encoding());
   ASSERT_EQ(expected.definition_level_encoding, 
data_page->definition_level_encoding());
@@ -120,7 +136,25 @@ void CheckDataPageHeader(const format::DataPageHeader 
expected, const Page* page
   }
 }
 
-TEST_F(TestPageSerde, DataPage) {
+// Overload for DataPageV2 tests.
+void CheckDataPageHeader(const format::DataPageHeaderV2 expected, const Page* 
page) {
+  ASSERT_EQ(PageType::DATA_PAGE_V2, page->type());
+
+  const DataPageV2* data_page = static_cast<const DataPageV2*>(page);
+  ASSERT_EQ(expected.num_values, data_page->num_values());
+  ASSERT_EQ(expected.num_nulls, data_page->num_nulls());
+  ASSERT_EQ(expected.num_rows, data_page->num_rows());
+  ASSERT_EQ(expected.encoding, data_page->encoding());
+  ASSERT_EQ(expected.definition_levels_byte_length,
+            data_page->definition_levels_byte_length());
+  ASSERT_EQ(expected.repetition_levels_byte_length,
+            data_page->repetition_levels_byte_length());
+  ASSERT_EQ(expected.is_compressed, data_page->is_compressed());
+
+  // TODO: Tests for DataPageHeaderV2 statistics.
+}
+
+TEST_F(TestPageSerde, DataPageV1) {
   format::PageHeader out_page_header;
 
   int stats_size = 512;
@@ -134,6 +168,18 @@ TEST_F(TestPageSerde, DataPage) {
   ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, 
current_page.get()));
 }
 
+TEST_F(TestPageSerde, DataPageV2) {
+  format::PageHeader out_page_header;
+
+  const int32_t num_rows = 4444;
+  data_page_header_.num_values = num_rows;
+
+  ASSERT_NO_FATAL_FAILURE(WriteDataPageHeaderV2());
+  InitSerializedPageReader(num_rows);
+  std::shared_ptr<Page> current_page = page_reader_->NextPage();
+  ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_v2_, 
current_page.get()));
+}
+
 TEST_F(TestPageSerde, TestLargePageHeaders) {
   int stats_size = 256 * 1024;  // 256 KB
   AddDummyStats(stats_size, data_page_header_);
@@ -218,11 +264,11 @@ TEST_F(TestPageSerde, Compression) {
     InitSerializedPageReader(num_rows * num_pages, codec_type);
 
     std::shared_ptr<Page> page;
-    const DataPage* data_page;
+    const DataPageV1* data_page;
     for (int i = 0; i < num_pages; ++i) {
       int data_size = static_cast<int>(faux_data[i].size());
       page = page_reader_->NextPage();
-      data_page = static_cast<const DataPage*>(page.get());
+      data_page = static_cast<const DataPageV1*>(page.get());
       ASSERT_EQ(data_size, data_page->size());
       ASSERT_EQ(0, memcmp(faux_data[i].data(), data_page->data(), data_size));
     }
diff --git a/cpp/src/parquet/test-util.h b/cpp/src/parquet/test-util.h
index ed7c7bb..a234fd4 100644
--- a/cpp/src/parquet/test-util.h
+++ b/cpp/src/parquet/test-util.h
@@ -216,7 +216,7 @@ void DataPageBuilder<BooleanType>::AppendValues(const 
ColumnDescriptor* d,
 }
 
 template <typename Type>
-static shared_ptr<DataPage> MakeDataPage(
+static shared_ptr<DataPageV1> MakeDataPage(
     const ColumnDescriptor* d, const vector<typename Type::c_type>& values, 
int num_vals,
     Encoding::type encoding, const uint8_t* indices, int indices_size,
     const vector<int16_t>& def_levels, int16_t max_def_level,
@@ -243,9 +243,9 @@ static shared_ptr<DataPage> MakeDataPage(
 
   auto buffer = page_stream.GetBuffer();
 
-  return std::make_shared<DataPage>(buffer, num_values, encoding,
-                                    page_builder.def_level_encoding(),
-                                    page_builder.rep_level_encoding());
+  return std::make_shared<DataPageV1>(buffer, num_values, encoding,
+                                      page_builder.def_level_encoding(),
+                                      page_builder.rep_level_encoding());
 }
 
 template <typename TYPE>
@@ -357,7 +357,7 @@ static void PaginateDict(const ColumnDescriptor* d,
       rep_level_start = i * num_levels_per_page;
       rep_level_end = (i + 1) * num_levels_per_page;
     }
-    shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(
+    shared_ptr<DataPageV1> data_page = MakeDataPage<Int32Type>(
         d, {}, values_per_page[i], encoding, rle_indices[i]->data(),
         static_cast<int>(rle_indices[i]->size()),
         slice(def_levels, def_level_start, def_level_end), max_def_level,

Reply via email to