PARQUET-451: Add RowGroupReader helper class and refactor parquet_reader.cc into DebugPrint
This also addresses PARQUET-433 and PARQUET-453. Author: Wes McKinney <[email protected]> Closes #23 from wesm/PARQUET-451 and squashes the following commits: 748ee0c [Wes McKinney] Turn MakeColumnReader into static ColumnReader::Make 6528497 [Wes McKinney] Incorporate code review comments 4b5575d [Wes McKinney] [PARQUET-451/453]: Implement RowGroupReader class and refactor parquet_reader.cc into ParquetFileReader::DebugPrint 2985e2e [Wes McKinney] [PARQUET-433]: Templatize decoders and column readers and remove most switch-on-type statements. Add parquet::SchemaElement* member to Decoder<T>, for FLBA metadata. Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/1f24e765 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/1f24e765 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/1f24e765 Branch: refs/heads/master Commit: 1f24e7658b9e9d41f95e6ce3a0d7a2fe3ace1abf Parents: 8fc24f8 Author: Wes McKinney <[email protected]> Authored: Wed Jan 27 21:13:15 2016 -0800 Committer: Nong Li <[email protected]> Committed: Wed Jan 27 21:13:15 2016 -0800 ---------------------------------------------------------------------- CMakeLists.txt | 3 +- example/CMakeLists.txt | 8 +- example/decode_benchmark.cc | 19 +- example/example_util.cc | 84 ------ example/example_util.h | 44 --- example/parquet_reader.cc | 256 ++--------------- setup_build_env.sh | 3 +- src/parquet.cc | 272 ------------------ src/parquet/CMakeLists.txt | 2 + src/parquet/column_reader.cc | 194 +++++++++++++ src/parquet/column_reader.h | 183 ++++++++++++ src/parquet/compression/codec.h | 6 +- src/parquet/encodings/CMakeLists.txt | 1 - src/parquet/encodings/bool-encoding.h | 48 ---- src/parquet/encodings/delta-bit-pack-encoding.h | 20 +- .../encodings/delta-byte-array-encoding.h | 20 +- .../delta-length-byte-array-encoding.h | 16 +- src/parquet/encodings/dictionary-encoding.h | 131 ++++----- src/parquet/encodings/encodings.h | 34 +-- src/parquet/encodings/plain-encoding.h | 82 +++--- src/parquet/parquet.h | 197 +------------ src/parquet/reader-test.cc | 14 +- src/parquet/reader.cc | 283 ++++++++++++++++++- src/parquet/reader.h | 65 ++++- src/parquet/types.h | 112 ++++++++ src/parquet/util/CMakeLists.txt | 5 + src/parquet/util/input_stream.cc | 63 +++++ src/parquet/util/input_stream.h | 80 ++++++ 28 files changed, 1171 insertions(+), 1074 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index a2f7e6a..b66e296 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -213,13 +213,14 @@ set(PARQUET_TEST_LINK_LIBS ${PARQUET_MIN_TEST_LIBS}) # Library config set(LIBPARQUET_SRCS - src/parquet.cc + src/parquet/column_reader.cc src/parquet/reader.cc ) set(LIBPARQUET_LINK_LIBS parquet_compression parquet_thrift + parquet_util thriftstatic ) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index a020184..05c541a 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -12,16 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. - -add_library(Example STATIC - example_util.cc -) - SET(LINK_LIBS parquet snappystatic - thriftstatic - Example) + thriftstatic) add_executable(decode_benchmark decode_benchmark.cc) target_link_libraries(decode_benchmark ${LINK_LIBS}) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/decode_benchmark.cc ---------------------------------------------------------------------- diff --git a/example/decode_benchmark.cc b/example/decode_benchmark.cc index ed4077a..f33232d 100644 --- a/example/decode_benchmark.cc +++ b/example/decode_benchmark.cc @@ -16,7 +16,6 @@ #include <iostream> #include <stdio.h> -#include "example_util.h" #include "parquet/compression/codec.h" #include "parquet/encodings/encodings.h" #include "parquet/util/stopwatch.h" @@ -198,11 +197,11 @@ class DeltaByteArrayEncoder { uint64_t TestPlainIntEncoding(const uint8_t* data, int num_values, int batch_size) { uint64_t result = 0; - PlainDecoder decoder(Type::INT64); + PlainDecoder<Type::INT64> decoder(nullptr); decoder.SetData(num_values, data, num_values * sizeof(int64_t)); int64_t values[batch_size]; for (int i = 0; i < num_values;) { - int n = decoder.GetInt64(values, batch_size); + int n = decoder.Decode(values, batch_size); for (int j = 0; j < n; ++j) { result += values[j]; } @@ -221,7 +220,7 @@ uint64_t TestBinaryPackedEncoding(const char* name, const vector<int64_t>& value } else { mini_block_size = 32; } - DeltaBitPackDecoder decoder(Type::INT64); + DeltaBitPackDecoder<Type::INT64> decoder(nullptr); DeltaBitPackEncoder encoder(mini_block_size); for (int i = 0; i < values.size(); ++i) { encoder.Add(values[i]); @@ -238,7 +237,7 @@ uint64_t TestBinaryPackedEncoding(const char* name, const vector<int64_t>& value decoder.SetData(encoder.num_values(), buffer, len); for (int i = 0; i < encoder.num_values(); ++i) { int64_t x = 0; - decoder.GetInt64(&x, 1); + decoder.Decode(&x, 1); if (values[i] != x) { cerr << "Bad: " << i << endl; cerr << " " << x << " != " << values[i] << endl; @@ -258,7 +257,7 @@ uint64_t TestBinaryPackedEncoding(const char* name, const vector<int64_t>& value for (int k = 0; k < benchmark_iters; ++k) { decoder.SetData(encoder.num_values(), buffer, len); for (int i = 0; i < values.size();) { - int n = decoder.GetInt64(buf, benchmark_batch_size); + int n = decoder.Decode(buf, benchmark_batch_size); for (int j = 0; j < n; ++j) { result += buf[j]; } @@ -349,7 +348,7 @@ void TestBinaryPacking() { } void TestDeltaLengthByteArray() { - DeltaLengthByteArrayDecoder decoder; + DeltaLengthByteArrayDecoder decoder(nullptr); DeltaLengthByteArrayEncoder encoder; vector<string> values; @@ -369,7 +368,7 @@ void TestDeltaLengthByteArray() { decoder.SetData(encoder.num_values(), buffer, len); for (int i = 0; i < encoder.num_values(); ++i) { ByteArray v; - decoder.GetByteArray(&v, 1); + decoder.Decode(&v, 1); string r = string((char*)v.ptr, v.len); if (r != values[i]) { cout << "Bad " << r << " != " << values[i] << endl; @@ -378,7 +377,7 @@ void TestDeltaLengthByteArray() { } void TestDeltaByteArray() { - DeltaByteArrayDecoder decoder; + DeltaByteArrayDecoder decoder(nullptr); DeltaByteArrayEncoder encoder; vector<string> values; @@ -407,7 +406,7 @@ void TestDeltaByteArray() { decoder.SetData(encoder.num_values(), buffer, len); for (int i = 0; i < encoder.num_values(); ++i) { ByteArray v; - decoder.GetByteArray(&v, 1); + decoder.Decode(&v, 1); string r = string((char*)v.ptr, v.len); if (r != values[i]) { cout << "Bad " << r << " != " << values[i] << endl; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/example_util.cc ---------------------------------------------------------------------- diff --git a/example/example_util.cc b/example/example_util.cc deleted file mode 100644 index 07d8129..0000000 --- a/example/example_util.cc +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright 2012 Cloudera Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "example_util.h" -#include <iostream> -#include <stdlib.h> -#include <stdio.h> -#include <string.h> - -#include "parquet/thrift/util.h" - -using namespace parquet; -using namespace parquet_cpp; -using namespace std; - -// 4 byte constant + 4 byte metadata len -const uint32_t FOOTER_SIZE = 8; -const uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; - -struct ScopedFile { - public: - ScopedFile(FILE* f) : file_(f) { } - ~ScopedFile() { fclose(file_); } - - private: - FILE* file_; -}; - -bool GetFileMetadata(const string& path, FileMetaData* metadata) { - FILE* file = fopen(path.c_str(), "r"); - if (!file) { - cerr << "Could not open file: " << path << endl; - return false; - } - ScopedFile cleanup(file); - fseek(file, 0L, SEEK_END); - size_t file_len = ftell(file); - if (file_len < FOOTER_SIZE) { - cerr << "Invalid parquet file. Corrupt footer." << endl; - return false; - } - - uint8_t footer_buffer[FOOTER_SIZE]; - fseek(file, file_len - FOOTER_SIZE, SEEK_SET); - size_t bytes_read = fread(footer_buffer, 1, FOOTER_SIZE, file); - if (bytes_read != FOOTER_SIZE) { - cerr << "Invalid parquet file. Corrupt footer." << endl; - return false; - } - if (memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) { - cerr << "Invalid parquet file. Corrupt footer." << endl; - return false; - } - - uint32_t metadata_len = *reinterpret_cast<uint32_t*>(footer_buffer); - size_t metadata_start = file_len - FOOTER_SIZE - metadata_len; - if (metadata_start < 0) { - cerr << "Invalid parquet file. File is less than file metadata size." << endl; - return false; - } - - fseek(file, metadata_start, SEEK_SET); - uint8_t metadata_buffer[metadata_len]; - bytes_read = fread(metadata_buffer, 1, metadata_len, file); - if (bytes_read != metadata_len) { - cerr << "Invalid parquet file. Could not read metadata bytes." << endl; - return false; - } - - DeserializeThriftMsg(metadata_buffer, &metadata_len, metadata); - return true; - -} http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/example_util.h ---------------------------------------------------------------------- diff --git a/example/example_util.h b/example/example_util.h deleted file mode 100644 index a8b58fc..0000000 --- a/example/example_util.h +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2012 Cloudera Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef PARQUET_EXAMPLE_UTIL_H -#define PARQUET_EXAMPLE_UTIL_H - -#include <string> -#include <parquet/parquet.h> -#include <stdio.h> - -bool GetFileMetadata(const std::string& path, parquet::FileMetaData* metadata); - -class InputFile { -private: - FILE* file; - std::string filename; - -public: - InputFile(const std::string& _filename): filename(_filename) { - file = fopen(_filename.c_str(), "r"); - } - ~InputFile() { - if (file != NULL) { - fclose(file); - } - } - - FILE* getFileHandle() { return file; } - bool isOpen() { return file != NULL; } - std::string getFilename() { return filename; } -}; - -#endif // PARQUET_EXAMPLE_UTIL_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/parquet_reader.cc ---------------------------------------------------------------------- diff --git a/example/parquet_reader.cc b/example/parquet_reader.cc index 5379c5e..7b476b5 100644 --- a/example/parquet_reader.cc +++ b/example/parquet_reader.cc @@ -13,255 +13,49 @@ // limitations under the License. #include <parquet/parquet.h> -#include "example_util.h" #include <iostream> -// the fixed initial size is just for an example -#define COL_WIDTH "17" - -using namespace parquet; -using namespace parquet_cpp; -using namespace std; - -struct AnyType { - union { - bool bool_val; - int32_t int32_val; - int64_t int64_val; - float float_val; - double double_val; - ByteArray byte_array_val; - }; -}; - -static string ByteArrayToString(const ByteArray& a) { - return string(reinterpret_cast<const char*>(a.ptr), a.len); -} - -int ByteCompare(const ByteArray& x1, const ByteArray& x2) { - int len = ::min(x1.len, x2.len); - int cmp = memcmp(x1.ptr, x2.ptr, len); - if (cmp != 0) return cmp; - if (len < x1.len) return 1; - if (len < x2.len) return -1; - return 0; -} - -string type2String(Type::type t) { - switch(t) { - case Type::BOOLEAN: - return "BOOLEAN"; - break; - case Type::INT32: - return "INT32"; - break; - case Type::INT64: - return "INT64"; - break; - case Type::FLOAT: - return "FLOAT"; - break; - case Type::DOUBLE: - return "DOUBLE"; - break; - case Type::BYTE_ARRAY: - return "BYTE_ARRAY"; - break; - case Type::INT96: - return "INT96"; - break; - case Type::FIXED_LEN_BYTE_ARRAY: - return "FIXED_LEN_BYTE_ARRAY"; - break; - default: - return "UNKNOWN"; - break; - } -} - -void readParquet(const string& filename, const bool printValues) { - InputFile file(filename); - if (!file.isOpen()) { - cerr << "Could not open file " << file.getFilename() << endl; - return; - } - - FileMetaData metadata; - if (!GetFileMetadata(file.getFilename().c_str(), &metadata)) { - cerr << "Could not read metadata from file " << file.getFilename() << endl; - return; - } - - cout << "File statistics:\n" ; - cout << "Total rows: " << metadata.num_rows << "\n"; - for (int c = 1; c < metadata.schema.size(); ++c) { - cout << "Column " << c-1 << ": " << metadata.schema[c].name << " (" - << type2String(metadata.schema[c].type); - if (metadata.schema[c].type == Type::INT96 || - metadata.schema[c].type == Type::FIXED_LEN_BYTE_ARRAY) { - cout << " - not supported"; - } - cout << ")\n"; - } - - for (int i = 0; i < metadata.row_groups.size(); ++i) { - cout << "--- Row Group " << i << " ---\n"; - - // Print column metadata - const RowGroup& row_group = metadata.row_groups[i]; - size_t nColumns = row_group.columns.size(); - - for (int c = 0; c < nColumns; ++c) { - const ColumnMetaData& meta_data = row_group.columns[c].meta_data; - cout << "Column " << c - << ": " << meta_data.num_values << " rows, " - << meta_data.statistics.null_count << " null values, " - << meta_data.statistics.distinct_count << " distinct values, " - << "min value: " << (meta_data.statistics.min.length()>0 ? - meta_data.statistics.min : "N/A") - << ", max value: " << (meta_data.statistics.max.length()>0 ? - meta_data.statistics.max : "N/A") << ".\n"; - } - - if (!printValues) { - continue; - } - - // Create readers for all columns and print contents - vector<ColumnReader*> readers(nColumns, NULL); - try { - for (int c = 0; c < nColumns; ++c) { - const ColumnChunk& col = row_group.columns[c]; - printf("%-" COL_WIDTH"s", metadata.schema[c+1].name.c_str()); - - if (col.meta_data.type == Type::INT96 || - col.meta_data.type == Type::FIXED_LEN_BYTE_ARRAY) { - continue; - } - - size_t col_start = col.meta_data.data_page_offset; - if (col.meta_data.__isset.dictionary_page_offset && - col_start > col.meta_data.dictionary_page_offset) { - col_start = col.meta_data.dictionary_page_offset; - } - - std::unique_ptr<ScopedInMemoryInputStream> input( - new ScopedInMemoryInputStream(col.meta_data.total_compressed_size)); - fseek(file.getFileHandle(), col_start, SEEK_SET); - size_t num_read = fread(input->data(), - 1, - input->size(), - file.getFileHandle()); - if (num_read != input->size()) { - cerr << "Could not read column data." << endl; - continue; - } - - readers[c] = new ColumnReader(&col.meta_data, - &metadata.schema[c+1], - input.release()); - } - cout << "\n"; - - vector<int> def_level(nColumns, 0); - vector<int> rep_level(nColumns, 0); - - bool hasRow; - do { - hasRow = false; - for (int c = 0; c < nColumns; ++c) { - if (readers[c] == NULL) { - printf("%-" COL_WIDTH"s", " "); - continue; - } - const ColumnChunk& col = row_group.columns[c]; - if (readers[c]->HasNext()) { - hasRow = true; - switch (col.meta_data.type) { - case Type::BOOLEAN: { - bool val = readers[c]->GetBool(&def_level[c], &rep_level[c]); - if (def_level[c] >= rep_level[c]) { - printf("%-" COL_WIDTH"d",val); - } - break; - } - case Type::INT32: { - int32_t val = readers[c]->GetInt32(&def_level[c], &rep_level[c]); - if (def_level[c] >= rep_level[c]) { - printf("%-" COL_WIDTH"d",val); - } - break; - } - case Type::INT64: { - int64_t val = readers[c]->GetInt64(&def_level[c], &rep_level[c]); - if (def_level[c] >= rep_level[c]) { - printf("%-" COL_WIDTH"ld",val); - } - break; - } - case Type::FLOAT: { - float val = readers[c]->GetFloat(&def_level[c], &rep_level[c]); - if (def_level[c] >= rep_level[c]) { - printf("%-" COL_WIDTH"f",val); - } - break; - } - case Type::DOUBLE: { - double val = readers[c]->GetDouble(&def_level[c], &rep_level[c]); - if (def_level[c] >= rep_level[c]) { - printf("%-" COL_WIDTH"lf",val); - } - break; - } - case Type::BYTE_ARRAY: { - ByteArray val = readers[c]->GetByteArray(&def_level[c], &rep_level[c]); - if (def_level[c] >= rep_level[c]) { - string result = ByteArrayToString(val); - printf("%-" COL_WIDTH"s", result.c_str()); - } - break; - } - default: - continue; - } - } - } - cout << "\n"; - } while (hasRow); - } catch (exception& e) { - cout << "Caught an exception: " << e.what() << "\n"; - } catch (...) { - cout << "Caught an exception.\n"; - } - - for(vector<ColumnReader*>::iterator it = readers.begin(); it != readers.end(); it++) { - delete *it; - } - } -} - int main(int argc, char** argv) { if (argc > 3) { - cerr << "Usage: parquet_reader [--only-stats] <file>" << endl; + std::cerr << "Usage: parquet_reader [--only-stats] <file>" + << std::endl; return -1; } - string filename; - bool printContents = true; + std::string filename; + bool print_values = true; // Read command-line options char *param, *value; for (int i = 1; i < argc; i++) { if ( (param = std::strstr(argv[i], "--only-stats")) ) { - printContents = false; + print_values = false; } else { filename = argv[i]; } } - readParquet(filename, printContents); + parquet_cpp::ParquetFileReader reader; + parquet_cpp::LocalFile file; + + file.Open(filename); + if (!file.is_open()) { + std::cerr << "Could not open file " << file.path() + << std::endl; + return -1; + } + + try { + reader.Open(&file); + reader.ParseMetaData(); + reader.DebugPrint(std::cout, print_values); + } catch (const std::exception& e) { + std::cerr << "Parquet error: " + << e.what() + << std::endl; + return -1; + } return 0; } - http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/setup_build_env.sh ---------------------------------------------------------------------- diff --git a/setup_build_env.sh b/setup_build_env.sh index 1cd7bb2..c95b889 100755 --- a/setup_build_env.sh +++ b/setup_build_env.sh @@ -17,9 +17,8 @@ if [ "$(uname)" != "Darwin" ]; then export THRIFT_HOME=$BUILD_DIR/thirdparty/installed fi -export GTEST_HOME=$BUILD_DIR/thirdparty/$GTEST_BASEDIR - export PARQUET_TEST_DATA=$SOURCE_DIR/data +export GTEST_HOME=$BUILD_DIR/thirdparty/$GTEST_BASEDIR cmake $SOURCE_DIR http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet.cc ---------------------------------------------------------------------- diff --git a/src/parquet.cc b/src/parquet.cc deleted file mode 100644 index 6b6adaa..0000000 --- a/src/parquet.cc +++ /dev/null @@ -1,272 +0,0 @@ -// Copyright 2012 Cloudera Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "parquet/parquet.h" - -#include <algorithm> -#include <string> -#include <string.h> - -#include <thrift/protocol/TDebugProtocol.h> - -#include "parquet/encodings/encodings.h" -#include "parquet/compression/codec.h" -#include "parquet/thrift/util.h" - -const int DATA_PAGE_SIZE = 64 * 1024; - -namespace parquet_cpp { - -using parquet::CompressionCodec; -using parquet::Encoding; -using parquet::FieldRepetitionType; -using parquet::PageType; -using parquet::SchemaElement; -using parquet::Type; - -InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) : - buffer_(buffer), len_(len), offset_(0) {} - -const uint8_t* InMemoryInputStream::Peek(int num_to_peek, int* num_bytes) { - *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_); - return buffer_ + offset_; -} - -const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) { - const uint8_t* result = Peek(num_to_read, num_bytes); - offset_ += *num_bytes; - return result; -} - -ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) { - buffer_.resize(len); - stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size())); -} - -uint8_t* ScopedInMemoryInputStream::data() { - return buffer_.data(); -} - -int64_t ScopedInMemoryInputStream::size() { - return buffer_.size(); -} - -const uint8_t* ScopedInMemoryInputStream::Peek(int num_to_peek, - int* num_bytes) { - return stream_->Peek(num_to_peek, num_bytes); -} - -const uint8_t* ScopedInMemoryInputStream::Read(int num_to_read, - int* num_bytes) { - return stream_->Read(num_to_read, num_bytes); -} - - -ColumnReader::~ColumnReader() { - delete stream_; -} - -ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata, - const SchemaElement* schema, InputStream* stream) - : metadata_(metadata), - schema_(schema), - stream_(stream), - current_decoder_(NULL), - num_buffered_values_(0), - num_decoded_values_(0), - buffered_values_offset_(0) { - int value_byte_size; - switch (metadata->type) { - case parquet::Type::BOOLEAN: - value_byte_size = 1; - break; - case parquet::Type::INT32: - value_byte_size = sizeof(int32_t); - break; - case parquet::Type::INT64: - value_byte_size = sizeof(int64_t); - break; - case parquet::Type::FLOAT: - value_byte_size = sizeof(float); - break; - case parquet::Type::DOUBLE: - value_byte_size = sizeof(double); - break; - case parquet::Type::BYTE_ARRAY: - value_byte_size = sizeof(ByteArray); - break; - default: - ParquetException::NYI("Unsupported type"); - } - - switch (metadata->codec) { - case CompressionCodec::UNCOMPRESSED: - break; - case CompressionCodec::SNAPPY: - decompressor_.reset(new SnappyCodec()); - break; - default: - ParquetException::NYI("Reading compressed data"); - } - - config_ = Config::DefaultConfig(); - values_buffer_.resize(config_.batch_size * value_byte_size); -} - -void ColumnReader::BatchDecode() { - buffered_values_offset_ = 0; - uint8_t* buf = &values_buffer_[0]; - int batch_size = config_.batch_size; - switch (metadata_->type) { - case parquet::Type::BOOLEAN: - num_decoded_values_ = - current_decoder_->GetBool(reinterpret_cast<bool*>(buf), batch_size); - break; - case parquet::Type::INT32: - num_decoded_values_ = - current_decoder_->GetInt32(reinterpret_cast<int32_t*>(buf), batch_size); - break; - case parquet::Type::INT64: - num_decoded_values_ = - current_decoder_->GetInt64(reinterpret_cast<int64_t*>(buf), batch_size); - break; - case parquet::Type::FLOAT: - num_decoded_values_ = - current_decoder_->GetFloat(reinterpret_cast<float*>(buf), batch_size); - break; - case parquet::Type::DOUBLE: - num_decoded_values_ = - current_decoder_->GetDouble(reinterpret_cast<double*>(buf), batch_size); - break; - case parquet::Type::BYTE_ARRAY: - num_decoded_values_ = - current_decoder_->GetByteArray(reinterpret_cast<ByteArray*>(buf), batch_size); - break; - default: - ParquetException::NYI("Unsupported type."); - } -} - -// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index -// encoding. -static bool IsDictionaryIndexEncoding(const Encoding::type& e) { - return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY; -} - -bool ColumnReader::ReadNewPage() { - // Loop until we find the next data page. - - while (true) { - int bytes_read = 0; - const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read); - if (bytes_read == 0) return false; - uint32_t header_size = bytes_read; - DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); - stream_->Read(header_size, &bytes_read); - - int compressed_len = current_page_header_.compressed_page_size; - int uncompressed_len = current_page_header_.uncompressed_page_size; - - // Read the compressed data page. - buffer = stream_->Read(compressed_len, &bytes_read); - if (bytes_read != compressed_len) ParquetException::EofException(); - - // Uncompress it if we need to - if (decompressor_ != NULL) { - // Grow the uncompressed buffer if we need to. - if (uncompressed_len > decompression_buffer_.size()) { - decompression_buffer_.resize(uncompressed_len); - } - decompressor_->Decompress( - compressed_len, buffer, uncompressed_len, &decompression_buffer_[0]); - buffer = &decompression_buffer_[0]; - } - - if (current_page_header_.type == PageType::DICTIONARY_PAGE) { - std::unordered_map<Encoding::type, std::shared_ptr<Decoder> >::iterator it = - decoders_.find(Encoding::RLE_DICTIONARY); - if (it != decoders_.end()) { - throw ParquetException("Column cannot have more than one dictionary."); - } - - PlainDecoder dictionary(schema_->type); - dictionary.SetData(current_page_header_.dictionary_page_header.num_values, - buffer, uncompressed_len); - std::shared_ptr<Decoder> decoder( - new DictionaryDecoder(schema_->type, &dictionary)); - decoders_[Encoding::RLE_DICTIONARY] = decoder; - current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get(); - continue; - } else if (current_page_header_.type == PageType::DATA_PAGE) { - // Read a data page. - num_buffered_values_ = current_page_header_.data_page_header.num_values; - - // Read definition levels. - if (schema_->repetition_type != FieldRepetitionType::REQUIRED) { - int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer); - buffer += sizeof(uint32_t); - definition_level_decoder_.reset( - new RleDecoder(buffer, num_definition_bytes, 1)); - buffer += num_definition_bytes; - uncompressed_len -= sizeof(uint32_t); - uncompressed_len -= num_definition_bytes; - } - - // TODO: repetition levels - - // Get a decoder object for this page or create a new decoder if this is the - // first page with this encoding. - Encoding::type encoding = current_page_header_.data_page_header.encoding; - if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY; - - std::unordered_map<Encoding::type, std::shared_ptr<Decoder> >::iterator it = - decoders_.find(encoding); - if (it != decoders_.end()) { - current_decoder_ = it->second.get(); - } else { - switch (encoding) { - case Encoding::PLAIN: { - std::shared_ptr<Decoder> decoder; - if (schema_->type == Type::BOOLEAN) { - decoder.reset(new BoolDecoder()); - } else { - decoder.reset(new PlainDecoder(schema_->type)); - } - decoders_[encoding] = decoder; - current_decoder_ = decoder.get(); - break; - } - case Encoding::RLE_DICTIONARY: - throw ParquetException("Dictionary page must be before data page."); - - case Encoding::DELTA_BINARY_PACKED: - case Encoding::DELTA_LENGTH_BYTE_ARRAY: - case Encoding::DELTA_BYTE_ARRAY: - ParquetException::NYI("Unsupported encoding"); - - default: - throw ParquetException("Unknown encoding type."); - } - } - current_decoder_->SetData(num_buffered_values_, buffer, uncompressed_len); - return true; - } else { - // We don't know what this page type is. We're allowed to skip non-data pages. - continue; - } - } - return true; -} - -} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt index f08901e..1809ea1 100644 --- a/src/parquet/CMakeLists.txt +++ b/src/parquet/CMakeLists.txt @@ -18,8 +18,10 @@ # Headers: top level install(FILES parquet.h + column_reader.h reader.h exception.h + types.h DESTINATION include/parquet) ADD_PARQUET_TEST(reader-test) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/column_reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc new file mode 100644 index 0000000..b7ececb --- /dev/null +++ b/src/parquet/column_reader.cc @@ -0,0 +1,194 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "parquet/column_reader.h" + +#include <algorithm> +#include <string> +#include <string.h> + +#include "parquet/encodings/encodings.h" +#include "parquet/compression/codec.h" +#include "parquet/thrift/util.h" +#include "parquet/util/input_stream.h" + +const int DATA_PAGE_SIZE = 64 * 1024; + +namespace parquet_cpp { + +using parquet::CompressionCodec; +using parquet::Encoding; +using parquet::FieldRepetitionType; +using parquet::PageType; +using parquet::Type; + + +ColumnReader::~ColumnReader() { + delete stream_; +} + +ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata, + const parquet::SchemaElement* schema, InputStream* stream) + : metadata_(metadata), + schema_(schema), + stream_(stream), + num_buffered_values_(0), + num_decoded_values_(0), + buffered_values_offset_(0) { + + switch (metadata->codec) { + case CompressionCodec::UNCOMPRESSED: + break; + case CompressionCodec::SNAPPY: + decompressor_.reset(new SnappyCodec()); + break; + default: + ParquetException::NYI("Reading compressed data"); + } + + config_ = Config::DefaultConfig(); +} + + +// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index +// encoding. +static bool IsDictionaryIndexEncoding(const Encoding::type& e) { + return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY; +} + +template <int TYPE> +bool TypedColumnReader<TYPE>::ReadNewPage() { + // Loop until we find the next data page. + + + while (true) { + int bytes_read = 0; + const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read); + if (bytes_read == 0) return false; + uint32_t header_size = bytes_read; + DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); + stream_->Read(header_size, &bytes_read); + + int compressed_len = current_page_header_.compressed_page_size; + int uncompressed_len = current_page_header_.uncompressed_page_size; + + // Read the compressed data page. + buffer = stream_->Read(compressed_len, &bytes_read); + if (bytes_read != compressed_len) ParquetException::EofException(); + + // Uncompress it if we need to + if (decompressor_ != NULL) { + // Grow the uncompressed buffer if we need to. + if (uncompressed_len > decompression_buffer_.size()) { + decompression_buffer_.resize(uncompressed_len); + } + decompressor_->Decompress(compressed_len, buffer, uncompressed_len, + &decompression_buffer_[0]); + buffer = &decompression_buffer_[0]; + } + + if (current_page_header_.type == PageType::DICTIONARY_PAGE) { + auto it = decoders_.find(Encoding::RLE_DICTIONARY); + if (it != decoders_.end()) { + throw ParquetException("Column cannot have more than one dictionary."); + } + + PlainDecoder<TYPE> dictionary(schema_); + dictionary.SetData(current_page_header_.dictionary_page_header.num_values, + buffer, uncompressed_len); + std::shared_ptr<DecoderType> decoder(new DictionaryDecoder<TYPE>(schema_, &dictionary)); + + decoders_[Encoding::RLE_DICTIONARY] = decoder; + current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get(); + continue; + } else if (current_page_header_.type == PageType::DATA_PAGE) { + // Read a data page. + num_buffered_values_ = current_page_header_.data_page_header.num_values; + + // Read definition levels. + if (schema_->repetition_type != FieldRepetitionType::REQUIRED) { + int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer); + buffer += sizeof(uint32_t); + definition_level_decoder_.reset( + new RleDecoder(buffer, num_definition_bytes, 1)); + buffer += num_definition_bytes; + uncompressed_len -= sizeof(uint32_t); + uncompressed_len -= num_definition_bytes; + } + + // TODO: repetition levels + + // Get a decoder object for this page or create a new decoder if this is the + // first page with this encoding. + Encoding::type encoding = current_page_header_.data_page_header.encoding; + if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY; + + auto it = decoders_.find(encoding); + if (it != decoders_.end()) { + current_decoder_ = it->second.get(); + } else { + switch (encoding) { + case Encoding::PLAIN: { + std::shared_ptr<DecoderType> decoder(new PlainDecoder<TYPE>(schema_)); + decoders_[encoding] = decoder; + current_decoder_ = decoder.get(); + break; + } + case Encoding::RLE_DICTIONARY: + throw ParquetException("Dictionary page must be before data page."); + + case Encoding::DELTA_BINARY_PACKED: + case Encoding::DELTA_LENGTH_BYTE_ARRAY: + case Encoding::DELTA_BYTE_ARRAY: + ParquetException::NYI("Unsupported encoding"); + + default: + throw ParquetException("Unknown encoding type."); + } + } + current_decoder_->SetData(num_buffered_values_, buffer, uncompressed_len); + return true; + } else { + // We don't know what this page type is. We're allowed to skip non-data pages. + continue; + } + } + return true; +} + +std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData* metadata, + const parquet::SchemaElement* element, InputStream* stream) { + switch (metadata->type) { + case Type::BOOLEAN: + return std::make_shared<BoolReader>(metadata, element, stream); + case Type::INT32: + return std::make_shared<Int32Reader>(metadata, element, stream); + case Type::INT64: + return std::make_shared<Int64Reader>(metadata, element, stream); + case Type::INT96: + return std::make_shared<Int96Reader>(metadata, element, stream); + case Type::FLOAT: + return std::make_shared<FloatReader>(metadata, element, stream); + case Type::DOUBLE: + return std::make_shared<DoubleReader>(metadata, element, stream); + case Type::BYTE_ARRAY: + return std::make_shared<ByteArrayReader>(metadata, element, stream); + default: + ParquetException::NYI("type reader not implemented"); + } + // Unreachable code, but supress compiler warning + return std::shared_ptr<ColumnReader>(nullptr); +} + +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/column_reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h new file mode 100644 index 0000000..cd6cc02 --- /dev/null +++ b/src/parquet/column_reader.h @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_COLUMN_READER_H +#define PARQUET_COLUMN_READER_H + +#include <exception> +#include <cstdint> +#include <cstring> +#include <memory> +#include <string> +#include <unordered_map> +#include <vector> + +#include "parquet/exception.h" +#include "parquet/types.h" +#include "parquet/thrift/parquet_constants.h" +#include "parquet/thrift/parquet_types.h" +#include "parquet/util/input_stream.h" +#include "parquet/encodings/encodings.h" +#include "parquet/util/rle-encoding.h" + +namespace std { + +template <> +struct hash<parquet::Encoding::type> { + std::size_t operator()(const parquet::Encoding::type& k) const { + return hash<int>()(static_cast<int>(k)); + } +}; + +} // namespace std + +namespace parquet_cpp { + +class Codec; + +class ColumnReader { + public: + + struct Config { + int batch_size; + + static Config DefaultConfig() { + Config config; + config.batch_size = 128; + return config; + } + }; + + ColumnReader(const parquet::ColumnMetaData*, + const parquet::SchemaElement*, InputStream* stream); + + virtual ~ColumnReader(); + + static std::shared_ptr<ColumnReader> Make(const parquet::ColumnMetaData*, + const parquet::SchemaElement*, InputStream* stream); + + virtual bool ReadNewPage() = 0; + + // Returns true if there are still values in this column. + bool HasNext() { + if (num_buffered_values_ == 0) { + ReadNewPage(); + if (num_buffered_values_ == 0) return false; + } + return true; + } + + parquet::Type::type type() const { + return metadata_->type; + } + + const parquet::ColumnMetaData* metadata() const { + return metadata_; + } + + protected: + // Reads the next definition and repetition level. Returns true if the value is NULL. + bool ReadDefinitionRepetitionLevels(int* def_level, int* rep_level); + + Config config_; + + const parquet::ColumnMetaData* metadata_; + const parquet::SchemaElement* schema_; + InputStream* stream_; + + // Compression codec to use. + std::unique_ptr<Codec> decompressor_; + std::vector<uint8_t> decompression_buffer_; + + parquet::PageHeader current_page_header_; + + // Not set if field is required. + std::unique_ptr<RleDecoder> definition_level_decoder_; + // Not set for flat schemas. + std::unique_ptr<RleDecoder> repetition_level_decoder_; + int num_buffered_values_; + + int num_decoded_values_; + int buffered_values_offset_; +}; + + +// API to read values from a single column. This is the main client facing API. +template <int TYPE> +class TypedColumnReader : public ColumnReader { + public: + typedef typename type_traits<TYPE>::value_type T; + + TypedColumnReader(const parquet::ColumnMetaData* metadata, + const parquet::SchemaElement* schema, InputStream* stream) : + ColumnReader(metadata, schema, stream), + current_decoder_(NULL) { + size_t value_byte_size = type_traits<TYPE>::value_byte_size; + values_buffer_.resize(config_.batch_size * value_byte_size); + } + + // Returns the next value of this type. + // TODO: batchify this interface. + T NextValue(int* def_level, int* rep_level) { + if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return T(); + if (buffered_values_offset_ == num_decoded_values_) BatchDecode(); + return reinterpret_cast<T*>(&values_buffer_[0])[buffered_values_offset_++]; + } + + private: + void BatchDecode(); + + virtual bool ReadNewPage(); + + typedef Decoder<TYPE> DecoderType; + + // Map of compression type to decompressor object. + std::unordered_map<parquet::Encoding::type, std::shared_ptr<DecoderType> > decoders_; + + DecoderType* current_decoder_; + std::vector<uint8_t> values_buffer_; +}; + +typedef TypedColumnReader<parquet::Type::BOOLEAN> BoolReader; +typedef TypedColumnReader<parquet::Type::INT32> Int32Reader; +typedef TypedColumnReader<parquet::Type::INT64> Int64Reader; +typedef TypedColumnReader<parquet::Type::INT96> Int96Reader; +typedef TypedColumnReader<parquet::Type::FLOAT> FloatReader; +typedef TypedColumnReader<parquet::Type::DOUBLE> DoubleReader; +typedef TypedColumnReader<parquet::Type::BYTE_ARRAY> ByteArrayReader; + + +template <int TYPE> +void TypedColumnReader<TYPE>::BatchDecode() { + buffered_values_offset_ = 0; + T* buf = reinterpret_cast<T*>(&values_buffer_[0]); + int batch_size = config_.batch_size; + num_decoded_values_ = current_decoder_->Decode(buf, batch_size); +} + +inline bool ColumnReader::ReadDefinitionRepetitionLevels(int* def_level, int* rep_level) { + *rep_level = 1; + if (definition_level_decoder_ && !definition_level_decoder_->Get(def_level)) { + ParquetException::EofException(); + } + --num_buffered_values_; + return *def_level == 0; +} + +} // namespace parquet_cpp + +#endif // PARQUET_COLUMN_READER_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/compression/codec.h ---------------------------------------------------------------------- diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h index 8166847..07648d7 100644 --- a/src/parquet/compression/codec.h +++ b/src/parquet/compression/codec.h @@ -15,11 +15,9 @@ #ifndef PARQUET_COMPRESSION_CODEC_H #define PARQUET_COMPRESSION_CODEC_H -#include "parquet/parquet.h" - #include <cstdint> -#include "parquet/thrift/parquet_constants.h" -#include "parquet/thrift/parquet_types.h" + +#include "parquet/exception.h" namespace parquet_cpp { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/CMakeLists.txt b/src/parquet/encodings/CMakeLists.txt index 72baf48..544b1e1 100644 --- a/src/parquet/encodings/CMakeLists.txt +++ b/src/parquet/encodings/CMakeLists.txt @@ -15,7 +15,6 @@ # Headers: encodings install(FILES encodings.h - bool-encoding.h delta-bit-pack-encoding.h delta-byte-array-encoding.h delta-length-byte-array-encoding.h http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/bool-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/bool-encoding.h b/src/parquet/encodings/bool-encoding.h deleted file mode 100644 index 8eb55bc..0000000 --- a/src/parquet/encodings/bool-encoding.h +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2012 Cloudera Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef PARQUET_BOOL_ENCODING_H -#define PARQUET_BOOL_ENCODING_H - -#include "parquet/encodings/encodings.h" - -#include <algorithm> - -namespace parquet_cpp { - -class BoolDecoder : public Decoder { - public: - BoolDecoder() : Decoder(parquet::Type::BOOLEAN, parquet::Encoding::PLAIN) { } - - virtual void SetData(int num_values, const uint8_t* data, int len) { - num_values_ = num_values; - decoder_ = RleDecoder(data, len, 1); - } - - virtual int GetBool(bool* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - if (!decoder_.Get(&buffer[i])) ParquetException::EofException(); - } - num_values_ -= max_values; - return max_values; - } - - private: - RleDecoder decoder_; -}; - -} // namespace parquet_cpp - -#endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/delta-bit-pack-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/delta-bit-pack-encoding.h b/src/parquet/encodings/delta-bit-pack-encoding.h index 77a3b26..b437734 100644 --- a/src/parquet/encodings/delta-bit-pack-encoding.h +++ b/src/parquet/encodings/delta-bit-pack-encoding.h @@ -22,10 +22,16 @@ namespace parquet_cpp { -class DeltaBitPackDecoder : public Decoder { +template <int TYPE> +class DeltaBitPackDecoder : public Decoder<TYPE> { public: - explicit DeltaBitPackDecoder(const parquet::Type::type& type) - : Decoder(type, parquet::Encoding::DELTA_BINARY_PACKED) { + typedef typename type_traits<TYPE>::value_type T; + + explicit DeltaBitPackDecoder(const parquet::SchemaElement* schema) + : Decoder<TYPE>(schema, parquet::Encoding::DELTA_BINARY_PACKED) { + + parquet::Type::type type = type_traits<TYPE>::parquet_type; + if (type != parquet::Type::INT32 && type != parquet::Type::INT64) { throw ParquetException("Delta bit pack encoding should only be for integer data."); } @@ -38,15 +44,13 @@ class DeltaBitPackDecoder : public Decoder { values_current_mini_block_ = 0; } - virtual int GetInt32(int32_t* buffer, int max_values) { - return GetInternal(buffer, max_values); - } - - virtual int GetInt64(int64_t* buffer, int max_values) { + virtual int Decode(T* buffer, int max_values) { return GetInternal(buffer, max_values); } private: + using Decoder<TYPE>::num_values_; + void InitBlock() { uint64_t block_size; if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/delta-byte-array-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/delta-byte-array-encoding.h b/src/parquet/encodings/delta-byte-array-encoding.h index 3396586..a1b5b48 100644 --- a/src/parquet/encodings/delta-byte-array-encoding.h +++ b/src/parquet/encodings/delta-byte-array-encoding.h @@ -21,12 +21,12 @@ namespace parquet_cpp { -class DeltaByteArrayDecoder : public Decoder { +class DeltaByteArrayDecoder : public Decoder<parquet::Type::BYTE_ARRAY> { public: - DeltaByteArrayDecoder() - : Decoder(parquet::Type::BYTE_ARRAY, parquet::Encoding::DELTA_BYTE_ARRAY), - prefix_len_decoder_(parquet::Type::INT32), - suffix_decoder_() { + explicit DeltaByteArrayDecoder(const parquet::SchemaElement* schema) + : Decoder<parquet::Type::BYTE_ARRAY>(schema, parquet::Encoding::DELTA_BYTE_ARRAY), + prefix_len_decoder_(nullptr), + suffix_decoder_(nullptr) { } virtual void SetData(int num_values, const uint8_t* data, int len) { @@ -43,13 +43,13 @@ class DeltaByteArrayDecoder : public Decoder { // TODO: this doesn't work and requires memory management. We need to allocate // new strings to store the results. - virtual int GetByteArray(ByteArray* buffer, int max_values) { + virtual int Decode(ByteArray* buffer, int max_values) { max_values = std::min(max_values, num_values_); for (int i = 0; i < max_values; ++i) { int prefix_len = 0; - prefix_len_decoder_.GetInt32(&prefix_len, 1); + prefix_len_decoder_.Decode(&prefix_len, 1); ByteArray suffix; - suffix_decoder_.GetByteArray(&suffix, 1); + suffix_decoder_.Decode(&suffix, 1); buffer[i].len = prefix_len + suffix.len; uint8_t* result = reinterpret_cast<uint8_t*>(malloc(buffer[i].len)); @@ -64,7 +64,9 @@ class DeltaByteArrayDecoder : public Decoder { } private: - DeltaBitPackDecoder prefix_len_decoder_; + using Decoder<parquet::Type::BYTE_ARRAY>::num_values_; + + DeltaBitPackDecoder<parquet::Type::INT32> prefix_len_decoder_; DeltaLengthByteArrayDecoder suffix_decoder_; ByteArray last_value_; }; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/delta-length-byte-array-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/delta-length-byte-array-encoding.h b/src/parquet/encodings/delta-length-byte-array-encoding.h index 06bf39d..a6e4c58 100644 --- a/src/parquet/encodings/delta-length-byte-array-encoding.h +++ b/src/parquet/encodings/delta-length-byte-array-encoding.h @@ -21,11 +21,12 @@ namespace parquet_cpp { -class DeltaLengthByteArrayDecoder : public Decoder { +class DeltaLengthByteArrayDecoder : public Decoder<parquet::Type::BYTE_ARRAY> { public: - DeltaLengthByteArrayDecoder() - : Decoder(parquet::Type::BYTE_ARRAY, parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY), - len_decoder_(parquet::Type::INT32) { + explicit DeltaLengthByteArrayDecoder(const parquet::SchemaElement* schema) + : Decoder<parquet::Type::BYTE_ARRAY>( + schema, parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY), + len_decoder_(nullptr) { } virtual void SetData(int num_values, const uint8_t* data, int len) { @@ -38,10 +39,10 @@ class DeltaLengthByteArrayDecoder : public Decoder { len_ = len - 4 - total_lengths_len; } - virtual int GetByteArray(ByteArray* buffer, int max_values) { + virtual int Decode(ByteArray* buffer, int max_values) { max_values = std::min(max_values, num_values_); int lengths[max_values]; - len_decoder_.GetInt32(lengths, max_values); + len_decoder_.Decode(lengths, max_values); for (int i = 0; i < max_values; ++i) { buffer[i].len = lengths[i]; buffer[i].ptr = data_; @@ -53,7 +54,8 @@ class DeltaLengthByteArrayDecoder : public Decoder { } private: - DeltaBitPackDecoder len_decoder_; + using Decoder<parquet::Type::BYTE_ARRAY>::num_values_; + DeltaBitPackDecoder<parquet::Type::INT32> len_decoder_; const uint8_t* data_; int len_; }; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/dictionary-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h index 2501b2a..cb8fb30 100644 --- a/src/parquet/encodings/dictionary-encoding.h +++ b/src/parquet/encodings/dictionary-encoding.h @@ -22,56 +22,22 @@ namespace parquet_cpp { -class DictionaryDecoder : public Decoder { +template <int TYPE> +class DictionaryDecoder : public Decoder<TYPE> { public: + typedef typename type_traits<TYPE>::value_type T; + // Initializes the dictionary with values from 'dictionary'. The data in dictionary // is not guaranteed to persist in memory after this call so the dictionary decoder // needs to copy the data out if necessary. - DictionaryDecoder(const parquet::Type::type& type, Decoder* dictionary) - : Decoder(type, parquet::Encoding::RLE_DICTIONARY) { - int num_dictionary_values = dictionary->values_left(); - switch (type) { - case parquet::Type::BOOLEAN: - throw ParquetException("Boolean cols should not be dictionary encoded."); - - case parquet::Type::INT32: - int32_dictionary_.resize(num_dictionary_values); - dictionary->GetInt32(&int32_dictionary_[0], num_dictionary_values); - break; - case parquet::Type::INT64: - int64_dictionary_.resize(num_dictionary_values); - dictionary->GetInt64(&int64_dictionary_[0], num_dictionary_values); - break; - case parquet::Type::FLOAT: - float_dictionary_.resize(num_dictionary_values); - dictionary->GetFloat(&float_dictionary_[0], num_dictionary_values); - break; - case parquet::Type::DOUBLE: - double_dictionary_.resize(num_dictionary_values); - dictionary->GetDouble(&double_dictionary_[0], num_dictionary_values); - break; - case parquet::Type::BYTE_ARRAY: { - byte_array_dictionary_.resize(num_dictionary_values); - dictionary->GetByteArray(&byte_array_dictionary_[0], num_dictionary_values); - int total_size = 0; - for (int i = 0; i < num_dictionary_values; ++i) { - total_size += byte_array_dictionary_[i].len; - } - byte_array_data_.resize(total_size); - int offset = 0; - for (int i = 0; i < num_dictionary_values; ++i) { - memcpy(&byte_array_data_[offset], - byte_array_dictionary_[i].ptr, byte_array_dictionary_[i].len); - byte_array_dictionary_[i].ptr = &byte_array_data_[offset]; - offset += byte_array_dictionary_[i].len; - } - break; - } - default: - ParquetException::NYI("Unsupported dictionary type"); - } + DictionaryDecoder(const parquet::SchemaElement* schema, Decoder<TYPE>* dictionary) + : Decoder<TYPE>(schema, parquet::Encoding::RLE_DICTIONARY) { + Init(dictionary); } + // Perform type-specific initiatialization + void Init(Decoder<TYPE>* dictionary); + virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; if (len == 0) return; @@ -81,47 +47,17 @@ class DictionaryDecoder : public Decoder { idx_decoder_ = RleDecoder(data, len, bit_width); } - virtual int GetInt32(int32_t* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - buffer[i] = int32_dictionary_[index()]; - } - return max_values; - } - - virtual int GetInt64(int64_t* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - buffer[i] = int64_dictionary_[index()]; - } - return max_values; - } - - virtual int GetFloat(float* buffer, int max_values) { + virtual int Decode(T* buffer, int max_values) { max_values = std::min(max_values, num_values_); for (int i = 0; i < max_values; ++i) { - buffer[i] = float_dictionary_[index()]; - } - return max_values; - } - - virtual int GetDouble(double* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - buffer[i] = double_dictionary_[index()]; - } - return max_values; - } - - virtual int GetByteArray(ByteArray* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - buffer[i] = byte_array_dictionary_[index()]; + buffer[i] = dictionary_[index()]; } return max_values; } private: + using Decoder<TYPE>::num_values_; + int index() { int idx = 0; if (!idx_decoder_.Get(&idx)) ParquetException::EofException(); @@ -130,11 +66,7 @@ class DictionaryDecoder : public Decoder { } // Only one is set. - std::vector<int32_t> int32_dictionary_; - std::vector<int64_t> int64_dictionary_; - std::vector<float> float_dictionary_; - std::vector<double> double_dictionary_; - std::vector<ByteArray> byte_array_dictionary_; + std::vector<T> dictionary_; // Data that contains the byte array data (byte_array_dictionary_ just has the // pointers). @@ -143,6 +75,39 @@ class DictionaryDecoder : public Decoder { RleDecoder idx_decoder_; }; +template <int TYPE> +inline void DictionaryDecoder<TYPE>::Init(Decoder<TYPE>* dictionary) { + int num_dictionary_values = dictionary->values_left(); + dictionary_.resize(num_dictionary_values); + dictionary->Decode(&dictionary_[0], num_dictionary_values); +} + +template <> +inline void DictionaryDecoder<parquet::Type::BOOLEAN>::Init( + Decoder<parquet::Type::BOOLEAN>* dictionary) { + ParquetException::NYI("Dictionary encoding is not implemented for boolean values"); +} + +template <> +inline void DictionaryDecoder<parquet::Type::BYTE_ARRAY>::Init( + Decoder<parquet::Type::BYTE_ARRAY>* dictionary) { + int num_dictionary_values = dictionary->values_left(); + dictionary_.resize(num_dictionary_values); + dictionary->Decode(&dictionary_[0], num_dictionary_values); + + int total_size = 0; + for (int i = 0; i < num_dictionary_values; ++i) { + total_size += dictionary_[i].len; + } + byte_array_data_.resize(total_size); + int offset = 0; + for (int i = 0; i < num_dictionary_values; ++i) { + memcpy(&byte_array_data_[offset], dictionary_[i].ptr, dictionary_[i].len); + dictionary_[i].ptr = &byte_array_data_[offset]; + offset += dictionary_[i].len; + } +} + } // namespace parquet_cpp #endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/encodings.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encodings.h b/src/parquet/encodings/encodings.h index 9211bf8..2017fca 100644 --- a/src/parquet/encodings/encodings.h +++ b/src/parquet/encodings/encodings.h @@ -17,6 +17,8 @@ #include <cstdint> +#include "parquet/types.h" + #include "parquet/thrift/parquet_constants.h" #include "parquet/thrift/parquet_types.h" #include "parquet/util/rle-encoding.h" @@ -24,8 +26,12 @@ namespace parquet_cpp { +// The Decoder template is parameterized on parquet::Type::type +template <int TYPE> class Decoder { public: + typedef typename type_traits<TYPE>::value_type T; + virtual ~Decoder() {} // Sets the data for a new page. This will be called multiple times on the same @@ -36,22 +42,7 @@ class Decoder { // the decoder would decode put to 'max_values', storing the result in 'buffer'. // The function returns the number of values decoded, which should be max_values // except for end of the current data page. - virtual int GetBool(bool* buffer, int max_values) { - throw ParquetException("Decoder does not implement this type."); - } - virtual int GetInt32(int32_t* buffer, int max_values) { - throw ParquetException("Decoder does not implement this type."); - } - virtual int GetInt64(int64_t* buffer, int max_values) { - throw ParquetException("Decoder does not implement this type."); - } - virtual int GetFloat(float* buffer, int max_values) { - throw ParquetException("Decoder does not implement this type."); - } - virtual int GetDouble(double* buffer, int max_values) { - throw ParquetException("Decoder does not implement this type."); - } - virtual int GetByteArray(ByteArray* buffer, int max_values) { + virtual int Decode(T* buffer, int max_values) { throw ParquetException("Decoder does not implement this type."); } @@ -62,19 +53,22 @@ class Decoder { const parquet::Encoding::type encoding() const { return encoding_; } protected: - Decoder(const parquet::Type::type& type, const parquet::Encoding::type& encoding) - : type_(type), encoding_(encoding), num_values_(0) {} + explicit Decoder(const parquet::SchemaElement* schema, + const parquet::Encoding::type& encoding) + : schema_(schema), encoding_(encoding), num_values_(0) {} + + // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY + const parquet::SchemaElement* schema_; - const parquet::Type::type type_; const parquet::Encoding::type encoding_; int num_values_; }; } // namespace parquet_cpp -#include "parquet/encodings/bool-encoding.h" #include "parquet/encodings/plain-encoding.h" #include "parquet/encodings/dictionary-encoding.h" + #include "parquet/encodings/delta-bit-pack-encoding.h" #include "parquet/encodings/delta-length-byte-array-encoding.h" #include "parquet/encodings/delta-byte-array-encoding.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/plain-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h index b094cdb..5fb460e 100644 --- a/src/parquet/encodings/plain-encoding.h +++ b/src/parquet/encodings/plain-encoding.h @@ -21,11 +21,15 @@ namespace parquet_cpp { -class PlainDecoder : public Decoder { +template <int TYPE> +class PlainDecoder : public Decoder<TYPE> { public: - explicit PlainDecoder(const parquet::Type::type& type) - : Decoder(type, parquet::Encoding::PLAIN), data_(NULL), len_(0) { - } + typedef typename type_traits<TYPE>::value_type T; + using Decoder<TYPE>::num_values_; + + explicit PlainDecoder(const parquet::SchemaElement* schema) : + Decoder<TYPE>(schema, parquet::Encoding::PLAIN), + data_(NULL), len_(0) {} virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; @@ -33,49 +37,61 @@ class PlainDecoder : public Decoder { len_ = len; } - int GetValues(void* buffer, int max_values, int byte_size) { - max_values = std::min(max_values, num_values_); - int size = max_values * byte_size; - if (len_ < size) ParquetException::EofException(); - memcpy(buffer, data_, size); - data_ += size; - len_ -= size; - num_values_ -= max_values; - return max_values; - } + virtual int Decode(T* buffer, int max_values); + private: + const uint8_t* data_; + int len_; +}; - virtual int GetInt32(int32_t* buffer, int max_values) { - return GetValues(buffer, max_values, sizeof(int32_t)); - } +template <int TYPE> +inline int PlainDecoder<TYPE>::Decode(T* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + int size = max_values * sizeof(T); + if (len_ < size) ParquetException::EofException(); + memcpy(buffer, data_, size); + data_ += size; + len_ -= size; + num_values_ -= max_values; + return max_values; +} - virtual int GetInt64(int64_t* buffer, int max_values) { - return GetValues(buffer, max_values, sizeof(int64_t)); +// Template specialization for BYTE_ARRAY +template <> +inline int PlainDecoder<parquet::Type::BYTE_ARRAY>::Decode(ByteArray* buffer, + int max_values) { + max_values = std::min(max_values, num_values_); + for (int i = 0; i < max_values; ++i) { + buffer[i].len = *reinterpret_cast<const uint32_t*>(data_); + if (len_ < sizeof(uint32_t) + buffer[i].len) ParquetException::EofException(); + buffer[i].ptr = data_ + sizeof(uint32_t); + data_ += sizeof(uint32_t) + buffer[i].len; + len_ -= sizeof(uint32_t) + buffer[i].len; } + num_values_ -= max_values; + return max_values; +} - virtual int GetFloat(float* buffer, int max_values) { - return GetValues(buffer, max_values, sizeof(float)); - } +template <> +class PlainDecoder<parquet::Type::BOOLEAN> : public Decoder<parquet::Type::BOOLEAN> { + public: + explicit PlainDecoder(const parquet::SchemaElement* schema) : + Decoder<parquet::Type::BOOLEAN>(schema, parquet::Encoding::PLAIN) {} - virtual int GetDouble(double* buffer, int max_values) { - return GetValues(buffer, max_values, sizeof(double)); + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + decoder_ = RleDecoder(data, len, 1); } - virtual int GetByteArray(ByteArray* buffer, int max_values) { + virtual int Decode(bool* buffer, int max_values) { max_values = std::min(max_values, num_values_); for (int i = 0; i < max_values; ++i) { - buffer[i].len = *reinterpret_cast<const uint32_t*>(data_); - if (len_ < sizeof(uint32_t) + buffer[i].len) ParquetException::EofException(); - buffer[i].ptr = data_ + sizeof(uint32_t); - data_ += sizeof(uint32_t) + buffer[i].len; - len_ -= sizeof(uint32_t) + buffer[i].len; + if (!decoder_.Get(&buffer[i])) ParquetException::EofException(); } num_values_ -= max_values; return max_values; } - private: - const uint8_t* data_; - int len_; + RleDecoder decoder_; }; } // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/parquet.h ---------------------------------------------------------------------- diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h index 4469a82..0fd3e97 100644 --- a/src/parquet/parquet.h +++ b/src/parquet/parquet.h @@ -27,199 +27,8 @@ #include <vector> #include "parquet/exception.h" -#include "parquet/thrift/parquet_constants.h" -#include "parquet/thrift/parquet_types.h" -#include "parquet/util/rle-encoding.h" - -namespace std { - -template <> -struct hash<parquet::Encoding::type> { - std::size_t operator()(const parquet::Encoding::type& k) const { - return hash<int>()(static_cast<int>(k)); - } -}; - -} // namespace std - -namespace parquet_cpp { - -class Codec; -class Decoder; - -struct ByteArray { - uint32_t len; - const uint8_t* ptr; -}; - -// Interface for the column reader to get the bytes. The interface is a stream -// interface, meaning the bytes in order and once a byte is read, it does not -// need to be read again. -class InputStream { - public: - // Returns the next 'num_to_peek' without advancing the current position. - // *num_bytes will contain the number of bytes returned which can only be - // less than num_to_peek at end of stream cases. - // Since the position is not advanced, calls to this function are idempotent. - // The buffer returned to the caller is still owned by the input stream and must - // stay valid until the next call to Peek() or Read(). - virtual const uint8_t* Peek(int num_to_peek, int* num_bytes) = 0; - - // Identical to Peek(), except the current position in the stream is advanced by - // *num_bytes. - virtual const uint8_t* Read(int num_to_read, int* num_bytes) = 0; - - virtual ~InputStream() {} - - protected: - InputStream() {} -}; - -// Implementation of an InputStream when all the bytes are in memory. -class InMemoryInputStream : public InputStream { - public: - InMemoryInputStream(const uint8_t* buffer, int64_t len); - virtual const uint8_t* Peek(int num_to_peek, int* num_bytes); - virtual const uint8_t* Read(int num_to_read, int* num_bytes); - - private: - const uint8_t* buffer_; - int64_t len_; - int64_t offset_; -}; - -// A wrapper for InMemoryInputStream to manage the memory. -class ScopedInMemoryInputStream : public InputStream { - public: - ScopedInMemoryInputStream(int64_t len); - uint8_t* data(); - int64_t size(); - virtual const uint8_t* Peek(int num_to_peek, int* num_bytes); - virtual const uint8_t* Read(int num_to_read, int* num_bytes); - - private: - std::vector<uint8_t> buffer_; - std::unique_ptr<InMemoryInputStream> stream_; -}; - -// API to read values from a single column. This is the main client facing API. -class ColumnReader { - public: - struct Config { - int batch_size; - - static Config DefaultConfig() { - Config config; - config.batch_size = 128; - return config; - } - }; - - ColumnReader(const parquet::ColumnMetaData*, - const parquet::SchemaElement*, InputStream* stream); - - ~ColumnReader(); - - // Returns true if there are still values in this column. - bool HasNext(); - - // Returns the next value of this type. - // TODO: batchify this interface. - bool GetBool(int* definition_level, int* repetition_level); - int32_t GetInt32(int* definition_level, int* repetition_level); - int64_t GetInt64(int* definition_level, int* repetition_level); - float GetFloat(int* definition_level, int* repetition_level); - double GetDouble(int* definition_level, int* repetition_level); - ByteArray GetByteArray(int* definition_level, int* repetition_level); - - private: - bool ReadNewPage(); - // Reads the next definition and repetition level. Returns true if the value is NULL. - bool ReadDefinitionRepetitionLevels(int* def_level, int* rep_level); - - void BatchDecode(); - - Config config_; - - const parquet::ColumnMetaData* metadata_; - const parquet::SchemaElement* schema_; - InputStream* stream_; - - // Compression codec to use. - std::unique_ptr<Codec> decompressor_; - std::vector<uint8_t> decompression_buffer_; - - // Map of compression type to decompressor object. - std::unordered_map<parquet::Encoding::type, std::shared_ptr<Decoder> > decoders_; - - parquet::PageHeader current_page_header_; - - // Not set if field is required. - std::unique_ptr<RleDecoder> definition_level_decoder_; - // Not set for flat schemas. - std::unique_ptr<RleDecoder> repetition_level_decoder_; - Decoder* current_decoder_; - int num_buffered_values_; - - std::vector<uint8_t> values_buffer_; - int num_decoded_values_; - int buffered_values_offset_; -}; - - -inline bool ColumnReader::HasNext() { - if (num_buffered_values_ == 0) { - ReadNewPage(); - if (num_buffered_values_ == 0) return false; - } - return true; -} - -inline bool ColumnReader::GetBool(int* def_level, int* rep_level) { - if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return bool(); - if (buffered_values_offset_ == num_decoded_values_) BatchDecode(); - return reinterpret_cast<bool*>(&values_buffer_[0])[buffered_values_offset_++]; -} - -inline int32_t ColumnReader::GetInt32(int* def_level, int* rep_level) { - if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return int32_t(); - if (buffered_values_offset_ == num_decoded_values_) BatchDecode(); - return reinterpret_cast<int32_t*>(&values_buffer_[0])[buffered_values_offset_++]; -} - -inline int64_t ColumnReader::GetInt64(int* def_level, int* rep_level) { - if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return int64_t(); - if (buffered_values_offset_ == num_decoded_values_) BatchDecode(); - return reinterpret_cast<int64_t*>(&values_buffer_[0])[buffered_values_offset_++]; -} - -inline float ColumnReader::GetFloat(int* def_level, int* rep_level) { - if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return float(); - if (buffered_values_offset_ == num_decoded_values_) BatchDecode(); - return reinterpret_cast<float*>(&values_buffer_[0])[buffered_values_offset_++]; -} - -inline double ColumnReader::GetDouble(int* def_level, int* rep_level) { - if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return double(); - if (buffered_values_offset_ == num_decoded_values_) BatchDecode(); - return reinterpret_cast<double*>(&values_buffer_[0])[buffered_values_offset_++]; -} - -inline ByteArray ColumnReader::GetByteArray(int* def_level, int* rep_level) { - if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return ByteArray(); - if (buffered_values_offset_ == num_decoded_values_) BatchDecode(); - return reinterpret_cast<ByteArray*>(&values_buffer_[0])[buffered_values_offset_++]; -} - -inline bool ColumnReader::ReadDefinitionRepetitionLevels(int* def_level, int* rep_level) { - *rep_level = 1; - if (definition_level_decoder_ && !definition_level_decoder_->Get(def_level)) { - ParquetException::EofException(); - } - --num_buffered_values_; - return *def_level == 0; -} - -} // namespace parquet_cpp +#include "parquet/reader.h" +#include "parquet/column_reader.h" +#include "parquet/util/input_stream.h" #endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc index 0f06f3f..1459afc 100644 --- a/src/parquet/reader-test.cc +++ b/src/parquet/reader-test.cc @@ -42,9 +42,7 @@ class TestAllTypesPlain : public ::testing::Test { reader_.Open(&file_); } - void TearDown() { - reader_.Close(); - } + void TearDown() {} protected: LocalFile file_; @@ -56,4 +54,14 @@ TEST_F(TestAllTypesPlain, ParseMetaData) { reader_.ParseMetaData(); } +TEST_F(TestAllTypesPlain, DebugPrintWorks) { + std::stringstream ss; + + // Automatically parses metadata + reader_.DebugPrint(ss); + + std::string result = ss.str(); + ASSERT_TRUE(result.size() > 0); +} + } // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc index 7ccd98c..7c727ba 100644 --- a/src/parquet/reader.cc +++ b/src/parquet/reader.cc @@ -18,18 +18,30 @@ #include "parquet/reader.h" #include <cstdio> +#include <cstring> +#include <memory> +#include <sstream> +#include <string> #include <vector> +#include "parquet/column_reader.h" #include "parquet/exception.h" + #include "parquet/thrift/util.h" +#include "parquet/util/input_stream.h" + +using std::string; +using std::vector; +using parquet::Type; + namespace parquet_cpp { // ---------------------------------------------------------------------- // LocalFile methods LocalFile::~LocalFile() { - // You must explicitly call Close + CloseFile(); } void LocalFile::Open(const std::string& path) { @@ -39,6 +51,11 @@ void LocalFile::Open(const std::string& path) { } void LocalFile::Close() { + // Pure virtual + CloseFile(); +} + +void LocalFile::CloseFile() { if (is_open_) { fclose(file_); is_open_ = false; @@ -58,9 +75,51 @@ size_t LocalFile::Tell() { return ftell(file_); } -void LocalFile::Read(size_t nbytes, uint8_t* buffer, - size_t* bytes_read) { - *bytes_read = fread(buffer, 1, nbytes, file_); +size_t LocalFile::Read(size_t nbytes, uint8_t* buffer) { + return fread(buffer, 1, nbytes, file_); +} + +// ---------------------------------------------------------------------- +// RowGroupReader + +ColumnReader* RowGroupReader::Column(size_t i) { + // TODO: boundschecking + auto it = column_readers_.find(i); + if (it != column_readers_.end()) { + // Already have constructed the ColumnReader + return it->second.get(); + } + + const parquet::ColumnChunk& col = row_group_->columns[i]; + + size_t col_start = col.meta_data.data_page_offset; + if (col.meta_data.__isset.dictionary_page_offset && + col_start > col.meta_data.dictionary_page_offset) { + col_start = col.meta_data.dictionary_page_offset; + } + + std::unique_ptr<ScopedInMemoryInputStream> input( + new ScopedInMemoryInputStream(col.meta_data.total_compressed_size)); + + FileLike* source = this->parent_->buffer_; + + source->Seek(col_start); + + // TODO(wesm): Law of demeter violation + size_t bytes_read = source->Read(input->size(), input->data()); + + if (bytes_read != input->size()) { + std::cout << "Bytes needed: " << col.meta_data.total_compressed_size << std::endl; + std::cout << "Bytes read: " << bytes_read << std::endl; + throw ParquetException("Unable to read column chunk data"); + } + + // TODO(wesm): This presumes a flat schema + std::shared_ptr<ColumnReader> reader = ColumnReader::Make(&col.meta_data, + &this->parent_->metadata_.schema[i + 1], input.release()); + column_readers_[i] = reader; + + return reader.get(); } // ---------------------------------------------------------------------- @@ -70,6 +129,12 @@ void LocalFile::Read(size_t nbytes, uint8_t* buffer, static constexpr uint32_t FOOTER_SIZE = 8; static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; +ParquetFileReader::ParquetFileReader() : + parsed_metadata_(false), + buffer_(nullptr) {} + +ParquetFileReader::~ParquetFileReader() {} + void ParquetFileReader::Open(FileLike* buffer) { buffer_ = buffer; } @@ -78,6 +143,29 @@ void ParquetFileReader::Close() { buffer_->Close(); } +RowGroupReader* ParquetFileReader::RowGroup(size_t i) { + if (i >= num_row_groups()) { + std::stringstream ss; + ss << "The file only has " << num_row_groups() + << "row groups, requested reader for: " + << i; + throw ParquetException(ss.str()); + } + + auto it = row_group_readers_.find(i); + if (it != row_group_readers_.end()) { + // Constructed the RowGroupReader already + return it->second.get(); + } + if (!parsed_metadata_) { + ParseMetaData(); + } + + // Construct the RowGroupReader + row_group_readers_[i] = std::make_shared<RowGroupReader>(this, &metadata_.row_groups[i]); + return row_group_readers_[i].get(); +} + void ParquetFileReader::ParseMetaData() { size_t filesize = buffer_->Size(); @@ -85,11 +173,11 @@ void ParquetFileReader::ParseMetaData() { throw ParquetException("Corrupted file, smaller than file footer"); } - size_t bytes_read; uint8_t footer_buffer[FOOTER_SIZE]; buffer_->Seek(filesize - FOOTER_SIZE); - buffer_->Read(FOOTER_SIZE, footer_buffer, &bytes_read); + + size_t bytes_read = buffer_->Read(FOOTER_SIZE, footer_buffer); if (bytes_read != FOOTER_SIZE) { throw ParquetException("Invalid parquet file. Corrupt footer."); @@ -107,11 +195,192 @@ void ParquetFileReader::ParseMetaData() { buffer_->Seek(metadata_start); std::vector<uint8_t> metadata_buffer(metadata_len); - buffer_->Read(metadata_len, &metadata_buffer[0], &bytes_read); + bytes_read = buffer_->Read(metadata_len, &metadata_buffer[0]); if (bytes_read != metadata_len) { throw ParquetException("Invalid parquet file. Could not read metadata bytes."); } DeserializeThriftMsg(&metadata_buffer[0], &metadata_len, &metadata_); + parsed_metadata_ = true; +} + +// ---------------------------------------------------------------------- +// ParquetFileReader::DebugPrint + +static string parquet_type_to_string(Type::type t) { + switch (t) { + case Type::BOOLEAN: + return "BOOLEAN"; + break; + case Type::INT32: + return "INT32"; + break; + case Type::INT64: + return "INT64"; + break; + case Type::FLOAT: + return "FLOAT"; + break; + case Type::DOUBLE: + return "DOUBLE"; + break; + case Type::BYTE_ARRAY: + return "BYTE_ARRAY"; + break; + case Type::INT96: + return "INT96"; + break; + case Type::FIXED_LEN_BYTE_ARRAY: + return "FIXED_LEN_BYTE_ARRAY"; + break; + default: + return "UNKNOWN"; + break; + } +} + +// the fixed initial size is just for an example +#define COL_WIDTH "17" + +void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) { + if (!parsed_metadata_) { + ParseMetaData(); + } + + stream << "File statistics:\n"; + stream << "Total rows: " << metadata_.num_rows << "\n"; + for (int c = 1; c < metadata_.schema.size(); ++c) { + stream << "Column " << c-1 << ": " << metadata_.schema[c].name << " (" + << parquet_type_to_string(metadata_.schema[c].type); + if (metadata_.schema[c].type == Type::INT96 || + metadata_.schema[c].type == Type::FIXED_LEN_BYTE_ARRAY) { + stream << " - not supported"; + } + stream << ")\n"; + } + + for (int i = 0; i < metadata_.row_groups.size(); ++i) { + stream << "--- Row Group " << i << " ---\n"; + + RowGroupReader* group_reader = RowGroup(i); + + // Print column metadata + size_t nColumns = group_reader->num_columns(); + + for (int c = 0; c < group_reader->num_columns(); ++c) { + const parquet::ColumnMetaData* meta_data = group_reader->Column(c)->metadata(); + stream << "Column " << c + << ": " << meta_data->num_values << " rows, " + << meta_data->statistics.null_count << " null values, " + << meta_data->statistics.distinct_count << " distinct values, " + << "min value: " << (meta_data->statistics.min.length()>0 ? + meta_data->statistics.min : "N/A") + << ", max value: " << (meta_data->statistics.max.length()>0 ? + meta_data->statistics.max : "N/A") << ".\n"; + } + + if (!print_values) { + continue; + } + + // Create readers for all columns and print contents + vector<ColumnReader*> readers(nColumns, NULL); + for (int c = 0; c < nColumns; ++c) { + ColumnReader* col_reader = group_reader->Column(c); + + Type::type col_type = col_reader->type(); + + printf("%-" COL_WIDTH"s", metadata_.schema[c+1].name.c_str()); + + if (col_type == Type::INT96 || col_type == Type::FIXED_LEN_BYTE_ARRAY) { + continue; + } + + // This is OK in this method as long as the RowGroupReader does not get deleted + readers[c] = col_reader; + } + stream << "\n"; + + vector<int> def_level(nColumns, 0); + vector<int> rep_level(nColumns, 0); + + static constexpr size_t bufsize = 25; + char buffer[bufsize]; + + bool hasRow; + do { + hasRow = false; + for (int c = 0; c < nColumns; ++c) { + if (readers[c] == NULL) { + snprintf(buffer, bufsize, "%-" COL_WIDTH"s", " "); + stream << buffer; + continue; + } + if (readers[c]->HasNext()) { + hasRow = true; + switch (readers[c]->type()) { + case Type::BOOLEAN: { + bool val = reinterpret_cast<BoolReader*>(readers[c])->NextValue( + &def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + snprintf(buffer, bufsize, "%-" COL_WIDTH"d",val); + stream << buffer; + } + break; + } + case Type::INT32: { + int32_t val = reinterpret_cast<Int32Reader*>(readers[c])->NextValue( + &def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + snprintf(buffer, bufsize, "%-" COL_WIDTH"d",val); + stream << buffer; + } + break; + } + case Type::INT64: { + int64_t val = reinterpret_cast<Int64Reader*>(readers[c])->NextValue( + &def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + snprintf(buffer, bufsize, "%-" COL_WIDTH"ld",val); + stream << buffer; + } + break; + } + case Type::FLOAT: { + float val = reinterpret_cast<FloatReader*>(readers[c])->NextValue( + &def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + snprintf(buffer, bufsize, "%-" COL_WIDTH"f",val); + stream << buffer; + } + break; + } + case Type::DOUBLE: { + double val = reinterpret_cast<DoubleReader*>(readers[c])->NextValue( + &def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + snprintf(buffer, bufsize, "%-" COL_WIDTH"lf",val); + stream << buffer; + } + break; + } + case Type::BYTE_ARRAY: { + ByteArray val = reinterpret_cast<ByteArrayReader*>(readers[c])->NextValue( + &def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + string result = ByteArrayToString(val); + snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str()); + stream << buffer; + } + break; + } + default: + continue; + } + } + } + stream << "\n"; + } while (hasRow); + } } } // namespace parquet_cpp
