http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/scanner.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h index 184c74d..13fb01b 100644 --- a/src/parquet/column/scanner.h +++ b/src/parquet/column/scanner.h @@ -29,7 +29,7 @@ #include "parquet/exception.h" #include "parquet/schema/descriptor.h" #include "parquet/types.h" -#include "parquet/util/mem-allocator.h" +#include "parquet/util/memory.h" #include "parquet/util/visibility.h" namespace parquet { @@ -44,7 +44,7 @@ class PARQUET_EXPORT Scanner { : batch_size_(batch_size), level_offset_(0), levels_buffered_(0), - value_buffer_(0, allocator), + value_buffer_(std::make_shared<PoolBuffer>(allocator)), value_offset_(0), values_buffered_(0), reader_(reader) { @@ -76,7 +76,7 @@ class PARQUET_EXPORT Scanner { int level_offset_; int levels_buffered_; - OwnedMutableBuffer value_buffer_; + std::shared_ptr<PoolBuffer> value_buffer_; int value_offset_; int64_t values_buffered_; @@ -95,8 +95,8 @@ class PARQUET_EXPORT TypedScanner : public Scanner { : Scanner(reader, batch_size, allocator) { typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get()); int value_byte_size = type_traits<DType::type_num>::value_byte_size; - value_buffer_.Resize(batch_size_ * value_byte_size); - values_ = reinterpret_cast<T*>(&value_buffer_[0]); + PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size)); + values_ = reinterpret_cast<T*>(value_buffer_->mutable_data()); } virtual ~TypedScanner() {}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/statistics-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/statistics-test.cc b/src/parquet/column/statistics-test.cc index 364d9d4..c8641a1 100644 --- a/src/parquet/column/statistics-test.cc +++ b/src/parquet/column/statistics-test.cc @@ -33,9 +33,7 @@ #include "parquet/file/writer.h" #include "parquet/schema/descriptor.h" #include "parquet/types.h" -#include "parquet/util/input.h" -#include "parquet/util/mem-allocator.h" -#include "parquet/util/output.h" +#include "parquet/util/memory.h" namespace parquet { @@ -150,8 +148,8 @@ class TestRowGroupStatistics : public PrimitiveTypedTest<TestType> { file_writer->Close(); auto buffer = sink->GetBuffer(); - std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer)); - auto file_reader = ParquetFileReader::Open(std::move(source)); + auto source = std::make_shared<::arrow::io::BufferReader>(buffer); + auto file_reader = ParquetFileReader::Open(source); auto rg_reader = file_reader->RowGroup(0); auto column_chunk = rg_reader->metadata()->ColumnChunk(0); std::shared_ptr<RowGroupStatistics> stats = column_chunk->statistics(); @@ -191,7 +189,8 @@ std::vector<FLBA> TestRowGroupStatistics<FLBAType>::GetDeepCopy( std::vector<FLBA> copy; MemoryAllocator* allocator = default_allocator(); for (const FLBA& flba : values) { - uint8_t* ptr = allocator->Malloc(FLBA_LENGTH); + uint8_t* ptr; + PARQUET_THROW_NOT_OK(allocator->Allocate(FLBA_LENGTH, &ptr)); memcpy(ptr, flba.ptr, FLBA_LENGTH); copy.emplace_back(ptr); } @@ -204,7 +203,8 @@ std::vector<ByteArray> TestRowGroupStatistics<ByteArrayType>::GetDeepCopy( std::vector<ByteArray> copy; MemoryAllocator* allocator = default_allocator(); for (const ByteArray& ba : values) { - uint8_t* ptr = allocator->Malloc(ba.len); + uint8_t* ptr; + PARQUET_THROW_NOT_OK(allocator->Allocate(ba.len, &ptr)); memcpy(ptr, ba.ptr, ba.len); copy.emplace_back(ba.len, ptr); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/statistics.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/statistics.cc b/src/parquet/column/statistics.cc index 0330ac1..9b76fab 100644 --- a/src/parquet/column/statistics.cc +++ b/src/parquet/column/statistics.cc @@ -21,16 +21,17 @@ #include "parquet/column/statistics.h" #include "parquet/encodings/plain-encoding.h" #include "parquet/exception.h" -#include "parquet/util/buffer.h" #include "parquet/util/comparison.h" -#include "parquet/util/output.h" +#include "parquet/util/memory.h" namespace parquet { template <typename DType> TypedRowGroupStatistics<DType>::TypedRowGroupStatistics( const ColumnDescriptor* schema, MemoryAllocator* allocator) - : allocator_(allocator), min_buffer_(0, allocator_), max_buffer_(0, allocator_) { + : allocator_(allocator), + min_buffer_(AllocateBuffer(allocator_, 0)), + max_buffer_(AllocateBuffer(allocator_, 0)) { SetDescr(schema); Reset(); } @@ -40,14 +41,14 @@ TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const typename DType::c_ const typename DType::c_type& max, int64_t num_values, int64_t null_count, int64_t distinct_count) : allocator_(default_allocator()), - min_buffer_(0, allocator_), - max_buffer_(0, allocator_) { + min_buffer_(AllocateBuffer(allocator_, 0)), + max_buffer_(AllocateBuffer(allocator_, 0)) { IncrementNumValues(num_values); IncrementNullCount(null_count); IncrementDistinctCount(distinct_count); - Copy(min, &min_, min_buffer_); - Copy(max, &max_, max_buffer_); + Copy(min, &min_, min_buffer_.get()); + Copy(max, &max_, max_buffer_.get()); has_min_max_ = true; } @@ -56,7 +57,9 @@ TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const ColumnDescriptor* const std::string& encoded_min, const std::string& encoded_max, int64_t num_values, int64_t null_count, int64_t distinct_count, bool has_min_max, MemoryAllocator* allocator) - : allocator_(allocator), min_buffer_(0, allocator_), max_buffer_(0, allocator_) { + : allocator_(allocator), + min_buffer_(AllocateBuffer(allocator_, 0)), + max_buffer_(AllocateBuffer(allocator_, 0)) { IncrementNumValues(num_values); IncrementNullCount(null_count); IncrementDistinctCount(distinct_count); @@ -94,11 +97,11 @@ void TypedRowGroupStatistics<DType>::Update( auto batch_minmax = std::minmax_element(values, values + num_not_null, compare); if (!has_min_max_) { has_min_max_ = true; - Copy(*batch_minmax.first, &min_, min_buffer_); - Copy(*batch_minmax.second, &max_, max_buffer_); + Copy(*batch_minmax.first, &min_, min_buffer_.get()); + Copy(*batch_minmax.second, &max_, max_buffer_.get()); } else { - Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_); - Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_); + Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_.get()); + Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_.get()); } } @@ -119,15 +122,15 @@ void TypedRowGroupStatistics<DType>::Merge(const TypedRowGroupStatistics<DType>& if (!other.HasMinMax()) return; if (!has_min_max_) { - Copy(other.min_, &this->min_, min_buffer_); - Copy(other.max_, &this->max_, max_buffer_); + Copy(other.min_, &this->min_, min_buffer_.get()); + Copy(other.max_, &this->max_, max_buffer_.get()); has_min_max_ = true; return; } Compare<T> compare(descr_); - Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_); - Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_); + Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_.get()); + Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_.get()); } template <typename DType> http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/statistics.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/statistics.h b/src/parquet/column/statistics.h index a21a0fa..cf41dc0 100644 --- a/src/parquet/column/statistics.h +++ b/src/parquet/column/statistics.h @@ -24,8 +24,7 @@ #include "parquet/schema/descriptor.h" #include "parquet/types.h" -#include "parquet/util/buffer.h" -#include "parquet/util/mem-allocator.h" +#include "parquet/util/memory.h" #include "parquet/util/visibility.h" namespace parquet { @@ -166,34 +165,33 @@ class TypedRowGroupStatistics : public RowGroupStatistics { void PlainEncode(const T& src, std::string* dst); void PlainDecode(const std::string& src, T* dst); - void Copy(const T& src, T* dst, OwnedMutableBuffer& buffer); + void Copy(const T& src, T* dst, PoolBuffer* buffer); - OwnedMutableBuffer min_buffer_, max_buffer_; + std::shared_ptr<PoolBuffer> min_buffer_, max_buffer_; }; template <typename DType> -inline void TypedRowGroupStatistics<DType>::Copy( - const T& src, T* dst, OwnedMutableBuffer&) { +inline void TypedRowGroupStatistics<DType>::Copy(const T& src, T* dst, PoolBuffer*) { *dst = src; } template <> inline void TypedRowGroupStatistics<FLBAType>::Copy( - const FLBA& src, FLBA* dst, OwnedMutableBuffer& buffer) { + const FLBA& src, FLBA* dst, PoolBuffer* buffer) { if (dst->ptr == src.ptr) return; uint32_t len = descr_->type_length(); - buffer.Resize(len); - std::memcpy(&buffer[0], src.ptr, len); - *dst = FLBA(buffer.data()); + PARQUET_THROW_NOT_OK(buffer->Resize(len)); + std::memcpy(buffer->mutable_data(), src.ptr, len); + *dst = FLBA(buffer->data()); } template <> inline void TypedRowGroupStatistics<ByteArrayType>::Copy( - const ByteArray& src, ByteArray* dst, OwnedMutableBuffer& buffer) { + const ByteArray& src, ByteArray* dst, PoolBuffer* buffer) { if (dst->ptr == src.ptr) return; - buffer.Resize(src.len); - std::memcpy(&buffer[0], src.ptr, src.len); - *dst = ByteArray(src.len, buffer.data()); + PARQUET_THROW_NOT_OK(buffer->Resize(src.len)); + std::memcpy(buffer->mutable_data(), src.ptr, src.len); + *dst = ByteArray(src.len, buffer->data()); } template <> http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h index 10632d2..9efa623 100644 --- a/src/parquet/column/test-util.h +++ b/src/parquet/column/test-util.h @@ -28,13 +28,15 @@ #include <string> #include <vector> +#include <gtest/gtest.h> + #include "parquet/column/levels.h" #include "parquet/column/page.h" // Depended on by SerializedPageReader test utilities for now #include "parquet/encodings/dictionary-encoding.h" #include "parquet/encodings/plain-encoding.h" -#include "parquet/util/input.h" +#include "parquet/util/memory.h" #include "parquet/util/test-common.h" using std::vector; @@ -253,8 +255,8 @@ class DictionaryPageBuilder { } shared_ptr<Buffer> WriteDict() { - shared_ptr<OwnedMutableBuffer> dict_buffer = - std::make_shared<OwnedMutableBuffer>(encoder_->dict_encoded_size()); + std::shared_ptr<PoolBuffer> dict_buffer = + AllocateBuffer(default_allocator(), encoder_->dict_encoded_size()); encoder_->WriteDict(dict_buffer->mutable_data()); return dict_buffer; } @@ -262,7 +264,7 @@ class DictionaryPageBuilder { int32_t num_values() const { return num_dict_values_; } private: - MemPool pool_; + ChunkedAllocator pool_; shared_ptr<DictEncoder<TYPE>> encoder_; int32_t num_dict_values_; bool have_values_; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index 6112efe..7319d46 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -21,6 +21,7 @@ #include "parquet/column/statistics.h" #include "parquet/encodings/dictionary-encoding.h" #include "parquet/encodings/plain-encoding.h" +#include "parquet/util/memory.h" namespace parquet { @@ -55,8 +56,8 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata, } void ColumnWriter::InitSinks() { - definition_levels_sink_.reset(new InMemoryOutputStream()); - repetition_levels_sink_.reset(new InMemoryOutputStream()); + definition_levels_sink_.reset(new InMemoryOutputStream(properties_->allocator())); + repetition_levels_sink_.reset(new InMemoryOutputStream(properties_->allocator())); } void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) { @@ -77,7 +78,8 @@ std::shared_ptr<Buffer> ColumnWriter::RleEncodeLevels( int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, num_buffered_values_) + sizeof(uint32_t); - auto buffer_rle = std::make_shared<OwnedMutableBuffer>(rle_size, allocator_); + std::shared_ptr<PoolBuffer> buffer_rle = + AllocateBuffer(properties_->allocator(), rle_size); level_encoder_.Init(Encoding::RLE, max_level, num_buffered_values_, buffer_rle->mutable_data() + sizeof(uint32_t), buffer_rle->size() - sizeof(uint32_t)); @@ -87,7 +89,7 @@ std::shared_ptr<Buffer> ColumnWriter::RleEncodeLevels( reinterpret_cast<uint32_t*>(buffer_rle->mutable_data())[0] = level_encoder_.len(); int64_t encoded_size = level_encoder_.len() + sizeof(uint32_t); DCHECK(rle_size >= encoded_size); - buffer_rle->Resize(encoded_size); + PARQUET_THROW_NOT_OK(buffer_rle->Resize(encoded_size)); return std::static_pointer_cast<Buffer>(buffer_rle); } @@ -110,8 +112,8 @@ void ColumnWriter::AddDataPage() { definition_levels->size() + repetition_levels->size() + values->size(); // Concatenate data into a single buffer - std::shared_ptr<OwnedMutableBuffer> uncompressed_data = - std::make_shared<OwnedMutableBuffer>(uncompressed_size, allocator_); + std::shared_ptr<PoolBuffer> uncompressed_data = + AllocateBuffer(allocator_, uncompressed_size); uint8_t* uncompressed_ptr = uncompressed_data->mutable_data(); memcpy(uncompressed_ptr, repetition_levels->data(), repetition_levels->size()); uncompressed_ptr += repetition_levels->size(); @@ -223,7 +225,8 @@ void TypedColumnWriter<Type>::CheckDictionarySizeLimit() { template <typename Type> void TypedColumnWriter<Type>::WriteDictionaryPage() { auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get()); - auto buffer = std::make_shared<OwnedMutableBuffer>(dict_encoder->dict_encoded_size()); + std::shared_ptr<PoolBuffer> buffer = + AllocateBuffer(properties_->allocator(), dict_encoder->dict_encoded_size()); dict_encoder->WriteDict(buffer->mutable_data()); // TODO Get rid of this deep call dict_encoder->mem_pool()->FreeAll(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index 67a29bc..39d5934 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -28,9 +28,7 @@ #include "parquet/file/metadata.h" #include "parquet/schema/descriptor.h" #include "parquet/types.h" -#include "parquet/util/mem-allocator.h" -#include "parquet/util/mem-pool.h" -#include "parquet/util/output.h" +#include "parquet/util/memory.h" #include "parquet/util/visibility.h" namespace parquet { @@ -111,7 +109,7 @@ class PARQUET_EXPORT ColumnWriter { LevelEncoder level_encoder_; MemoryAllocator* allocator_; - MemPool pool_; + ChunkedAllocator pool_; // The total number of values stored in the data page. This is the maximum of // the number of encoded definition levels or encoded values. For http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/decoder.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/decoder.h b/src/parquet/encodings/decoder.h index 4442507..1ac9f35 100644 --- a/src/parquet/encodings/decoder.h +++ b/src/parquet/encodings/decoder.h @@ -22,7 +22,7 @@ #include "parquet/exception.h" #include "parquet/types.h" -#include "parquet/util/mem-allocator.h" +#include "parquet/util/memory.h" namespace parquet { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/delta-bit-pack-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/delta-bit-pack-encoding.h b/src/parquet/encodings/delta-bit-pack-encoding.h index 5353817..59774a4 100644 --- a/src/parquet/encodings/delta-bit-pack-encoding.h +++ b/src/parquet/encodings/delta-bit-pack-encoding.h @@ -24,7 +24,7 @@ #include "parquet/encodings/decoder.h" #include "parquet/util/bit-stream-utils.inline.h" -#include "parquet/util/buffer.h" +#include "parquet/util/memory.h" namespace parquet { @@ -36,7 +36,7 @@ class DeltaBitPackDecoder : public Decoder<DType> { explicit DeltaBitPackDecoder( const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) : Decoder<DType>(descr, Encoding::DELTA_BINARY_PACKED), - delta_bit_widths_(0, allocator) { + delta_bit_widths_(new PoolBuffer(allocator)) { if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) { throw ParquetException("Delta bit pack encoding should only be for integer data."); } @@ -62,28 +62,31 @@ class DeltaBitPackDecoder : public Decoder<DType> { if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException(); if (!decoder_.GetVlqInt(&values_current_block_)) { ParquetException::EofException(); } if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException(); - delta_bit_widths_.Resize(num_mini_blocks_); + PARQUET_THROW_NOT_OK(delta_bit_widths_->Resize(num_mini_blocks_)); + + uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException(); for (int i = 0; i < num_mini_blocks_; ++i) { - if (!decoder_.GetAligned<uint8_t>(1, &delta_bit_widths_[i])) { + if (!decoder_.GetAligned<uint8_t>(1, bit_width_data + i)) { ParquetException::EofException(); } } values_per_mini_block_ = block_size / num_mini_blocks_; mini_block_idx_ = 0; - delta_bit_width_ = delta_bit_widths_[0]; + delta_bit_width_ = bit_width_data[0]; values_current_mini_block_ = values_per_mini_block_; } template <typename T> int GetInternal(T* buffer, int max_values) { max_values = std::min(max_values, num_values_); + const uint8_t* bit_width_data = delta_bit_widths_->data(); for (int i = 0; i < max_values; ++i) { if (UNLIKELY(values_current_mini_block_ == 0)) { ++mini_block_idx_; - if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_.size())) { - delta_bit_width_ = delta_bit_widths_[mini_block_idx_]; + if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_->size())) { + delta_bit_width_ = bit_width_data[mini_block_idx_]; values_current_mini_block_ = values_per_mini_block_; } else { InitBlock(); @@ -112,7 +115,7 @@ class DeltaBitPackDecoder : public Decoder<DType> { int32_t min_delta_; size_t mini_block_idx_; - OwnedMutableBuffer delta_bit_widths_; + std::unique_ptr<PoolBuffer> delta_bit_widths_; int delta_bit_width_; int32_t last_value_; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/dictionary-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h index 7823307..b79744a 100644 --- a/src/parquet/encodings/dictionary-encoding.h +++ b/src/parquet/encodings/dictionary-encoding.h @@ -27,11 +27,9 @@ #include "parquet/encodings/decoder.h" #include "parquet/encodings/encoder.h" #include "parquet/encodings/plain-encoding.h" -#include "parquet/util/buffer.h" #include "parquet/util/cpu-info.h" #include "parquet/util/hash-util.h" -#include "parquet/util/mem-allocator.h" -#include "parquet/util/mem-pool.h" +#include "parquet/util/memory.h" #include "parquet/util/rle-encoding.h" namespace parquet { @@ -48,7 +46,7 @@ class DictionaryDecoder : public Decoder<Type> { const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) : Decoder<Type>(descr, Encoding::RLE_DICTIONARY), dictionary_(0, allocator), - byte_array_data_(0, allocator) {} + byte_array_data_(AllocateBuffer(allocator, 0)) {} // Perform type-specific initiatialization void SetDict(Decoder<Type>* dictionary); @@ -78,7 +76,7 @@ class DictionaryDecoder : public Decoder<Type> { // Data that contains the byte array data (byte_array_dictionary_ just has the // pointers). - OwnedMutableBuffer byte_array_data_; + std::shared_ptr<PoolBuffer> byte_array_data_; RleDecoder idx_decoder_; }; @@ -106,11 +104,13 @@ inline void DictionaryDecoder<ByteArrayType>::SetDict( for (int i = 0; i < num_dictionary_values; ++i) { total_size += dictionary_[i].len; } - byte_array_data_.Resize(total_size); + PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size)); int offset = 0; + + uint8_t* bytes_data = byte_array_data_->mutable_data(); for (int i = 0; i < num_dictionary_values; ++i) { - memcpy(&byte_array_data_[offset], dictionary_[i].ptr, dictionary_[i].len); - dictionary_[i].ptr = &byte_array_data_[offset]; + memcpy(bytes_data + offset, dictionary_[i].ptr, dictionary_[i].len); + dictionary_[i].ptr = bytes_data + offset; offset += dictionary_[i].len; } } @@ -124,11 +124,12 @@ inline void DictionaryDecoder<FLBAType>::SetDict(Decoder<FLBAType>* dictionary) int fixed_len = descr_->type_length(); int total_size = num_dictionary_values * fixed_len; - byte_array_data_.Resize(total_size); + PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size)); + uint8_t* bytes_data = byte_array_data_->mutable_data(); int offset = 0; for (int i = 0; i < num_dictionary_values; ++i) { - memcpy(&byte_array_data_[offset], dictionary_[i].ptr, fixed_len); - dictionary_[i].ptr = &byte_array_data_[offset]; + memcpy(bytes_data + offset, dictionary_[i].ptr, fixed_len); + dictionary_[i].ptr = bytes_data + offset; offset += fixed_len; } } @@ -158,7 +159,7 @@ class DictEncoder : public Encoder<DType> { public: typedef typename DType::c_type T; - explicit DictEncoder(const ColumnDescriptor* desc, MemPool* pool = nullptr, + explicit DictEncoder(const ColumnDescriptor* desc, ChunkedAllocator* pool = nullptr, MemoryAllocator* allocator = default_allocator()) : Encoder<DType>(desc, Encoding::PLAIN_DICTIONARY, allocator), allocator_(allocator), @@ -176,7 +177,7 @@ class DictEncoder : public Encoder<DType> { // TODO(wesm): think about how to address the construction semantics in // encodings/dictionary-encoding.h - void set_mem_pool(MemPool* pool) { pool_ = pool; } + void set_mem_pool(ChunkedAllocator* pool) { pool_ = pool; } void set_type_length(int type_length) { type_length_ = type_length; } @@ -215,11 +216,11 @@ class DictEncoder : public Encoder<DType> { void Put(const T& value); std::shared_ptr<Buffer> FlushValues() override { - auto buffer = std::make_shared<OwnedMutableBuffer>( - EstimatedDataEncodedSize(), this->allocator_); + std::shared_ptr<PoolBuffer> buffer = + AllocateBuffer(this->allocator_, EstimatedDataEncodedSize()); int result_size = WriteIndices(buffer->mutable_data(), EstimatedDataEncodedSize()); ClearIndices(); - buffer->Resize(result_size); + PARQUET_THROW_NOT_OK(buffer->Resize(result_size)); return buffer; }; @@ -233,7 +234,7 @@ class DictEncoder : public Encoder<DType> { /// dict_encoded_size() bytes. void WriteDict(uint8_t* buffer); - MemPool* mem_pool() { return pool_; } + ChunkedAllocator* mem_pool() { return pool_; } /// The number of entries in the dictionary. int num_entries() const { return uniques_.size(); } @@ -242,7 +243,7 @@ class DictEncoder : public Encoder<DType> { MemoryAllocator* allocator_; // For ByteArray / FixedLenByteArray data. Not owned - MemPool* pool_; + ChunkedAllocator* pool_; /// Size of the table. Must be a power of 2. int hash_table_size_; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/encoder.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h index a325ab5..c51f8d5 100644 --- a/src/parquet/encodings/encoder.h +++ b/src/parquet/encodings/encoder.h @@ -23,12 +23,11 @@ #include "parquet/exception.h" #include "parquet/types.h" +#include "parquet/util/memory.h" namespace parquet { -class Buffer; class ColumnDescriptor; -class OutputStream; // Base class for value encoders. Since encoders may or not have state (e.g., // dictionary encoding) we use a class instance to maintain any state. http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/encoding-benchmark.cc ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encoding-benchmark.cc b/src/parquet/encodings/encoding-benchmark.cc index e62d758..516e453 100644 --- a/src/parquet/encodings/encoding-benchmark.cc +++ b/src/parquet/encodings/encoding-benchmark.cc @@ -19,7 +19,7 @@ #include "parquet/encodings/dictionary-encoding.h" #include "parquet/file/reader-internal.h" -#include "parquet/util/mem-pool.h" +#include "parquet/util/memory.h" namespace parquet { @@ -101,23 +101,25 @@ static void DecodeDict( typedef typename Type::c_type T; int num_values = values.size(); - MemPool pool; + ChunkedAllocator pool; MemoryAllocator* allocator = default_allocator(); std::shared_ptr<ColumnDescriptor> descr = Int64Schema(Repetition::REQUIRED); - std::shared_ptr<OwnedMutableBuffer> dict_buffer = - std::make_shared<OwnedMutableBuffer>(); - auto indices = std::make_shared<OwnedMutableBuffer>(); DictEncoder<Type> encoder(descr.get(), &pool, allocator); for (int i = 0; i < num_values; ++i) { encoder.Put(values[i]); } - dict_buffer->Resize(encoder.dict_encoded_size()); + std::shared_ptr<PoolBuffer> dict_buffer = + AllocateBuffer(allocator, encoder.dict_encoded_size()); + + std::shared_ptr<PoolBuffer> indices = + AllocateBuffer(allocator, encoder.EstimatedDataEncodedSize()); + encoder.WriteDict(dict_buffer->mutable_data()); - indices->Resize(encoder.EstimatedDataEncodedSize()); int actual_bytes = encoder.WriteIndices(indices->mutable_data(), indices->size()); - indices->Resize(actual_bytes); + + PARQUET_THROW_NOT_OK(indices->Resize(actual_bytes)); while (state.KeepRunning()) { PlainDecoder<Type> dict_decoder(descr.get()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/encoding-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc index daa25cb..eccfc5d 100644 --- a/src/parquet/encodings/encoding-test.cc +++ b/src/parquet/encodings/encoding-test.cc @@ -28,8 +28,7 @@ #include "parquet/schema/types.h" #include "parquet/types.h" #include "parquet/util/bit-util.h" -#include "parquet/util/buffer.h" -#include "parquet/util/output.h" +#include "parquet/util/memory.h" #include "parquet/util/test-common.h" using std::string; @@ -178,7 +177,7 @@ class TestEncodingBase : public ::testing::Test { } protected: - MemPool pool_; + ChunkedAllocator pool_; MemoryAllocator* allocator_; int num_values_; @@ -250,10 +249,9 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> { void CheckRoundtrip() { DictEncoder<Type> encoder(descr_.get(), &pool_); - dict_buffer_ = std::make_shared<OwnedMutableBuffer>(); - ASSERT_NO_THROW(encoder.Put(draws_, num_values_)); - dict_buffer_->Resize(encoder.dict_encoded_size()); + dict_buffer_ = AllocateBuffer(default_allocator(), encoder.dict_encoded_size()); + encoder.WriteDict(dict_buffer_->mutable_data()); std::shared_ptr<Buffer> indices = encoder.FlushValues(); @@ -277,7 +275,7 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> { protected: USING_BASE_MEMBERS(); - std::shared_ptr<OwnedMutableBuffer> dict_buffer_; + std::shared_ptr<PoolBuffer> dict_buffer_; }; TYPED_TEST_CASE(TestDictionaryEncoding, DictEncodedTypes); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/plain-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h index a3d7b69..d2127ef 100644 --- a/src/parquet/encodings/plain-encoding.h +++ b/src/parquet/encodings/plain-encoding.h @@ -25,8 +25,7 @@ #include "parquet/encodings/encoder.h" #include "parquet/schema/descriptor.h" #include "parquet/util/bit-stream-utils.inline.h" -#include "parquet/util/buffer.h" -#include "parquet/util/output.h" +#include "parquet/util/memory.h" namespace parquet { @@ -163,8 +162,9 @@ class PlainEncoder : public Encoder<DType> { explicit PlainEncoder( const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) - : Encoder<DType>(descr, Encoding::PLAIN, allocator), - values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) {} + : Encoder<DType>(descr, Encoding::PLAIN, allocator) { + values_sink_.reset(new InMemoryOutputStream(allocator)); + } int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); } @@ -172,7 +172,7 @@ class PlainEncoder : public Encoder<DType> { void Put(const T* src, int num_values) override; protected: - std::shared_ptr<InMemoryOutputStream> values_sink_; + std::unique_ptr<InMemoryOutputStream> values_sink_; }; template <> @@ -181,10 +181,10 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> { explicit PlainEncoder( const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) : Encoder<BooleanType>(descr, Encoding::PLAIN, allocator), - bits_available_(IN_MEMORY_DEFAULT_CAPACITY * 8), - bits_buffer_(IN_MEMORY_DEFAULT_CAPACITY, allocator), - values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) { - bit_writer_.reset(new BitWriter(bits_buffer_.mutable_data(), bits_buffer_.size())); + bits_available_(kInMemoryDefaultCapacity * 8), + bits_buffer_(AllocateBuffer(allocator, kInMemoryDefaultCapacity)), + values_sink_(new InMemoryOutputStream(allocator)) { + bit_writer_.reset(new BitWriter(bits_buffer_->mutable_data(), bits_buffer_->size())); } int64_t EstimatedDataEncodedSize() override { @@ -196,12 +196,11 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> { bit_writer_->Flush(); values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); bit_writer_->Clear(); - bits_available_ = bits_buffer_.size() * 8; + bits_available_ = bits_buffer_->size() * 8; } std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer(); - values_sink_.reset( - new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, this->allocator_)); + values_sink_.reset(new InMemoryOutputStream(this->allocator_)); return buffer; } @@ -225,7 +224,7 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> { \ int bits_remaining = num_values - bit_offset; \ while (bit_offset < num_values) { \ - bits_available_ = bits_buffer_.size() * 8; \ + bits_available_ = bits_buffer_->size() * 8; \ \ int bits_to_write = std::min(bits_available_, bits_remaining); \ for (int i = bit_offset; i < bit_offset + bits_to_write; i++) { \ @@ -249,15 +248,14 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> { protected: int bits_available_; std::unique_ptr<BitWriter> bit_writer_; - OwnedMutableBuffer bits_buffer_; - std::shared_ptr<InMemoryOutputStream> values_sink_; + std::shared_ptr<PoolBuffer> bits_buffer_; + std::unique_ptr<InMemoryOutputStream> values_sink_; }; template <typename DType> inline std::shared_ptr<Buffer> PlainEncoder<DType>::FlushValues() { std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer(); - values_sink_.reset( - new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, this->allocator_)); + values_sink_.reset(new InMemoryOutputStream(this->allocator_)); return buffer; } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/file-deserialize-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc index 5287885..fbb511a 100644 --- a/src/parquet/file/file-deserialize-test.cc +++ b/src/parquet/file/file-deserialize-test.cc @@ -33,12 +33,13 @@ #include "parquet/thrift/parquet_types.h" #include "parquet/thrift/util.h" #include "parquet/types.h" -#include "parquet/util/input.h" -#include "parquet/util/output.h" +#include "parquet/util/memory.h" #include "parquet/util/test-common.h" namespace parquet { +using ::arrow::io::BufferReader; + // Adds page statistics occupying a certain amount of bytes (for testing very // large page headers) static inline void AddDummyStats(int stat_size, format::DataPageHeader& data_page) { @@ -234,11 +235,13 @@ TEST_F(TestPageSerde, LZONotSupported) { class TestParquetFileReader : public ::testing::Test { public: void AssertInvalidFileThrows(const std::shared_ptr<Buffer>& buffer) { - std::unique_ptr<BufferReader> reader(new BufferReader(buffer)); reader_.reset(new ParquetFileReader()); + auto reader = std::make_shared<BufferReader>(buffer); + auto wrapper = std::unique_ptr<ArrowInputFile>(new ArrowInputFile(reader)); + ASSERT_THROW( - reader_->Open(SerializedFile::Open(std::move(reader))), ParquetException); + reader_->Open(SerializedFile::Open(std::move(wrapper))), ParquetException); } protected: http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/file-serialize-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc index 3a11cd8..7a90eeb 100644 --- a/src/parquet/file/file-serialize-test.cc +++ b/src/parquet/file/file-serialize-test.cc @@ -24,8 +24,7 @@ #include "parquet/file/reader.h" #include "parquet/file/writer.h" #include "parquet/types.h" -#include "parquet/util/input.h" -#include "parquet/util/output.h" +#include "parquet/util/memory.h" namespace parquet { @@ -75,8 +74,9 @@ class TestSerialize : public PrimitiveTypedTest<TestType> { file_writer->Close(); auto buffer = sink->GetBuffer(); - std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer)); - auto file_reader = ParquetFileReader::Open(std::move(source)); + + auto source = std::make_shared<::arrow::io::BufferReader>(buffer); + auto file_reader = ParquetFileReader::Open(source); ASSERT_EQ(num_columns_, file_reader->metadata()->num_columns()); ASSERT_EQ(1, file_reader->metadata()->num_row_groups()); ASSERT_EQ(100, file_reader->metadata()->num_rows()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/metadata.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index adfcb69..692a0f5 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -23,6 +23,7 @@ #include "parquet/file/metadata.h" #include "parquet/schema/converter.h" #include "parquet/thrift/util.h" +#include "parquet/util/memory.h" #include <boost/algorithm/string.hpp> http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/metadata.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h index c5dd03a..ef19c98 100644 --- a/src/parquet/file/metadata.h +++ b/src/parquet/file/metadata.h @@ -27,7 +27,7 @@ #include "parquet/compression/codec.h" #include "parquet/schema/descriptor.h" #include "parquet/types.h" -#include "parquet/util/output.h" +#include "parquet/util/memory.h" #include "parquet/util/visibility.h" namespace parquet { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/reader-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc index 37c790c..2c3ebb3 100644 --- a/src/parquet/file/reader-internal.cc +++ b/src/parquet/file/reader-internal.cc @@ -32,8 +32,7 @@ #include "parquet/schema/types.h" #include "parquet/thrift/util.h" #include "parquet/types.h" -#include "parquet/util/buffer.h" -#include "parquet/util/input.h" +#include "parquet/util/memory.h" namespace parquet { @@ -44,7 +43,7 @@ namespace parquet { SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream, int64_t total_num_rows, Compression::type codec_type, MemoryAllocator* allocator) : stream_(std::move(stream)), - decompression_buffer_(0, allocator), + decompression_buffer_(AllocateBuffer(allocator, 0)), seen_num_rows_(0), total_num_rows_(total_num_rows) { max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE; @@ -97,12 +96,12 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() { // Uncompress it if we need to if (decompressor_ != NULL) { // Grow the uncompressed buffer if we need to. - if (uncompressed_len > static_cast<int>(decompression_buffer_.size())) { - decompression_buffer_.Resize(uncompressed_len); + if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) { + PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len)); } - decompressor_->Decompress( - compressed_len, buffer, uncompressed_len, &decompression_buffer_[0]); - buffer = &decompression_buffer_[0]; + decompressor_->Decompress(compressed_len, buffer, uncompressed_len, + decompression_buffer_->mutable_data()); + buffer = decompression_buffer_->data(); } auto page_buffer = std::make_shared<Buffer>(buffer, uncompressed_len); @@ -207,7 +206,7 @@ static constexpr uint32_t FOOTER_SIZE = 8; static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; std::unique_ptr<ParquetFileReader::Contents> SerializedFile::Open( - std::unique_ptr<RandomAccessSource> source, ReaderProperties props) { + std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props) { std::unique_ptr<ParquetFileReader::Contents> result( new SerializedFile(std::move(source), props)); @@ -239,39 +238,40 @@ const FileMetaData* SerializedFile::metadata() const { } SerializedFile::SerializedFile(std::unique_ptr<RandomAccessSource> source, - ReaderProperties props = default_reader_properties()) + const ReaderProperties& props = default_reader_properties()) : source_(std::move(source)), properties_(props) {} void SerializedFile::ParseMetaData() { - int64_t filesize = source_->Size(); + int64_t file_size = source_->Size(); - if (filesize < FOOTER_SIZE) { + if (file_size < FOOTER_SIZE) { throw ParquetException("Corrupted file, smaller than file footer"); } uint8_t footer_buffer[FOOTER_SIZE]; - source_->Seek(filesize - FOOTER_SIZE); + source_->Seek(file_size - FOOTER_SIZE); int64_t bytes_read = source_->Read(FOOTER_SIZE, footer_buffer); if (bytes_read != FOOTER_SIZE || memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) { throw ParquetException("Invalid parquet file. Corrupt footer."); } uint32_t metadata_len = *reinterpret_cast<uint32_t*>(footer_buffer); - int64_t metadata_start = filesize - FOOTER_SIZE - metadata_len; - if (FOOTER_SIZE + metadata_len > filesize) { + int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len; + if (FOOTER_SIZE + metadata_len > file_size) { throw ParquetException( "Invalid parquet file. File is less than " "file metadata size."); } source_->Seek(metadata_start); - OwnedMutableBuffer metadata_buffer(metadata_len, properties_.allocator()); - bytes_read = source_->Read(metadata_len, &metadata_buffer[0]); + std::shared_ptr<PoolBuffer> metadata_buffer = + AllocateBuffer(properties_.allocator(), metadata_len); + bytes_read = source_->Read(metadata_len, metadata_buffer->mutable_data()); if (bytes_read != metadata_len) { throw ParquetException("Invalid parquet file. Could not read metadata bytes."); } - file_metadata_ = FileMetaData::Make(&metadata_buffer[0], &metadata_len); + file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len); } } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/reader-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h index 582ab35..aa9b75e 100644 --- a/src/parquet/file/reader-internal.h +++ b/src/parquet/file/reader-internal.h @@ -29,7 +29,7 @@ #include "parquet/file/reader.h" #include "parquet/thrift/parquet_types.h" #include "parquet/types.h" -#include "parquet/util/input.h" +#include "parquet/util/memory.h" namespace parquet { @@ -62,7 +62,7 @@ class SerializedPageReader : public PageReader { // Compression codec to use. std::unique_ptr<Codec> decompressor_; - OwnedMutableBuffer decompression_buffer_; + std::shared_ptr<PoolBuffer> decompression_buffer_; // Maximum allowed page size uint32_t max_page_header_size_; @@ -104,7 +104,7 @@ class SerializedFile : public ParquetFileReader::Contents { // lifetime separately static std::unique_ptr<ParquetFileReader::Contents> Open( std::unique_ptr<RandomAccessSource> source, - ReaderProperties props = default_reader_properties()); + const ReaderProperties& props = default_reader_properties()); virtual void Close(); virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i); virtual const FileMetaData* metadata() const; @@ -113,7 +113,7 @@ class SerializedFile : public ParquetFileReader::Contents { private: // This class takes ownership of the provided data source explicit SerializedFile( - std::unique_ptr<RandomAccessSource> source, ReaderProperties props); + std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props); std::unique_ptr<RandomAccessSource> source_; std::unique_ptr<FileMetaData> file_metadata_; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc index 06d2d8e..52fe57a 100644 --- a/src/parquet/file/reader.cc +++ b/src/parquet/file/reader.cc @@ -24,14 +24,16 @@ #include <utility> #include <vector> +#include "arrow/io/file.h" + #include "parquet/column/page.h" #include "parquet/column/reader.h" #include "parquet/column/scanner.h" #include "parquet/exception.h" #include "parquet/file/reader-internal.h" #include "parquet/types.h" -#include "parquet/util/input.h" #include "parquet/util/logging.h" +#include "parquet/util/memory.h" using std::string; using std::vector; @@ -69,26 +71,36 @@ ParquetFileReader::~ParquetFileReader() { } std::unique_ptr<ParquetFileReader> ParquetFileReader::Open( - std::unique_ptr<RandomAccessSource> source, ReaderProperties props) { - auto contents = SerializedFile::Open(std::move(source), props); + const std::shared_ptr<::arrow::io::ReadableFileInterface>& source, + const ReaderProperties& props) { + std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(source)); + return Open(std::move(io_wrapper), props); +} +std::unique_ptr<ParquetFileReader> ParquetFileReader::Open( + std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props) { + auto contents = SerializedFile::Open(std::move(source), props); std::unique_ptr<ParquetFileReader> result(new ParquetFileReader()); result->Open(std::move(contents)); - return result; } std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile( - const std::string& path, bool memory_map, ReaderProperties props) { - std::unique_ptr<LocalFileSource> file; + const std::string& path, bool memory_map, const ReaderProperties& props) { + std::shared_ptr<::arrow::io::ReadableFileInterface> source; if (memory_map) { - file.reset(new MemoryMapSource(props.allocator())); + std::shared_ptr<::arrow::io::ReadableFile> handle; + PARQUET_THROW_NOT_OK( + ::arrow::io::ReadableFile::Open(path, props.allocator(), &handle)); + source = handle; } else { - file.reset(new LocalFileSource(props.allocator())); + std::shared_ptr<::arrow::io::MemoryMappedFile> handle; + PARQUET_THROW_NOT_OK( + ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ, &handle)); + source = handle; } - file->Open(path); - return Open(std::move(file), props); + return Open(source, props); } void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h index ca28f67..1c24506 100644 --- a/src/parquet/file/reader.h +++ b/src/parquet/file/reader.h @@ -30,12 +30,12 @@ #include "parquet/column/statistics.h" #include "parquet/file/metadata.h" #include "parquet/schema/descriptor.h" +#include "parquet/util/memory.h" #include "parquet/util/visibility.h" namespace parquet { class ColumnReader; -class RandomAccessSource; class PARQUET_EXPORT RowGroupReader { public: @@ -79,15 +79,27 @@ class PARQUET_EXPORT ParquetFileReader { ParquetFileReader(); ~ParquetFileReader(); + // Create a reader from some implementation of parquet-cpp's generic file + // input interface + // + // If you cannot provide exclusive access to your file resource, create a + // subclass of RandomAccessSource that wraps the shared resource + static std::unique_ptr<ParquetFileReader> Open( + std::unique_ptr<RandomAccessSource> source, + const ReaderProperties& props = default_reader_properties()); + + // Create a file reader instance from an Arrow file object. Thread-safety is + // the responsibility of the file implementation + static std::unique_ptr<ParquetFileReader> Open( + const std::shared_ptr<::arrow::io::ReadableFileInterface>& source, + const ReaderProperties& props = default_reader_properties()); + // API Convenience to open a serialized Parquet file on disk, using built-in IO // interface implementations that were created for testing, and may not be robust for // all use cases. static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path, - bool memory_map = true, ReaderProperties props = default_reader_properties()); - - static std::unique_ptr<ParquetFileReader> Open( - std::unique_ptr<RandomAccessSource> source, - ReaderProperties props = default_reader_properties()); + bool memory_map = true, + const ReaderProperties& props = default_reader_properties()); void Open(std::unique_ptr<Contents> contents); void Close(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/writer-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index c4681bd..48884ad 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -20,7 +20,7 @@ #include "parquet/column/writer.h" #include "parquet/schema/converter.h" #include "parquet/thrift/util.h" -#include "parquet/util/output.h" +#include "parquet/util/memory.h" using parquet::schema::GroupNode; using parquet::schema::SchemaFlattener; @@ -37,6 +37,7 @@ SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type ColumnChunkMetaDataBuilder* metadata, MemoryAllocator* allocator) : sink_(sink), metadata_(metadata), + allocator_(allocator), num_values_(0), dictionary_page_offset_(0), data_page_offset_(0), @@ -71,10 +72,13 @@ std::shared_ptr<Buffer> SerializedPageWriter::Compress( // Compress the data int64_t max_compressed_size = compressor_->MaxCompressedLen(buffer->size(), buffer->data()); - auto compression_buffer = std::make_shared<OwnedMutableBuffer>(max_compressed_size); + + std::shared_ptr<PoolBuffer> compression_buffer = + AllocateBuffer(allocator_, max_compressed_size); + int64_t compressed_size = compressor_->Compress(buffer->size(), buffer->data(), max_compressed_size, compression_buffer->mutable_data()); - compression_buffer->Resize(compressed_size); + PARQUET_THROW_NOT_OK(compression_buffer->Resize(compressed_size)); return compression_buffer; } @@ -182,7 +186,7 @@ void RowGroupSerializer::Close() { // FileSerializer std::unique_ptr<ParquetFileWriter::Contents> FileSerializer::Open( - std::shared_ptr<OutputStream> sink, const std::shared_ptr<GroupNode>& schema, + const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema, const std::shared_ptr<WriterProperties>& properties) { std::unique_ptr<ParquetFileWriter::Contents> result( new FileSerializer(sink, schema, properties)); @@ -248,7 +252,7 @@ void FileSerializer::WriteMetaData() { sink_->Write(PARQUET_MAGIC, 4); } -FileSerializer::FileSerializer(std::shared_ptr<OutputStream> sink, +FileSerializer::FileSerializer(const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema, const std::shared_ptr<WriterProperties>& properties) : sink_(sink), http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/writer-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index f1f76ab..81a0837 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -26,6 +26,7 @@ #include "parquet/file/metadata.h" #include "parquet/file/writer.h" #include "parquet/thrift/parquet_types.h" +#include "parquet/util/memory.h" namespace parquet { @@ -54,6 +55,7 @@ class SerializedPageWriter : public PageWriter { private: OutputStream* sink_; ColumnChunkMetaDataBuilder* metadata_; + MemoryAllocator* allocator_; int64_t num_values_; int64_t dictionary_page_offset_; int64_t data_page_offset_; @@ -102,7 +104,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents { class FileSerializer : public ParquetFileWriter::Contents { public: static std::unique_ptr<ParquetFileWriter::Contents> Open( - std::shared_ptr<OutputStream> sink, + const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<schema::GroupNode>& schema, const std::shared_ptr<WriterProperties>& properties = default_writer_properties()); @@ -119,7 +121,7 @@ class FileSerializer : public ParquetFileWriter::Contents { virtual ~FileSerializer(); private: - explicit FileSerializer(std::shared_ptr<OutputStream> sink, + explicit FileSerializer(const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<schema::GroupNode>& schema, const std::shared_ptr<WriterProperties>& properties); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc index 8c9f52f..a381c22 100644 --- a/src/parquet/file/writer.cc +++ b/src/parquet/file/writer.cc @@ -18,7 +18,7 @@ #include "parquet/file/writer.h" #include "parquet/file/writer-internal.h" -#include "parquet/util/output.h" +#include "parquet/util/memory.h" using parquet::schema::GroupNode; @@ -51,13 +51,19 @@ ParquetFileWriter::~ParquetFileWriter() { } std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open( - std::shared_ptr<OutputStream> sink, const std::shared_ptr<GroupNode>& schema, + const std::shared_ptr<::arrow::io::OutputStream>& sink, + const std::shared_ptr<GroupNode>& schema, const std::shared_ptr<WriterProperties>& properties) { - auto contents = FileSerializer::Open(sink, schema, properties); + return Open(std::make_shared<ArrowOutputStream>(sink), schema, properties); +} +std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open( + const std::shared_ptr<OutputStream>& sink, + const std::shared_ptr<schema::GroupNode>& schema, + const std::shared_ptr<WriterProperties>& properties) { + auto contents = FileSerializer::Open(sink, schema, properties); std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter()); result->Open(std::move(contents)); - return result; } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h index e82f016..6d7161b 100644 --- a/src/parquet/file/writer.h +++ b/src/parquet/file/writer.h @@ -24,7 +24,7 @@ #include "parquet/column/properties.h" #include "parquet/schema/descriptor.h" #include "parquet/schema/types.h" -#include "parquet/util/mem-allocator.h" +#include "parquet/util/memory.h" #include "parquet/util/visibility.h" namespace parquet { @@ -97,7 +97,13 @@ class PARQUET_EXPORT ParquetFileWriter { ParquetFileWriter(); ~ParquetFileWriter(); - static std::unique_ptr<ParquetFileWriter> Open(std::shared_ptr<OutputStream> sink, + static std::unique_ptr<ParquetFileWriter> Open( + const std::shared_ptr<::arrow::io::OutputStream>& sink, + const std::shared_ptr<schema::GroupNode>& schema, + const std::shared_ptr<WriterProperties>& properties = default_writer_properties()); + + static std::unique_ptr<ParquetFileWriter> Open( + const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<schema::GroupNode>& schema, const std::shared_ptr<WriterProperties>& properties = default_writer_properties()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc index d21a809..e3be9b0 100644 --- a/src/parquet/reader-test.cc +++ b/src/parquet/reader-test.cc @@ -23,17 +23,20 @@ #include <memory> #include <string> +#include "arrow/io/file.h" + #include "parquet/column/reader.h" #include "parquet/column/scanner.h" #include "parquet/file/reader-internal.h" #include "parquet/file/reader.h" -#include "parquet/util/input.h" -#include "parquet/util/mem-allocator.h" +#include "parquet/util/memory.h" using std::string; namespace parquet { +using ReadableFile = ::arrow::io::ReadableFile; + const char* data_dir = std::getenv("PARQUET_TEST_DATA"); std::string alltypes_plain() { @@ -159,7 +162,7 @@ TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) { ASSERT_THROW(reader_->DebugPrint(ss, columns), ParquetException); } -class TestLocalFileSource : public ::testing::Test { +class TestLocalFile : public ::testing::Test { public: void SetUp() { std::string dir_string(data_dir); @@ -168,24 +171,25 @@ class TestLocalFileSource : public ::testing::Test { ss << dir_string << "/" << "alltypes_plain.parquet"; - file.reset(new LocalFileSource()); - file->Open(ss.str()); + PARQUET_THROW_NOT_OK(ReadableFile::Open(ss.str(), &handle)); + fileno = handle->file_descriptor(); } void TearDown() {} protected: - std::unique_ptr<LocalFileSource> file; + int fileno; + std::shared_ptr<::arrow::io::ReadableFile> handle; }; -TEST_F(TestLocalFileSource, FileClosedOnDestruction) { - int file_desc = file->file_descriptor(); +TEST_F(TestLocalFile, FileClosedOnDestruction) { { - auto contents = SerializedFile::Open(std::move(file)); + auto contents = SerializedFile::Open( + std::unique_ptr<RandomAccessSource>(new ArrowInputFile(handle))); std::unique_ptr<ParquetFileReader> result(new ParquetFileReader()); result->Open(std::move(contents)); } - ASSERT_EQ(-1, fcntl(file_desc, F_GETFD)); + ASSERT_EQ(-1, fcntl(fileno, F_GETFD)); ASSERT_EQ(EBADF, errno); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/thrift/util.h ---------------------------------------------------------------------- diff --git a/src/parquet/thrift/util.h b/src/parquet/thrift/util.h index 1800435..9d2b66f 100644 --- a/src/parquet/thrift/util.h +++ b/src/parquet/thrift/util.h @@ -37,7 +37,7 @@ #include "parquet/exception.h" #include "parquet/thrift/parquet_types.h" #include "parquet/util/logging.h" -#include "parquet/util/output.h" +#include "parquet/util/memory.h" namespace parquet { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt index 3a4b1c9..7a9ccba 100644 --- a/src/parquet/util/CMakeLists.txt +++ b/src/parquet/util/CMakeLists.txt @@ -20,17 +20,13 @@ install(FILES bit-stream-utils.h bit-stream-utils.inline.h bit-util.h - buffer.h buffer-builder.h compiler-util.h cpu-info.h hash-util.h - input.h logging.h macros.h - mem-allocator.h - mem-pool.h - output.h + memory.h rle-encoding.h stopwatch.h sse-util.h @@ -70,9 +66,6 @@ if (PARQUET_BUILD_BENCHMARKS) endif() ADD_PARQUET_TEST(bit-util-test) -ADD_PARQUET_TEST(buffer-test) ADD_PARQUET_TEST(comparison-test) -ADD_PARQUET_TEST(input-output-test) -ADD_PARQUET_TEST(mem-allocator-test) -ADD_PARQUET_TEST(mem-pool-test) +ADD_PARQUET_TEST(memory-test) ADD_PARQUET_TEST(rle-test) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/buffer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/buffer-test.cc b/src/parquet/util/buffer-test.cc deleted file mode 100644 index ee5b000..0000000 --- a/src/parquet/util/buffer-test.cc +++ /dev/null @@ -1,65 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <cstdint> -#include <cstdlib> -#include <exception> -#include <gtest/gtest.h> -#include <limits> -#include <memory> -#include <string> - -#include "parquet/exception.h" -#include "parquet/util/buffer.h" - -using std::string; - -namespace parquet { - -class TestBuffer : public ::testing::Test {}; - -TEST_F(TestBuffer, Resize) { - OwnedMutableBuffer buf; - - ASSERT_EQ(0, buf.size()); - ASSERT_NO_THROW(buf.Resize(100)); - ASSERT_EQ(100, buf.size()); - ASSERT_NO_THROW(buf.Resize(200)); - ASSERT_EQ(200, buf.size()); - - // Make it smaller, too - ASSERT_NO_THROW(buf.Resize(50)); - ASSERT_EQ(50, buf.size()); -} - -TEST_F(TestBuffer, ResizeOOM) { -// Tests that deliberately throw Exceptions foul up valgrind and report -// red herring memory leaks -#ifndef PARQUET_VALGRIND - OwnedMutableBuffer buf; - ASSERT_NO_THROW(buf.Resize(100)); - int64_t to_alloc = std::numeric_limits<int64_t>::max(); - try { - buf.Resize(to_alloc); - FAIL() << "Exception not thrown"; - } catch (const ParquetException& e) { - // pass - } catch (const std::exception& e) { FAIL() << "Different exception thrown"; } -#endif -} - -} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/buffer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/buffer.cc b/src/parquet/util/buffer.cc deleted file mode 100644 index 0b7100c..0000000 --- a/src/parquet/util/buffer.cc +++ /dev/null @@ -1,123 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "parquet/util/buffer.h" - -#include <algorithm> -#include <cstdint> - -#include "parquet/exception.h" -#include "parquet/types.h" - -namespace parquet { - -Buffer::Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size) { - data_ = parent->data() + offset; - size_ = size; - parent_ = parent; -} - -std::shared_ptr<Buffer> MutableBuffer::GetImmutableView() { - return std::make_shared<Buffer>(this->get_shared_ptr(), 0, size()); -} - -OwnedMutableBuffer::OwnedMutableBuffer(int64_t size, MemoryAllocator* allocator) - : ResizableBuffer(nullptr, 0), allocator_(allocator) { - Resize(size); -} - -OwnedMutableBuffer::~OwnedMutableBuffer() { - if (mutable_data_) { allocator_->Free(mutable_data_, capacity_); } -} - -void OwnedMutableBuffer::Reserve(int64_t new_capacity) { - if (!mutable_data_ || new_capacity > capacity_) { - if (mutable_data_) { - uint8_t* new_data = allocator_->Malloc(new_capacity); - memcpy(new_data, mutable_data_, size_); - allocator_->Free(mutable_data_, capacity_); - mutable_data_ = new_data; - } else { - mutable_data_ = allocator_->Malloc(new_capacity); - } - data_ = mutable_data_; - capacity_ = new_capacity; - } -} - -void OwnedMutableBuffer::Resize(int64_t new_size) { - Reserve(new_size); - size_ = new_size; -} - -uint8_t& OwnedMutableBuffer::operator[](int64_t i) { - return mutable_data_[i]; -} - -template <class T> -Vector<T>::Vector(int64_t size, MemoryAllocator* allocator) - : buffer_(new OwnedMutableBuffer(size * sizeof(T), allocator)), - size_(size), - capacity_(size) { - if (size > 0) { - data_ = reinterpret_cast<T*>(buffer_->mutable_data()); - } else { - data_ = nullptr; - } -} - -template <class T> -void Vector<T>::Reserve(int64_t new_capacity) { - if (new_capacity > capacity_) { - buffer_->Resize(new_capacity * sizeof(T)); - data_ = reinterpret_cast<T*>(buffer_->mutable_data()); - capacity_ = new_capacity; - } -} - -template <class T> -void Vector<T>::Resize(int64_t new_size) { - Reserve(new_size); - size_ = new_size; -} - -template <class T> -void Vector<T>::Assign(int64_t size, const T val) { - Resize(size); - for (int64_t i = 0; i < size_; i++) { - data_[i] = val; - } -} - -template <class T> -void Vector<T>::Swap(Vector<T>& v) { - buffer_.swap(v.buffer_); - std::swap(size_, v.size_); - std::swap(capacity_, v.capacity_); - std::swap(data_, v.data_); -} - -template class Vector<int32_t>; -template class Vector<int64_t>; -template class Vector<bool>; -template class Vector<float>; -template class Vector<double>; -template class Vector<Int96>; -template class Vector<ByteArray>; -template class Vector<FixedLenByteArray>; - -} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/buffer.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/buffer.h b/src/parquet/util/buffer.h deleted file mode 100644 index 58a5f5e..0000000 --- a/src/parquet/util/buffer.h +++ /dev/null @@ -1,149 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_UTIL_BUFFER_H -#define PARQUET_UTIL_BUFFER_H - -#include <cstdint> -#include <cstdlib> -#include <cstring> -#include <memory> -#include <vector> - -#include "parquet/util/macros.h" -#include "parquet/util/mem-allocator.h" -#include "parquet/util/visibility.h" - -namespace parquet { - -// ---------------------------------------------------------------------- -// Buffer classes - -// Immutable API for a chunk of bytes which may or may not be owned by the -// class instance -class PARQUET_EXPORT Buffer : public std::enable_shared_from_this<Buffer> { - public: - Buffer(const uint8_t* data, int64_t size) : data_(data), size_(size) {} - - // An offset into data that is owned by another buffer, but we want to be - // able to retain a valid pointer to it even after other shared_ptr's to the - // parent buffer have been destroyed - Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size); - - std::shared_ptr<Buffer> get_shared_ptr() { return shared_from_this(); } - - // Return true if both buffers are the same size and contain the same bytes - // up to the number of compared bytes - bool Equals(const Buffer& other, int64_t nbytes) const { - return this == &other || (size_ >= nbytes && other.size_ >= nbytes && - !memcmp(data_, other.data_, nbytes)); - } - - bool Equals(const Buffer& other) const { - return this == &other || (size_ == other.size_ && !memcmp(data_, other.data_, size_)); - } - - const uint8_t* data() const { return data_; } - - int64_t size() const { return size_; } - - // Returns true if this Buffer is referencing memory (possibly) owned by some - // other buffer - bool is_shared() const { return static_cast<bool>(parent_); } - - const std::shared_ptr<Buffer> parent() const { return parent_; } - - protected: - const uint8_t* data_; - int64_t size_; - - // nullptr by default, but may be set - std::shared_ptr<Buffer> parent_; - - private: - DISALLOW_COPY_AND_ASSIGN(Buffer); -}; - -// A Buffer whose contents can be mutated. May or may not own its data. -class PARQUET_EXPORT MutableBuffer : public Buffer { - public: - MutableBuffer(uint8_t* data, int64_t size) : Buffer(data, size) { - mutable_data_ = data; - } - - uint8_t* mutable_data() { return mutable_data_; } - - // Get a read-only view of this buffer - std::shared_ptr<Buffer> GetImmutableView(); - - protected: - MutableBuffer() : Buffer(nullptr, 0), mutable_data_(nullptr) {} - - uint8_t* mutable_data_; -}; - -class PARQUET_EXPORT ResizableBuffer : public MutableBuffer { - public: - virtual void Resize(int64_t new_size) = 0; - - protected: - ResizableBuffer(uint8_t* data, int64_t size) - : MutableBuffer(data, size), capacity_(size) {} - int64_t capacity_; -}; - -// A ResizableBuffer whose memory is owned by the class instance. For example, -// for reading data out of files that you want to deallocate when this class is -// garbage-collected -class PARQUET_EXPORT OwnedMutableBuffer : public ResizableBuffer { - public: - explicit OwnedMutableBuffer( - int64_t size = 0, MemoryAllocator* allocator = default_allocator()); - virtual ~OwnedMutableBuffer(); - void Resize(int64_t new_size) override; - void Reserve(int64_t new_capacity); - uint8_t& operator[](int64_t i); - - private: - // TODO: aligned allocations - MemoryAllocator* allocator_; - - DISALLOW_COPY_AND_ASSIGN(OwnedMutableBuffer); -}; - -template <class T> -class Vector { - public: - explicit Vector(int64_t size, MemoryAllocator* allocator); - void Resize(int64_t new_size); - void Reserve(int64_t new_capacity); - void Assign(int64_t size, const T val); - void Swap(Vector<T>& v); - inline T& operator[](int64_t i) const { return data_[i]; } - - private: - std::unique_ptr<OwnedMutableBuffer> buffer_; - int64_t size_; - int64_t capacity_; - T* data_; - - DISALLOW_COPY_AND_ASSIGN(Vector); -}; - -} // namespace parquet - -#endif // PARQUET_UTIL_BUFFER_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/input-output-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/input-output-test.cc b/src/parquet/util/input-output-test.cc deleted file mode 100644 index 72aad9c..0000000 --- a/src/parquet/util/input-output-test.cc +++ /dev/null @@ -1,244 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <gtest/gtest.h> - -#include <cstdint> -#include <cstdio> -#include <fstream> -#include <iostream> -#include <memory> -#include <string> -#include <vector> - -#include "parquet/exception.h" -#include "parquet/util/buffer.h" -#include "parquet/util/input.h" -#include "parquet/util/mem-allocator.h" -#include "parquet/util/output.h" -#include "parquet/util/test-common.h" - -namespace parquet { - -TEST(TestBufferedInputStream, Basics) { - int64_t source_size = 256; - int64_t stream_offset = 10; - int64_t stream_size = source_size - stream_offset; - int64_t chunk_size = 50; - auto buf = std::make_shared<OwnedMutableBuffer>(source_size); - ASSERT_EQ(source_size, buf->size()); - for (int i = 0; i < source_size; i++) { - buf->mutable_data()[i] = i; - } - - std::unique_ptr<BufferReader> source(new BufferReader(buf)); - std::unique_ptr<MemoryAllocator> allocator(new TrackingAllocator()); - std::unique_ptr<BufferedInputStream> stream(new BufferedInputStream( - allocator.get(), chunk_size, source.get(), stream_offset, stream_size)); - - const uint8_t* output; - int64_t bytes_read; - - // source is at offset 10 - output = stream->Peek(10, &bytes_read); - ASSERT_EQ(10, bytes_read); - for (int i = 0; i < 10; i++) { - ASSERT_EQ(10 + i, output[i]) << i; - } - output = stream->Read(10, &bytes_read); - ASSERT_EQ(10, bytes_read); - for (int i = 0; i < 10; i++) { - ASSERT_EQ(10 + i, output[i]) << i; - } - output = stream->Read(10, &bytes_read); - ASSERT_EQ(10, bytes_read); - for (int i = 0; i < 10; i++) { - ASSERT_EQ(20 + i, output[i]) << i; - } - stream->Advance(5); - stream->Advance(5); - // source is at offset 40 - // read across buffer boundary. buffer size is 50 - output = stream->Read(20, &bytes_read); - ASSERT_EQ(20, bytes_read); - for (int i = 0; i < 20; i++) { - ASSERT_EQ(40 + i, output[i]) << i; - } - // read more than original chunk_size - output = stream->Read(60, &bytes_read); - ASSERT_EQ(60, bytes_read); - for (int i = 0; i < 60; i++) { - ASSERT_EQ(60 + i, output[i]) << i; - } - - stream->Advance(120); - // source is at offset 240 - // read outside of source boundary. source size is 256 - output = stream->Read(30, &bytes_read); - ASSERT_EQ(16, bytes_read); - for (int i = 0; i < 16; i++) { - ASSERT_EQ(240 + i, output[i]) << i; - } -} - -TEST(TestInMemoryOutputStream, Basics) { - std::unique_ptr<InMemoryOutputStream> stream(new InMemoryOutputStream(8)); - - std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; - - stream->Write(&data[0], 4); - ASSERT_EQ(4, stream->Tell()); - stream->Write(&data[4], data.size() - 4); - - std::shared_ptr<Buffer> buffer = stream->GetBuffer(); - - Buffer data_buf(data.data(), data.size()); - - ASSERT_TRUE(data_buf.Equals(*buffer)); -} - -TEST(TestBufferedReader, Basics) { - std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; - auto buffer = std::make_shared<Buffer>(data.data(), data.size()); - BufferReader reader(buffer); - - uint8_t out[4]; - ASSERT_EQ(4, reader.Read(4, out)); - ASSERT_EQ(4, reader.Tell()); - ASSERT_EQ(0, out[0]); - ASSERT_EQ(1, out[1]); - ASSERT_EQ(2, out[2]); - ASSERT_EQ(3, out[3]); - - reader.Seek(8); - ASSERT_EQ(8, reader.Tell()); - - auto out_buffer = reader.Read(5); - ASSERT_EQ(8, out_buffer->data()[0]); - ASSERT_EQ(9, out_buffer->data()[1]); - ASSERT_EQ(10, out_buffer->data()[2]); - ASSERT_EQ(11, out_buffer->data()[3]); - ASSERT_EQ(12, out_buffer->data()[4]); - - // Read past the end of the buffer - ASSERT_EQ(13, reader.Tell()); - ASSERT_EQ(0, reader.Read(4, out)); - ASSERT_EQ(0, reader.Read(4)->size()); - - reader.Close(); -} - -static bool file_exists(const std::string& path) { - return std::ifstream(path.c_str()).good(); -} - -template <typename ReaderType> -class TestFileReaders : public ::testing::Test { - public: - void SetUp() { - test_path_ = "parquet-input-output-test.txt"; - if (file_exists(test_path_)) { std::remove(test_path_.c_str()); } - test_data_ = "testingdata"; - - std::ofstream stream; - stream.open(test_path_.c_str()); - stream << test_data_; - filesize_ = test_data_.size(); - } - - void TearDown() { DeleteTestFile(); } - - void DeleteTestFile() { - if (file_exists(test_path_)) { std::remove(test_path_.c_str()); } - } - - protected: - ReaderType source; - std::string test_path_; - std::string test_data_; - int filesize_; -}; - -typedef ::testing::Types<LocalFileSource, MemoryMapSource> ReaderTypes; - -TYPED_TEST_CASE(TestFileReaders, ReaderTypes); - -TYPED_TEST(TestFileReaders, NonExistentFile) { - ASSERT_THROW(this->source.Open("0xDEADBEEF.txt"), ParquetException); -} - -TYPED_TEST(TestFileReaders, Read) { - this->source.Open(this->test_path_); - - ASSERT_EQ(this->filesize_, this->source.Size()); - - std::shared_ptr<Buffer> buffer = this->source.Read(4); - ASSERT_EQ(4, buffer->size()); - ASSERT_EQ(0, memcmp(this->test_data_.c_str(), buffer->data(), 4)); - - // Read past EOF - buffer = this->source.Read(10); - ASSERT_EQ(7, buffer->size()); - ASSERT_EQ(0, memcmp(this->test_data_.c_str() + 4, buffer->data(), 7)); -} - -TYPED_TEST(TestFileReaders, FileDisappeared) { - this->source.Open(this->test_path_); - this->source.Seek(4); - this->DeleteTestFile(); - this->source.Close(); -} - -TYPED_TEST(TestFileReaders, BadSeek) { - this->source.Open(this->test_path_); - ASSERT_THROW(this->source.Seek(this->filesize_ + 1), ParquetException); -} - -class TestFileWriter : public ::testing::Test { - public: - void SetUp() { - test_path_ = "parquet-input-output-test.txt"; - if (file_exists(test_path_)) { std::remove(test_path_.c_str()); } - } - - void TearDown() { DeleteTestFile(); } - - void DeleteTestFile() { - if (file_exists(test_path_)) { std::remove(test_path_.c_str()); } - } - - protected: - std::string test_path_; - uint8_t test_data_[4] = {1, 2, 3, 4}; -}; - -TEST_F(TestFileWriter, Write) { - LocalFileOutputStream sink(test_path_); - ASSERT_EQ(0, sink.Tell()); - sink.Write(test_data_, 4); - ASSERT_EQ(4, sink.Tell()); - sink.Close(); - - // Check that the correct content was written - LocalFileSource source; - source.Open(test_path_); - std::shared_ptr<Buffer> buffer = source.Read(4); - ASSERT_EQ(4, buffer->size()); - ASSERT_EQ(0, memcmp(test_data_, buffer->data(), 4)); -} - -} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/input.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/input.cc b/src/parquet/util/input.cc deleted file mode 100644 index 127b90c..0000000 --- a/src/parquet/util/input.cc +++ /dev/null @@ -1,285 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "parquet/util/input.h" - -#include <algorithm> -#include <sstream> -#include <string> -#include <sys/mman.h> - -#include "parquet/exception.h" -#include "parquet/util/buffer.h" -#include "parquet/util/logging.h" - -namespace parquet { - -// ---------------------------------------------------------------------- -// RandomAccessSource - -std::shared_ptr<Buffer> RandomAccessSource::ReadAt(int64_t pos, int64_t nbytes) { - Seek(pos); - return Read(nbytes); -} - -int64_t RandomAccessSource::Size() const { - return size_; -} - -// ---------------------------------------------------------------------- -// LocalFileSource - -LocalFileSource::~LocalFileSource() { - CloseFile(); -} - -void LocalFileSource::Open(const std::string& path) { - path_ = path; - file_ = fopen(path_.c_str(), "rb"); - if (file_ == nullptr || ferror(file_)) { - std::stringstream ss; - ss << "Unable to open file: " << path; - throw ParquetException(ss.str()); - } - is_open_ = true; - SeekFile(0, SEEK_END); - size_ = LocalFileSource::Tell(); - Seek(0); -} - -void LocalFileSource::SeekFile(int64_t pos, int origin) { - if (origin == SEEK_SET && (pos < 0 || pos >= size_)) { - std::stringstream ss; - ss << "Position " << pos << " is not in range."; - throw ParquetException(ss.str()); - } - - if (0 != fseek(file_, pos, origin)) { - std::stringstream ss; - ss << "File seek to position " << pos << " failed."; - throw ParquetException(ss.str()); - } -} - -void LocalFileSource::Close() { - // Pure virtual - CloseFile(); -} - -void LocalFileSource::CloseFile() { - if (is_open_) { - fclose(file_); - is_open_ = false; - } -} - -void LocalFileSource::Seek(int64_t pos) { - SeekFile(pos); -} - -int64_t LocalFileSource::Tell() const { - int64_t position = ftell(file_); - if (position < 0) { throw ParquetException("ftell failed, did the file disappear?"); } - return position; -} - -int LocalFileSource::file_descriptor() const { - return fileno(file_); -} - -int64_t LocalFileSource::Read(int64_t nbytes, uint8_t* buffer) { - return fread(buffer, 1, nbytes, file_); -} - -std::shared_ptr<Buffer> LocalFileSource::Read(int64_t nbytes) { - auto result = std::make_shared<OwnedMutableBuffer>(0, allocator_); - result->Resize(nbytes); - - int64_t bytes_read = Read(nbytes, result->mutable_data()); - if (bytes_read < nbytes) { result->Resize(bytes_read); } - return result; -} -// ---------------------------------------------------------------------- -// MemoryMapSource methods - -MemoryMapSource::~MemoryMapSource() { - CloseFile(); -} - -void MemoryMapSource::Open(const std::string& path) { - LocalFileSource::Open(path); - data_ = reinterpret_cast<uint8_t*>( - mmap(nullptr, size_, PROT_READ, MAP_SHARED, fileno(file_), 0)); - if (data_ == nullptr) { throw ParquetException("Memory mapping file failed"); } - pos_ = 0; -} - -void MemoryMapSource::Close() { - // Pure virtual - CloseFile(); -} - -void MemoryMapSource::CloseFile() { - if (data_ != nullptr) { - munmap(data_, size_); - data_ = nullptr; - } - - LocalFileSource::CloseFile(); -} - -void MemoryMapSource::Seek(int64_t pos) { - if (pos < 0 || pos >= size_) { - std::stringstream ss; - ss << "Position " << pos << " is not in range."; - throw ParquetException(ss.str()); - } - - pos_ = pos; -} - -int64_t MemoryMapSource::Tell() const { - return pos_; -} - -int64_t MemoryMapSource::Read(int64_t nbytes, uint8_t* buffer) { - int64_t bytes_available = std::min(nbytes, size_ - pos_); - memcpy(buffer, data_ + pos_, bytes_available); - pos_ += bytes_available; - return bytes_available; -} - -std::shared_ptr<Buffer> MemoryMapSource::Read(int64_t nbytes) { - int64_t bytes_available = std::min(nbytes, size_ - pos_); - auto result = std::make_shared<Buffer>(data_ + pos_, bytes_available); - pos_ += bytes_available; - return result; -} - -// ---------------------------------------------------------------------- -// BufferReader - -BufferReader::BufferReader(const std::shared_ptr<Buffer>& buffer) - : buffer_(buffer), data_(buffer->data()), pos_(0) { - size_ = buffer->size(); -} - -int64_t BufferReader::Tell() const { - return pos_; -} - -void BufferReader::Seek(int64_t pos) { - if (pos < 0 || pos >= size_) { - std::stringstream ss; - ss << "Cannot seek to " << pos << "File is length " << size_; - throw ParquetException(ss.str()); - } - pos_ = pos; -} - -int64_t BufferReader::Read(int64_t nbytes, uint8_t* out) { - int64_t bytes_available = std::min(nbytes, size_ - pos_); - memcpy(out, Head(), bytes_available); - pos_ += bytes_available; - return bytes_available; -} - -std::shared_ptr<Buffer> BufferReader::Read(int64_t nbytes) { - int64_t bytes_available = std::min(nbytes, size_ - pos_); - auto result = std::make_shared<Buffer>(Head(), bytes_available); - pos_ += bytes_available; - return result; -} - -// ---------------------------------------------------------------------- -// InMemoryInputStream - -InMemoryInputStream::InMemoryInputStream(const std::shared_ptr<Buffer>& buffer) - : buffer_(buffer), offset_(0) { - len_ = buffer_->size(); -} - -InMemoryInputStream::InMemoryInputStream( - RandomAccessSource* source, int64_t start, int64_t num_bytes) - : offset_(0) { - buffer_ = source->ReadAt(start, num_bytes); - if (buffer_->size() < num_bytes) { - throw ParquetException("Unable to read column chunk data"); - } - len_ = buffer_->size(); -} - -const uint8_t* InMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) { - *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_); - return buffer_->data() + offset_; -} - -const uint8_t* InMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) { - const uint8_t* result = Peek(num_to_read, num_bytes); - offset_ += *num_bytes; - return result; -} - -void InMemoryInputStream::Advance(int64_t num_bytes) { - offset_ += num_bytes; -} - -// ---------------------------------------------------------------------- -// BufferedInputStream -BufferedInputStream::BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size, - RandomAccessSource* source, int64_t start, int64_t num_bytes) - : source_(source), stream_offset_(start), stream_end_(start + num_bytes) { - buffer_ = std::make_shared<OwnedMutableBuffer>(buffer_size, pool); - buffer_size_ = buffer_->size(); - // Required to force a lazy read - buffer_offset_ = buffer_size_; -} - -const uint8_t* BufferedInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) { - *num_bytes = std::min(num_to_peek, stream_end_ - stream_offset_); - // increase the buffer size if needed - if (*num_bytes > buffer_size_) { - buffer_->Resize(*num_bytes); - buffer_size_ = buffer_->size(); - DCHECK(buffer_size_ >= *num_bytes); - } - // Read more data when buffer has insufficient left or when resized - if (*num_bytes > (buffer_size_ - buffer_offset_)) { - source_->Seek(stream_offset_); - buffer_size_ = std::min(buffer_size_, stream_end_ - stream_offset_); - int64_t bytes_read = source_->Read(buffer_size_, buffer_->mutable_data()); - if (bytes_read < *num_bytes) { - throw ParquetException("Failed reading column data from source"); - } - buffer_offset_ = 0; - } - return buffer_->data() + buffer_offset_; -} - -const uint8_t* BufferedInputStream::Read(int64_t num_to_read, int64_t* num_bytes) { - const uint8_t* result = Peek(num_to_read, num_bytes); - stream_offset_ += *num_bytes; - buffer_offset_ += *num_bytes; - return result; -} - -void BufferedInputStream::Advance(int64_t num_bytes) { - stream_offset_ += num_bytes; - buffer_offset_ += num_bytes; -} - -} // namespace parquet
