Repository: parquet-cpp Updated Branches: refs/heads/master 8fc24f861 -> 1f24e7658
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/reader.h b/src/parquet/reader.h index 4a40e04..e8a6806 100644 --- a/src/parquet/reader.h +++ b/src/parquet/reader.h @@ -19,14 +19,18 @@ #define PARQUET_FILE_READER_H #include <cstdint> +#include <memory> #include <string> #include <stdio.h> +#include <unordered_map> #include "parquet/thrift/parquet_types.h" -#include "parquet/parquet.h" +#include "parquet/types.h" namespace parquet_cpp { +class ColumnReader; + class FileLike { public: virtual ~FileLike() {} @@ -35,7 +39,9 @@ class FileLike { virtual size_t Size() = 0; virtual size_t Tell() = 0; virtual void Seek(size_t pos) = 0; - virtual void Read(size_t nbytes, uint8_t* out, size_t* bytes_read) = 0; + + // Returns actual number of bytes read + virtual size_t Read(size_t nbytes, uint8_t* out) = 0; }; @@ -50,36 +56,83 @@ class LocalFile : public FileLike { virtual size_t Size(); virtual size_t Tell(); virtual void Seek(size_t pos); - virtual void Read(size_t nbytes, uint8_t* out, size_t* bytes_read); + + // Returns actual number of bytes read + virtual size_t Read(size_t nbytes, uint8_t* out); bool is_open() const { return is_open_;} const std::string& path() const { return path_;} private: + void CloseFile(); + std::string path_; FILE* file_; bool is_open_; }; +class ParquetFileReader; + +class RowGroupReader { + public: + RowGroupReader(ParquetFileReader* parent, parquet::RowGroup* group) : + parent_(parent), + row_group_(group) {} + + // Construct a ColumnReader for the indicated row group-relative column. The + // returned object is owned by the RowGroupReader + ColumnReader* Column(size_t i); + + size_t num_columns() const { + return row_group_->columns.size(); + } + + private: + friend class ParquetFileReader; + + ParquetFileReader* parent_; + parquet::RowGroup* row_group_; + + // Column index -> ColumnReader + std::unordered_map<int, std::shared_ptr<ColumnReader> > column_readers_; +}; + class ParquetFileReader { public: - ParquetFileReader() : buffer_(nullptr) {} - ~ParquetFileReader() {} + ParquetFileReader(); + ~ParquetFileReader(); - // The class takes ownership of the passed file-like object + // This class does _not_ take ownership of the file. You must manage its + // lifetime separately void Open(FileLike* buffer); void Close(); void ParseMetaData(); + // The RowGroupReader is owned by the FileReader + RowGroupReader* RowGroup(size_t i); + + size_t num_row_groups() const { + return metadata_.row_groups.size(); + } + const parquet::FileMetaData& metadata() const { return metadata_; } + void DebugPrint(std::ostream& stream, bool print_values = true); + private: + friend class RowGroupReader; + parquet::FileMetaData metadata_; + bool parsed_metadata_; + + // Row group index -> RowGroupReader + std::unordered_map<int, std::shared_ptr<RowGroupReader> > row_group_readers_; + FileLike* buffer_; }; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/types.h ---------------------------------------------------------------------- diff --git a/src/parquet/types.h b/src/parquet/types.h new file mode 100644 index 0000000..37f538a --- /dev/null +++ b/src/parquet/types.h @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_TYPES_H +#define PARQUET_TYPES_H + +#include <algorithm> +#include <cstdint> +#include <cstring> +#include <string> + +#include "parquet/thrift/parquet_types.h" + +namespace parquet_cpp { + +struct ByteArray { + uint32_t len; + const uint8_t* ptr; +}; + + +static inline std::string ByteArrayToString(const ByteArray& a) { + return std::string(reinterpret_cast<const char*>(a.ptr), a.len); +} + +static inline int ByteCompare(const ByteArray& x1, const ByteArray& x2) { + int len = std::min(x1.len, x2.len); + int cmp = memcmp(x1.ptr, x2.ptr, len); + if (cmp != 0) return cmp; + if (len < x1.len) return 1; + if (len < x2.len) return -1; + return 0; +} + +template <int TYPE> +struct type_traits { +}; + +template <> +struct type_traits<parquet::Type::BOOLEAN> { + typedef bool value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::BOOLEAN; + + static constexpr size_t value_byte_size = 1; +}; + +template <> +struct type_traits<parquet::Type::INT32> { + typedef int32_t value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::INT32; + + static constexpr size_t value_byte_size = 4; +}; + +template <> +struct type_traits<parquet::Type::INT64> { + typedef int64_t value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::INT64; + + static constexpr size_t value_byte_size = 8; +}; + +template <> +struct type_traits<parquet::Type::INT96> { + // TODO + typedef void* value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::INT96; + + static constexpr size_t value_byte_size = 12; +}; + +template <> +struct type_traits<parquet::Type::FLOAT> { + typedef float value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::FLOAT; + + static constexpr size_t value_byte_size = 4; +}; + +template <> +struct type_traits<parquet::Type::DOUBLE> { + typedef double value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::DOUBLE; + + static constexpr size_t value_byte_size = 8; +}; + +template <> +struct type_traits<parquet::Type::BYTE_ARRAY> { + typedef ByteArray value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::BYTE_ARRAY; + + static constexpr size_t value_byte_size = sizeof(ByteArray); +}; + +} // namespace parquet_cpp + +#endif // PARQUET_TYPES_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt index 766214b..b3c817d 100644 --- a/src/parquet/util/CMakeLists.txt +++ b/src/parquet/util/CMakeLists.txt @@ -21,8 +21,13 @@ install(FILES logging.h rle-encoding.h stopwatch.h + input_stream.h DESTINATION include/parquet/util) +add_library(parquet_util STATIC + input_stream.cc +) + add_library(parquet_test_main test_main.cc) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/util/input_stream.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/input_stream.cc b/src/parquet/util/input_stream.cc new file mode 100644 index 0000000..d0e53ed --- /dev/null +++ b/src/parquet/util/input_stream.cc @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/util/input_stream.h" + +#include <algorithm> + +#include "parquet/exception.h" + +namespace parquet_cpp { + +InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) : + buffer_(buffer), len_(len), offset_(0) {} + +const uint8_t* InMemoryInputStream::Peek(int num_to_peek, int* num_bytes) { + *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_); + return buffer_ + offset_; +} + +const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) { + const uint8_t* result = Peek(num_to_read, num_bytes); + offset_ += *num_bytes; + return result; +} + +ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) { + buffer_.resize(len); + stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size())); +} + +uint8_t* ScopedInMemoryInputStream::data() { + return buffer_.data(); +} + +int64_t ScopedInMemoryInputStream::size() { + return buffer_.size(); +} + +const uint8_t* ScopedInMemoryInputStream::Peek(int num_to_peek, + int* num_bytes) { + return stream_->Peek(num_to_peek, num_bytes); +} + +const uint8_t* ScopedInMemoryInputStream::Read(int num_to_read, + int* num_bytes) { + return stream_->Read(num_to_read, num_bytes); +} + +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/util/input_stream.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/input_stream.h b/src/parquet/util/input_stream.h new file mode 100644 index 0000000..ece2488 --- /dev/null +++ b/src/parquet/util/input_stream.h @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_INPUT_STREAM_H +#define PARQUET_INPUT_STREAM_H + +#include <cstdint> +#include <memory> +#include <vector> + +namespace parquet_cpp { + +// Interface for the column reader to get the bytes. The interface is a stream +// interface, meaning the bytes in order and once a byte is read, it does not +// need to be read again. +class InputStream { + public: + // Returns the next 'num_to_peek' without advancing the current position. + // *num_bytes will contain the number of bytes returned which can only be + // less than num_to_peek at end of stream cases. + // Since the position is not advanced, calls to this function are idempotent. + // The buffer returned to the caller is still owned by the input stream and must + // stay valid until the next call to Peek() or Read(). + virtual const uint8_t* Peek(int num_to_peek, int* num_bytes) = 0; + + // Identical to Peek(), except the current position in the stream is advanced by + // *num_bytes. + virtual const uint8_t* Read(int num_to_read, int* num_bytes) = 0; + + virtual ~InputStream() {} + + protected: + InputStream() {} +}; + +// Implementation of an InputStream when all the bytes are in memory. +class InMemoryInputStream : public InputStream { + public: + InMemoryInputStream(const uint8_t* buffer, int64_t len); + virtual const uint8_t* Peek(int num_to_peek, int* num_bytes); + virtual const uint8_t* Read(int num_to_read, int* num_bytes); + + private: + const uint8_t* buffer_; + int64_t len_; + int64_t offset_; +}; + + +// A wrapper for InMemoryInputStream to manage the memory. +class ScopedInMemoryInputStream : public InputStream { + public: + explicit ScopedInMemoryInputStream(int64_t len); + uint8_t* data(); + int64_t size(); + virtual const uint8_t* Peek(int num_to_peek, int* num_bytes); + virtual const uint8_t* Read(int num_to_read, int* num_bytes); + + private: + std::vector<uint8_t> buffer_; + std::unique_ptr<InMemoryInputStream> stream_; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_INPUT_STREAM_H
