Repository: parquet-cpp Updated Branches: refs/heads/master c6f5ebe52 -> 0bf72a96b
PARQUET-718: Fix I/O of non-dictionary encoded pages We have set dictionary_page_offset if though we did not have a dictionary page present, this caused too much data to be loaded into the SerializedPageReader. Author: Uwe L. Korn <[email protected]> Closes #159 from xhochy/PARQUET-718 and squashes the following commits: b3a7205 [Uwe L. Korn] PARQUET-718: Fix I/O of non-dictionary encoded pages 1e07b9f [Uwe L. Korn] PARQUET-718: Reading boolean pages fails Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/0bf72a96 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/0bf72a96 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/0bf72a96 Branch: refs/heads/master Commit: 0bf72a96b5b9c6643decd8bd1e45fd8dc9422e33 Parents: c6f5ebe Author: Uwe L. Korn <[email protected]> Authored: Thu Sep 15 09:23:20 2016 -0400 Committer: Wes McKinney <[email protected]> Committed: Thu Sep 15 09:23:20 2016 -0400 ---------------------------------------------------------------------- src/parquet/column/column-writer-test.cc | 96 +++----------------------- src/parquet/column/test-specialization.h | 97 +++++++++++++++++++++++++++ src/parquet/file/file-serialize-test.cc | 73 +++++++++----------- src/parquet/file/metadata.cc | 2 +- 4 files changed, 140 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0bf72a96/src/parquet/column/column-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index a87dc48..230a843 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -43,39 +43,17 @@ const int LARGE_SIZE = 10000; const int VERY_LARGE_SIZE = 400000; template <typename TestType> -class TestPrimitiveWriter : public ::testing::Test { +class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> { public: typedef typename TestType::c_type T; - void SetUpSchemaRequired() { - node_ = PrimitiveNode::Make("column", Repetition::REQUIRED, TestType::type_num, - LogicalType::NONE, FLBA_LENGTH); - schema_ = std::make_shared<ColumnDescriptor>(node_, 0, 0); - } - - void SetUpSchemaOptional() { - node_ = PrimitiveNode::Make("column", Repetition::OPTIONAL, TestType::type_num, - LogicalType::NONE, FLBA_LENGTH); - schema_ = std::make_shared<ColumnDescriptor>(node_, 1, 0); - } - - void SetUpSchemaRepeated() { - node_ = PrimitiveNode::Make("column", Repetition::REPEATED, TestType::type_num, - LogicalType::NONE, FLBA_LENGTH); - schema_ = std::make_shared<ColumnDescriptor>(node_, 1, 1); - } - - void GenerateData(int64_t num_values); - - void SetupValuesOut(); - void SetUp() { - SetupValuesOut(); + this->SetupValuesOut(SMALL_SIZE); writer_properties_ = default_writer_properties(); definition_levels_out_.resize(SMALL_SIZE); repetition_levels_out_.resize(SMALL_SIZE); - SetUpSchemaRequired(); + this->SetUpSchemaRequired(); } Type::type type_num() { return TestType::type_num; } @@ -85,14 +63,15 @@ class TestPrimitiveWriter : public ::testing::Test { std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer)); std::unique_ptr<SerializedPageReader> page_reader( new SerializedPageReader(std::move(source), Compression::UNCOMPRESSED)); - reader_.reset(new TypedColumnReader<TestType>(schema_.get(), std::move(page_reader))); + reader_.reset( + new TypedColumnReader<TestType>(this->descr_.get(), std::move(page_reader))); } std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter( int64_t output_size = SMALL_SIZE, Encoding::type encoding = Encoding::PLAIN) { sink_.reset(new InMemoryOutputStream()); - metadata_ = ColumnChunkMetaDataBuilder::Make( - writer_properties_, schema_.get(), reinterpret_cast<uint8_t*>(&thrift_metadata_)); + metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_.get(), + reinterpret_cast<uint8_t*>(&thrift_metadata_)); std::unique_ptr<SerializedPageWriter> pager(new SerializedPageWriter( sink_.get(), Compression::UNCOMPRESSED, metadata_.get())); WriterProperties::Builder wp_builder; @@ -104,16 +83,15 @@ class TestPrimitiveWriter : public ::testing::Test { } writer_properties_ = wp_builder.build(); std::shared_ptr<ColumnWriter> writer = ColumnWriter::Make( - schema_.get(), std::move(pager), output_size, writer_properties_.get()); + this->descr_.get(), std::move(pager), output_size, writer_properties_.get()); return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer); } - void SyncValuesOut(); void ReadColumn() { BuildReader(); - reader_->ReadBatch(values_out_.size(), definition_levels_out_.data(), - repetition_levels_out_.data(), values_out_ptr_, &values_read_); - SyncValuesOut(); + reader_->ReadBatch(this->values_out_.size(), definition_levels_out_.data(), + repetition_levels_out_.data(), this->values_out_ptr_, &values_read_); + this->SyncValuesOut(); } void TestRequiredWithEncoding(Encoding::type encoding) { @@ -156,22 +134,10 @@ class TestPrimitiveWriter : public ::testing::Test { // content is bound to the reader. std::unique_ptr<TypedColumnReader<TestType>> reader_; - // Input buffers - std::vector<T> values_; - std::vector<uint8_t> buffer_; - // Pointer to the values, needed as we cannot use vector<bool>::data() - T* values_ptr_; - std::vector<uint8_t> bool_buffer_; - - // Output buffers - std::vector<T> values_out_; - std::vector<uint8_t> bool_buffer_out_; - T* values_out_ptr_; std::vector<int16_t> definition_levels_out_; std::vector<int16_t> repetition_levels_out_; private: - NodePtr node_; format::ColumnChunk thrift_metadata_; std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_; std::shared_ptr<ColumnDescriptor> schema_; @@ -179,46 +145,6 @@ class TestPrimitiveWriter : public ::testing::Test { std::shared_ptr<WriterProperties> writer_properties_; }; -template <typename TestType> -void TestPrimitiveWriter<TestType>::SetupValuesOut() { - values_out_.resize(SMALL_SIZE); - values_out_ptr_ = values_out_.data(); -} - -template <> -void TestPrimitiveWriter<BooleanType>::SetupValuesOut() { - values_out_.resize(SMALL_SIZE); - bool_buffer_out_.resize(SMALL_SIZE); - // Write once to all values so we can copy it without getting Valgrind errors - // about uninitialised values. - std::fill(bool_buffer_out_.begin(), bool_buffer_out_.end(), true); - values_out_ptr_ = reinterpret_cast<bool*>(bool_buffer_out_.data()); -} - -template <typename TestType> -void TestPrimitiveWriter<TestType>::SyncValuesOut() {} - -template <> -void TestPrimitiveWriter<BooleanType>::SyncValuesOut() { - std::copy(bool_buffer_out_.begin(), bool_buffer_out_.end(), values_out_.begin()); -} - -template <typename TestType> -void TestPrimitiveWriter<TestType>::GenerateData(int64_t num_values) { - values_.resize(num_values); - InitValues<T>(num_values, values_, buffer_); - values_ptr_ = values_.data(); -} - -template <> -void TestPrimitiveWriter<BooleanType>::GenerateData(int64_t num_values) { - values_.resize(num_values); - InitValues<T>(num_values, values_, buffer_); - bool_buffer_.resize(num_values); - std::copy(values_.begin(), values_.end(), bool_buffer_.begin()); - values_ptr_ = reinterpret_cast<bool*>(bool_buffer_.data()); -} - typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, BooleanType, ByteArrayType, FLBAType> TestTypes; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0bf72a96/src/parquet/column/test-specialization.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/test-specialization.h b/src/parquet/column/test-specialization.h index ab678b8..5803b65 100644 --- a/src/parquet/column/test-specialization.h +++ b/src/parquet/column/test-specialization.h @@ -22,6 +22,7 @@ #ifndef PARQUET_COLUMN_TEST_SPECIALIZATION_H #define PARQUET_COLUMN_TEST_SPECIALIZATION_H +#include <algorithm> #include <limits> #include <vector> @@ -59,6 +60,102 @@ void InitValues<Int96>(int num_values, vector<Int96>& values, vector<uint8_t>& b std::numeric_limits<int32_t>::max(), values.data()); } +// This class lives here because of its dependency on the InitValues specializations. +template <typename TestType> +class PrimitiveTypedTest : public ::testing::Test { + public: + typedef typename TestType::c_type T; + + void SetUpSchemaRequired() { + primitive_node_ = schema::PrimitiveNode::Make("column", Repetition::REQUIRED, + TestType::type_num, LogicalType::NONE, FLBA_LENGTH); + descr_ = std::make_shared<ColumnDescriptor>(primitive_node_, 0, 0); + node_ = schema::GroupNode::Make( + "schema", Repetition::REQUIRED, std::vector<schema::NodePtr>({primitive_node_})); + schema_.Init(node_); + } + + void SetUpSchemaOptional() { + primitive_node_ = schema::PrimitiveNode::Make("column", Repetition::OPTIONAL, + TestType::type_num, LogicalType::NONE, FLBA_LENGTH); + descr_ = std::make_shared<ColumnDescriptor>(primitive_node_, 1, 0); + node_ = schema::GroupNode::Make( + "schema", Repetition::REQUIRED, std::vector<schema::NodePtr>({primitive_node_})); + schema_.Init(node_); + } + + void SetUpSchemaRepeated() { + primitive_node_ = schema::PrimitiveNode::Make("column", Repetition::REPEATED, + TestType::type_num, LogicalType::NONE, FLBA_LENGTH); + descr_ = std::make_shared<ColumnDescriptor>(primitive_node_, 1, 1); + node_ = schema::GroupNode::Make( + "schema", Repetition::REQUIRED, std::vector<schema::NodePtr>({primitive_node_})); + schema_.Init(node_); + } + + void GenerateData(int64_t num_values); + void SetupValuesOut(int64_t num_values); + void SyncValuesOut(); + void SetUp() { SetUpSchemaRequired(); } + + protected: + schema::NodePtr primitive_node_; + schema::NodePtr node_; + SchemaDescriptor schema_; + std::shared_ptr<ColumnDescriptor> descr_; + + // Input buffers + std::vector<T> values_; + std::vector<uint8_t> buffer_; + // Pointer to the values, needed as we cannot use vector<bool>::data() + T* values_ptr_; + std::vector<uint8_t> bool_buffer_; + + // Output buffers + std::vector<T> values_out_; + std::vector<uint8_t> bool_buffer_out_; + T* values_out_ptr_; +}; + +template <typename TestType> +void PrimitiveTypedTest<TestType>::SyncValuesOut() {} + +template <> +void PrimitiveTypedTest<BooleanType>::SyncValuesOut() { + std::copy(bool_buffer_out_.begin(), bool_buffer_out_.end(), values_out_.begin()); +} + +template <typename TestType> +void PrimitiveTypedTest<TestType>::SetupValuesOut(int64_t num_values) { + values_out_.resize(num_values); + values_out_ptr_ = values_out_.data(); +} + +template <> +void PrimitiveTypedTest<BooleanType>::SetupValuesOut(int64_t num_values) { + values_out_.resize(num_values); + bool_buffer_out_.resize(num_values); + // Write once to all values so we can copy it without getting Valgrind errors + // about uninitialised values. + std::fill(bool_buffer_out_.begin(), bool_buffer_out_.end(), true); + values_out_ptr_ = reinterpret_cast<bool*>(bool_buffer_out_.data()); +} + +template <typename TestType> +void PrimitiveTypedTest<TestType>::GenerateData(int64_t num_values) { + values_.resize(num_values); + InitValues<T>(num_values, values_, buffer_); + values_ptr_ = values_.data(); +} + +template <> +void PrimitiveTypedTest<BooleanType>::GenerateData(int64_t num_values) { + values_.resize(num_values); + InitValues<T>(num_values, values_, buffer_); + bool_buffer_.resize(num_values); + std::copy(values_.begin(), values_.end(), bool_buffer_.begin()); + values_ptr_ = reinterpret_cast<bool*>(bool_buffer_.data()); +} } // namespace test } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0bf72a96/src/parquet/file/file-serialize-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc index bd41e1e..90ee7de 100644 --- a/src/parquet/file/file-serialize-test.cc +++ b/src/parquet/file/file-serialize-test.cc @@ -18,6 +18,8 @@ #include <gtest/gtest.h> #include "parquet/column/reader.h" +#include "parquet/column/test-util.h" +#include "parquet/column/test-specialization.h" #include "parquet/column/writer.h" #include "parquet/file/reader.h" #include "parquet/file/writer.h" @@ -33,45 +35,23 @@ using schema::PrimitiveNode; namespace test { -class TestSerialize : public ::testing::Test { +template <typename TestType> +class TestSerialize : public PrimitiveTypedTest<TestType> { public: - void SetUpSchemaRequired() { - auto pnode = PrimitiveNode::Make("int64", Repetition::REQUIRED, Type::INT64); - node_ = - GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode})); - schema_.Init(node_); - } - - void SetUpSchemaOptional() { - auto pnode = PrimitiveNode::Make("int64", Repetition::OPTIONAL, Type::INT64); - node_ = - GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode})); - schema_.Init(node_); - } - - void SetUpSchemaRepeated() { - auto pnode = PrimitiveNode::Make("int64", Repetition::REPEATED, Type::INT64); - node_ = - GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode})); - schema_.Init(node_); - } - - void SetUp() { SetUpSchemaRequired(); } + typedef typename TestType::c_type T; protected: - NodePtr node_; - SchemaDescriptor schema_; - void FileSerializeTest(Compression::type codec_type) { std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream()); - auto gnode = std::static_pointer_cast<GroupNode>(node_); + auto gnode = std::static_pointer_cast<GroupNode>(this->node_); std::shared_ptr<WriterProperties> writer_properties = - WriterProperties::Builder().compression("schema.int64", codec_type)->build(); + WriterProperties::Builder().compression("column", codec_type)->build(); auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties); auto row_group_writer = file_writer->AppendRowGroup(100); - auto column_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn()); - std::vector<int64_t> values(100, 128); - column_writer->WriteBatch(values.size(), nullptr, nullptr, values.data()); + auto column_writer = + static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn()); + this->GenerateData(100); + column_writer->WriteBatch(100, nullptr, nullptr, this->values_ptr_); column_writer->Close(); row_group_writer->Close(); file_writer->Close(); @@ -86,29 +66,38 @@ class TestSerialize : public ::testing::Test { auto rg_reader = file_reader->RowGroup(0); ASSERT_EQ(1, rg_reader->metadata()->num_columns()); ASSERT_EQ(100, rg_reader->metadata()->num_rows()); + // Check that the specified compression was actually used. + ASSERT_EQ(codec_type, rg_reader->metadata()->ColumnChunk(0)->compression()); - auto col_reader = std::static_pointer_cast<Int64Reader>(rg_reader->Column(0)); - std::vector<int64_t> values_out(100); + auto col_reader = + std::static_pointer_cast<TypedColumnReader<TestType>>(rg_reader->Column(0)); std::vector<int16_t> def_levels_out(100); std::vector<int16_t> rep_levels_out(100); int64_t values_read; - col_reader->ReadBatch(values_out.size(), def_levels_out.data(), rep_levels_out.data(), - values_out.data(), &values_read); + this->SetupValuesOut(100); + col_reader->ReadBatch(100, def_levels_out.data(), rep_levels_out.data(), + this->values_out_ptr_, &values_read); + this->SyncValuesOut(); ASSERT_EQ(100, values_read); - ASSERT_EQ(values, values_out); + ASSERT_EQ(this->values_, this->values_out_); } }; -TEST_F(TestSerialize, SmallFileUncompressed) { - FileSerializeTest(Compression::UNCOMPRESSED); +typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, + BooleanType, ByteArrayType, FLBAType> TestTypes; + +TYPED_TEST_CASE(TestSerialize, TestTypes); + +TYPED_TEST(TestSerialize, SmallFileUncompressed) { + this->FileSerializeTest(Compression::UNCOMPRESSED); } -TEST_F(TestSerialize, SmallFileSnappy) { - FileSerializeTest(Compression::SNAPPY); +TYPED_TEST(TestSerialize, SmallFileSnappy) { + this->FileSerializeTest(Compression::SNAPPY); } -TEST_F(TestSerialize, SmallFileGzip) { - FileSerializeTest(Compression::GZIP); +TYPED_TEST(TestSerialize, SmallFileGzip) { + this->FileSerializeTest(Compression::GZIP); } } // namespace test http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0bf72a96/src/parquet/file/metadata.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index 00ce990..9964882 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -355,13 +355,13 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback) { if (dictionary_page_offset > 0) { + column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset); column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size); } else { column_chunk_->__set_file_offset(data_page_offset + compressed_size); } column_chunk_->__isset.meta_data = true; column_chunk_->meta_data.__set_num_values(num_values); - column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset); column_chunk_->meta_data.__set_index_page_offset(index_page_offset); column_chunk_->meta_data.__set_data_page_offset(data_page_offset); column_chunk_->meta_data.__set_total_uncompressed_size(uncompressed_size);
