PARQUET-446: Hide Thrift compiled headers and Boost from public API, #include scrubbing
This is the completion of work I started in PARQUET-442. This also resolves PARQUET-277 as no boost headers are included in the public API anymore. I've done some scrubbing of #includes using Google's Clang-based include-what-you-use tool. PARQUET-522 can also be resolved when this is merged. Author: Wes McKinney <[email protected]> Closes #49 from wesm/PARQUET-446 and squashes the following commits: e805a0c [Wes McKinney] Use int64_t for scanner batch sizes 503b1c1 [Wes McKinney] Fix mixed-up include guard names 4c02d2b [Wes McKinney] Refactor monolithic encodings/encodings.h 9e28fc3 [Wes McKinney] Finished IWYU path. Some imported impala code left unchanged for now 6d4af8e [Wes McKinney] Some initial IWYU 5be40d6 [Wes McKinney] Remove outdated TODO 2e39062 [Wes McKinney] Remove any boost #include dependencies 07059ca [Wes McKinney] Remove serialized-page.* files, move serialized-page-test to parquet/file 9458b36 [Wes McKinney] Add more headers to parquet.h public API b4b0412 [Wes McKinney] Remove Thrift compiled headers from public API and general use outside of deserialized-related internal headers and code paths. Add unit test to enforce this Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/b71e826f Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/b71e826f Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/b71e826f Branch: refs/heads/master Commit: b71e826f0e3a392001124f5c730d2eaa29f5a44f Parents: 05cd4ec Author: Wes McKinney <[email protected]> Authored: Mon Feb 15 15:55:18 2016 -0800 Committer: Julien Le Dem <[email protected]> Committed: Mon Feb 15 15:55:18 2016 -0800 ---------------------------------------------------------------------- CMakeLists.txt | 1 - example/decode_benchmark.cc | 6 +- src/parquet/CMakeLists.txt | 1 + src/parquet/column/CMakeLists.txt | 2 - src/parquet/column/column-reader-test.cc | 10 +- src/parquet/column/levels-test.cc | 124 ++++++++------- src/parquet/column/levels.h | 29 ++-- src/parquet/column/page.h | 149 +++++++++++++------ src/parquet/column/reader.cc | 32 ++-- src/parquet/column/reader.h | 16 +- src/parquet/column/scanner.cc | 3 +- src/parquet/column/scanner.h | 18 ++- src/parquet/column/serialized-page-test.cc | 109 -------------- src/parquet/column/serialized-page.cc | 122 --------------- src/parquet/column/serialized-page.h | 71 --------- src/parquet/column/test-util.h | 93 +++++++----- src/parquet/compression/lz4-codec.cc | 5 +- src/parquet/compression/snappy-codec.cc | 4 + src/parquet/encodings/CMakeLists.txt | 3 +- src/parquet/encodings/decoder.h | 70 +++++++++ src/parquet/encodings/delta-bit-pack-encoding.h | 8 +- .../encodings/delta-byte-array-encoding.h | 8 +- .../delta-length-byte-array-encoding.h | 9 +- src/parquet/encodings/dictionary-encoding.h | 9 +- src/parquet/encodings/encoder.h | 61 ++++++++ src/parquet/encodings/encodings.h | 111 -------------- src/parquet/encodings/plain-encoding-test.cc | 8 +- src/parquet/encodings/plain-encoding.h | 16 +- src/parquet/file/CMakeLists.txt | 2 + src/parquet/file/file-deserialize-test.cc | 111 ++++++++++++++ src/parquet/file/reader-internal.cc | 127 +++++++++++++++- src/parquet/file/reader-internal.h | 49 +++++- src/parquet/file/reader.cc | 6 +- src/parquet/file/reader.h | 8 +- src/parquet/parquet.h | 10 ++ src/parquet/public-api-test.cc | 29 ++++ src/parquet/reader-test.cc | 3 +- src/parquet/schema/CMakeLists.txt | 3 +- src/parquet/schema/converter.cc | 28 +--- src/parquet/schema/converter.h | 22 +-- src/parquet/schema/descriptor.h | 2 + src/parquet/schema/printer.cc | 14 +- src/parquet/schema/printer.h | 6 +- src/parquet/schema/schema-converter-test.cc | 10 +- src/parquet/schema/schema-descriptor-test.cc | 7 +- src/parquet/schema/schema-printer-test.cc | 10 +- src/parquet/schema/schema-types-test.cc | 11 +- src/parquet/schema/types.cc | 23 +-- src/parquet/schema/types.h | 5 +- src/parquet/thrift/serializer-test.cc | 17 +-- src/parquet/thrift/util.h | 30 +++- src/parquet/types.h | 1 - src/parquet/util/bit-util.h | 31 +++- src/parquet/util/input.h | 1 + src/parquet/util/output-test.cc | 6 +- src/parquet/util/output.cc | 2 - src/parquet/util/output.h | 1 - 57 files changed, 916 insertions(+), 757 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index ec7d66b..62182c4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -296,7 +296,6 @@ endif() # Library config set(LIBPARQUET_SRCS - src/parquet/column/serialized-page.cc src/parquet/column/reader.cc src/parquet/column/scanner.cc http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/example/decode_benchmark.cc ---------------------------------------------------------------------- diff --git a/example/decode_benchmark.cc b/example/decode_benchmark.cc index b9076bf..ce16588 100644 --- a/example/decode_benchmark.cc +++ b/example/decode_benchmark.cc @@ -20,7 +20,11 @@ #include <stdio.h> #include "parquet/compression/codec.h" -#include "parquet/encodings/encodings.h" +#include "parquet/encodings/plain-encoding.h" +#include "parquet/encodings/dictionary-encoding.h" +#include "parquet/encodings/delta-bit-pack-encoding.h" +#include "parquet/encodings/delta-byte-array-encoding.h" +#include "parquet/encodings/delta-length-byte-array-encoding.h" #include "parquet/util/stopwatch.h" using namespace parquet_cpp; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt index 6a47917..97547ce 100644 --- a/src/parquet/CMakeLists.txt +++ b/src/parquet/CMakeLists.txt @@ -22,4 +22,5 @@ install(FILES types.h DESTINATION include/parquet) +ADD_PARQUET_TEST(public-api-test) ADD_PARQUET_TEST(reader-test) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/column/CMakeLists.txt b/src/parquet/column/CMakeLists.txt index 32ec11c..99b4ed2 100644 --- a/src/parquet/column/CMakeLists.txt +++ b/src/parquet/column/CMakeLists.txt @@ -20,10 +20,8 @@ install(FILES page.h levels.h reader.h - serialized-page.h scanner.h DESTINATION include/parquet/column) ADD_PARQUET_TEST(column-reader-test) ADD_PARQUET_TEST(levels-test) -ADD_PARQUET_TEST(serialized-page-test) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/column-reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc index 84a36db..0abdf79 100644 --- a/src/parquet/column/column-reader-test.cc +++ b/src/parquet/column/column-reader-test.cc @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. +#include <algorithm> #include <cstdint> #include <cstdlib> -#include <iostream> -#include <sstream> +#include <memory> #include <string> #include <vector> @@ -28,15 +28,13 @@ #include "parquet/column/page.h" #include "parquet/column/reader.h" #include "parquet/column/test-util.h" - -#include "parquet/util/output.h" +#include "parquet/schema/descriptor.h" +#include "parquet/schema/types.h" #include "parquet/util/test-common.h" using std::string; using std::vector; using std::shared_ptr; -using parquet::FieldRepetitionType; -using parquet::SchemaElement; namespace parquet_cpp { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/levels-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/levels-test.cc b/src/parquet/column/levels-test.cc index 99cc21e..6061d23 100644 --- a/src/parquet/column/levels-test.cc +++ b/src/parquet/column/levels-test.cc @@ -15,98 +15,94 @@ // specific language governing permissions and limitations // under the License. -#include <cstdlib> -#include <iostream> -#include <sstream> +#include <cstdint> #include <string> +#include <vector> #include <gtest/gtest.h> -#include "parquet/thrift/parquet_types.h" #include "parquet/column/levels.h" +#include "parquet/types.h" using std::string; namespace parquet_cpp { -class TestLevels : public ::testing::Test { - public: - int GenerateLevels(int min_repeat_factor, int max_repeat_factor, - int max_level, std::vector<int16_t>& input_levels) { - int total_count = 0; - // for each repetition count upto max_repeat_factor - for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) { - // repeat count increase by a factor of 2 for every iteration - int repeat_count = (1 << repeat); - // generate levels for repetition count upto the maximum level - int value = 0; - int bwidth = 0; - while (value <= max_level) { - for (int i = 0; i < repeat_count; i++) { - input_levels[total_count++] = value; - } - value = (2 << bwidth) - 1; - bwidth++; +int GenerateLevels(int min_repeat_factor, int max_repeat_factor, + int max_level, std::vector<int16_t>& input_levels) { + int total_count = 0; + // for each repetition count upto max_repeat_factor + for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) { + // repeat count increase by a factor of 2 for every iteration + int repeat_count = (1 << repeat); + // generate levels for repetition count upto the maximum level + int value = 0; + int bwidth = 0; + while (value <= max_level) { + for (int i = 0; i < repeat_count; i++) { + input_levels[total_count++] = value; } + value = (2 << bwidth) - 1; + bwidth++; } - return total_count; } + return total_count; +} - void VerifyLevelsEncoding(parquet::Encoding::type encoding, int max_level, - std::vector<int16_t>& input_levels) { - LevelEncoder encoder; - LevelDecoder decoder; - int levels_count = 0; - std::vector<int16_t> output_levels; - std::vector<uint8_t> bytes; - int num_levels = input_levels.size(); - output_levels.resize(num_levels); - bytes.resize(2 * num_levels); - ASSERT_EQ(num_levels, output_levels.size()); - ASSERT_EQ(2 * num_levels, bytes.size()); - // start encoding and decoding - if (encoding == parquet::Encoding::RLE) { - // leave space to write the rle length value - encoder.Init(encoding, max_level, num_levels, - bytes.data() + sizeof(uint32_t), bytes.size()); - - levels_count = encoder.Encode(num_levels, input_levels.data()); - (reinterpret_cast<uint32_t*>(bytes.data()))[0] = encoder.len(); - - } else { - encoder.Init(encoding, max_level, num_levels, - bytes.data(), bytes.size()); - levels_count = encoder.Encode(num_levels, input_levels.data()); - } +void VerifyLevelsEncoding(Encoding::type encoding, int max_level, + std::vector<int16_t>& input_levels) { + LevelEncoder encoder; + LevelDecoder decoder; + int levels_count = 0; + std::vector<int16_t> output_levels; + std::vector<uint8_t> bytes; + int num_levels = input_levels.size(); + output_levels.resize(num_levels); + bytes.resize(2 * num_levels); + ASSERT_EQ(num_levels, output_levels.size()); + ASSERT_EQ(2 * num_levels, bytes.size()); + // start encoding and decoding + if (encoding == Encoding::RLE) { + // leave space to write the rle length value + encoder.Init(encoding, max_level, num_levels, + bytes.data() + sizeof(uint32_t), bytes.size()); + + levels_count = encoder.Encode(num_levels, input_levels.data()); + (reinterpret_cast<uint32_t*>(bytes.data()))[0] = encoder.len(); + + } else { + encoder.Init(encoding, max_level, num_levels, + bytes.data(), bytes.size()); + levels_count = encoder.Encode(num_levels, input_levels.data()); + } - ASSERT_EQ(num_levels, levels_count); + ASSERT_EQ(num_levels, levels_count); - decoder.Init(encoding, max_level, num_levels, bytes.data()); - levels_count = decoder.Decode(num_levels, output_levels.data()); + decoder.Init(encoding, max_level, num_levels, bytes.data()); + levels_count = decoder.Decode(num_levels, output_levels.data()); - ASSERT_EQ(num_levels, levels_count); + ASSERT_EQ(num_levels, levels_count); - for (int i = 0; i < num_levels; i++) { - EXPECT_EQ(input_levels[i], output_levels[i]); - } + for (int i = 0; i < num_levels; i++) { + EXPECT_EQ(input_levels[i], output_levels[i]); } -}; +} + +TEST(TestLevels, TestEncodeDecodeLevels) { + // test levels with maximum bit-width from 1 to 8 + // increase the repetition count for each iteration by a factor of 2 -// test levels with maximum bit-width from 1 to 8 -// increase the repetition count for each iteration by a factor of 2 -TEST_F(TestLevels, TestEncodeDecodeLevels) { int min_repeat_factor = 0; int max_repeat_factor = 7; // 128 int max_bit_width = 8; std::vector<int16_t> input_levels; - parquet::Encoding::type encodings[2] = {parquet::Encoding::RLE, - parquet::Encoding::BIT_PACKED}; + Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED}; // for each encoding for (int encode = 0; encode < 2; encode++) { - parquet::Encoding::type encoding = encodings[encode]; + Encoding::type encoding = encodings[encode]; // BIT_PACKED requires a sequence of atleast 8 - if (encoding == parquet::Encoding::BIT_PACKED) min_repeat_factor = 3; + if (encoding == Encoding::BIT_PACKED) min_repeat_factor = 3; // for each maximum bit-width for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/levels.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/levels.h b/src/parquet/column/levels.h index 4056223..18fd0bb 100644 --- a/src/parquet/column/levels.h +++ b/src/parquet/column/levels.h @@ -18,9 +18,10 @@ #ifndef PARQUET_COLUMN_LEVELS_H #define PARQUET_COLUMN_LEVELS_H +#include <memory> + #include "parquet/exception.h" -#include "parquet/thrift/parquet_types.h" -#include "parquet/encodings/encodings.h" +#include "parquet/types.h" #include "parquet/util/rle-encoding.h" namespace parquet_cpp { @@ -30,16 +31,16 @@ class LevelEncoder { LevelEncoder() {} // Initialize the LevelEncoder. - void Init(parquet::Encoding::type encoding, int16_t max_level, + void Init(Encoding::type encoding, int16_t max_level, int num_buffered_values, uint8_t* data, int data_size) { bit_width_ = BitUtil::Log2(max_level + 1); encoding_ = encoding; switch (encoding) { - case parquet::Encoding::RLE: { + case Encoding::RLE: { rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_)); break; } - case parquet::Encoding::BIT_PACKED: { + case Encoding::BIT_PACKED: { int num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8); bit_packed_encoder_.reset(new BitWriter(data, num_bytes)); break; @@ -56,7 +57,7 @@ class LevelEncoder { throw ParquetException("Level encoders are not initialized."); } - if (encoding_ == parquet::Encoding::RLE) { + if (encoding_ == Encoding::RLE) { for (size_t i = 0; i < batch_size; ++i) { if (!rle_encoder_->Put(*(levels + i))) { break; @@ -78,14 +79,16 @@ class LevelEncoder { } int32_t len() { - assert(encoding_ == parquet::Encoding::RLE); + if (encoding_ != Encoding::RLE) { + throw ParquetException("Only implemented for RLE encoding"); + } return rle_length_; } private: int bit_width_; int rle_length_; - parquet::Encoding::type encoding_; + Encoding::type encoding_; std::unique_ptr<RleEncoder> rle_encoder_; std::unique_ptr<BitWriter> bit_packed_encoder_; }; @@ -96,20 +99,20 @@ class LevelDecoder { LevelDecoder() {} // Initialize the LevelDecoder and return the number of bytes consumed - size_t Init(parquet::Encoding::type encoding, int16_t max_level, + size_t Init(Encoding::type encoding, int16_t max_level, int num_buffered_values, const uint8_t* data) { uint32_t num_bytes = 0; uint32_t total_bytes = 0; bit_width_ = BitUtil::Log2(max_level + 1); encoding_ = encoding; switch (encoding) { - case parquet::Encoding::RLE: { + case Encoding::RLE: { num_bytes = *reinterpret_cast<const uint32_t*>(data); const uint8_t* decoder_data = data + sizeof(uint32_t); rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_)); return sizeof(uint32_t) + num_bytes; } - case parquet::Encoding::BIT_PACKED: { + case Encoding::BIT_PACKED: { num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8); bit_packed_decoder_.reset(new BitReader(data, num_bytes)); return num_bytes; @@ -127,7 +130,7 @@ class LevelDecoder { throw ParquetException("Level decoders are not initialized."); } - if (encoding_ == parquet::Encoding::RLE) { + if (encoding_ == Encoding::RLE) { for (size_t i = 0; i < batch_size; ++i) { if (!rle_decoder_->Get(levels + i)) { break; @@ -147,7 +150,7 @@ class LevelDecoder { private: int bit_width_; - parquet::Encoding::type encoding_; + Encoding::type encoding_; std::unique_ptr<RleDecoder> rle_decoder_; std::unique_ptr<BitReader> bit_packed_decoder_; }; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/page.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h index f2740b6..3308a1c 100644 --- a/src/parquet/column/page.h +++ b/src/parquet/column/page.h @@ -22,32 +22,28 @@ #ifndef PARQUET_COLUMN_PAGE_H #define PARQUET_COLUMN_PAGE_H -#include "parquet/thrift/parquet_types.h" +#include <cstdint> +#include <memory> + +#include "parquet/types.h" namespace parquet_cpp { -// Note: Copying the specific page header Thrift metadata to the Page object -// (instead of using a pointer) presently so that data pages can be -// decompressed and processed in parallel. We can turn the header members of -// these classes into pointers at some point, but the downside is that -// applications materializing multiple data pages at once will have to have a -// data container that manages the lifetime of the deserialized -// parquet::PageHeader structs. -// // TODO: Parallel processing is not yet safe because of memory-ownership // semantics (the PageReader may or may not own the memory referenced by a // page) +// +// TODO(wesm): In the future Parquet implementations may store the crc code +// in parquet::PageHeader. parquet-mr currently does not, so we also skip it +// here, both on the read and write path class Page { - // TODO(wesm): In the future Parquet implementations may store the crc code - // in parquet::PageHeader. parquet-mr currently does not, so we also skip it - // here, both on the read and write path public: - Page(const uint8_t* buffer, size_t buffer_size, parquet::PageType::type type) : + Page(const uint8_t* buffer, int32_t buffer_size, PageType::type type) : buffer_(buffer), buffer_size_(buffer_size), type_(type) {} - parquet::PageType::type type() const { + PageType::type type() const { return type_; } @@ -57,71 +53,138 @@ class Page { } // @returns: the total size in bytes of the page's data buffer - size_t size() const { + int32_t size() const { return buffer_size_; } private: const uint8_t* buffer_; - size_t buffer_size_; + int32_t buffer_size_; - parquet::PageType::type type_; + PageType::type type_; }; class DataPage : public Page { public: - DataPage(const uint8_t* buffer, size_t buffer_size, - const parquet::DataPageHeader& header) : - Page(buffer, buffer_size, parquet::PageType::DATA_PAGE), - header_(header) {} - - size_t num_values() const { - return header_.num_values; + DataPage(const uint8_t* buffer, int32_t buffer_size, + int32_t num_values, Encoding::type encoding, + Encoding::type definition_level_encoding, + Encoding::type repetition_level_encoding) : + Page(buffer, buffer_size, PageType::DATA_PAGE), + num_values_(num_values), + encoding_(encoding), + definition_level_encoding_(definition_level_encoding), + repetition_level_encoding_(repetition_level_encoding) {} + + int32_t num_values() const { + return num_values_; } - parquet::Encoding::type encoding() const { - return header_.encoding; + Encoding::type encoding() const { + return encoding_; } - parquet::Encoding::type repetition_level_encoding() const { - return header_.repetition_level_encoding; + Encoding::type repetition_level_encoding() const { + return repetition_level_encoding_; } - parquet::Encoding::type definition_level_encoding() const { - return header_.definition_level_encoding; + Encoding::type definition_level_encoding() const { + return definition_level_encoding_; } private: - parquet::DataPageHeader header_; + int32_t num_values_; + Encoding::type encoding_; + Encoding::type definition_level_encoding_; + Encoding::type repetition_level_encoding_; + + // TODO(wesm): parquet::DataPageHeader.statistics }; class DataPageV2 : public Page { public: - DataPageV2(const uint8_t* buffer, size_t buffer_size, - const parquet::DataPageHeaderV2& header) : - Page(buffer, buffer_size, parquet::PageType::DATA_PAGE_V2), - header_(header) {} + DataPageV2(const uint8_t* buffer, int32_t buffer_size, + int32_t num_values, int32_t num_nulls, int32_t num_rows, + Encoding::type encoding, + int32_t definition_levels_byte_length, + int32_t repetition_levels_byte_length, bool is_compressed = false) : + Page(buffer, buffer_size, PageType::DATA_PAGE_V2), + num_values_(num_values), + num_nulls_(num_nulls), + num_rows_(num_rows), + encoding_(encoding), + definition_levels_byte_length_(definition_levels_byte_length), + repetition_levels_byte_length_(repetition_levels_byte_length), + is_compressed_(is_compressed) {} + + int32_t num_values() const { + return num_values_; + } + + int32_t num_nulls() const { + return num_nulls_; + } + + int32_t num_rows() const { + return num_rows_; + } + + Encoding::type encoding() const { + return encoding_; + } + + int32_t definition_levels_byte_length() const { + return definition_levels_byte_length_; + } + + int32_t repetition_levels_byte_length() const { + return repetition_levels_byte_length_; + } + + bool is_compressed() const { + return is_compressed_; + } private: - parquet::DataPageHeaderV2 header_; + int32_t num_values_; + int32_t num_nulls_; + int32_t num_rows_; + Encoding::type encoding_; + int32_t definition_levels_byte_length_; + int32_t repetition_levels_byte_length_; + bool is_compressed_; + + // TODO(wesm): parquet::DataPageHeaderV2.statistics }; class DictionaryPage : public Page { public: - DictionaryPage(const uint8_t* buffer, size_t buffer_size, - const parquet::DictionaryPageHeader& header) : - Page(buffer, buffer_size, parquet::PageType::DICTIONARY_PAGE), - header_(header) {} + DictionaryPage(const uint8_t* buffer, int32_t buffer_size, + int32_t num_values, Encoding::type encoding, bool is_sorted = false) : + Page(buffer, buffer_size, PageType::DICTIONARY_PAGE), + num_values_(num_values), + encoding_(encoding), + is_sorted_(is_sorted) {} + + int32_t num_values() const { + return num_values_; + } + + Encoding::type encoding() const { + return encoding_; + } - size_t num_values() const { - return header_.num_values; + bool is_sorted() const { + return is_sorted_; } private: - parquet::DictionaryPageHeader header_; + int32_t num_values_; + Encoding::type encoding_; + bool is_sorted_; }; // Abstract page iterator interface. This way, we can feed column pages to the http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc index 878bd4f..4ba0616 100644 --- a/src/parquet/column/reader.cc +++ b/src/parquet/column/reader.cc @@ -18,13 +18,13 @@ #include "parquet/column/reader.h" #include <algorithm> +#include <cstdint> #include <memory> -#include <string> -#include <string.h> #include "parquet/column/page.h" -#include "parquet/encodings/encodings.h" +#include "parquet/encodings/dictionary-encoding.h" +#include "parquet/encodings/plain-encoding.h" namespace parquet_cpp { @@ -37,7 +37,7 @@ ColumnReader::ColumnReader(const ColumnDescriptor* descr, template <int TYPE> void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) { - int encoding = static_cast<int>(parquet::Encoding::RLE_DICTIONARY); + int encoding = static_cast<int>(Encoding::RLE_DICTIONARY); auto it = decoders_.find(encoding); if (it != decoders_.end()) { @@ -61,9 +61,9 @@ void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) { // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index // encoding. -static bool IsDictionaryIndexEncoding(const parquet::Encoding::type& e) { - return e == parquet::Encoding::RLE_DICTIONARY || - e == parquet::Encoding::PLAIN_DICTIONARY; +static bool IsDictionaryIndexEncoding(const Encoding::type& e) { + return e == Encoding::RLE_DICTIONARY || + e == Encoding::PLAIN_DICTIONARY; } template <int TYPE> @@ -78,10 +78,10 @@ bool TypedColumnReader<TYPE>::ReadNewPage() { return false; } - if (current_page_->type() == parquet::PageType::DICTIONARY_PAGE) { + if (current_page_->type() == PageType::DICTIONARY_PAGE) { ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get())); continue; - } else if (current_page_->type() == parquet::PageType::DATA_PAGE) { + } else if (current_page_->type() == PageType::DATA_PAGE) { const DataPage* page = static_cast<const DataPage*>(current_page_.get()); // Read a data page. @@ -123,10 +123,10 @@ bool TypedColumnReader<TYPE>::ReadNewPage() { // Get a decoder object for this page or create a new decoder if this is the // first page with this encoding. - parquet::Encoding::type encoding = page->encoding(); + Encoding::type encoding = page->encoding(); if (IsDictionaryIndexEncoding(encoding)) { - encoding = parquet::Encoding::RLE_DICTIONARY; + encoding = Encoding::RLE_DICTIONARY; } auto it = decoders_.find(static_cast<int>(encoding)); @@ -134,18 +134,18 @@ bool TypedColumnReader<TYPE>::ReadNewPage() { current_decoder_ = it->second.get(); } else { switch (encoding) { - case parquet::Encoding::PLAIN: { + case Encoding::PLAIN: { std::shared_ptr<DecoderType> decoder(new PlainDecoder<TYPE>(descr_)); decoders_[static_cast<int>(encoding)] = decoder; current_decoder_ = decoder.get(); break; } - case parquet::Encoding::RLE_DICTIONARY: + case Encoding::RLE_DICTIONARY: throw ParquetException("Dictionary page must be before data page."); - case parquet::Encoding::DELTA_BINARY_PACKED: - case parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY: - case parquet::Encoding::DELTA_BYTE_ARRAY: + case Encoding::DELTA_BINARY_PACKED: + case Encoding::DELTA_LENGTH_BYTE_ARRAY: + case Encoding::DELTA_BYTE_ARRAY: ParquetException::NYI("Unsupported encoding"); default: http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h index 4585de8..d11a13c 100644 --- a/src/parquet/column/reader.h +++ b/src/parquet/column/reader.h @@ -22,25 +22,17 @@ #include <cstdint> #include <cstring> #include <memory> -#include <string> #include <unordered_map> -#include <vector> - -#include "parquet/exception.h" -#include "parquet/types.h" +#include "parquet/column/levels.h" #include "parquet/column/page.h" -#include "parquet/encodings/encodings.h" +#include "parquet/encodings/decoder.h" +#include "parquet/exception.h" #include "parquet/schema/descriptor.h" -#include "parquet/util/rle-encoding.h" -#include "parquet/column/levels.h" +#include "parquet/types.h" namespace parquet_cpp { - -class Codec; -class Scanner; - class ColumnReader { public: ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/scanner.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/scanner.cc b/src/parquet/column/scanner.cc index 58f1460..4a0b32f 100644 --- a/src/parquet/column/scanner.cc +++ b/src/parquet/column/scanner.cc @@ -17,6 +17,7 @@ #include "parquet/column/scanner.h" +#include <cstdint> #include <memory> #include "parquet/column/reader.h" @@ -24,7 +25,7 @@ namespace parquet_cpp { std::shared_ptr<Scanner> Scanner::Make(std::shared_ptr<ColumnReader> col_reader, - size_t batch_size) { + int64_t batch_size) { switch (col_reader->type()) { case Type::BOOLEAN: return std::make_shared<BoolScanner>(col_reader, batch_size); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/scanner.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h index 17fd5f6..512f540 100644 --- a/src/parquet/column/scanner.h +++ b/src/parquet/column/scanner.h @@ -18,24 +18,26 @@ #ifndef PARQUET_COLUMN_SCANNER_H #define PARQUET_COLUMN_SCANNER_H +#include <stdio.h> +#include <cstdint> #include <memory> #include <ostream> #include <string> #include <vector> #include "parquet/column/reader.h" - +#include "parquet/exception.h" #include "parquet/schema/descriptor.h" #include "parquet/types.h" namespace parquet_cpp { -static constexpr size_t DEFAULT_SCANNER_BATCH_SIZE = 128; +static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128; class Scanner { public: explicit Scanner(std::shared_ptr<ColumnReader> reader, - size_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) : + int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) : batch_size_(batch_size), level_offset_(0), levels_buffered_(0), @@ -50,7 +52,7 @@ class Scanner { virtual ~Scanner() {} static std::shared_ptr<Scanner> Make(std::shared_ptr<ColumnReader> col_reader, - size_t batch_size = DEFAULT_SCANNER_BATCH_SIZE); + int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE); virtual void PrintNext(std::ostream& out, int width) = 0; @@ -62,14 +64,14 @@ class Scanner { return reader_->descr(); } - size_t batch_size() const { return batch_size_;} + int64_t batch_size() const { return batch_size_;} - void SetBatchSize(size_t batch_size) { + void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; } protected: - size_t batch_size_; + int64_t batch_size_; std::vector<int16_t> def_levels_; std::vector<int16_t> rep_levels_; @@ -91,7 +93,7 @@ class TypedScanner : public Scanner { typedef typename type_traits<TYPE>::value_type T; explicit TypedScanner(std::shared_ptr<ColumnReader> reader, - size_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) : + int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) : Scanner(reader, batch_size) { typed_reader_ = static_cast<TypedColumnReader<TYPE>*>(reader.get()); size_t value_byte_size = type_traits<TYPE>::value_byte_size; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/serialized-page-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/serialized-page-test.cc b/src/parquet/column/serialized-page-test.cc deleted file mode 100644 index 5c49021..0000000 --- a/src/parquet/column/serialized-page-test.cc +++ /dev/null @@ -1,109 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <cstdlib> -#include <iostream> -#include <sstream> -#include <string> - -#include <gtest/gtest.h> - -#include "parquet/types.h" -#include "parquet/thrift/parquet_types.h" -#include "parquet/thrift/util.h" -#include "parquet/column/serialized-page.h" -#include "parquet/column/page.h" -#include "parquet/column/reader.h" -#include "parquet/column/test-util.h" - - -namespace parquet_cpp { - -class TestSerializedPage : public ::testing::Test { - public: - void InitSerializedPageReader(const uint8_t* buffer, size_t header_size, - parquet::CompressionCodec::type codec) { - std::unique_ptr<InputStream> stream; - stream.reset(new InMemoryInputStream(buffer, header_size)); - page_reader_.reset(new SerializedPageReader(std::move(stream), codec)); - } - - protected: - std::unique_ptr<SerializedPageReader> page_reader_; -}; - -TEST_F(TestSerializedPage, TestLargePageHeaders) { - parquet::PageHeader in_page_header; - parquet::DataPageHeader data_page_header; - parquet::PageHeader out_page_header; - parquet::Statistics stats; - int expected_header_size = 512 * 1024; //512 KB - int stats_size = 256 * 1024; // 256 KB - std::string serialized_buffer; - int num_values = 4141; - - InitStats(stats_size, stats); - InitDataPage(stats, data_page_header, num_values); - InitPageHeader(data_page_header, in_page_header); - - // Serialize the Page header - ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header, - expected_header_size)); - // check header size is between 256 KB to 16 MB - ASSERT_LE(stats_size, serialized_buffer.length()); - ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length()); - - InitSerializedPageReader(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()), - serialized_buffer.length(), parquet::CompressionCodec::UNCOMPRESSED); - - std::shared_ptr<Page> current_page = page_reader_->NextPage(); - ASSERT_EQ(parquet::PageType::DATA_PAGE, current_page->type()); - const DataPage* page = static_cast<const DataPage*>(current_page.get()); - ASSERT_EQ(num_values, page->num_values()); -} - -TEST_F(TestSerializedPage, TestFailLargePageHeaders) { - parquet::PageHeader in_page_header; - parquet::DataPageHeader data_page_header; - parquet::PageHeader out_page_header; - parquet::Statistics stats; - int expected_header_size = 512 * 1024; // 512 KB - int stats_size = 256 * 1024; // 256 KB - int max_header_size = 128 * 1024; // 128 KB - int num_values = 4141; - std::string serialized_buffer; - - InitStats(stats_size, stats); - InitDataPage(stats, data_page_header, num_values); - InitPageHeader(data_page_header, in_page_header); - - // Serialize the Page header - ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header, - expected_header_size)); - // check header size is between 256 KB to 16 MB - ASSERT_LE(stats_size, serialized_buffer.length()); - ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length()); - - InitSerializedPageReader(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()), - serialized_buffer.length(), parquet::CompressionCodec::UNCOMPRESSED); - - // Set the max page header size to 128 KB, which is less than the current header size - page_reader_->set_max_page_header_size(max_header_size); - - ASSERT_THROW(page_reader_->NextPage(), ParquetException); -} -} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/serialized-page.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/serialized-page.cc b/src/parquet/column/serialized-page.cc deleted file mode 100644 index 56b73a7..0000000 --- a/src/parquet/column/serialized-page.cc +++ /dev/null @@ -1,122 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "parquet/column/serialized-page.h" - -#include <memory> - -#include "parquet/exception.h" -#include "parquet/thrift/util.h" - -using parquet::PageType; - -namespace parquet_cpp { - -// ---------------------------------------------------------------------- -// SerializedPageReader deserializes Thrift metadata and pages that have been -// assembled in a serialized stream for storing in a Parquet files - -SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream, - parquet::CompressionCodec::type codec) : - stream_(std::move(stream)) { - max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE; - switch (codec) { - case parquet::CompressionCodec::UNCOMPRESSED: - break; - case parquet::CompressionCodec::SNAPPY: - decompressor_.reset(new SnappyCodec()); - break; - default: - ParquetException::NYI("Reading compressed data"); - } -} - - -std::shared_ptr<Page> SerializedPageReader::NextPage() { - // Loop here because there may be unhandled page types that we skip until - // finding a page that we do know what to do with - while (true) { - int64_t bytes_read = 0; - int64_t bytes_available = 0; - uint32_t header_size = 0; - const uint8_t* buffer; - uint32_t allowed_page_size = DEFAULT_PAGE_HEADER_SIZE; - std::stringstream ss; - - // Page headers can be very large because of page statistics - // We try to deserialize a larger buffer progressively - // until a maximum allowed header limit - while (true) { - buffer = stream_->Peek(allowed_page_size, &bytes_available); - if (bytes_available == 0) { - return std::shared_ptr<Page>(nullptr); - } - - // This gets used, then set by DeserializeThriftMsg - header_size = bytes_available; - try { - DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); - break; - } catch (std::exception& e) { - // Failed to deserialize. Double the allowed page header size and try again - ss << e.what(); - allowed_page_size *= 2; - if (allowed_page_size > max_page_header_size_) { - ss << "Deserializing page header failed.\n"; - throw ParquetException(ss.str()); - } - } - } - // Advance the stream offset - stream_->Read(header_size, &bytes_read); - - int compressed_len = current_page_header_.compressed_page_size; - int uncompressed_len = current_page_header_.uncompressed_page_size; - - // Read the compressed data page. - buffer = stream_->Read(compressed_len, &bytes_read); - if (bytes_read != compressed_len) ParquetException::EofException(); - - // Uncompress it if we need to - if (decompressor_ != NULL) { - // Grow the uncompressed buffer if we need to. - if (uncompressed_len > decompression_buffer_.size()) { - decompression_buffer_.resize(uncompressed_len); - } - decompressor_->Decompress(compressed_len, buffer, uncompressed_len, - &decompression_buffer_[0]); - buffer = &decompression_buffer_[0]; - } - - if (current_page_header_.type == PageType::DICTIONARY_PAGE) { - return std::make_shared<DictionaryPage>(buffer, uncompressed_len, - current_page_header_.dictionary_page_header); - } else if (current_page_header_.type == PageType::DATA_PAGE) { - return std::make_shared<DataPage>(buffer, uncompressed_len, - current_page_header_.data_page_header); - } else if (current_page_header_.type == PageType::DATA_PAGE_V2) { - ParquetException::NYI("data page v2"); - } else { - // We don't know what this page type is. We're allowed to skip non-data - // pages. - continue; - } - } - return std::shared_ptr<Page>(nullptr); -} - -} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/serialized-page.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/serialized-page.h b/src/parquet/column/serialized-page.h deleted file mode 100644 index 62bf66d..0000000 --- a/src/parquet/column/serialized-page.h +++ /dev/null @@ -1,71 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// This module defines an abstract interface for iterating through pages in a -// Parquet column chunk within a row group. It could be extended in the future -// to iterate through all data pages in all chunks in a file. - -#ifndef PARQUET_COLUMN_SERIALIZED_PAGE_H -#define PARQUET_COLUMN_SERIALIZED_PAGE_H - -#include <memory> -#include <vector> - -#include "parquet/column/page.h" -#include "parquet/compression/codec.h" -#include "parquet/util/input.h" -#include "parquet/thrift/parquet_types.h" - -namespace parquet_cpp { - -// 16 MB is the default maximum page header size -static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024; -// 16 KB is the default expected page header size -static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024; -// This subclass delimits pages appearing in a serialized stream, each preceded -// by a serialized Thrift parquet::PageHeader indicating the type of each page -// and the page metadata. -class SerializedPageReader : public PageReader { - public: - SerializedPageReader(std::unique_ptr<InputStream> stream, - parquet::CompressionCodec::type codec); - - virtual ~SerializedPageReader() {} - - // Implement the PageReader interface - virtual std::shared_ptr<Page> NextPage(); - - void set_max_page_header_size(uint32_t size) { - max_page_header_size_ = size; - } - - private: - std::unique_ptr<InputStream> stream_; - - parquet::PageHeader current_page_header_; - std::shared_ptr<Page> current_page_; - - // Compression codec to use. - std::unique_ptr<Codec> decompressor_; - std::vector<uint8_t> decompression_buffer_; - // Maximum allowed page size - uint32_t max_page_header_size_; -}; - -} // namespace parquet_cpp - -#endif // PARQUET_COLUMN_SERIALIZED_PAGE_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h index 90dde3b..b346fc2 100644 --- a/src/parquet/column/test-util.h +++ b/src/parquet/column/test-util.h @@ -27,7 +27,14 @@ #include <vector> #include <string> +#include "parquet/column/levels.h" #include "parquet/column/page.h" + +// Depended on by SerializedPageReader test utilities for now +#include "parquet/encodings/plain-encoding.h" +#include "parquet/thrift/util.h" +#include "parquet/util/input.h" + namespace parquet_cpp { namespace test { @@ -61,36 +68,38 @@ class DataPageBuilder { typedef typename type_traits<TYPE>::value_type T; // This class writes data and metadata to the passed inputs - explicit DataPageBuilder(InMemoryOutputStream* sink, parquet::DataPageHeader* header) : + explicit DataPageBuilder(InMemoryOutputStream* sink) : sink_(sink), - header_(header), num_values_(0), + encoding_(Encoding::PLAIN), + definition_level_encoding_(Encoding::RLE), + repetition_level_encoding_(Encoding::RLE), have_def_levels_(false), have_rep_levels_(false), have_values_(false) { } - void AppendDefLevels(const std::vector<int16_t>& levels, - int16_t max_level, parquet::Encoding::type encoding) { + void AppendDefLevels(const std::vector<int16_t>& levels, int16_t max_level, + Encoding::type encoding = Encoding::RLE) { AppendLevels(levels, max_level, encoding); - num_values_ = std::max(levels.size(), num_values_); - header_->__set_definition_level_encoding(encoding); + num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_); + definition_level_encoding_ = encoding; have_def_levels_ = true; } - void AppendRepLevels(const std::vector<int16_t>& levels, - int16_t max_level, parquet::Encoding::type encoding) { + void AppendRepLevels(const std::vector<int16_t>& levels, int16_t max_level, + Encoding::type encoding = Encoding::RLE) { AppendLevels(levels, max_level, encoding); - num_values_ = std::max(levels.size(), num_values_); - header_->__set_repetition_level_encoding(encoding); + num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_); + repetition_level_encoding_ = encoding; have_rep_levels_ = true; } void AppendValues(const std::vector<T>& values, - parquet::Encoding::type encoding) { - if (encoding != parquet::Encoding::PLAIN) { + Encoding::type encoding = Encoding::PLAIN) { + if (encoding != Encoding::PLAIN) { ParquetException::NYI("only plain encoding currently implemented"); } size_t bytes_to_encode = values.size() * sizeof(T); @@ -98,31 +107,43 @@ class DataPageBuilder { PlainEncoder<TYPE> encoder(nullptr); encoder.Encode(&values[0], values.size(), sink_); - num_values_ = std::max(values.size(), num_values_); - header_->__set_encoding(encoding); + num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_); + encoding_ = encoding; have_values_ = true; } - void Finish() { - if (!have_values_) { - throw ParquetException("A data page must at least contain values"); - } - header_->__set_num_values(num_values_); + int32_t num_values() const { + return num_values_; + } + + Encoding::type encoding() const { + return encoding_; + } + + Encoding::type rep_level_encoding() const { + return repetition_level_encoding_; + } + + Encoding::type def_level_encoding() const { + return definition_level_encoding_; } private: InMemoryOutputStream* sink_; - parquet::DataPageHeader* header_; - size_t num_values_; + int32_t num_values_; + Encoding::type encoding_; + Encoding::type definition_level_encoding_; + Encoding::type repetition_level_encoding_; + bool have_def_levels_; bool have_rep_levels_; bool have_values_; // Used internally for both repetition and definition levels void AppendLevels(const std::vector<int16_t>& levels, int16_t max_level, - parquet::Encoding::type encoding) { - if (encoding != parquet::Encoding::RLE) { + Encoding::type encoding) { + if (encoding != Encoding::RLE) { ParquetException::NYI("only rle encoding currently implemented"); } @@ -152,32 +173,32 @@ static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>& values, size_t num_values = values.size(); InMemoryOutputStream page_stream; - parquet::DataPageHeader page_header; - - test::DataPageBuilder<TYPE> page_builder(&page_stream, &page_header); + test::DataPageBuilder<TYPE> page_builder(&page_stream); if (!rep_levels.empty()) { - page_builder.AppendRepLevels(rep_levels, max_rep_level, - parquet::Encoding::RLE); + page_builder.AppendRepLevels(rep_levels, max_rep_level); } if (!def_levels.empty()) { - page_builder.AppendDefLevels(def_levels, max_def_level, - parquet::Encoding::RLE); + page_builder.AppendDefLevels(def_levels, max_def_level); } - page_builder.AppendValues(values, parquet::Encoding::PLAIN); - page_builder.Finish(); - - // Hand off the data stream to the passed std::vector + page_builder.AppendValues(values); page_stream.Transfer(out_buffer); - return std::make_shared<DataPage>(&(*out_buffer)[0], out_buffer->size(), page_header); + return std::make_shared<DataPage>(&(*out_buffer)[0], out_buffer->size(), + page_builder.num_values(), + page_builder.encoding(), + page_builder.def_level_encoding(), + page_builder.rep_level_encoding()); } + } // namespace test +// Utilities for testing the SerializedPageReader internally + static inline void InitDataPage(const parquet::Statistics& stat, - parquet::DataPageHeader& data_page, int nvalues) { + parquet::DataPageHeader& data_page, int32_t nvalues) { data_page.encoding = parquet::Encoding::PLAIN; data_page.definition_level_encoding = parquet::Encoding::RLE; data_page.repetition_level_encoding = parquet::Encoding::RLE; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/compression/lz4-codec.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression/lz4-codec.cc b/src/parquet/compression/lz4-codec.cc index dfd50f6..a131031 100644 --- a/src/parquet/compression/lz4-codec.cc +++ b/src/parquet/compression/lz4-codec.cc @@ -18,6 +18,9 @@ #include "parquet/compression/codec.h" #include <lz4.h> +#include <cstdint> + +#include "parquet/exception.h" namespace parquet_cpp { @@ -26,7 +29,7 @@ void Lz4Codec::Decompress(int64_t input_len, const uint8_t* input, int64_t n = LZ4_decompress_fast(reinterpret_cast<const char*>(input), reinterpret_cast<char*>(output_buffer), output_len); if (n != input_len) { - throw parquet_cpp::ParquetException("Corrupt lz4 compressed data."); + throw ParquetException("Corrupt lz4 compressed data."); } } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/compression/snappy-codec.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression/snappy-codec.cc b/src/parquet/compression/snappy-codec.cc index 4135a15..91590db 100644 --- a/src/parquet/compression/snappy-codec.cc +++ b/src/parquet/compression/snappy-codec.cc @@ -18,6 +18,10 @@ #include "parquet/compression/codec.h" #include <snappy.h> +#include <cstdint> +#include <cstdlib> + +#include "parquet/exception.h" namespace parquet_cpp { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/CMakeLists.txt b/src/parquet/encodings/CMakeLists.txt index 638fba0..c9349af 100644 --- a/src/parquet/encodings/CMakeLists.txt +++ b/src/parquet/encodings/CMakeLists.txt @@ -17,7 +17,8 @@ # Headers: encodings install(FILES - encodings.h + decoder.h + encoder.h delta-bit-pack-encoding.h delta-byte-array-encoding.h delta-length-byte-array-encoding.h http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/decoder.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/decoder.h b/src/parquet/encodings/decoder.h new file mode 100644 index 0000000..55b29e8 --- /dev/null +++ b/src/parquet/encodings/decoder.h @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_ENCODINGS_DECODER_H +#define PARQUET_ENCODINGS_DECODER_H + +#include <cstdint> + +#include "parquet/exception.h" +#include "parquet/types.h" + +namespace parquet_cpp { + +class ColumnDescriptor; + +// The Decoder template is parameterized on parquet_cpp::Type::type +template <int TYPE> +class Decoder { + public: + typedef typename type_traits<TYPE>::value_type T; + + virtual ~Decoder() {} + + // Sets the data for a new page. This will be called multiple times on the same + // decoder and should reset all internal state. + virtual void SetData(int num_values, const uint8_t* data, int len) = 0; + + // Subclasses should override the ones they support. In each of these functions, + // the decoder would decode put to 'max_values', storing the result in 'buffer'. + // The function returns the number of values decoded, which should be max_values + // except for end of the current data page. + virtual int Decode(T* buffer, int max_values) { + throw ParquetException("Decoder does not implement this type."); + } + + // Returns the number of values left (for the last call to SetData()). This is + // the number of values left in this page. + int values_left() const { return num_values_; } + + const Encoding::type encoding() const { return encoding_; } + + protected: + explicit Decoder(const ColumnDescriptor* descr, + const Encoding::type& encoding) + : descr_(descr), encoding_(encoding), num_values_(0) {} + + // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY + const ColumnDescriptor* descr_; + + const Encoding::type encoding_; + int num_values_; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_ENCODINGS_DECODER_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/delta-bit-pack-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/delta-bit-pack-encoding.h b/src/parquet/encodings/delta-bit-pack-encoding.h index 4eb762b..d512db9 100644 --- a/src/parquet/encodings/delta-bit-pack-encoding.h +++ b/src/parquet/encodings/delta-bit-pack-encoding.h @@ -18,11 +18,13 @@ #ifndef PARQUET_DELTA_BIT_PACK_ENCODING_H #define PARQUET_DELTA_BIT_PACK_ENCODING_H -#include "parquet/encodings/encodings.h" - #include <algorithm> +#include <cstdint> #include <vector> +#include "parquet/encodings/decoder.h" +#include "parquet/util/bit-stream-utils.inline.h" + namespace parquet_cpp { template <int TYPE> @@ -31,7 +33,7 @@ class DeltaBitPackDecoder : public Decoder<TYPE> { typedef typename type_traits<TYPE>::value_type T; explicit DeltaBitPackDecoder(const ColumnDescriptor* descr) - : Decoder<TYPE>(descr, parquet::Encoding::DELTA_BINARY_PACKED) { + : Decoder<TYPE>(descr, Encoding::DELTA_BINARY_PACKED) { if (TYPE != Type::INT32 && TYPE != Type::INT64) { throw ParquetException("Delta bit pack encoding should only be for integer data."); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/delta-byte-array-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/delta-byte-array-encoding.h b/src/parquet/encodings/delta-byte-array-encoding.h index 2763f16..01dceea 100644 --- a/src/parquet/encodings/delta-byte-array-encoding.h +++ b/src/parquet/encodings/delta-byte-array-encoding.h @@ -18,16 +18,18 @@ #ifndef PARQUET_DELTA_BYTE_ARRAY_ENCODING_H #define PARQUET_DELTA_BYTE_ARRAY_ENCODING_H -#include "parquet/encodings/encodings.h" - #include <algorithm> +#include "parquet/encodings/decoder.h" +#include "parquet/encodings/delta-length-byte-array-encoding.h" +#include "parquet/encodings/delta-bit-pack-encoding.h" + namespace parquet_cpp { class DeltaByteArrayDecoder : public Decoder<Type::BYTE_ARRAY> { public: explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr) - : Decoder<Type::BYTE_ARRAY>(descr, parquet::Encoding::DELTA_BYTE_ARRAY), + : Decoder<Type::BYTE_ARRAY>(descr, Encoding::DELTA_BYTE_ARRAY), prefix_len_decoder_(nullptr), suffix_decoder_(nullptr) { } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/delta-length-byte-array-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/delta-length-byte-array-encoding.h b/src/parquet/encodings/delta-length-byte-array-encoding.h index 0868924..a1b4fd3 100644 --- a/src/parquet/encodings/delta-length-byte-array-encoding.h +++ b/src/parquet/encodings/delta-length-byte-array-encoding.h @@ -18,9 +18,12 @@ #ifndef PARQUET_DELTA_LENGTH_BYTE_ARRAY_ENCODING_H #define PARQUET_DELTA_LENGTH_BYTE_ARRAY_ENCODING_H -#include "parquet/encodings/encodings.h" - #include <algorithm> +#include <cstdint> +#include <vector> + +#include "parquet/encodings/decoder.h" +#include "parquet/encodings/delta-bit-pack-encoding.h" namespace parquet_cpp { @@ -28,7 +31,7 @@ class DeltaLengthByteArrayDecoder : public Decoder<Type::BYTE_ARRAY> { public: explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr) : Decoder<Type::BYTE_ARRAY>(descr, - parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY), + Encoding::DELTA_LENGTH_BYTE_ARRAY), len_decoder_(nullptr) { } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/dictionary-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h index 0547eb3..b52aefb 100644 --- a/src/parquet/encodings/dictionary-encoding.h +++ b/src/parquet/encodings/dictionary-encoding.h @@ -18,11 +18,14 @@ #ifndef PARQUET_DICTIONARY_ENCODING_H #define PARQUET_DICTIONARY_ENCODING_H -#include "parquet/encodings/encodings.h" - #include <algorithm> +#include <cstdint> #include <vector> +#include "parquet/encodings/decoder.h" +#include "parquet/encodings/encoder.h" +#include "parquet/util/rle-encoding.h" + namespace parquet_cpp { template <int TYPE> @@ -35,7 +38,7 @@ class DictionaryDecoder : public Decoder<TYPE> { // dictionary decoder needs to copy the data out if necessary. DictionaryDecoder(const ColumnDescriptor* descr, Decoder<TYPE>* dictionary) - : Decoder<TYPE>(descr, parquet::Encoding::RLE_DICTIONARY) { + : Decoder<TYPE>(descr, Encoding::RLE_DICTIONARY) { Init(dictionary); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/encoder.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h new file mode 100644 index 0000000..50ba48f --- /dev/null +++ b/src/parquet/encodings/encoder.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_ENCODINGS_ENCODER_H +#define PARQUET_ENCODINGS_ENCODER_H + +#include <cstdint> + +#include "parquet/exception.h" +#include "parquet/types.h" + +namespace parquet_cpp { + +class ColumnDescriptor; +class OutputStream; + +// Base class for value encoders. Since encoders may or not have state (e.g., +// dictionary encoding) we use a class instance to maintain any state. +// +// TODO(wesm): Encode interface API is temporary +template <int TYPE> +class Encoder { + public: + typedef typename type_traits<TYPE>::value_type T; + + virtual ~Encoder() {} + + // Subclasses should override the ones they support + virtual void Encode(const T* src, int num_values, OutputStream* dst) { + throw ParquetException("Encoder does not implement this type."); + } + + const Encoding::type encoding() const { return encoding_; } + + protected: + explicit Encoder(const ColumnDescriptor* descr, + const Encoding::type& encoding) + : descr_(descr), encoding_(encoding) {} + + // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY + const ColumnDescriptor* descr_; + const Encoding::type encoding_; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_ENCODINGS_ENCODER_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/encodings.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encodings.h b/src/parquet/encodings/encodings.h deleted file mode 100644 index 46c61b6..0000000 --- a/src/parquet/encodings/encodings.h +++ /dev/null @@ -1,111 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_ENCODINGS_ENCODINGS_H -#define PARQUET_ENCODINGS_ENCODINGS_H - -#include <cstdint> - -#include "parquet/exception.h" -#include "parquet/types.h" - -#include "parquet/util/output.h" -#include "parquet/util/rle-encoding.h" -#include "parquet/util/bit-stream-utils.inline.h" - -#include "parquet/schema/descriptor.h" - -#include "parquet/thrift/parquet_types.h" - -namespace parquet_cpp { - -// The Decoder template is parameterized on parquet_cpp::Type::type -template <int TYPE> -class Decoder { - public: - typedef typename type_traits<TYPE>::value_type T; - - virtual ~Decoder() {} - - // Sets the data for a new page. This will be called multiple times on the same - // decoder and should reset all internal state. - virtual void SetData(int num_values, const uint8_t* data, int len) = 0; - - // Subclasses should override the ones they support. In each of these functions, - // the decoder would decode put to 'max_values', storing the result in 'buffer'. - // The function returns the number of values decoded, which should be max_values - // except for end of the current data page. - virtual int Decode(T* buffer, int max_values) { - throw ParquetException("Decoder does not implement this type."); - } - - // Returns the number of values left (for the last call to SetData()). This is - // the number of values left in this page. - int values_left() const { return num_values_; } - - const parquet::Encoding::type encoding() const { return encoding_; } - - protected: - explicit Decoder(const ColumnDescriptor* descr, - const parquet::Encoding::type& encoding) - : descr_(descr), encoding_(encoding), num_values_(0) {} - - // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY - const ColumnDescriptor* descr_; - - const parquet::Encoding::type encoding_; - int num_values_; -}; - - -// Base class for value encoders. Since encoders may or not have state (e.g., -// dictionary encoding) we use a class instance to maintain any state. -// -// TODO(wesm): Encode interface API is temporary -template <int TYPE> -class Encoder { - public: - typedef typename type_traits<TYPE>::value_type T; - - virtual ~Encoder() {} - - // Subclasses should override the ones they support - virtual void Encode(const T* src, int num_values, OutputStream* dst) { - throw ParquetException("Encoder does not implement this type."); - } - - const parquet::Encoding::type encoding() const { return encoding_; } - - protected: - explicit Encoder(const ColumnDescriptor* descr, - const parquet::Encoding::type& encoding) - : descr_(descr), encoding_(encoding) {} - - // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY - const ColumnDescriptor* descr_; - const parquet::Encoding::type encoding_; -}; - -} // namespace parquet_cpp - -#include "parquet/encodings/plain-encoding.h" -#include "parquet/encodings/dictionary-encoding.h" -#include "parquet/encodings/delta-bit-pack-encoding.h" -#include "parquet/encodings/delta-length-byte-array-encoding.h" -#include "parquet/encodings/delta-byte-array-encoding.h" - -#endif // PARQUET_ENCODINGS_ENCODINGS_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/plain-encoding-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc index 16862b8..955a415 100644 --- a/src/parquet/encodings/plain-encoding-test.cc +++ b/src/parquet/encodings/plain-encoding-test.cc @@ -16,13 +16,17 @@ // under the License. #include <cstdint> +#include <cstdlib> #include <string> #include <vector> #include <gtest/gtest.h> -#include "parquet/util/test-common.h" -#include "parquet/encodings/encodings.h" +#include "parquet/encodings/plain-encoding.h" +#include "parquet/types.h" +#include "parquet/util/bit-util.h" +#include "parquet/util/output.h" +#include "parquet/util/test-common.h" using std::string; using std::vector; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/plain-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h index a450eb4..78560fd 100644 --- a/src/parquet/encodings/plain-encoding.h +++ b/src/parquet/encodings/plain-encoding.h @@ -18,11 +18,15 @@ #ifndef PARQUET_PLAIN_ENCODING_H #define PARQUET_PLAIN_ENCODING_H -#include "parquet/encodings/encodings.h" - #include <algorithm> #include <vector> +#include "parquet/encodings/decoder.h" +#include "parquet/encodings/encoder.h" +#include "parquet/schema/descriptor.h" +#include "parquet/util/bit-stream-utils.inline.h" +#include "parquet/util/output.h" + namespace parquet_cpp { // ---------------------------------------------------------------------- @@ -35,7 +39,7 @@ class PlainDecoder : public Decoder<TYPE> { using Decoder<TYPE>::num_values_; explicit PlainDecoder(const ColumnDescriptor* descr) : - Decoder<TYPE>(descr, parquet::Encoding::PLAIN), + Decoder<TYPE>(descr, Encoding::PLAIN), data_(NULL), len_(0) {} virtual void SetData(int num_values, const uint8_t* data, int len) { @@ -98,7 +102,7 @@ template <> class PlainDecoder<Type::BOOLEAN> : public Decoder<Type::BOOLEAN> { public: explicit PlainDecoder(const ColumnDescriptor* descr) : - Decoder<Type::BOOLEAN>(descr, parquet::Encoding::PLAIN) {} + Decoder<Type::BOOLEAN>(descr, Encoding::PLAIN) {} virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; @@ -145,7 +149,7 @@ class PlainEncoder : public Encoder<TYPE> { typedef typename type_traits<TYPE>::value_type T; explicit PlainEncoder(const ColumnDescriptor* descr) : - Encoder<TYPE>(descr, parquet::Encoding::PLAIN) {} + Encoder<TYPE>(descr, Encoding::PLAIN) {} virtual void Encode(const T* src, int num_values, OutputStream* dst); }; @@ -154,7 +158,7 @@ template <> class PlainEncoder<Type::BOOLEAN> : public Encoder<Type::BOOLEAN> { public: explicit PlainEncoder(const ColumnDescriptor* descr) : - Encoder<Type::BOOLEAN>(descr, parquet::Encoding::PLAIN) {} + Encoder<Type::BOOLEAN>(descr, Encoding::PLAIN) {} virtual void Encode(const bool* src, int num_values, OutputStream* dst) { throw ParquetException("this API for encoding bools not implemented"); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/file/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/file/CMakeLists.txt b/src/parquet/file/CMakeLists.txt index ef6ac01..b7a65f1 100644 --- a/src/parquet/file/CMakeLists.txt +++ b/src/parquet/file/CMakeLists.txt @@ -18,3 +18,5 @@ install(FILES reader.h DESTINATION include/parquet/file) + +ADD_PARQUET_TEST(file-deserialize-test) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/file/file-deserialize-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc new file mode 100644 index 0000000..e90889d --- /dev/null +++ b/src/parquet/file/file-deserialize-test.cc @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include <algorithm> +#include <cstdlib> +#include <cstdint> +#include <exception> +#include <memory> +#include <string> + +#include "parquet/column/page.h" +#include "parquet/column/test-util.h" + +#include "parquet/file/reader-internal.h" +#include "parquet/thrift/parquet_types.h" +#include "parquet/thrift/util.h" +#include "parquet/types.h" +#include "parquet/util/input.h" + +namespace parquet_cpp { + +class TestSerializedPage : public ::testing::Test { + public: + void InitSerializedPageReader(const uint8_t* buffer, size_t header_size, + Compression::type codec) { + std::unique_ptr<InputStream> stream; + stream.reset(new InMemoryInputStream(buffer, header_size)); + page_reader_.reset(new SerializedPageReader(std::move(stream), codec)); + } + + protected: + std::unique_ptr<SerializedPageReader> page_reader_; +}; + +TEST_F(TestSerializedPage, TestLargePageHeaders) { + parquet::PageHeader in_page_header; + parquet::DataPageHeader data_page_header; + parquet::PageHeader out_page_header; + parquet::Statistics stats; + int expected_header_size = 512 * 1024; //512 KB + int stats_size = 256 * 1024; // 256 KB + std::string serialized_buffer; + int num_values = 4141; + + InitStats(stats_size, stats); + InitDataPage(stats, data_page_header, num_values); + InitPageHeader(data_page_header, in_page_header); + + // Serialize the Page header + ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header, + expected_header_size)); + // check header size is between 256 KB to 16 MB + ASSERT_LE(stats_size, serialized_buffer.length()); + ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length()); + + InitSerializedPageReader(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()), + serialized_buffer.length(), Compression::UNCOMPRESSED); + + std::shared_ptr<Page> current_page = page_reader_->NextPage(); + ASSERT_EQ(PageType::DATA_PAGE, current_page->type()); + const DataPage* page = static_cast<const DataPage*>(current_page.get()); + ASSERT_EQ(num_values, page->num_values()); +} + +TEST_F(TestSerializedPage, TestFailLargePageHeaders) { + parquet::PageHeader in_page_header; + parquet::DataPageHeader data_page_header; + parquet::PageHeader out_page_header; + parquet::Statistics stats; + int expected_header_size = 512 * 1024; // 512 KB + int stats_size = 256 * 1024; // 256 KB + int max_header_size = 128 * 1024; // 128 KB + int num_values = 4141; + std::string serialized_buffer; + + InitStats(stats_size, stats); + InitDataPage(stats, data_page_header, num_values); + InitPageHeader(data_page_header, in_page_header); + + // Serialize the Page header + ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header, + expected_header_size)); + // check header size is between 256 KB to 16 MB + ASSERT_LE(stats_size, serialized_buffer.length()); + ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length()); + + InitSerializedPageReader(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()), + serialized_buffer.length(), Compression::UNCOMPRESSED); + + // Set the max page header size to 128 KB, which is less than the current header size + page_reader_->set_max_page_header_size(max_header_size); + + ASSERT_THROW(page_reader_->NextPage(), ParquetException); +} +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/file/reader-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc index 7b0a719..47092a5 100644 --- a/src/parquet/file/reader-internal.cc +++ b/src/parquet/file/reader-internal.cc @@ -17,17 +17,138 @@ #include "parquet/file/reader-internal.h" -#include <memory> +#include <string.h> +#include <algorithm> +#include <exception> +#include <ostream> #include <vector> -#include "parquet/column/serialized-page.h" +#include "parquet/column/page.h" +#include "parquet/compression/codec.h" +#include "parquet/exception.h" #include "parquet/schema/converter.h" +#include "parquet/schema/descriptor.h" +#include "parquet/schema/types.h" #include "parquet/thrift/util.h" +#include "parquet/types.h" #include "parquet/util/input.h" namespace parquet_cpp { // ---------------------------------------------------------------------- +// SerializedPageReader deserializes Thrift metadata and pages that have been +// assembled in a serialized stream for storing in a Parquet files + +SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream, + Compression::type codec) : + stream_(std::move(stream)) { + max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE; + // TODO(wesm): add GZIP after PARQUET-456 + switch (codec) { + case Compression::UNCOMPRESSED: + break; + case Compression::SNAPPY: + decompressor_.reset(new SnappyCodec()); + break; + case Compression::LZO: + decompressor_.reset(new Lz4Codec()); + break; + default: + ParquetException::NYI("Reading compressed data"); + } +} + +std::shared_ptr<Page> SerializedPageReader::NextPage() { + // Loop here because there may be unhandled page types that we skip until + // finding a page that we do know what to do with + while (true) { + int64_t bytes_read = 0; + int64_t bytes_available = 0; + uint32_t header_size = 0; + const uint8_t* buffer; + uint32_t allowed_page_size = DEFAULT_PAGE_HEADER_SIZE; + std::stringstream ss; + + // Page headers can be very large because of page statistics + // We try to deserialize a larger buffer progressively + // until a maximum allowed header limit + while (true) { + buffer = stream_->Peek(allowed_page_size, &bytes_available); + if (bytes_available == 0) { + return std::shared_ptr<Page>(nullptr); + } + + // This gets used, then set by DeserializeThriftMsg + header_size = bytes_available; + try { + DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); + break; + } catch (std::exception& e) { + // Failed to deserialize. Double the allowed page header size and try again + ss << e.what(); + allowed_page_size *= 2; + if (allowed_page_size > max_page_header_size_) { + ss << "Deserializing page header failed.\n"; + throw ParquetException(ss.str()); + } + } + } + // Advance the stream offset + stream_->Read(header_size, &bytes_read); + + int compressed_len = current_page_header_.compressed_page_size; + int uncompressed_len = current_page_header_.uncompressed_page_size; + + // Read the compressed data page. + buffer = stream_->Read(compressed_len, &bytes_read); + if (bytes_read != compressed_len) ParquetException::EofException(); + + // Uncompress it if we need to + if (decompressor_ != NULL) { + // Grow the uncompressed buffer if we need to. + if (uncompressed_len > decompression_buffer_.size()) { + decompression_buffer_.resize(uncompressed_len); + } + decompressor_->Decompress(compressed_len, buffer, uncompressed_len, + &decompression_buffer_[0]); + buffer = &decompression_buffer_[0]; + } + + if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) { + const parquet::DictionaryPageHeader& dict_header = + current_page_header_.dictionary_page_header; + + bool is_sorted = dict_header.__isset.is_sorted? dict_header.is_sorted : false; + + return std::make_shared<DictionaryPage>(buffer, uncompressed_len, + dict_header.num_values, FromThrift(dict_header.encoding), + is_sorted); + } else if (current_page_header_.type == parquet::PageType::DATA_PAGE) { + const parquet::DataPageHeader& header = current_page_header_.data_page_header; + + return std::make_shared<DataPage>(buffer, uncompressed_len, + header.num_values, + FromThrift(header.encoding), + FromThrift(header.definition_level_encoding), + FromThrift(header.repetition_level_encoding)); + } else if (current_page_header_.type == parquet::PageType::DATA_PAGE_V2) { + const parquet::DataPageHeaderV2& header = current_page_header_.data_page_header_v2; + bool is_compressed = header.__isset.is_compressed? header.is_compressed : false; + return std::make_shared<DataPageV2>(buffer, uncompressed_len, + header.num_values, header.num_nulls, header.num_rows, + FromThrift(header.encoding), + header.definition_levels_byte_length, + header.repetition_levels_byte_length, is_compressed); + } else { + // We don't know what this page type is. We're allowed to skip non-data + // pages. + continue; + } + } + return std::shared_ptr<Page>(nullptr); +} + +// ---------------------------------------------------------------------- // SerializedRowGroup int SerializedRowGroup::num_columns() const { @@ -62,7 +183,7 @@ std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int i) { const ColumnDescriptor* descr = schema_->Column(i); return std::unique_ptr<PageReader>(new SerializedPageReader(std::move(input), - col.meta_data.codec)); + FromThrift(col.meta_data.codec))); } RowGroupStatistics SerializedRowGroup::GetColumnStats(int i) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/file/reader-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h index 8ba105e..b7e9154 100644 --- a/src/parquet/file/reader-internal.h +++ b/src/parquet/file/reader-internal.h @@ -18,16 +18,57 @@ #ifndef PARQUET_FILE_READER_INTERNAL_H #define PARQUET_FILE_READER_INTERNAL_H -#include "parquet/file/reader.h" - +#include <cstdint> #include <memory> +#include <vector> -#include "parquet/schema/descriptor.h" -#include "parquet/util/input.h" +#include "parquet/column/page.h" +#include "parquet/compression/codec.h" +#include "parquet/file/reader.h" #include "parquet/thrift/parquet_types.h" +#include "parquet/types.h" +#include "parquet/util/input.h" namespace parquet_cpp { +class SchemaDescriptor; + +// 16 MB is the default maximum page header size +static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024; + +// 16 KB is the default expected page header size +static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024; + +// This subclass delimits pages appearing in a serialized stream, each preceded +// by a serialized Thrift parquet::PageHeader indicating the type of each page +// and the page metadata. +class SerializedPageReader : public PageReader { + public: + SerializedPageReader(std::unique_ptr<InputStream> stream, + Compression::type codec); + + virtual ~SerializedPageReader() {} + + // Implement the PageReader interface + virtual std::shared_ptr<Page> NextPage(); + + void set_max_page_header_size(uint32_t size) { + max_page_header_size_ = size; + } + + private: + std::unique_ptr<InputStream> stream_; + + parquet::PageHeader current_page_header_; + std::shared_ptr<Page> current_page_; + + // Compression codec to use. + std::unique_ptr<Codec> decompressor_; + std::vector<uint8_t> decompression_buffer_; + // Maximum allowed page size + uint32_t max_page_header_size_; +}; + // RowGroupReader::Contents implementation for the Parquet file specification class SerializedRowGroup : public RowGroupReader::Contents { public: http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/file/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc index 6ef59ed..9da0f09 100644 --- a/src/parquet/file/reader.cc +++ b/src/parquet/file/reader.cc @@ -18,17 +18,19 @@ #include "parquet/file/reader.h" #include <cstdio> -#include <cstring> #include <memory> #include <sstream> #include <string> +#include <utility> #include <vector> +#include "parquet/column/page.h" #include "parquet/column/reader.h" #include "parquet/column/scanner.h" - #include "parquet/exception.h" #include "parquet/file/reader-internal.h" +#include "parquet/util/input.h" +#include "parquet/types.h" using std::string; using std::vector; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/file/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h index 3ff8697..32ae429 100644 --- a/src/parquet/file/reader.h +++ b/src/parquet/file/reader.h @@ -19,21 +19,17 @@ #define PARQUET_FILE_READER_H #include <cstdint> +#include <iosfwd> #include <memory> #include <string> -#include <stdio.h> #include <unordered_map> -#include "parquet/types.h" -#include "parquet/schema/descriptor.h" - -// TODO(wesm): Still depends on Thrift #include "parquet/column/page.h" +#include "parquet/schema/descriptor.h" namespace parquet_cpp { class ColumnReader; -class ParquetFileReader; struct RowGroupStatistics { int64_t num_values; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/parquet.h ---------------------------------------------------------------------- diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h index b8624ae..ea8ab5e 100644 --- a/src/parquet/parquet.h +++ b/src/parquet/parquet.h @@ -27,9 +27,19 @@ #include <vector> #include "parquet/exception.h" + +// Column reader API #include "parquet/column/reader.h" + +// File API #include "parquet/file/reader.h" +// Schemas +#include "parquet/schema/descriptor.h" +#include "parquet/schema/printer.h" +#include "parquet/schema/types.h" + +// IO #include "parquet/util/input.h" #include "parquet/util/output.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/public-api-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/public-api-test.cc b/src/parquet/public-api-test.cc new file mode 100644 index 0000000..4103714 --- /dev/null +++ b/src/parquet/public-api-test.cc @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> +#include "parquet/parquet.h" + +namespace parquet_cpp { + +TEST(TestPublicAPI, DoesNotIncludeThrift) { +#ifdef _THRIFT_THRIFT_H_ + FAIL() << "Thrift headers should not be in the public API"; +#endif +} + +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc index 8599e7e..97a5f79 100644 --- a/src/parquet/reader-test.cc +++ b/src/parquet/reader-test.cc @@ -16,9 +16,9 @@ // under the License. #include <cstdlib> +#include <cstdint> #include <iostream> #include <memory> -#include <sstream> #include <string> #include <gtest/gtest.h> @@ -26,7 +26,6 @@ #include "parquet/file/reader.h" #include "parquet/column/reader.h" #include "parquet/column/scanner.h" -#include "parquet/util/input.h" using std::string;
