Repository: parquet-cpp Updated Branches: refs/heads/master 89f5a5439 -> 5e938171b
PARQUET-472: Changed the ownership of InputStream in ColumnReader. Author: Aliaksei Sandryhaila <[email protected]> Closes #29 from asandryh/parquet-472 and squashes the following commits: 4bcbbb1 [Aliaksei Sandryhaila] Addressed review comments. 58c2da2 [Aliaksei Sandryhaila] PARQUET-472: Changed the ownership of InputStream in ColumnReader. Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/5e938171 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/5e938171 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/5e938171 Branch: refs/heads/master Commit: 5e938171b5b6d1effad27559b001c42f7ee0bb2c Parents: 89f5a54 Author: Aliaksei Sandryhaila <[email protected]> Authored: Fri Jan 29 09:01:44 2016 -0800 Committer: Julien Le Dem <[email protected]> Committed: Fri Jan 29 09:01:44 2016 -0800 ---------------------------------------------------------------------- src/parquet/column_reader.cc | 27 +++++++++++---------------- src/parquet/column_reader.h | 14 ++++++-------- src/parquet/reader.cc | 10 +++++----- 3 files changed, 22 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e938171/src/parquet/column_reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc index 4f565cf..7c88a32 100644 --- a/src/parquet/column_reader.cc +++ b/src/parquet/column_reader.cc @@ -36,16 +36,11 @@ using parquet::FieldRepetitionType; using parquet::PageType; using parquet::Type; - -ColumnReader::~ColumnReader() { - delete stream_; -} - ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata, - const parquet::SchemaElement* schema, InputStream* stream) + const parquet::SchemaElement* schema, std::unique_ptr<InputStream> stream) : metadata_(metadata), schema_(schema), - stream_(stream), + stream_(std::move(stream)), num_buffered_values_(0), num_decoded_values_(0), buffered_values_offset_(0) { @@ -171,24 +166,24 @@ bool TypedColumnReader<TYPE>::ReadNewPage() { } std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData* metadata, - const parquet::SchemaElement* element, InputStream* stream) { + const parquet::SchemaElement* element, std::unique_ptr<InputStream> stream) { switch (metadata->type) { case Type::BOOLEAN: - return std::make_shared<BoolReader>(metadata, element, stream); + return std::make_shared<BoolReader>(metadata, element, std::move(stream)); case Type::INT32: - return std::make_shared<Int32Reader>(metadata, element, stream); + return std::make_shared<Int32Reader>(metadata, element, std::move(stream)); case Type::INT64: - return std::make_shared<Int64Reader>(metadata, element, stream); + return std::make_shared<Int64Reader>(metadata, element, std::move(stream)); case Type::INT96: - return std::make_shared<Int96Reader>(metadata, element, stream); + return std::make_shared<Int96Reader>(metadata, element, std::move(stream)); case Type::FLOAT: - return std::make_shared<FloatReader>(metadata, element, stream); + return std::make_shared<FloatReader>(metadata, element, std::move(stream)); case Type::DOUBLE: - return std::make_shared<DoubleReader>(metadata, element, stream); + return std::make_shared<DoubleReader>(metadata, element, std::move(stream)); case Type::BYTE_ARRAY: - return std::make_shared<ByteArrayReader>(metadata, element, stream); + return std::make_shared<ByteArrayReader>(metadata, element, std::move(stream)); case Type::FIXED_LEN_BYTE_ARRAY: - return std::make_shared<FixedLenByteArrayReader>(metadata, element, stream); + return std::make_shared<FixedLenByteArrayReader>(metadata, element, std::move(stream)); default: ParquetException::NYI("type reader not implemented"); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e938171/src/parquet/column_reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h index 00722f5..3d27549 100644 --- a/src/parquet/column_reader.h +++ b/src/parquet/column_reader.h @@ -62,13 +62,11 @@ class ColumnReader { } }; - ColumnReader(const parquet::ColumnMetaData*, - const parquet::SchemaElement*, InputStream* stream); - - virtual ~ColumnReader(); + ColumnReader(const parquet::ColumnMetaData*, const parquet::SchemaElement*, + std::unique_ptr<InputStream> stream); static std::shared_ptr<ColumnReader> Make(const parquet::ColumnMetaData*, - const parquet::SchemaElement*, InputStream* stream); + const parquet::SchemaElement*, std::unique_ptr<InputStream> stream); virtual bool ReadNewPage() = 0; @@ -97,7 +95,7 @@ class ColumnReader { const parquet::ColumnMetaData* metadata_; const parquet::SchemaElement* schema_; - InputStream* stream_; + std::unique_ptr<InputStream> stream_; // Compression codec to use. std::unique_ptr<Codec> decompressor_; @@ -123,8 +121,8 @@ class TypedColumnReader : public ColumnReader { typedef typename type_traits<TYPE>::value_type T; TypedColumnReader(const parquet::ColumnMetaData* metadata, - const parquet::SchemaElement* schema, InputStream* stream) : - ColumnReader(metadata, schema, stream), + const parquet::SchemaElement* schema, std::unique_ptr<InputStream> stream) : + ColumnReader(metadata, schema, std::move(stream)), current_decoder_(NULL) { size_t value_byte_size = type_traits<TYPE>::value_byte_size; values_buffer_.resize(config_.batch_size * value_byte_size); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e938171/src/parquet/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc index 4654f14..d467b75 100644 --- a/src/parquet/reader.cc +++ b/src/parquet/reader.cc @@ -98,7 +98,7 @@ ColumnReader* RowGroupReader::Column(size_t i) { col_start = col.meta_data.dictionary_page_offset; } - std::unique_ptr<ScopedInMemoryInputStream> input( + std::unique_ptr<InputStream> input( new ScopedInMemoryInputStream(col.meta_data.total_compressed_size)); FileLike* source = this->parent_->buffer_; @@ -106,9 +106,9 @@ ColumnReader* RowGroupReader::Column(size_t i) { source->Seek(col_start); // TODO(wesm): Law of demeter violation - size_t bytes_read = source->Read(input->size(), input->data()); - - if (bytes_read != input->size()) { + ScopedInMemoryInputStream* scoped_input = static_cast<ScopedInMemoryInputStream*>(input.get()); + size_t bytes_read = source->Read(scoped_input->size(), scoped_input->data()); + if (bytes_read != scoped_input->size()) { std::cout << "Bytes needed: " << col.meta_data.total_compressed_size << std::endl; std::cout << "Bytes read: " << bytes_read << std::endl; throw ParquetException("Unable to read column chunk data"); @@ -116,7 +116,7 @@ ColumnReader* RowGroupReader::Column(size_t i) { // TODO(wesm): This presumes a flat schema std::shared_ptr<ColumnReader> reader = ColumnReader::Make(&col.meta_data, - &this->parent_->metadata_.schema[i + 1], input.release()); + &this->parent_->metadata_.schema[i + 1], std::move(input)); column_readers_[i] = reader; return reader.get();
