This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new c16eccb PARQUET-1482: [C++] Add branch to
TypedRecordReader::ReadNewPage for …
c16eccb is described below
commit c16eccb1d220a749d7d1ec48b70e828fc74cce54
Author: Wes McKinney <[email protected]>
AuthorDate: Wed Mar 6 15:46:22 2019 -0600
PARQUET-1482: [C++] Add branch to TypedRecordReader::ReadNewPage for …
…PageType::DATA_PAGE_V2 to address incompatibility with parquetjs.
**Tests**
This commit doesn't include tests right now; I am working on adding tests
and was hoping for some initial feedback on the code changes. I may need to use
an actual file generated by `parquetjs` to test this issue, so I wonder if
adding `feeds1kMicros.parquet` from the JIRA task to the parquet-testing
repository is an option for this.
**Description**
`parquetjs` seems to be writing Parquet V2 files with
[`DataPageV2`](https://github.com/apache/parquet-format/blob/e93dd628d90aa076745558998f0bf5d9c262bf22/src/main/thrift/parquet.thrift#L529)
pages, while `parquet-cpp` writes Parquet V2 files with
[`DataPage`](https://github.com/apache/parquet-format/blob/e93dd628d90aa076745558998f0bf5d9c262bf22/src/main/thrift/parquet.thrift#L492)
pages.
Since `TypedRecordReader::ReadNewPage()` only had a branch for
`PageType::DATA_PAGE`, the reader would return without reading any data for
records that have `DATA_PAGE_V2` pages. This explains the behavior observed in
[PARQUET-1482](https://issues.apache.org/jira/projects/PARQUET/issues/PARQUET-1482?filter=allopenissues).
This commit adds a new if-else branch for the `DataPageV2` case in
`TypedRecordReader::ReadNewPage()`. Since the `DataPageV2` branch needed to
reuse the code from the `DataPage` case, I refactored the repetition/definition
level decoder initialization and the data decoder initialization to two new
methods in the `TypedRecordReader` class. These new methods are now called by
the `DataPage` and `DataPageV2` initialization branches in
`TypedRecordReader::ReadNewPage()`.
There is an alternate implementation possible (with a smaller diff) by
sharing the same else-if branch between `DataPage` and `DataPageV2` using a
pointer-to-derived `shared_ptr<Page>`. However, since the Page superclass
doesn't have the necessary `encoding()` or `num_values()` methods, I would need
to add a common superclass to both `DataPage` and `DataPageV2` that defined
these methods. I didn't do this because I was hesitant to modify the `Page`
class hierarchy for this commit.
Author: Wes McKinney <[email protected]>
Author: rdmello <[email protected]>
Author: Rylan Dmello <[email protected]>
Closes #3312 from rdmello/parquet_1482 and squashes the following commits:
c5cb0f37 <Wes McKinney> Add DataPage base class for DataPageV1 and
DataPageV2
8df8328e <rdmello> PARQUET-1482: Adding basic unit test for DataPageV2
serialization and deserialization.
9df32222 <Rylan Dmello> PARQUET-1482: Add branch to
TypedRecordReader::ReadNewPage for PageType::DATA_PAGE_V2 to address
incompatibility with parquetjs.
---
cpp/src/parquet/arrow/record_reader.cc | 186 ++++++++++++++++++-------------
cpp/src/parquet/column_page.h | 53 ++++-----
cpp/src/parquet/column_reader-test.cc | 2 +-
cpp/src/parquet/column_reader.cc | 16 +--
cpp/src/parquet/file-deserialize-test.cc | 54 ++++++++-
cpp/src/parquet/test-util.h | 10 +-
6 files changed, 198 insertions(+), 123 deletions(-)
diff --git a/cpp/src/parquet/arrow/record_reader.cc
b/cpp/src/parquet/arrow/record_reader.cc
index 39945af..c800f36 100644
--- a/cpp/src/parquet/arrow/record_reader.cc
+++ b/cpp/src/parquet/arrow/record_reader.cc
@@ -48,7 +48,7 @@ namespace BitUtil = ::arrow::BitUtil;
// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
// encoding.
-static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
+static bool IsDictionaryIndexEncoding(Encoding::type e) {
return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
}
@@ -86,7 +86,7 @@ class RecordReader::RecordReaderImpl {
virtual ~RecordReaderImpl() = default;
- virtual int64_t ReadRecordData(const int64_t num_records) = 0;
+ virtual int64_t ReadRecordData(int64_t num_records) = 0;
// Returns true if there are still values in this column.
bool HasNext() {
@@ -494,7 +494,7 @@ class TypedRecordReader : public
RecordReader::RecordReaderImpl {
}
// Return number of logical records read
- int64_t ReadRecordData(const int64_t num_records) override {
+ int64_t ReadRecordData(int64_t num_records) override {
// Conservative upper bound
const int64_t possible_num_values =
std::max(num_records, levels_written_ - levels_position_);
@@ -580,6 +580,13 @@ class TypedRecordReader : public
RecordReader::RecordReaderImpl {
DecoderType* current_decoder_;
+ // Initialize repetition and definition level decoders on the next data page.
+ int64_t InitializeLevelDecoders(const DataPage& page,
+ Encoding::type repetition_level_encoding,
+ Encoding::type definition_level_encoding);
+
+ void InitializeDataDecoder(const DataPage& page, int64_t levels_bytes);
+
// Advance to the next data page
bool ReadNewPage() override;
@@ -717,11 +724,94 @@ inline void
TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage*
DCHECK(current_decoder_);
}
+// If the data page includes repetition and definition levels, we
+// initialize the level decoders and return the number of encoded level bytes.
+// The return value helps determine the number of bytes in the encoded data.
+template <typename DType>
+int64_t TypedRecordReader<DType>::InitializeLevelDecoders(
+ const DataPage& page, Encoding::type repetition_level_encoding,
+ Encoding::type definition_level_encoding) {
+ // Read a data page.
+ num_buffered_values_ = page.num_values();
+
+ // Have not decoded any values from the data page yet
+ num_decoded_values_ = 0;
+
+ const uint8_t* buffer = page.data();
+ int64_t levels_byte_size = 0;
+
+ // Data page Layout: Repetition Levels - Definition Levels - encoded values.
+ // Levels are encoded as rle or bit-packed.
+ // Init repetition levels
+ if (descr_->max_repetition_level() > 0) {
+ int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
+ repetition_level_encoding, descr_->max_repetition_level(),
+ static_cast<int>(num_buffered_values_), buffer);
+ buffer += rep_levels_bytes;
+ levels_byte_size += rep_levels_bytes;
+ }
+ // TODO figure a way to set max_definition_level_ to 0
+ // if the initial value is invalid
+
+ // Init definition levels
+ if (descr_->max_definition_level() > 0) {
+ int64_t def_levels_bytes = definition_level_decoder_.SetData(
+ definition_level_encoding, descr_->max_definition_level(),
+ static_cast<int>(num_buffered_values_), buffer);
+ levels_byte_size += def_levels_bytes;
+ }
+
+ return levels_byte_size;
+}
+
+// Get a decoder object for this page or create a new decoder if this is the
+// first page with this encoding.
+template <typename DType>
+void TypedRecordReader<DType>::InitializeDataDecoder(const DataPage& page,
+ int64_t levels_byte_size)
{
+ const uint8_t* buffer = page.data() + levels_byte_size;
+ const int64_t data_size = page.size() - levels_byte_size;
+
+ Encoding::type encoding = page.encoding();
+
+ if (IsDictionaryIndexEncoding(encoding)) {
+ encoding = Encoding::RLE_DICTIONARY;
+ }
+
+ auto it = decoders_.find(static_cast<int>(encoding));
+ if (it != decoders_.end()) {
+ DCHECK(it->second.get() != nullptr);
+ if (encoding == Encoding::RLE_DICTIONARY) {
+ DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
+ }
+ current_decoder_ = it->second.get();
+ } else {
+ switch (encoding) {
+ case Encoding::PLAIN: {
+ auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+ current_decoder_ = decoder.get();
+ decoders_[static_cast<int>(encoding)] = std::move(decoder);
+ break;
+ }
+ case Encoding::RLE_DICTIONARY:
+ throw ParquetException("Dictionary page must be before data page.");
+
+ case Encoding::DELTA_BINARY_PACKED:
+ case Encoding::DELTA_LENGTH_BYTE_ARRAY:
+ case Encoding::DELTA_BYTE_ARRAY:
+ ParquetException::NYI("Unsupported encoding");
+
+ default:
+ throw ParquetException("Unknown encoding type.");
+ }
+ }
+ current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
+ static_cast<int>(data_size));
+}
+
template <typename DType>
bool TypedRecordReader<DType>::ReadNewPage() {
// Loop until we find the next data page.
- const uint8_t* buffer;
-
while (true) {
current_page_ = pager_->NextPage();
if (!current_page_) {
@@ -733,80 +823,18 @@ bool TypedRecordReader<DType>::ReadNewPage() {
ConfigureDictionary(static_cast<const
DictionaryPage*>(current_page_.get()));
continue;
} else if (current_page_->type() == PageType::DATA_PAGE) {
- const DataPage* page = static_cast<const DataPage*>(current_page_.get());
-
- // Read a data page.
- num_buffered_values_ = page->num_values();
-
- // Have not decoded any values from the data page yet
- num_decoded_values_ = 0;
-
- buffer = page->data();
-
- // If the data page includes repetition and definition levels, we
- // initialize the level decoder and subtract the encoded level bytes from
- // the page size to determine the number of bytes in the encoded data.
- int64_t data_size = page->size();
-
- // Data page Layout: Repetition Levels - Definition Levels - encoded
values.
- // Levels are encoded as rle or bit-packed.
- // Init repetition levels
- if (descr_->max_repetition_level() > 0) {
- int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
- page->repetition_level_encoding(), descr_->max_repetition_level(),
- static_cast<int>(num_buffered_values_), buffer);
- buffer += rep_levels_bytes;
- data_size -= rep_levels_bytes;
- }
- // TODO figure a way to set max_definition_level_ to 0
- // if the initial value is invalid
-
- // Init definition levels
- if (descr_->max_definition_level() > 0) {
- int64_t def_levels_bytes = definition_level_decoder_.SetData(
- page->definition_level_encoding(), descr_->max_definition_level(),
- static_cast<int>(num_buffered_values_), buffer);
- buffer += def_levels_bytes;
- data_size -= def_levels_bytes;
- }
-
- // Get a decoder object for this page or create a new decoder if this is
the
- // first page with this encoding.
- Encoding::type encoding = page->encoding();
-
- if (IsDictionaryIndexEncoding(encoding)) {
- encoding = Encoding::RLE_DICTIONARY;
- }
-
- auto it = decoders_.find(static_cast<int>(encoding));
- if (it != decoders_.end()) {
- DCHECK(it->second.get() != nullptr);
- if (encoding == Encoding::RLE_DICTIONARY) {
- DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
- }
- current_decoder_ = it->second.get();
- } else {
- switch (encoding) {
- case Encoding::PLAIN: {
- auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
- current_decoder_ = decoder.get();
- decoders_[static_cast<int>(encoding)] = std::move(decoder);
- break;
- }
- case Encoding::RLE_DICTIONARY:
- throw ParquetException("Dictionary page must be before data
page.");
-
- case Encoding::DELTA_BINARY_PACKED:
- case Encoding::DELTA_LENGTH_BYTE_ARRAY:
- case Encoding::DELTA_BYTE_ARRAY:
- ParquetException::NYI("Unsupported encoding");
-
- default:
- throw ParquetException("Unknown encoding type.");
- }
- }
- current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
- static_cast<int>(data_size));
+ const auto page = std::static_pointer_cast<DataPageV1>(current_page_);
+ const int64_t levels_byte_size = InitializeLevelDecoders(
+ *page, page->repetition_level_encoding(),
page->definition_level_encoding());
+ InitializeDataDecoder(*page, levels_byte_size);
+ return true;
+ } else if (current_page_->type() == PageType::DATA_PAGE_V2) {
+ const auto page = std::static_pointer_cast<DataPageV2>(current_page_);
+ // Repetition and definition levels are always encoded using RLE encoding
+ // in the DataPageV2 format.
+ const int64_t levels_byte_size =
+ InitializeLevelDecoders(*page, Encoding::RLE, Encoding::RLE);
+ InitializeDataDecoder(*page, levels_byte_size);
return true;
} else {
// We don't know what this page type is. We're allowed to skip non-data
diff --git a/cpp/src/parquet/column_page.h b/cpp/src/parquet/column_page.h
index c34eee7..3a0355a 100644
--- a/cpp/src/parquet/column_page.h
+++ b/cpp/src/parquet/column_page.h
@@ -59,45 +59,54 @@ class Page {
PageType::type type_;
};
+/// \brief Base type for DataPageV1 and DataPageV2 including common attributes
class DataPage : public Page {
public:
- DataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
- Encoding::type encoding, Encoding::type definition_level_encoding,
- Encoding::type repetition_level_encoding,
+ int32_t num_values() const { return num_values_; }
+ Encoding::type encoding() const { return encoding_; }
+ const EncodedStatistics& statistics() const { return statistics_; }
+
+ protected:
+ DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t
num_values,
+ Encoding::type encoding,
const EncodedStatistics& statistics = EncodedStatistics())
- : Page(buffer, PageType::DATA_PAGE),
+ : Page(buffer, type),
num_values_(num_values),
encoding_(encoding),
- definition_level_encoding_(definition_level_encoding),
- repetition_level_encoding_(repetition_level_encoding),
statistics_(statistics) {}
- int32_t num_values() const { return num_values_; }
+ int32_t num_values_;
+ Encoding::type encoding_;
+ EncodedStatistics statistics_;
+};
- Encoding::type encoding() const { return encoding_; }
+class DataPageV1 : public DataPage {
+ public:
+ DataPageV1(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
+ Encoding::type encoding, Encoding::type definition_level_encoding,
+ Encoding::type repetition_level_encoding,
+ const EncodedStatistics& statistics = EncodedStatistics())
+ : DataPage(PageType::DATA_PAGE, buffer, num_values, encoding,
statistics),
+ definition_level_encoding_(definition_level_encoding),
+ repetition_level_encoding_(repetition_level_encoding) {}
Encoding::type repetition_level_encoding() const { return
repetition_level_encoding_; }
Encoding::type definition_level_encoding() const { return
definition_level_encoding_; }
- const EncodedStatistics& statistics() const { return statistics_; }
-
private:
- int32_t num_values_;
- Encoding::type encoding_;
Encoding::type definition_level_encoding_;
Encoding::type repetition_level_encoding_;
- EncodedStatistics statistics_;
};
-class CompressedDataPage : public DataPage {
+class CompressedDataPage : public DataPageV1 {
public:
CompressedDataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type
definition_level_encoding,
Encoding::type repetition_level_encoding, int64_t
uncompressed_size,
const EncodedStatistics& statistics = EncodedStatistics())
- : DataPage(buffer, num_values, encoding, definition_level_encoding,
- repetition_level_encoding, statistics),
+ : DataPageV1(buffer, num_values, encoding, definition_level_encoding,
+ repetition_level_encoding, statistics),
uncompressed_size_(uncompressed_size) {}
int64_t uncompressed_size() const { return uncompressed_size_; }
@@ -106,29 +115,23 @@ class CompressedDataPage : public DataPage {
int64_t uncompressed_size_;
};
-class DataPageV2 : public Page {
+class DataPageV2 : public DataPage {
public:
DataPageV2(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
int32_t num_nulls,
int32_t num_rows, Encoding::type encoding,
int32_t definition_levels_byte_length, int32_t
repetition_levels_byte_length,
bool is_compressed = false)
- : Page(buffer, PageType::DATA_PAGE_V2),
- num_values_(num_values),
+ : DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding),
num_nulls_(num_nulls),
num_rows_(num_rows),
- encoding_(encoding),
definition_levels_byte_length_(definition_levels_byte_length),
repetition_levels_byte_length_(repetition_levels_byte_length),
is_compressed_(is_compressed) {}
- int32_t num_values() const { return num_values_; }
-
int32_t num_nulls() const { return num_nulls_; }
int32_t num_rows() const { return num_rows_; }
- Encoding::type encoding() const { return encoding_; }
-
int32_t definition_levels_byte_length() const { return
definition_levels_byte_length_; }
int32_t repetition_levels_byte_length() const { return
repetition_levels_byte_length_; }
@@ -136,10 +139,8 @@ class DataPageV2 : public Page {
bool is_compressed() const { return is_compressed_; }
private:
- int32_t num_values_;
int32_t num_nulls_;
int32_t num_rows_;
- Encoding::type encoding_;
int32_t definition_levels_byte_length_;
int32_t repetition_levels_byte_length_;
bool is_compressed_;
diff --git a/cpp/src/parquet/column_reader-test.cc
b/cpp/src/parquet/column_reader-test.cc
index 0475ca5..49ca15b 100644
--- a/cpp/src/parquet/column_reader-test.cc
+++ b/cpp/src/parquet/column_reader-test.cc
@@ -332,7 +332,7 @@ TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
shared_ptr<DictionaryPage> dict_page =
std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
- shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(
+ shared_ptr<DataPageV1> data_page = MakeDataPage<Int32Type>(
&descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
pages_.push_back(dict_page);
pages_.push_back(data_page);
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 33d2f5c..9f3e52f 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -234,7 +234,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
seen_num_rows_ += header.num_values;
- return std::make_shared<DataPage>(
+ return std::make_shared<DataPageV1>(
page_buffer, header.num_values, FromThrift(header.encoding),
FromThrift(header.definition_level_encoding),
FromThrift(header.repetition_level_encoding), page_statistics);
@@ -613,27 +613,27 @@ bool TypedColumnReaderImpl<DType>::ReadNewPage() {
ConfigureDictionary(static_cast<const
DictionaryPage*>(current_page_.get()));
continue;
} else if (current_page_->type() == PageType::DATA_PAGE) {
- const DataPage* page = static_cast<const DataPage*>(current_page_.get());
+ const DataPageV1& page = static_cast<const DataPageV1&>(*current_page_);
// Read a data page.
- num_buffered_values_ = page->num_values();
+ num_buffered_values_ = page.num_values();
// Have not decoded any values from the data page yet
num_decoded_values_ = 0;
- buffer = page->data();
+ buffer = page.data();
// If the data page includes repetition and definition levels, we
// initialize the level decoder and subtract the encoded level bytes from
// the page size to determine the number of bytes in the encoded data.
- int64_t data_size = page->size();
+ int64_t data_size = page.size();
// Data page Layout: Repetition Levels - Definition Levels - encoded
values.
// Levels are encoded as rle or bit-packed.
// Init repetition levels
if (descr_->max_repetition_level() > 0) {
int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
- page->repetition_level_encoding(), descr_->max_repetition_level(),
+ page.repetition_level_encoding(), descr_->max_repetition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += rep_levels_bytes;
data_size -= rep_levels_bytes;
@@ -644,7 +644,7 @@ bool TypedColumnReaderImpl<DType>::ReadNewPage() {
// Init definition levels
if (descr_->max_definition_level() > 0) {
int64_t def_levels_bytes = definition_level_decoder_.SetData(
- page->definition_level_encoding(), descr_->max_definition_level(),
+ page.definition_level_encoding(), descr_->max_definition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += def_levels_bytes;
data_size -= def_levels_bytes;
@@ -652,7 +652,7 @@ bool TypedColumnReaderImpl<DType>::ReadNewPage() {
// Get a decoder object for this page or create a new decoder if this is
the
// first page with this encoding.
- Encoding::type encoding = page->encoding();
+ Encoding::type encoding = page.encoding();
if (IsDictionaryIndexEncoding(encoding)) {
encoding = Encoding::RLE_DICTIONARY;
diff --git a/cpp/src/parquet/file-deserialize-test.cc
b/cpp/src/parquet/file-deserialize-test.cc
index e62968e..f2b10c7 100644
--- a/cpp/src/parquet/file-deserialize-test.cc
+++ b/cpp/src/parquet/file-deserialize-test.cc
@@ -90,6 +90,21 @@ class TestPageSerde : public ::testing::Test {
ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
}
+ void WriteDataPageHeaderV2(int max_serialized_len = 1024, int32_t
uncompressed_size = 0,
+ int32_t compressed_size = 0) {
+ // Simplifying writing serialized data page V2 headers which may or may not
+ // have meaningful data associated with them
+
+ // Serialize the Page header
+ page_header_.__set_data_page_header_v2(data_page_header_v2_);
+ page_header_.uncompressed_page_size = uncompressed_size;
+ page_header_.compressed_page_size = compressed_size;
+ page_header_.type = format::PageType::DATA_PAGE_V2;
+
+ ThriftSerializer serializer;
+ ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
+ }
+
void ResetStream() { out_stream_.reset(new InMemoryOutputStream); }
void EndStream() { out_buffer_ = out_stream_->GetBuffer(); }
@@ -101,12 +116,13 @@ class TestPageSerde : public ::testing::Test {
std::unique_ptr<PageReader> page_reader_;
format::PageHeader page_header_;
format::DataPageHeader data_page_header_;
+ format::DataPageHeaderV2 data_page_header_v2_;
};
void CheckDataPageHeader(const format::DataPageHeader expected, const Page*
page) {
ASSERT_EQ(PageType::DATA_PAGE, page->type());
- const DataPage* data_page = static_cast<const DataPage*>(page);
+ const DataPageV1* data_page = static_cast<const DataPageV1*>(page);
ASSERT_EQ(expected.num_values, data_page->num_values());
ASSERT_EQ(expected.encoding, data_page->encoding());
ASSERT_EQ(expected.definition_level_encoding,
data_page->definition_level_encoding());
@@ -120,7 +136,25 @@ void CheckDataPageHeader(const format::DataPageHeader
expected, const Page* page
}
}
-TEST_F(TestPageSerde, DataPage) {
+// Overload for DataPageV2 tests.
+void CheckDataPageHeader(const format::DataPageHeaderV2 expected, const Page*
page) {
+ ASSERT_EQ(PageType::DATA_PAGE_V2, page->type());
+
+ const DataPageV2* data_page = static_cast<const DataPageV2*>(page);
+ ASSERT_EQ(expected.num_values, data_page->num_values());
+ ASSERT_EQ(expected.num_nulls, data_page->num_nulls());
+ ASSERT_EQ(expected.num_rows, data_page->num_rows());
+ ASSERT_EQ(expected.encoding, data_page->encoding());
+ ASSERT_EQ(expected.definition_levels_byte_length,
+ data_page->definition_levels_byte_length());
+ ASSERT_EQ(expected.repetition_levels_byte_length,
+ data_page->repetition_levels_byte_length());
+ ASSERT_EQ(expected.is_compressed, data_page->is_compressed());
+
+ // TODO: Tests for DataPageHeaderV2 statistics.
+}
+
+TEST_F(TestPageSerde, DataPageV1) {
format::PageHeader out_page_header;
int stats_size = 512;
@@ -134,6 +168,18 @@ TEST_F(TestPageSerde, DataPage) {
ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_,
current_page.get()));
}
+TEST_F(TestPageSerde, DataPageV2) {
+ format::PageHeader out_page_header;
+
+ const int32_t num_rows = 4444;
+ data_page_header_.num_values = num_rows;
+
+ ASSERT_NO_FATAL_FAILURE(WriteDataPageHeaderV2());
+ InitSerializedPageReader(num_rows);
+ std::shared_ptr<Page> current_page = page_reader_->NextPage();
+ ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_v2_,
current_page.get()));
+}
+
TEST_F(TestPageSerde, TestLargePageHeaders) {
int stats_size = 256 * 1024; // 256 KB
AddDummyStats(stats_size, data_page_header_);
@@ -218,11 +264,11 @@ TEST_F(TestPageSerde, Compression) {
InitSerializedPageReader(num_rows * num_pages, codec_type);
std::shared_ptr<Page> page;
- const DataPage* data_page;
+ const DataPageV1* data_page;
for (int i = 0; i < num_pages; ++i) {
int data_size = static_cast<int>(faux_data[i].size());
page = page_reader_->NextPage();
- data_page = static_cast<const DataPage*>(page.get());
+ data_page = static_cast<const DataPageV1*>(page.get());
ASSERT_EQ(data_size, data_page->size());
ASSERT_EQ(0, memcmp(faux_data[i].data(), data_page->data(), data_size));
}
diff --git a/cpp/src/parquet/test-util.h b/cpp/src/parquet/test-util.h
index ed7c7bb..a234fd4 100644
--- a/cpp/src/parquet/test-util.h
+++ b/cpp/src/parquet/test-util.h
@@ -216,7 +216,7 @@ void DataPageBuilder<BooleanType>::AppendValues(const
ColumnDescriptor* d,
}
template <typename Type>
-static shared_ptr<DataPage> MakeDataPage(
+static shared_ptr<DataPageV1> MakeDataPage(
const ColumnDescriptor* d, const vector<typename Type::c_type>& values,
int num_vals,
Encoding::type encoding, const uint8_t* indices, int indices_size,
const vector<int16_t>& def_levels, int16_t max_def_level,
@@ -243,9 +243,9 @@ static shared_ptr<DataPage> MakeDataPage(
auto buffer = page_stream.GetBuffer();
- return std::make_shared<DataPage>(buffer, num_values, encoding,
- page_builder.def_level_encoding(),
- page_builder.rep_level_encoding());
+ return std::make_shared<DataPageV1>(buffer, num_values, encoding,
+ page_builder.def_level_encoding(),
+ page_builder.rep_level_encoding());
}
template <typename TYPE>
@@ -357,7 +357,7 @@ static void PaginateDict(const ColumnDescriptor* d,
rep_level_start = i * num_levels_per_page;
rep_level_end = (i + 1) * num_levels_per_page;
}
- shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(
+ shared_ptr<DataPageV1> data_page = MakeDataPage<Int32Type>(
d, {}, values_per_page[i], encoding, rle_indices[i]->data(),
static_cast<int>(rle_indices[i]->size()),
slice(def_levels, def_level_start, def_level_end), max_def_level,