Repository: parquet-cpp Updated Branches: refs/heads/master 2b935ae96 -> 08088af76
PARQUET-485: Decouple page deserialization from column reader to facilitate unit testing Several things in this patch * Adds PageReader abstraction, and a SerializedPageReader implementation according to the Parquet file format * Adds a MockPageReader and a couple unit tests demonstrating end-to-end test without creating a Parquet file * Adds a DataPageBuilder test fixture tool, may become part of the main write path later * Adds PlainEncoder implementation for a few primitive types * Fixes a few ColumnReader bugs exposed by the unit tests Author: Wes McKinney <[email protected]> Closes #32 from wesm/PARQUET-485 and squashes the following commits: aa33078 [Wes McKinney] Fix function doc e897a81 [Wes McKinney] Restore NumRequiredBits function after rebase ee4d97a [Wes McKinney] Change PageReader::NextPage API to return shared_ptr<Page>(nullptr) on eos 0324021 [Wes McKinney] Clarify some comments ec871c4 [Wes McKinney] Add include guards e63bbdd [Wes McKinney] Move vector_equal to util/test-common.h 44a78a1 [Wes McKinney] Refactor to decouple page deserialization from column reader so that mock data pages cna be constructed in unit tests. Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/08088af7 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/08088af7 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/08088af7 Branch: refs/heads/master Commit: 08088af76ec2357318e045f0696901e2e6e79fbf Parents: 2b935ae Author: Wes McKinney <[email protected]> Authored: Tue Feb 2 10:50:17 2016 -0800 Committer: Nong Li <[email protected]> Committed: Tue Feb 2 10:50:17 2016 -0800 ---------------------------------------------------------------------- CMakeLists.txt | 2 + src/parquet/column/CMakeLists.txt | 4 + src/parquet/column/column-reader-test.cc | 165 +++++++++++++++++++++++ src/parquet/column/page.h | 132 ++++++++++++++++++ src/parquet/column/reader.cc | 164 +++++++++++------------ src/parquet/column/reader.h | 80 +++++------ src/parquet/column/serialized-page.cc | 103 ++++++++++++++ src/parquet/column/serialized-page.h | 61 +++++++++ src/parquet/column/test-util.h | 184 ++++++++++++++++++++++++++ src/parquet/encodings/encodings.h | 34 +++++ src/parquet/encodings/plain-encoding.h | 56 +++++++- src/parquet/reader.cc | 13 +- src/parquet/reader.h | 4 + src/parquet/util/bit-util.h | 8 ++ src/parquet/util/test-common.h | 53 ++++++++ 15 files changed, 924 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 173d676..d379e3d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -214,6 +214,7 @@ set(PARQUET_TEST_LINK_LIBS ${PARQUET_MIN_TEST_LIBS}) # Library config set(LIBPARQUET_SRCS + src/parquet/column/serialized-page.cc src/parquet/column/reader.cc src/parquet/column/scanner.cc src/parquet/reader.cc @@ -246,6 +247,7 @@ if(APPLE) endif() add_subdirectory(src/parquet) +add_subdirectory(src/parquet/column) add_subdirectory(src/parquet/compression) add_subdirectory(src/parquet/encodings) add_subdirectory(src/parquet/thrift) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/column/CMakeLists.txt b/src/parquet/column/CMakeLists.txt index 20f6167..7eb334e 100644 --- a/src/parquet/column/CMakeLists.txt +++ b/src/parquet/column/CMakeLists.txt @@ -17,6 +17,10 @@ # Headers: top level install(FILES + page.h reader.h + serialized-page.h scanner.h DESTINATION include/parquet/column) + +ADD_PARQUET_TEST(column-reader-test) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/column-reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc new file mode 100644 index 0000000..88f4465 --- /dev/null +++ b/src/parquet/column/column-reader-test.cc @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <cstdlib> +#include <iostream> +#include <sstream> +#include <string> +#include <vector> + +#include <gtest/gtest.h> + +#include "parquet/types.h" +#include "parquet/column/page.h" +#include "parquet/column/reader.h" +#include "parquet/column/test-util.h" + +#include "parquet/util/test-common.h" + +using std::string; +using std::vector; +using std::shared_ptr; +using parquet::FieldRepetitionType; +using parquet::SchemaElement; +using parquet::Encoding; +using parquet::Type; + +namespace parquet_cpp { + +namespace test { + +class TestPrimitiveReader : public ::testing::Test { + public: + void SetUp() {} + + void TearDown() {} + + void InitReader(const SchemaElement* element) { + pager_.reset(new test::MockPageReader(pages_)); + reader_ = ColumnReader::Make(element, std::move(pager_)); + } + + protected: + std::shared_ptr<ColumnReader> reader_; + std::unique_ptr<PageReader> pager_; + vector<shared_ptr<Page> > pages_; +}; + +template <typename T> +static vector<T> slice(const vector<T>& values, size_t start, size_t end) { + if (end < start) { + return vector<T>(0); + } + + vector<T> out(end - start); + for (size_t i = start; i < end; ++i) { + out[i - start] = values[i]; + } + return out; +} + + +TEST_F(TestPrimitiveReader, TestInt32FlatRequired) { + vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + size_t num_values = values.size(); + Encoding::type value_encoding = Encoding::PLAIN; + + vector<uint8_t> page1; + test::DataPageBuilder<Type::INT32> page_builder(&page1); + page_builder.AppendValues(values, Encoding::PLAIN); + pages_.push_back(page_builder.Finish()); + + // TODO: simplify this + SchemaElement element; + element.__set_type(Type::INT32); + element.__set_repetition_type(FieldRepetitionType::REQUIRED); + InitReader(&element); + + Int32Reader* reader = static_cast<Int32Reader*>(reader_.get()); + + vector<int32_t> result(10, -1); + + size_t values_read = 0; + size_t batch_actual = reader->ReadBatch(10, nullptr, nullptr, + &result[0], &values_read); + ASSERT_EQ(10, batch_actual); + ASSERT_EQ(10, values_read); + + ASSERT_TRUE(vector_equal(result, values)); +} + +TEST_F(TestPrimitiveReader, TestInt32FlatOptional) { + vector<int32_t> values = {1, 2, 3, 4, 5}; + vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1}; + + size_t num_values = values.size(); + Encoding::type value_encoding = Encoding::PLAIN; + + vector<uint8_t> page1; + test::DataPageBuilder<Type::INT32> page_builder(&page1); + + // Definition levels precede the values + page_builder.AppendDefLevels(def_levels, 1, Encoding::RLE); + page_builder.AppendValues(values, Encoding::PLAIN); + + pages_.push_back(page_builder.Finish()); + + // TODO: simplify this + SchemaElement element; + element.__set_type(Type::INT32); + element.__set_repetition_type(FieldRepetitionType::OPTIONAL); + InitReader(&element); + + Int32Reader* reader = static_cast<Int32Reader*>(reader_.get()); + + std::vector<int32_t> vexpected; + std::vector<int16_t> dexpected; + + size_t values_read = 0; + size_t batch_actual = 0; + + vector<int32_t> vresult(3, -1); + vector<int16_t> dresult(5, -1); + + batch_actual = reader->ReadBatch(5, &dresult[0], nullptr, + &vresult[0], &values_read); + ASSERT_EQ(5, batch_actual); + ASSERT_EQ(3, values_read); + + ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3))); + ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5))); + + batch_actual = reader->ReadBatch(5, &dresult[0], nullptr, + &vresult[0], &values_read); + ASSERT_EQ(5, batch_actual); + ASSERT_EQ(2, values_read); + + ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5))); + ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10))); + + // EOS, pass all nullptrs to check for improper writes. Do not segfault / + // core dump + batch_actual = reader->ReadBatch(5, nullptr, nullptr, + nullptr, &values_read); + ASSERT_EQ(0, batch_actual); + ASSERT_EQ(0, values_read); +} + +} // namespace test + +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/page.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h new file mode 100644 index 0000000..46f5d62 --- /dev/null +++ b/src/parquet/column/page.h @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module defines an abstract interface for iterating through pages in a +// Parquet column chunk within a row group. It could be extended in the future +// to iterate through all data pages in all chunks in a file. + +#ifndef PARQUET_COLUMN_PAGE_H +#define PARQUET_COLUMN_PAGE_H + +#include "parquet/thrift/parquet_types.h" + +namespace parquet_cpp { + +// Note: Copying the specific page header Thrift metadata to the Page object +// (instead of using a pointer) presently so that data pages can be +// decompressed and processed in parallel. We can turn the header members of +// these classes into pointers at some point, but the downside is that +// applications materializing multiple data pages at once will have to have a +// data container that manages the lifetime of the deserialized +// parquet::PageHeader structs. +// +// TODO: Parallel processing is not yet safe because of memory-ownership +// semantics (the PageReader may or may not own the memory referenced by a +// page) +class Page { + // TODO(wesm): In the future Parquet implementations may store the crc code + // in parquet::PageHeader. parquet-mr currently does not, so we also skip it + // here, both on the read and write path + public: + Page(const uint8_t* buffer, size_t buffer_size, parquet::PageType::type type) : + buffer_(buffer), + buffer_size_(buffer_size), + type_(type) {} + + parquet::PageType::type type() const { + return type_; + } + + // @returns: a pointer to the page's data + const uint8_t* data() const { + return buffer_; + } + + // @returns: the total size in bytes of the page's data buffer + size_t size() const { + return buffer_size_; + } + + private: + const uint8_t* buffer_; + size_t buffer_size_; + + parquet::PageType::type type_; +}; + + +class DataPage : public Page { + public: + DataPage(const uint8_t* buffer, size_t buffer_size, + const parquet::DataPageHeader& header) : + Page(buffer, buffer_size, parquet::PageType::DATA_PAGE), + header_(header) {} + + size_t num_values() const { + return header_.num_values; + } + + parquet::Encoding::type encoding() const { + return header_.encoding; + } + + private: + parquet::DataPageHeader header_; +}; + + +class DataPageV2 : public Page { + public: + DataPageV2(const uint8_t* buffer, size_t buffer_size, + const parquet::DataPageHeaderV2& header) : + Page(buffer, buffer_size, parquet::PageType::DATA_PAGE_V2), + header_(header) {} + + private: + parquet::DataPageHeaderV2 header_; +}; + + +class DictionaryPage : public Page { + public: + DictionaryPage(const uint8_t* buffer, size_t buffer_size, + const parquet::DictionaryPageHeader& header) : + Page(buffer, buffer_size, parquet::PageType::DICTIONARY_PAGE), + header_(header) {} + + size_t num_values() const { + return header_.num_values; + } + + private: + parquet::DictionaryPageHeader header_; +}; + +// Abstract page iterator interface. This way, we can feed column pages to the +// ColumnReader through whatever mechanism we choose +class PageReader { + public: + virtual ~PageReader() {} + + // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page> + // containing new Page otherwise + virtual std::shared_ptr<Page> NextPage() = 0; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_COLUMN_PAGE_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc index edfea49..91e026a 100644 --- a/src/parquet/column/reader.cc +++ b/src/parquet/column/reader.cc @@ -18,45 +18,62 @@ #include "parquet/column/reader.h" #include <algorithm> +#include <memory> #include <string> #include <string.h> -#include "parquet/compression/codec.h" -#include "parquet/encodings/encodings.h" -#include "parquet/thrift/util.h" -#include "parquet/util/input_stream.h" +#include "parquet/column/page.h" -const int DATA_PAGE_SIZE = 64 * 1024; +#include "parquet/encodings/encodings.h" namespace parquet_cpp { -using parquet::CompressionCodec; using parquet::Encoding; using parquet::FieldRepetitionType; using parquet::PageType; using parquet::Type; -ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata, - const parquet::SchemaElement* schema, std::unique_ptr<InputStream> stream) - : metadata_(metadata), - schema_(schema), - stream_(std::move(stream)), +ColumnReader::ColumnReader(const parquet::SchemaElement* schema, + std::unique_ptr<PageReader> pager) + : schema_(schema), + pager_(std::move(pager)), num_buffered_values_(0), - num_decoded_values_(0) { - switch (metadata->codec) { - case CompressionCodec::UNCOMPRESSED: - break; - case CompressionCodec::SNAPPY: - decompressor_.reset(new SnappyCodec()); - break; - default: - ParquetException::NYI("Reading compressed data"); + num_decoded_values_(0) {} + +template <int TYPE> +void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) { + auto it = decoders_.find(Encoding::RLE_DICTIONARY); + if (it != decoders_.end()) { + throw ParquetException("Column cannot have more than one dictionary."); } - config_ = Config::DefaultConfig(); + PlainDecoder<TYPE> dictionary(schema_); + dictionary.SetData(page->num_values(), page->data(), page->size()); + + // The dictionary is fully decoded during DictionaryDecoder::Init, so the + // DictionaryPage buffer is no longer required after this step + // + // TODO(wesm): investigate whether this all-or-nothing decoding of the + // dictionary makes sense and whether performance can be improved + std::shared_ptr<DecoderType> decoder( + new DictionaryDecoder<TYPE>(schema_, &dictionary)); + + decoders_[Encoding::RLE_DICTIONARY] = decoder; + current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get(); } +static size_t InitializeLevelDecoder(const uint8_t* buffer, + int16_t max_level, std::unique_ptr<RleDecoder>& decoder) { + int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer); + + decoder.reset(new RleDecoder(buffer + sizeof(uint32_t), + num_definition_bytes, + BitUtil::NumRequiredBits(max_level))); + + return sizeof(uint32_t) + num_definition_bytes; +} + // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index // encoding. static bool IsDictionaryIndexEncoding(const Encoding::type& e) { @@ -66,68 +83,44 @@ static bool IsDictionaryIndexEncoding(const Encoding::type& e) { template <int TYPE> bool TypedColumnReader<TYPE>::ReadNewPage() { // Loop until we find the next data page. + const uint8_t* buffer; while (true) { - int bytes_read = 0; - const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read); - if (bytes_read == 0) return false; - uint32_t header_size = bytes_read; - DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); - stream_->Read(header_size, &bytes_read); - - int compressed_len = current_page_header_.compressed_page_size; - int uncompressed_len = current_page_header_.uncompressed_page_size; - - // Read the compressed data page. - buffer = stream_->Read(compressed_len, &bytes_read); - if (bytes_read != compressed_len) ParquetException::EofException(); - - // Uncompress it if we need to - if (decompressor_ != NULL) { - // Grow the uncompressed buffer if we need to. - if (uncompressed_len > decompression_buffer_.size()) { - decompression_buffer_.resize(uncompressed_len); - } - decompressor_->Decompress(compressed_len, buffer, uncompressed_len, - &decompression_buffer_[0]); - buffer = &decompression_buffer_[0]; + current_page_ = pager_->NextPage(); + if (!current_page_) { + // EOS + return false; } - if (current_page_header_.type == PageType::DICTIONARY_PAGE) { - auto it = decoders_.find(Encoding::RLE_DICTIONARY); - if (it != decoders_.end()) { - throw ParquetException("Column cannot have more than one dictionary."); - } - - PlainDecoder<TYPE> dictionary(schema_); - dictionary.SetData(current_page_header_.dictionary_page_header.num_values, - buffer, uncompressed_len); - std::shared_ptr<DecoderType> decoder( - new DictionaryDecoder<TYPE>(schema_, &dictionary)); - - decoders_[Encoding::RLE_DICTIONARY] = decoder; - current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get(); + if (current_page_->type() == PageType::DICTIONARY_PAGE) { + ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get())); continue; - } else if (current_page_header_.type == PageType::DATA_PAGE) { + } else if (current_page_->type() == PageType::DATA_PAGE) { + const DataPage* page = static_cast<const DataPage*>(current_page_.get()); + // Read a data page. - num_buffered_values_ = current_page_header_.data_page_header.num_values; + num_buffered_values_ = page->num_values(); // Have not decoded any values from the data page yet num_decoded_values_ = 0; + buffer = page->data(); + + // If the data page includes repetition and definition levels, we + // initialize the level decoder and subtract the encoded level bytes from + // the page size to determine the number of bytes in the encoded data. + size_t data_size = page->size(); + // Read definition levels. if (schema_->repetition_type != FieldRepetitionType::REQUIRED) { - int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer); - - // Temporary hack until schema resolution + // Temporary hack until schema resolution implemented max_definition_level_ = 1; - buffer += sizeof(uint32_t); - definition_level_decoder_.reset( - new RleDecoder(buffer, num_definition_bytes, 1)); - buffer += num_definition_bytes; - uncompressed_len -= sizeof(uint32_t); - uncompressed_len -= num_definition_bytes; + size_t def_levels_bytes = InitializeLevelDecoder(buffer, + max_definition_level_, definition_level_decoder_); + + buffer += def_levels_bytes; + data_size -= def_levels_bytes; } else { // REQUIRED field max_definition_level_ = 0; @@ -137,7 +130,8 @@ bool TypedColumnReader<TYPE>::ReadNewPage() { // Get a decoder object for this page or create a new decoder if this is the // first page with this encoding. - Encoding::type encoding = current_page_header_.data_page_header.encoding; + Encoding::type encoding = page->encoding(); + if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY; auto it = decoders_.find(encoding); @@ -163,10 +157,11 @@ bool TypedColumnReader<TYPE>::ReadNewPage() { throw ParquetException("Unknown encoding type."); } } - current_decoder_->SetData(num_buffered_values_, buffer, uncompressed_len); + current_decoder_->SetData(num_buffered_values_, buffer, data_size); return true; } else { - // We don't know what this page type is. We're allowed to skip non-data pages. + // We don't know what this page type is. We're allowed to skip non-data + // pages. continue; } } @@ -206,27 +201,26 @@ size_t ColumnReader::ReadRepetitionLevels(size_t batch_size, int16_t* levels) { // ---------------------------------------------------------------------- // Dynamic column reader constructor -std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData* metadata, - const parquet::SchemaElement* element, std::unique_ptr<InputStream> stream) { - switch (metadata->type) { +std::shared_ptr<ColumnReader> ColumnReader::Make( + const parquet::SchemaElement* element, + std::unique_ptr<PageReader> pager) { + switch (element->type) { case Type::BOOLEAN: - return std::make_shared<BoolReader>(metadata, element, std::move(stream)); + return std::make_shared<BoolReader>(element, std::move(pager)); case Type::INT32: - return std::make_shared<Int32Reader>(metadata, element, std::move(stream)); + return std::make_shared<Int32Reader>(element, std::move(pager)); case Type::INT64: - return std::make_shared<Int64Reader>(metadata, element, std::move(stream)); + return std::make_shared<Int64Reader>(element, std::move(pager)); case Type::INT96: - return std::make_shared<Int96Reader>(metadata, element, std::move(stream)); + return std::make_shared<Int96Reader>(element, std::move(pager)); case Type::FLOAT: - return std::make_shared<FloatReader>(metadata, element, std::move(stream)); + return std::make_shared<FloatReader>(element, std::move(pager)); case Type::DOUBLE: - return std::make_shared<DoubleReader>(metadata, element, std::move(stream)); + return std::make_shared<DoubleReader>(element, std::move(pager)); case Type::BYTE_ARRAY: - return std::make_shared<ByteArrayReader>(metadata, element, - std::move(stream)); + return std::make_shared<ByteArrayReader>(element, std::move(pager)); case Type::FIXED_LEN_BYTE_ARRAY: - return std::make_shared<FixedLenByteArrayReader>(metadata, element, - std::move(stream)); + return std::make_shared<FixedLenByteArrayReader>(element, std::move(pager)); default: ParquetException::NYI("type reader not implemented"); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h index 8f857c4..27ff678 100644 --- a/src/parquet/column/reader.h +++ b/src/parquet/column/reader.h @@ -28,9 +28,11 @@ #include "parquet/exception.h" #include "parquet/types.h" + +#include "parquet/column/page.h" + #include "parquet/thrift/parquet_constants.h" #include "parquet/thrift/parquet_types.h" -#include "parquet/util/input_stream.h" #include "parquet/encodings/encodings.h" #include "parquet/util/rle-encoding.h" @@ -52,21 +54,10 @@ class Scanner; class ColumnReader { public: - struct Config { - int batch_size; - - static Config DefaultConfig() { - Config config; - config.batch_size = 128; - return config; - } - }; - - ColumnReader(const parquet::ColumnMetaData*, const parquet::SchemaElement*, - std::unique_ptr<InputStream> stream); + ColumnReader(const parquet::SchemaElement*, std::unique_ptr<PageReader>); - static std::shared_ptr<ColumnReader> Make(const parquet::ColumnMetaData*, - const parquet::SchemaElement*, std::unique_ptr<InputStream> stream); + static std::shared_ptr<ColumnReader> Make(const parquet::SchemaElement*, + std::unique_ptr<PageReader>); // Returns true if there are still values in this column. bool HasNext() { @@ -81,11 +72,7 @@ class ColumnReader { } parquet::Type::type type() const { - return metadata_->type; - } - - const parquet::ColumnMetaData* metadata() const { - return metadata_; + return schema_->type; } const parquet::SchemaElement* schema() const { @@ -105,17 +92,10 @@ class ColumnReader { // Returns the number of decoded repetition levels size_t ReadRepetitionLevels(size_t batch_size, int16_t* levels); - Config config_; - - const parquet::ColumnMetaData* metadata_; const parquet::SchemaElement* schema_; - std::unique_ptr<InputStream> stream_; - // Compression codec to use. - std::unique_ptr<Codec> decompressor_; - std::vector<uint8_t> decompression_buffer_; - - parquet::PageHeader current_page_header_; + std::unique_ptr<PageReader> pager_; + std::shared_ptr<Page> current_page_; // Not set if full schema for this field has no optional or repeated elements std::unique_ptr<RleDecoder> definition_level_decoder_; @@ -145,12 +125,10 @@ class TypedColumnReader : public ColumnReader { public: typedef typename type_traits<TYPE>::value_type T; - TypedColumnReader(const parquet::ColumnMetaData* metadata, - const parquet::SchemaElement* schema, std::unique_ptr<InputStream> stream) : - ColumnReader(metadata, schema, std::move(stream)), + TypedColumnReader(const parquet::SchemaElement* schema, + std::unique_ptr<PageReader> pager) : + ColumnReader(schema, std::move(pager)), current_decoder_(NULL) { - size_t value_byte_size = type_traits<TYPE>::value_byte_size; - values_buffer_.resize(config_.batch_size * value_byte_size); } // Read a batch of repetition levels, definition levels, and values from the @@ -181,18 +159,20 @@ class TypedColumnReader : public ColumnReader { // @returns: the number of values read into the out buffer size_t ReadValues(size_t batch_size, T* out); - // Map of compression type to decompressor object. + // Map of encoding type to the respective decoder object. For example, a + // column chunk's data pages may include both dictionary-encoded and + // plain-encoded data. std::unordered_map<parquet::Encoding::type, std::shared_ptr<DecoderType> > decoders_; + void ConfigureDictionary(const DictionaryPage* page); + DecoderType* current_decoder_; - std::vector<uint8_t> values_buffer_; }; template <int TYPE> inline size_t TypedColumnReader<TYPE>::ReadValues(size_t batch_size, T* out) { size_t num_decoded = current_decoder_->Decode(out, batch_size); - num_decoded_values_ += num_decoded; return num_decoded; } @@ -212,9 +192,22 @@ inline size_t TypedColumnReader<TYPE>::ReadBatch(int batch_size, int16_t* def_le size_t num_def_levels = 0; size_t num_rep_levels = 0; + size_t values_to_read = 0; + // If the field is required and non-repeated, there are no definition levels if (definition_level_decoder_) { num_def_levels = ReadDefinitionLevels(batch_size, def_levels); + + // TODO(wesm): this tallying of values-to-decode can be performed with better + // cache-efficiency if fused with the level decoding. + for (size_t i = 0; i < num_def_levels; ++i) { + if (def_levels[i] == max_definition_level_) { + ++values_to_read; + } + } + } else { + // Required field, read all values + values_to_read = batch_size; } // Not present for non-repeated fields @@ -226,18 +219,11 @@ inline size_t TypedColumnReader<TYPE>::ReadBatch(int batch_size, int16_t* def_le } } - // TODO(wesm): this tallying of values-to-decode can be performed with better - // cache-efficiency if fused with the level decoding. - size_t values_to_read = 0; - for (size_t i = 0; i < num_def_levels; ++i) { - if (def_levels[i] == max_definition_level_) { - ++values_to_read; - } - } - *values_read = ReadValues(values_to_read, values); + size_t total_values = std::max(num_def_levels, *values_read); + num_decoded_values_ += total_values; - return num_def_levels; + return total_values; } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/serialized-page.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/serialized-page.cc b/src/parquet/column/serialized-page.cc new file mode 100644 index 0000000..1cbaf4d --- /dev/null +++ b/src/parquet/column/serialized-page.cc @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/column/serialized-page.h" + +#include <memory> + +#include "parquet/exception.h" +#include "parquet/thrift/util.h" +#include "parquet/util/input_stream.h" + +using parquet::PageType; + +namespace parquet_cpp { + +// ---------------------------------------------------------------------- +// SerializedPageReader deserializes Thrift metadata and pages that have been +// assembled in a serialized stream for storing in a Parquet files + +SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream, + parquet::CompressionCodec::type codec) : + stream_(std::move(stream)) { + switch (codec) { + case parquet::CompressionCodec::UNCOMPRESSED: + break; + case parquet::CompressionCodec::SNAPPY: + decompressor_.reset(new SnappyCodec()); + break; + default: + ParquetException::NYI("Reading compressed data"); + } +} + +// TODO(wesm): this may differ from file to file +static constexpr int DATA_PAGE_SIZE = 64 * 1024; + +std::shared_ptr<Page> SerializedPageReader::NextPage() { + // Loop here because there may be unhandled page types that we skip until + // finding a page that we do know what to do with + while (true) { + int bytes_read = 0; + const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read); + if (bytes_read == 0) { + return std::shared_ptr<Page>(nullptr); + } + + // This gets used, then set by DeserializeThriftMsg + uint32_t header_size = bytes_read; + DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); + + // Advance the stream offset + stream_->Read(header_size, &bytes_read); + + int compressed_len = current_page_header_.compressed_page_size; + int uncompressed_len = current_page_header_.uncompressed_page_size; + + // Read the compressed data page. + buffer = stream_->Read(compressed_len, &bytes_read); + if (bytes_read != compressed_len) ParquetException::EofException(); + + // Uncompress it if we need to + if (decompressor_ != NULL) { + // Grow the uncompressed buffer if we need to. + if (uncompressed_len > decompression_buffer_.size()) { + decompression_buffer_.resize(uncompressed_len); + } + decompressor_->Decompress(compressed_len, buffer, uncompressed_len, + &decompression_buffer_[0]); + buffer = &decompression_buffer_[0]; + } + + if (current_page_header_.type == PageType::DICTIONARY_PAGE) { + return std::make_shared<DictionaryPage>(buffer, uncompressed_len, + current_page_header_.dictionary_page_header); + } else if (current_page_header_.type == PageType::DATA_PAGE) { + return std::make_shared<DataPage>(buffer, uncompressed_len, + current_page_header_.data_page_header); + } else if (current_page_header_.type == PageType::DATA_PAGE_V2) { + ParquetException::NYI("data page v2"); + } else { + // We don't know what this page type is. We're allowed to skip non-data + // pages. + continue; + } + } + return std::shared_ptr<Page>(nullptr); +} + +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/serialized-page.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/serialized-page.h b/src/parquet/column/serialized-page.h new file mode 100644 index 0000000..2735c3c --- /dev/null +++ b/src/parquet/column/serialized-page.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module defines an abstract interface for iterating through pages in a +// Parquet column chunk within a row group. It could be extended in the future +// to iterate through all data pages in all chunks in a file. + +#ifndef PARQUET_COLUMN_SERIALIZED_PAGE_H +#define PARQUET_COLUMN_SERIALIZED_PAGE_H + +#include <memory> +#include <vector> + +#include "parquet/column/page.h" +#include "parquet/compression/codec.h" +#include "parquet/util/input_stream.h" +#include "parquet/thrift/parquet_types.h" + +namespace parquet_cpp { + +// This subclass delimits pages appearing in a serialized stream, each preceded +// by a serialized Thrift parquet::PageHeader indicating the type of each page +// and the page metadata. +class SerializedPageReader : public PageReader { + public: + SerializedPageReader(std::unique_ptr<InputStream> stream, + parquet::CompressionCodec::type codec); + + virtual ~SerializedPageReader() {} + + // Implement the PageReader interface + virtual std::shared_ptr<Page> NextPage(); + + private: + std::unique_ptr<InputStream> stream_; + + parquet::PageHeader current_page_header_; + std::shared_ptr<Page> current_page_; + + // Compression codec to use. + std::unique_ptr<Codec> decompressor_; + std::vector<uint8_t> decompression_buffer_; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_COLUMN_SERIALIZED_PAGE_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h new file mode 100644 index 0000000..80f3fa1 --- /dev/null +++ b/src/parquet/column/test-util.h @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module defines an abstract interface for iterating through pages in a +// Parquet column chunk within a row group. It could be extended in the future +// to iterate through all data pages in all chunks in a file. + +#ifndef PARQUET_COLUMN_TEST_UTIL_H +#define PARQUET_COLUMN_TEST_UTIL_H + +#include <algorithm> +#include <memory> +#include <vector> + +#include "parquet/column/page.h" + +using parquet::Encoding; + +namespace parquet_cpp { + +namespace test { + +class MockPageReader : public PageReader { + public: + explicit MockPageReader(const std::vector<std::shared_ptr<Page> >& pages) : + pages_(pages), + page_index_(0) {} + + // Implement the PageReader interface + virtual std::shared_ptr<Page> NextPage() { + if (page_index_ == pages_.size()) { + // EOS to consumer + return std::shared_ptr<Page>(nullptr); + } + return pages_[page_index_++]; + } + + private: + std::vector<std::shared_ptr<Page> > pages_; + size_t page_index_; +}; + +// TODO(wesm): this is only used for testing for now + +static constexpr int DEFAULT_DATA_PAGE_SIZE = 64 * 1024; +static constexpr int INIT_BUFFER_SIZE = 1024; + +template <int TYPE> +class DataPageBuilder { + public: + typedef typename type_traits<TYPE>::value_type T; + + // The passed vector is the owner of the page's data + explicit DataPageBuilder(std::vector<uint8_t>* out) : + out_(out), + buffer_size_(0), + num_values_(0), + have_def_levels_(false), + have_rep_levels_(false), + have_values_(false) { + out_->resize(INIT_BUFFER_SIZE); + buffer_capacity_ = INIT_BUFFER_SIZE; + } + + void AppendDefLevels(const std::vector<int16_t>& levels, + int16_t max_level, parquet::Encoding::type encoding) { + AppendLevels(levels, max_level, encoding); + + num_values_ = std::max(levels.size(), num_values_); + header_.__set_definition_level_encoding(encoding); + have_def_levels_ = true; + } + + void AppendRepLevels(const std::vector<int16_t>& levels, + int16_t max_level, parquet::Encoding::type encoding) { + AppendLevels(levels, max_level, encoding); + + num_values_ = std::max(levels.size(), num_values_); + header_.__set_repetition_level_encoding(encoding); + have_rep_levels_ = true; + } + + void AppendValues(const std::vector<T>& values, + parquet::Encoding::type encoding) { + if (encoding != Encoding::PLAIN) { + ParquetException::NYI("only plain encoding currently implemented"); + } + size_t bytes_to_encode = values.size() * sizeof(T); + Reserve(bytes_to_encode); + + PlainEncoder<TYPE> encoder(nullptr); + size_t nbytes = encoder.Encode(&values[0], values.size(), Head()); + // In case for some reason it's fewer than bytes_to_encode + buffer_size_ += nbytes; + + num_values_ = std::max(values.size(), num_values_); + header_.__set_encoding(encoding); + have_values_ = true; + } + + std::shared_ptr<Page> Finish() { + if (!have_values_) { + throw ParquetException("A data page must at least contain values"); + } + header_.__set_num_values(num_values_); + return std::make_shared<DataPage>(&(*out_)[0], buffer_size_, header_); + } + + private: + std::vector<uint8_t>* out_; + + size_t buffer_size_; + size_t buffer_capacity_; + + parquet::DataPageHeader header_; + + size_t num_values_; + + bool have_def_levels_; + bool have_rep_levels_; + bool have_values_; + + void Reserve(size_t nbytes) { + while ((nbytes + buffer_size_) > buffer_capacity_) { + // TODO(wesm): limit to one reserve when this loop runs more than once + size_t new_capacity = 2 * buffer_capacity_; + out_->resize(new_capacity); + buffer_capacity_ = new_capacity; + } + } + + uint8_t* Head() { + return &(*out_)[buffer_size_]; + } + + // Used internally for both repetition and definition levels + void AppendLevels(const std::vector<int16_t>& levels, int16_t max_level, + parquet::Encoding::type encoding) { + if (encoding != Encoding::RLE) { + ParquetException::NYI("only rle encoding currently implemented"); + } + + // TODO: compute a more precise maximum size for the encoded levels + std::vector<uint8_t> encode_buffer(DEFAULT_DATA_PAGE_SIZE); + + RleEncoder encoder(&encode_buffer[0], encode_buffer.size(), + BitUtil::NumRequiredBits(max_level)); + + // TODO(wesm): push down vector encoding + for (int16_t level : levels) { + if (!encoder.Put(level)) { + throw ParquetException("out of space"); + } + } + + uint32_t rle_bytes = encoder.Flush(); + size_t levels_footprint = sizeof(uint32_t) + rle_bytes; + Reserve(levels_footprint); + + *reinterpret_cast<uint32_t*>(Head()) = rle_bytes; + memcpy(Head() + sizeof(uint32_t), encoder.buffer(), rle_bytes); + buffer_size_ += levels_footprint; + } +}; + +} // namespace test + +} // namespace parquet_cpp + +#endif // PARQUET_COLUMN_TEST_UTIL_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/encodings/encodings.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encodings.h b/src/parquet/encodings/encodings.h index b30146a..4fb3d9a 100644 --- a/src/parquet/encodings/encodings.h +++ b/src/parquet/encodings/encodings.h @@ -67,6 +67,40 @@ class Decoder { int num_values_; }; + +// Base class for value encoders. Since encoders may or not have state (e.g., +// dictionary encoding) we use a class instance to maintain any state. +// +// TODO(wesm): Encode interface API is temporary +template <int TYPE> +class Encoder { + public: + typedef typename type_traits<TYPE>::value_type T; + + virtual ~Encoder() {} + + // TODO(wesm): use an output stream + + // Subclasses should override the ones they support + // + // @returns: the number of bytes written to dst + virtual size_t Encode(const T* src, int num_values, uint8_t* dst) { + throw ParquetException("Encoder does not implement this type."); + return 0; + } + + const parquet::Encoding::type encoding() const { return encoding_; } + + protected: + explicit Encoder(const parquet::SchemaElement* schema, + const parquet::Encoding::type& encoding) + : schema_(schema), encoding_(encoding) {} + + // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY + const parquet::SchemaElement* schema_; + const parquet::Encoding::type encoding_; +}; + } // namespace parquet_cpp #include "parquet/encodings/plain-encoding.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/encodings/plain-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h index e8f8977..11e70c7 100644 --- a/src/parquet/encodings/plain-encoding.h +++ b/src/parquet/encodings/plain-encoding.h @@ -22,8 +22,13 @@ #include <algorithm> +using parquet::Type; + namespace parquet_cpp { +// ---------------------------------------------------------------------- +// Encoding::PLAIN decoder implementation + template <int TYPE> class PlainDecoder : public Decoder<TYPE> { public: @@ -60,7 +65,7 @@ inline int PlainDecoder<TYPE>::Decode(T* buffer, int max_values) { // Template specialization for BYTE_ARRAY template <> -inline int PlainDecoder<parquet::Type::BYTE_ARRAY>::Decode(ByteArray* buffer, +inline int PlainDecoder<Type::BYTE_ARRAY>::Decode(ByteArray* buffer, int max_values) { max_values = std::min(max_values, num_values_); for (int i = 0; i < max_values; ++i) { @@ -76,7 +81,7 @@ inline int PlainDecoder<parquet::Type::BYTE_ARRAY>::Decode(ByteArray* buffer, // Template specialization for FIXED_LEN_BYTE_ARRAY template <> -inline int PlainDecoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>::Decode( +inline int PlainDecoder<Type::FIXED_LEN_BYTE_ARRAY>::Decode( FixedLenByteArray* buffer, int max_values) { max_values = std::min(max_values, num_values_); int len = schema_->type_length; @@ -91,10 +96,10 @@ inline int PlainDecoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>::Decode( } template <> -class PlainDecoder<parquet::Type::BOOLEAN> : public Decoder<parquet::Type::BOOLEAN> { +class PlainDecoder<Type::BOOLEAN> : public Decoder<Type::BOOLEAN> { public: explicit PlainDecoder(const parquet::SchemaElement* schema) : - Decoder<parquet::Type::BOOLEAN>(schema, parquet::Encoding::PLAIN) {} + Decoder<Type::BOOLEAN>(schema, parquet::Encoding::PLAIN) {} virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; @@ -113,6 +118,49 @@ class PlainDecoder<parquet::Type::BOOLEAN> : public Decoder<parquet::Type::BOOLE RleDecoder decoder_; }; +// ---------------------------------------------------------------------- +// Encoding::PLAIN encoder implementation + +template <int TYPE> +class PlainEncoder : public Encoder<TYPE> { + public: + typedef typename type_traits<TYPE>::value_type T; + + explicit PlainEncoder(const parquet::SchemaElement* schema) : + Encoder<TYPE>(schema, parquet::Encoding::PLAIN) {} + + virtual size_t Encode(const T* src, int num_values, uint8_t* dst); +}; + +template <int TYPE> +inline size_t PlainEncoder<TYPE>::Encode(const T* buffer, int num_values, + uint8_t* dst) { + size_t nbytes = num_values * sizeof(T); + memcpy(dst, buffer, nbytes); + return nbytes; +} + +template <> +inline size_t PlainEncoder<Type::BOOLEAN>::Encode( + const bool* src, int num_values, uint8_t* dst) { + ParquetException::NYI("bool encoding"); + return 0; +} + +template <> +inline size_t PlainEncoder<Type::BYTE_ARRAY>::Encode(const ByteArray* src, + int num_values, uint8_t* dst) { + ParquetException::NYI("byte array encoding"); + return 0; +} + +template <> +inline size_t PlainEncoder<Type::FIXED_LEN_BYTE_ARRAY>::Encode( + const FixedLenByteArray* src, int num_values, uint8_t* dst) { + ParquetException::NYI("FLBA encoding"); + return 0; +} + } // namespace parquet_cpp #endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc index a43a2a5..a4e767e 100644 --- a/src/parquet/reader.cc +++ b/src/parquet/reader.cc @@ -25,7 +25,9 @@ #include <vector> #include "parquet/column/reader.h" +#include "parquet/column/serialized-page.h" #include "parquet/column/scanner.h" + #include "parquet/exception.h" #include "parquet/thrift/util.h" #include "parquet/util/input_stream.h" @@ -115,8 +117,13 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(size_t i) { } // TODO(wesm): This presumes a flat schema - std::shared_ptr<ColumnReader> reader = ColumnReader::Make(&col.meta_data, - &this->parent_->metadata_.schema[i + 1], std::move(input)); + const parquet::SchemaElement* schema = &parent_->metadata_.schema[i + 1]; + + std::unique_ptr<PageReader> pager( + new SerializedPageReader(std::move(input), col.meta_data.codec)); + + std::shared_ptr<ColumnReader> reader = ColumnReader::Make(schema, + std::move(pager)); column_readers_[i] = reader; return reader; @@ -269,7 +276,7 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) { size_t nColumns = group_reader->num_columns(); for (int c = 0; c < group_reader->num_columns(); ++c) { - const parquet::ColumnMetaData* meta_data = group_reader->Column(c)->metadata(); + const parquet::ColumnMetaData* meta_data = group_reader->column_metadata(c); stream << "Column " << c << ": " << meta_data->num_values << " rows, " << meta_data->statistics.null_count << " null values, " http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/reader.h b/src/parquet/reader.h index 4c92119..16927a7 100644 --- a/src/parquet/reader.h +++ b/src/parquet/reader.h @@ -83,6 +83,10 @@ class RowGroupReader { // column. Ownership is shared with the RowGroupReader. std::shared_ptr<ColumnReader> Column(size_t i); + const parquet::ColumnMetaData* column_metadata(size_t i) const { + return &row_group_->columns[i].meta_data; + } + size_t num_columns() const { return row_group_->columns.size(); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/util/bit-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/bit-util.h b/src/parquet/util/bit-util.h index 4db585a..7a2e921 100644 --- a/src/parquet/util/bit-util.h +++ b/src/parquet/util/bit-util.h @@ -276,6 +276,14 @@ class BitUtil { static T UnsetBit(T v, int bitpos) { return v & ~(static_cast<T>(0x1) << bitpos); } + + // Returns the minimum number of bits needed to represent the value of 'x' + static inline int NumRequiredBits(uint64_t x) { + for (int i = 63; i >= 0; --i) { + if (x & 1L << i) return i + 1; + } + return 0; + } }; } // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/util/test-common.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h new file mode 100644 index 0000000..38bc32c --- /dev/null +++ b/src/parquet/util/test-common.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_UTIL_TEST_COMMON_H +#define PARQUET_UTIL_TEST_COMMON_H + +#include <iostream> +#include <vector> + +using std::vector; + +namespace parquet_cpp { + +namespace test { + +template <typename T> +static inline bool vector_equal(const vector<T>& left, const vector<T>& right) { + if (left.size() != right.size()) { + return false; + } + + for (size_t i = 0; i < left.size(); ++i) { + if (left[i] != right[i]) { + std::cerr << "index " << i + << " left was " << left[i] + << " right was " << right[i] + << std::endl; + return false; + } + } + + return true; +} + +} // namespace test + +} // namespace parquet_cpp + +#endif // PARQUET_UTIL_TEST_COMMON_H
