Repository: parquet-cpp Updated Branches: refs/heads/master c14706f6e -> 5e59bc5c6
PARQUET-874: Use default memory allocator from Arrow Author: Korn, Uwe <[email protected]> Closes #241 from xhochy/PARQUET-874 and squashes the following commits: 7e7fbfb [Korn, Uwe] Fix benchmark compilation c57b2fa [Korn, Uwe] PARQUET-874: Use default memory allocator from Arrow Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/5e59bc5c Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/5e59bc5c Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/5e59bc5c Branch: refs/heads/master Commit: 5e59bc5c6491a7505585c08fd62aa52f9a6c9afc Parents: c14706f Author: Korn, Uwe <[email protected]> Authored: Sat Feb 11 16:48:24 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Sat Feb 11 16:48:24 2017 -0500 ---------------------------------------------------------------------- src/parquet/column/properties.h | 28 ++++++------ src/parquet/column/reader.cc | 31 +++++++------ src/parquet/column/reader.h | 16 +++---- src/parquet/column/scanner.cc | 23 +++++----- src/parquet/column/scanner.h | 10 ++-- src/parquet/column/statistics-test.cc | 19 ++++---- src/parquet/column/statistics.cc | 28 ++++++------ src/parquet/column/statistics.h | 8 ++-- src/parquet/column/test-util.h | 2 +- src/parquet/column/writer.cc | 19 ++++---- src/parquet/column/writer.h | 2 +- src/parquet/encoding-benchmark.cc | 5 +- src/parquet/encoding-internal.h | 54 +++++++++++----------- src/parquet/encoding-test.cc | 12 +++-- src/parquet/encoding.h | 8 ++-- src/parquet/file/file-serialize-test.cc | 3 +- src/parquet/file/reader-internal.cc | 10 ++-- src/parquet/file/reader-internal.h | 3 +- src/parquet/file/reader.cc | 4 +- src/parquet/file/writer-internal.cc | 10 ++-- src/parquet/file/writer-internal.h | 4 +- src/parquet/thrift.h | 6 ++- src/parquet/util/memory-test.cc | 50 ++------------------ src/parquet/util/memory.cc | 68 +++++++--------------------- src/parquet/util/memory.h | 35 ++++---------- src/parquet/util/test-common.h | 3 +- 26 files changed, 194 insertions(+), 267 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/properties.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h index b9b0bf3..6165603 100644 --- a/src/parquet/column/properties.h +++ b/src/parquet/column/properties.h @@ -39,20 +39,20 @@ static bool DEFAULT_USE_BUFFERED_STREAM = false; class PARQUET_EXPORT ReaderProperties { public: - explicit ReaderProperties(MemoryAllocator* allocator = default_allocator()) - : allocator_(allocator) { + explicit ReaderProperties(::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : pool_(pool) { buffered_stream_enabled_ = DEFAULT_USE_BUFFERED_STREAM; buffer_size_ = DEFAULT_BUFFER_SIZE; } - MemoryAllocator* allocator() const { return allocator_; } + ::arrow::MemoryPool* memory_pool() const { return pool_; } std::unique_ptr<InputStream> GetStream( RandomAccessSource* source, int64_t start, int64_t num_bytes) { std::unique_ptr<InputStream> stream; if (buffered_stream_enabled_) { stream.reset( - new BufferedInputStream(allocator_, buffer_size_, source, start, num_bytes)); + new BufferedInputStream(pool_, buffer_size_, source, start, num_bytes)); } else { stream.reset(new InMemoryInputStream(source, start, num_bytes)); } @@ -70,7 +70,7 @@ class PARQUET_EXPORT ReaderProperties { int64_t buffer_size() const { return buffer_size_; } private: - MemoryAllocator* allocator_; + ::arrow::MemoryPool* pool_; int64_t buffer_size_; bool buffered_stream_enabled_; }; @@ -110,7 +110,7 @@ class PARQUET_EXPORT WriterProperties { class Builder { public: Builder() - : allocator_(default_allocator()), + : pool_(::arrow::default_memory_pool()), dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT), write_batch_size_(DEFAULT_WRITE_BATCH_SIZE), pagesize_(DEFAULT_PAGE_SIZE), @@ -118,8 +118,8 @@ class PARQUET_EXPORT WriterProperties { created_by_(DEFAULT_CREATED_BY) {} virtual ~Builder() {} - Builder* allocator(MemoryAllocator* allocator) { - allocator_ = allocator; + Builder* memory_pool(::arrow::MemoryPool* pool) { + pool_ = pool; return this; } @@ -281,13 +281,13 @@ class PARQUET_EXPORT WriterProperties { for (const auto& item : statistics_enabled_) get(item.first).statistics_enabled = item.second; - return std::shared_ptr<WriterProperties>(new WriterProperties(allocator_, + return std::shared_ptr<WriterProperties>(new WriterProperties(pool_, dictionary_pagesize_limit_, write_batch_size_, pagesize_, version_, created_by_, default_column_properties_, column_properties)); } private: - MemoryAllocator* allocator_; + ::arrow::MemoryPool* pool_; int64_t dictionary_pagesize_limit_; int64_t write_batch_size_; int64_t pagesize_; @@ -302,7 +302,7 @@ class PARQUET_EXPORT WriterProperties { std::unordered_map<std::string, bool> statistics_enabled_; }; - inline MemoryAllocator* allocator() const { return allocator_; } + inline ::arrow::MemoryPool* memory_pool() const { return pool_; } inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; } @@ -354,11 +354,11 @@ class PARQUET_EXPORT WriterProperties { } private: - explicit WriterProperties(MemoryAllocator* allocator, int64_t dictionary_pagesize_limit, + explicit WriterProperties(::arrow::MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, int64_t pagesize, ParquetVersion::type version, const std::string& created_by, const ColumnProperties& default_column_properties, const std::unordered_map<std::string, ColumnProperties>& column_properties) - : allocator_(allocator), + : pool_(pool), dictionary_pagesize_limit_(dictionary_pagesize_limit), write_batch_size_(write_batch_size), pagesize_(pagesize), @@ -367,7 +367,7 @@ class PARQUET_EXPORT WriterProperties { default_column_properties_(default_column_properties), column_properties_(column_properties) {} - MemoryAllocator* allocator_; + ::arrow::MemoryPool* pool_; int64_t dictionary_pagesize_limit_; int64_t write_batch_size_; int64_t pagesize_; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc index a547fdc..71bb689 100644 --- a/src/parquet/column/reader.cc +++ b/src/parquet/column/reader.cc @@ -25,6 +25,8 @@ #include "parquet/column/properties.h" #include "parquet/encoding-internal.h" +using arrow::MemoryPool; + namespace parquet { ReaderProperties default_reader_properties() { @@ -32,13 +34,13 @@ ReaderProperties default_reader_properties() { return default_reader_properties; } -ColumnReader::ColumnReader(const ColumnDescriptor* descr, - std::unique_ptr<PageReader> pager, MemoryAllocator* allocator) +ColumnReader::ColumnReader( + const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, MemoryPool* pool) : descr_(descr), pager_(std::move(pager)), num_buffered_values_(0), num_decoded_values_(0), - allocator_(allocator) {} + pool_(pool) {} ColumnReader::~ColumnReader() {} @@ -66,7 +68,7 @@ void TypedColumnReader<DType>::ConfigureDictionary(const DictionaryPage* page) { // TODO(wesm): investigate whether this all-or-nothing decoding of the // dictionary makes sense and whether performance can be improved - auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, allocator_); + auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, pool_); decoder->SetDict(&dictionary); decoders_[encoding] = decoder; } else { @@ -194,26 +196,25 @@ int64_t ColumnReader::ReadRepetitionLevels(int64_t batch_size, int16_t* levels) // ---------------------------------------------------------------------- // Dynamic column reader constructor -std::shared_ptr<ColumnReader> ColumnReader::Make(const ColumnDescriptor* descr, - std::unique_ptr<PageReader> pager, MemoryAllocator* allocator) { +std::shared_ptr<ColumnReader> ColumnReader::Make( + const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, MemoryPool* pool) { switch (descr->physical_type()) { case Type::BOOLEAN: - return std::make_shared<BoolReader>(descr, std::move(pager), allocator); + return std::make_shared<BoolReader>(descr, std::move(pager), pool); case Type::INT32: - return std::make_shared<Int32Reader>(descr, std::move(pager), allocator); + return std::make_shared<Int32Reader>(descr, std::move(pager), pool); case Type::INT64: - return std::make_shared<Int64Reader>(descr, std::move(pager), allocator); + return std::make_shared<Int64Reader>(descr, std::move(pager), pool); case Type::INT96: - return std::make_shared<Int96Reader>(descr, std::move(pager), allocator); + return std::make_shared<Int96Reader>(descr, std::move(pager), pool); case Type::FLOAT: - return std::make_shared<FloatReader>(descr, std::move(pager), allocator); + return std::make_shared<FloatReader>(descr, std::move(pager), pool); case Type::DOUBLE: - return std::make_shared<DoubleReader>(descr, std::move(pager), allocator); + return std::make_shared<DoubleReader>(descr, std::move(pager), pool); case Type::BYTE_ARRAY: - return std::make_shared<ByteArrayReader>(descr, std::move(pager), allocator); + return std::make_shared<ByteArrayReader>(descr, std::move(pager), pool); case Type::FIXED_LEN_BYTE_ARRAY: - return std::make_shared<FixedLenByteArrayReader>( - descr, std::move(pager), allocator); + return std::make_shared<FixedLenByteArrayReader>(descr, std::move(pager), pool); default: ParquetException::NYI("type reader not implemented"); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h index 0c1e25c..e0c6585 100644 --- a/src/parquet/column/reader.h +++ b/src/parquet/column/reader.h @@ -41,12 +41,12 @@ namespace parquet { class PARQUET_EXPORT ColumnReader { public: ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>, - MemoryAllocator* allocator = default_allocator()); + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); virtual ~ColumnReader(); static std::shared_ptr<ColumnReader> Make(const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, - MemoryAllocator* allocator = default_allocator()); + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); // Returns true if there are still values in this column. bool HasNext() { @@ -97,7 +97,7 @@ class PARQUET_EXPORT ColumnReader { // into memory int num_decoded_values_; - MemoryAllocator* allocator_; + ::arrow::MemoryPool* pool_; }; // API to read values from a single column. This is the main client facing API. @@ -107,8 +107,8 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader { typedef typename DType::c_type T; TypedColumnReader(const ColumnDescriptor* schema, std::unique_ptr<PageReader> pager, - MemoryAllocator* allocator = default_allocator()) - : ColumnReader(schema, std::move(pager), allocator), current_decoder_(NULL) {} + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : ColumnReader(schema, std::move(pager), pool), current_decoder_(NULL) {} virtual ~TypedColumnReader() {} // Read a batch of repetition levels, definition levels, and values from the @@ -374,12 +374,12 @@ inline int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) { int64_t values_read = 0; std::shared_ptr<PoolBuffer> vals = AllocateBuffer( - this->allocator_, batch_size * type_traits<DType::type_num>::value_byte_size); + this->pool_, batch_size * type_traits<DType::type_num>::value_byte_size); std::shared_ptr<PoolBuffer> def_levels = - AllocateBuffer(this->allocator_, batch_size * sizeof(int16_t)); + AllocateBuffer(this->pool_, batch_size * sizeof(int16_t)); std::shared_ptr<PoolBuffer> rep_levels = - AllocateBuffer(this->allocator_, batch_size * sizeof(int16_t)); + AllocateBuffer(this->pool_, batch_size * sizeof(int16_t)); do { batch_size = std::min(batch_size, rows_to_skip); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/scanner.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/scanner.cc b/src/parquet/column/scanner.cc index faf99a0..0295315 100644 --- a/src/parquet/column/scanner.cc +++ b/src/parquet/column/scanner.cc @@ -23,28 +23,29 @@ #include "parquet/column/reader.h" #include "parquet/util/memory.h" +using arrow::MemoryPool; + namespace parquet { -std::shared_ptr<Scanner> Scanner::Make(std::shared_ptr<ColumnReader> col_reader, - int64_t batch_size, MemoryAllocator* allocator) { +std::shared_ptr<Scanner> Scanner::Make( + std::shared_ptr<ColumnReader> col_reader, int64_t batch_size, MemoryPool* pool) { switch (col_reader->type()) { case Type::BOOLEAN: - return std::make_shared<BoolScanner>(col_reader, batch_size, allocator); + return std::make_shared<BoolScanner>(col_reader, batch_size, pool); case Type::INT32: - return std::make_shared<Int32Scanner>(col_reader, batch_size, allocator); + return std::make_shared<Int32Scanner>(col_reader, batch_size, pool); case Type::INT64: - return std::make_shared<Int64Scanner>(col_reader, batch_size, allocator); + return std::make_shared<Int64Scanner>(col_reader, batch_size, pool); case Type::INT96: - return std::make_shared<Int96Scanner>(col_reader, batch_size, allocator); + return std::make_shared<Int96Scanner>(col_reader, batch_size, pool); case Type::FLOAT: - return std::make_shared<FloatScanner>(col_reader, batch_size, allocator); + return std::make_shared<FloatScanner>(col_reader, batch_size, pool); case Type::DOUBLE: - return std::make_shared<DoubleScanner>(col_reader, batch_size, allocator); + return std::make_shared<DoubleScanner>(col_reader, batch_size, pool); case Type::BYTE_ARRAY: - return std::make_shared<ByteArrayScanner>(col_reader, batch_size, allocator); + return std::make_shared<ByteArrayScanner>(col_reader, batch_size, pool); case Type::FIXED_LEN_BYTE_ARRAY: - return std::make_shared<FixedLenByteArrayScanner>( - col_reader, batch_size, allocator); + return std::make_shared<FixedLenByteArrayScanner>(col_reader, batch_size, pool); default: ParquetException::NYI("type reader not implemented"); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/scanner.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h index 17340b7..47daaed 100644 --- a/src/parquet/column/scanner.h +++ b/src/parquet/column/scanner.h @@ -40,11 +40,11 @@ class PARQUET_EXPORT Scanner { public: explicit Scanner(std::shared_ptr<ColumnReader> reader, int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, - MemoryAllocator* allocator = default_allocator()) + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) : batch_size_(batch_size), level_offset_(0), levels_buffered_(0), - value_buffer_(std::make_shared<PoolBuffer>(allocator)), + value_buffer_(std::make_shared<PoolBuffer>(pool)), value_offset_(0), values_buffered_(0), reader_(reader) { @@ -56,7 +56,7 @@ class PARQUET_EXPORT Scanner { static std::shared_ptr<Scanner> Make(std::shared_ptr<ColumnReader> col_reader, int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, - MemoryAllocator* allocator = default_allocator()); + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); virtual void PrintNext(std::ostream& out, int width) = 0; @@ -91,8 +91,8 @@ class PARQUET_EXPORT TypedScanner : public Scanner { explicit TypedScanner(std::shared_ptr<ColumnReader> reader, int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, - MemoryAllocator* allocator = default_allocator()) - : Scanner(reader, batch_size, allocator) { + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : Scanner(reader, batch_size, pool) { typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get()); int value_byte_size = type_traits<DType::type_num>::value_byte_size; PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size)); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/statistics-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/statistics-test.cc b/src/parquet/column/statistics-test.cc index 5eb7625..dcaad45 100644 --- a/src/parquet/column/statistics-test.cc +++ b/src/parquet/column/statistics-test.cc @@ -35,6 +35,9 @@ #include "parquet/types.h" #include "parquet/util/memory.h" +using arrow::default_memory_pool; +using arrow::MemoryPool; + namespace parquet { using schema::NodePtr; @@ -199,10 +202,10 @@ template <> std::vector<FLBA> TestRowGroupStatistics<FLBAType>::GetDeepCopy( const std::vector<FLBA>& values) { std::vector<FLBA> copy; - MemoryAllocator* allocator = default_allocator(); + MemoryPool* pool = ::arrow::default_memory_pool(); for (const FLBA& flba : values) { uint8_t* ptr; - PARQUET_THROW_NOT_OK(allocator->Allocate(FLBA_LENGTH, &ptr)); + PARQUET_THROW_NOT_OK(pool->Allocate(FLBA_LENGTH, &ptr)); memcpy(ptr, flba.ptr, FLBA_LENGTH); copy.emplace_back(ptr); } @@ -213,10 +216,10 @@ template <> std::vector<ByteArray> TestRowGroupStatistics<ByteArrayType>::GetDeepCopy( const std::vector<ByteArray>& values) { std::vector<ByteArray> copy; - MemoryAllocator* allocator = default_allocator(); + MemoryPool* pool = default_memory_pool(); for (const ByteArray& ba : values) { uint8_t* ptr; - PARQUET_THROW_NOT_OK(allocator->Allocate(ba.len, &ptr)); + PARQUET_THROW_NOT_OK(pool->Allocate(ba.len, &ptr)); memcpy(ptr, ba.ptr, ba.len); copy.emplace_back(ba.len, ptr); } @@ -229,21 +232,21 @@ void TestRowGroupStatistics<TestType>::DeepFree( template <> void TestRowGroupStatistics<FLBAType>::DeepFree(std::vector<FLBA>& values) { - MemoryAllocator* allocator = default_allocator(); + MemoryPool* pool = default_memory_pool(); for (FLBA& flba : values) { auto ptr = const_cast<uint8_t*>(flba.ptr); memset(ptr, 0, FLBA_LENGTH); - allocator->Free(ptr, FLBA_LENGTH); + pool->Free(ptr, FLBA_LENGTH); } } template <> void TestRowGroupStatistics<ByteArrayType>::DeepFree(std::vector<ByteArray>& values) { - MemoryAllocator* allocator = default_allocator(); + MemoryPool* pool = default_memory_pool(); for (ByteArray& ba : values) { auto ptr = const_cast<uint8_t*>(ba.ptr); memset(ptr, 0, ba.len); - allocator->Free(ptr, ba.len); + pool->Free(ptr, ba.len); } } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/statistics.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/statistics.cc b/src/parquet/column/statistics.cc index 897287e..e67a3d3 100644 --- a/src/parquet/column/statistics.cc +++ b/src/parquet/column/statistics.cc @@ -24,14 +24,17 @@ #include "parquet/util/comparison.h" #include "parquet/util/memory.h" +using arrow::default_memory_pool; +using arrow::MemoryPool; + namespace parquet { template <typename DType> TypedRowGroupStatistics<DType>::TypedRowGroupStatistics( - const ColumnDescriptor* schema, MemoryAllocator* allocator) - : allocator_(allocator), - min_buffer_(AllocateBuffer(allocator_, 0)), - max_buffer_(AllocateBuffer(allocator_, 0)) { + const ColumnDescriptor* schema, MemoryPool* pool) + : pool_(pool), + min_buffer_(AllocateBuffer(pool_, 0)), + max_buffer_(AllocateBuffer(pool_, 0)) { SetDescr(schema); Reset(); } @@ -40,9 +43,9 @@ template <typename DType> TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const typename DType::c_type& min, const typename DType::c_type& max, int64_t num_values, int64_t null_count, int64_t distinct_count) - : allocator_(default_allocator()), - min_buffer_(AllocateBuffer(allocator_, 0)), - max_buffer_(AllocateBuffer(allocator_, 0)) { + : pool_(default_memory_pool()), + min_buffer_(AllocateBuffer(pool_, 0)), + max_buffer_(AllocateBuffer(pool_, 0)) { IncrementNumValues(num_values); IncrementNullCount(null_count); IncrementDistinctCount(distinct_count); @@ -55,11 +58,10 @@ TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const typename DType::c_ template <typename DType> TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const ColumnDescriptor* schema, const std::string& encoded_min, const std::string& encoded_max, int64_t num_values, - int64_t null_count, int64_t distinct_count, bool has_min_max, - MemoryAllocator* allocator) - : allocator_(allocator), - min_buffer_(AllocateBuffer(allocator_, 0)), - max_buffer_(AllocateBuffer(allocator_, 0)) { + int64_t null_count, int64_t distinct_count, bool has_min_max, MemoryPool* pool) + : pool_(pool), + min_buffer_(AllocateBuffer(pool_, 0)), + max_buffer_(AllocateBuffer(pool_, 0)) { IncrementNumValues(num_values); IncrementNullCount(null_count); IncrementDistinctCount(distinct_count); @@ -204,7 +206,7 @@ EncodedStatistics TypedRowGroupStatistics<DType>::Encode() { template <typename DType> void TypedRowGroupStatistics<DType>::PlainEncode(const T& src, std::string* dst) { - PlainEncoder<DType> encoder(descr(), allocator_); + PlainEncoder<DType> encoder(descr(), pool_); encoder.Put(&src, 1); auto buffer = encoder.FlushValues(); auto ptr = reinterpret_cast<const char*>(buffer->data()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/statistics.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/statistics.h b/src/parquet/column/statistics.h index af5ada6..509e587 100644 --- a/src/parquet/column/statistics.h +++ b/src/parquet/column/statistics.h @@ -133,8 +133,8 @@ class TypedRowGroupStatistics : public RowGroupStatistics { public: using T = typename DType::c_type; - TypedRowGroupStatistics( - const ColumnDescriptor* schema, MemoryAllocator* allocator = default_allocator()); + TypedRowGroupStatistics(const ColumnDescriptor* schema, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); TypedRowGroupStatistics(const T& min, const T& max, int64_t num_values, int64_t null_count, int64_t distinct_count); @@ -142,7 +142,7 @@ class TypedRowGroupStatistics : public RowGroupStatistics { TypedRowGroupStatistics(const ColumnDescriptor* schema, const std::string& encoded_min, const std::string& encoded_max, int64_t num_values, int64_t null_count, int64_t distinct_count, bool has_min_max, - MemoryAllocator* allocator = default_allocator()); + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); bool HasMinMax() const override; void Reset() override; @@ -163,7 +163,7 @@ class TypedRowGroupStatistics : public RowGroupStatistics { bool has_min_max_ = false; T min_; T max_; - MemoryAllocator* allocator_; + ::arrow::MemoryPool* pool_; void PlainEncode(const T& src, std::string* dst); void PlainDecode(const std::string& src, T* dst); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h index 12b818d..97e936a 100644 --- a/src/parquet/column/test-util.h +++ b/src/parquet/column/test-util.h @@ -253,7 +253,7 @@ class DictionaryPageBuilder { shared_ptr<Buffer> WriteDict() { std::shared_ptr<PoolBuffer> dict_buffer = - AllocateBuffer(default_allocator(), encoder_->dict_encoded_size()); + AllocateBuffer(::arrow::default_memory_pool(), encoder_->dict_encoded_size()); encoder_->WriteDict(dict_buffer->mutable_data()); return dict_buffer; } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index 7c85905..fc0372f 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -44,8 +44,8 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata, has_dictionary_(has_dictionary), encoding_(encoding), properties_(properties), - allocator_(properties->allocator()), - pool_(properties->allocator()), + allocator_(properties->memory_pool()), + pool_(properties->memory_pool()), num_buffered_values_(0), num_buffered_encoded_values_(0), num_rows_(0), @@ -56,8 +56,8 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata, } void ColumnWriter::InitSinks() { - definition_levels_sink_.reset(new InMemoryOutputStream(properties_->allocator())); - repetition_levels_sink_.reset(new InMemoryOutputStream(properties_->allocator())); + definition_levels_sink_.reset(new InMemoryOutputStream(allocator_)); + repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_)); } void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) { @@ -78,8 +78,7 @@ std::shared_ptr<Buffer> ColumnWriter::RleEncodeLevels( int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, num_buffered_values_) + sizeof(int32_t); - std::shared_ptr<PoolBuffer> buffer_rle = - AllocateBuffer(properties_->allocator(), rle_size); + std::shared_ptr<PoolBuffer> buffer_rle = AllocateBuffer(allocator_, rle_size); level_encoder_.Init(Encoding::RLE, max_level, num_buffered_values_, buffer_rle->mutable_data() + sizeof(int32_t), buffer_rle->size() - sizeof(int32_t)); int encoded = level_encoder_.Encode( @@ -188,12 +187,12 @@ TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata, encoding, properties) { switch (encoding) { case Encoding::PLAIN: - current_encoder_.reset(new PlainEncoder<Type>(descr_, properties->allocator())); + current_encoder_.reset(new PlainEncoder<Type>(descr_, properties->memory_pool())); break; case Encoding::PLAIN_DICTIONARY: case Encoding::RLE_DICTIONARY: current_encoder_.reset( - new DictEncoder<Type>(descr_, &pool_, properties->allocator())); + new DictEncoder<Type>(descr_, &pool_, properties->memory_pool())); break; default: ParquetException::NYI("Selected encoding is not supported"); @@ -216,7 +215,7 @@ void TypedColumnWriter<Type>::CheckDictionarySizeLimit() { FlushBufferedDataPages(); fallback_ = true; // Only PLAIN encoding is supported for fallback in V1 - current_encoder_.reset(new PlainEncoder<Type>(descr_, properties_->allocator())); + current_encoder_.reset(new PlainEncoder<Type>(descr_, properties_->memory_pool())); encoding_ = Encoding::PLAIN; } } @@ -225,7 +224,7 @@ template <typename Type> void TypedColumnWriter<Type>::WriteDictionaryPage() { auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get()); std::shared_ptr<PoolBuffer> buffer = - AllocateBuffer(properties_->allocator(), dict_encoder->dict_encoded_size()); + AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size()); dict_encoder->WriteDict(buffer->mutable_data()); // TODO Get rid of this deep call dict_encoder->mem_pool()->FreeAll(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index 3ee8fd6..c91f261 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -108,7 +108,7 @@ class PARQUET_EXPORT ColumnWriter { LevelEncoder level_encoder_; - MemoryAllocator* allocator_; + ::arrow::MemoryPool* allocator_; ChunkedAllocator pool_; // The total number of values stored in the data page. This is the maximum of http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/encoding-benchmark.cc ---------------------------------------------------------------------- diff --git a/src/parquet/encoding-benchmark.cc b/src/parquet/encoding-benchmark.cc index cf65dd9..8ea684a 100644 --- a/src/parquet/encoding-benchmark.cc +++ b/src/parquet/encoding-benchmark.cc @@ -21,6 +21,9 @@ #include "parquet/file/reader-internal.h" #include "parquet/util/memory.h" +using arrow::default_memory_pool; +using arrow::MemoryPool; + namespace parquet { using format::ColumnChunk; @@ -102,7 +105,7 @@ static void DecodeDict( int num_values = values.size(); ChunkedAllocator pool; - MemoryAllocator* allocator = default_allocator(); + MemoryPool* allocator = default_memory_pool(); std::shared_ptr<ColumnDescriptor> descr = Int64Schema(Repetition::REQUIRED); DictEncoder<Type> encoder(descr.get(), &pool, allocator); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/encoding-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h index 67cd7ba..ffcb531 100644 --- a/src/parquet/encoding-internal.h +++ b/src/parquet/encoding-internal.h @@ -172,10 +172,10 @@ class PlainEncoder : public Encoder<DType> { public: typedef typename DType::c_type T; - explicit PlainEncoder( - const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) - : Encoder<DType>(descr, Encoding::PLAIN, allocator) { - values_sink_.reset(new InMemoryOutputStream(allocator)); + explicit PlainEncoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : Encoder<DType>(descr, Encoding::PLAIN, pool) { + values_sink_.reset(new InMemoryOutputStream(pool)); } int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); } @@ -190,12 +190,12 @@ class PlainEncoder : public Encoder<DType> { template <> class PlainEncoder<BooleanType> : public Encoder<BooleanType> { public: - explicit PlainEncoder( - const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) - : Encoder<BooleanType>(descr, Encoding::PLAIN, allocator), + explicit PlainEncoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : Encoder<BooleanType>(descr, Encoding::PLAIN, pool), bits_available_(kInMemoryDefaultCapacity * 8), - bits_buffer_(AllocateBuffer(allocator, kInMemoryDefaultCapacity)), - values_sink_(new InMemoryOutputStream(allocator)) { + bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)), + values_sink_(new InMemoryOutputStream(pool)) { bit_writer_.reset(new BitWriter(bits_buffer_->mutable_data(), bits_buffer_->size())); } @@ -212,7 +212,7 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> { } std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer(); - values_sink_.reset(new InMemoryOutputStream(this->allocator_)); + values_sink_.reset(new InMemoryOutputStream(this->pool_)); return buffer; } @@ -267,7 +267,7 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> { template <typename DType> inline std::shared_ptr<Buffer> PlainEncoder<DType>::FlushValues() { std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer(); - values_sink_.reset(new InMemoryOutputStream(this->allocator_)); + values_sink_.reset(new InMemoryOutputStream(this->pool_)); return buffer; } @@ -309,11 +309,11 @@ class DictionaryDecoder : public Decoder<Type> { // Initializes the dictionary with values from 'dictionary'. The data in // dictionary is not guaranteed to persist in memory after this call so the // dictionary decoder needs to copy the data out if necessary. - explicit DictionaryDecoder( - const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) + explicit DictionaryDecoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) : Decoder<Type>(descr, Encoding::RLE_DICTIONARY), - dictionary_(0, allocator), - byte_array_data_(AllocateBuffer(allocator, 0)) {} + dictionary_(0, pool), + byte_array_data_(AllocateBuffer(pool, 0)) {} // Perform type-specific initiatialization void SetDict(Decoder<Type>* dictionary); @@ -435,7 +435,7 @@ class DictEncoder : public Encoder<DType> { typedef typename DType::c_type T; explicit DictEncoder(const ColumnDescriptor* desc, ChunkedAllocator* pool = nullptr, - MemoryAllocator* allocator = default_allocator()) + ::arrow::MemoryPool* allocator = ::arrow::default_memory_pool()) : Encoder<DType>(desc, Encoding::PLAIN_DICTIONARY, allocator), allocator_(allocator), pool_(pool), @@ -524,7 +524,7 @@ class DictEncoder : public Encoder<DType> { int num_entries() const { return uniques_.size(); } private: - MemoryAllocator* allocator_; + ::arrow::MemoryPool* allocator_; // For ByteArray / FixedLenByteArray data. Not owned ChunkedAllocator* pool_; @@ -742,10 +742,10 @@ class DeltaBitPackDecoder : public Decoder<DType> { public: typedef typename DType::c_type T; - explicit DeltaBitPackDecoder( - const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) + explicit DeltaBitPackDecoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) : Decoder<DType>(descr, Encoding::DELTA_BINARY_PACKED), - delta_bit_widths_(new PoolBuffer(allocator)) { + delta_bit_widths_(new PoolBuffer(pool)) { if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) { throw ParquetException("Delta bit pack encoding should only be for integer data."); } @@ -835,10 +835,10 @@ class DeltaBitPackDecoder : public Decoder<DType> { class DeltaLengthByteArrayDecoder : public Decoder<ByteArrayType> { public: - explicit DeltaLengthByteArrayDecoder( - const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) + explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) : Decoder<ByteArrayType>(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY), - len_decoder_(nullptr, allocator) {} + len_decoder_(nullptr, pool) {} virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; @@ -876,11 +876,11 @@ class DeltaLengthByteArrayDecoder : public Decoder<ByteArrayType> { class DeltaByteArrayDecoder : public Decoder<ByteArrayType> { public: - explicit DeltaByteArrayDecoder( - const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) + explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) : Decoder<ByteArrayType>(descr, Encoding::DELTA_BYTE_ARRAY), - prefix_len_decoder_(nullptr, allocator), - suffix_decoder_(nullptr, allocator) {} + prefix_len_decoder_(nullptr, pool), + suffix_decoder_(nullptr, pool) {} virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/encoding-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/encoding-test.cc b/src/parquet/encoding-test.cc index 8f266b4..fbf6812 100644 --- a/src/parquet/encoding-test.cc +++ b/src/parquet/encoding-test.cc @@ -29,6 +29,9 @@ #include "parquet/util/memory.h" #include "parquet/util/test-common.h" +using arrow::default_memory_pool; +using arrow::MemoryPool; + using std::string; using std::vector; @@ -146,7 +149,7 @@ class TestEncodingBase : public ::testing::Test { void SetUp() { descr_ = ExampleDescr<Type>(); type_length_ = descr_->type_length(); - allocator_ = default_allocator(); + allocator_ = default_memory_pool(); } void TearDown() { pool_.FreeAll(); } @@ -176,7 +179,7 @@ class TestEncodingBase : public ::testing::Test { protected: ChunkedAllocator pool_; - MemoryAllocator* allocator_; + MemoryPool* allocator_; int num_values_; int type_length_; @@ -235,7 +238,8 @@ TYPED_TEST(TestPlainEncoding, BasicRoundTrip) { // Dictionary encoding tests typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, - ByteArrayType, FLBAType> DictEncodedTypes; + ByteArrayType, FLBAType> + DictEncodedTypes; template <typename Type> class TestDictionaryEncoding : public TestEncodingBase<Type> { @@ -248,7 +252,7 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> { DictEncoder<Type> encoder(descr_.get(), &pool_); ASSERT_NO_THROW(encoder.Put(draws_, num_values_)); - dict_buffer_ = AllocateBuffer(default_allocator(), encoder.dict_encoded_size()); + dict_buffer_ = AllocateBuffer(default_memory_pool(), encoder.dict_encoded_size()); encoder.WriteDict(dict_buffer_->mutable_data()); std::shared_ptr<Buffer> indices = encoder.FlushValues(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encoding.h b/src/parquet/encoding.h index 2e2bf68..384971a 100644 --- a/src/parquet/encoding.h +++ b/src/parquet/encoding.h @@ -49,7 +49,7 @@ class Encoder { virtual void Put(const T* src, int num_values) = 0; virtual void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) { - PoolBuffer buffer(allocator_); + PoolBuffer buffer(pool_); buffer.Resize(num_values * sizeof(T)); int32_t num_valid_values = 0; INIT_BITSET(valid_bits, valid_bits_offset); @@ -67,13 +67,13 @@ class Encoder { protected: explicit Encoder(const ColumnDescriptor* descr, const Encoding::type& encoding, - MemoryAllocator* allocator) - : descr_(descr), encoding_(encoding), allocator_(allocator) {} + ::arrow::MemoryPool* pool) + : descr_(descr), encoding_(encoding), pool_(pool) {} // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY const ColumnDescriptor* descr_; const Encoding::type encoding_; - MemoryAllocator* allocator_; + ::arrow::MemoryPool* pool_; }; // The Decoder template is parameterized on parquet::DataType subclasses http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/file/file-serialize-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc index 71dd5c4..7a90eeb 100644 --- a/src/parquet/file/file-serialize-test.cc +++ b/src/parquet/file/file-serialize-test.cc @@ -106,7 +106,8 @@ class TestSerialize : public PrimitiveTypedTest<TestType> { }; typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, - BooleanType, ByteArrayType, FLBAType> TestTypes; + BooleanType, ByteArrayType, FLBAType> + TestTypes; TYPED_TEST_CASE(TestSerialize, TestTypes); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/file/reader-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc index 328197a..4eb40b4 100644 --- a/src/parquet/file/reader-internal.cc +++ b/src/parquet/file/reader-internal.cc @@ -32,6 +32,8 @@ #include "parquet/types.h" #include "parquet/util/memory.h" +using arrow::MemoryPool; + namespace parquet { // ---------------------------------------------------------------------- @@ -39,9 +41,9 @@ namespace parquet { // assembled in a serialized stream for storing in a Parquet files SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream, - int64_t total_num_rows, Compression::type codec_type, MemoryAllocator* allocator) + int64_t total_num_rows, Compression::type codec_type, MemoryPool* pool) : stream_(std::move(stream)), - decompression_buffer_(AllocateBuffer(allocator, 0)), + decompression_buffer_(AllocateBuffer(pool, 0)), seen_num_rows_(0), total_num_rows_(total_num_rows) { max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE; @@ -194,7 +196,7 @@ std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int i) { stream = properties_.GetStream(source_, col_start, col_length); return std::unique_ptr<PageReader>(new SerializedPageReader(std::move(stream), - row_group_metadata_->num_rows(), col->compression(), properties_.allocator())); + row_group_metadata_->num_rows(), col->compression(), properties_.memory_pool())); } // ---------------------------------------------------------------------- @@ -268,7 +270,7 @@ void SerializedFile::ParseMetaData() { } std::shared_ptr<PoolBuffer> metadata_buffer = - AllocateBuffer(properties_.allocator(), metadata_len); + AllocateBuffer(properties_.memory_pool(), metadata_len); bytes_read = source_->ReadAt(metadata_start, metadata_len, metadata_buffer->mutable_data()); if (bytes_read != metadata_len) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/file/reader-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h index 9f436ca..0d0fb07 100644 --- a/src/parquet/file/reader-internal.h +++ b/src/parquet/file/reader-internal.h @@ -45,7 +45,8 @@ static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024; class SerializedPageReader : public PageReader { public: SerializedPageReader(std::unique_ptr<InputStream> stream, int64_t num_rows, - Compression::type codec, MemoryAllocator* allocator = default_allocator()); + Compression::type codec, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); virtual ~SerializedPageReader() {} http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/file/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc index d5b2dce..4e46b8e 100644 --- a/src/parquet/file/reader.cc +++ b/src/parquet/file/reader.cc @@ -54,7 +54,7 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) { std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i); return ColumnReader::Make(descr, std::move(page_reader), - const_cast<ReaderProperties*>(contents_->properties())->allocator()); + const_cast<ReaderProperties*>(contents_->properties())->memory_pool()); } // Returns the rowgroup metadata @@ -93,7 +93,7 @@ std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string if (memory_map) { std::shared_ptr<::arrow::io::ReadableFile> handle; PARQUET_THROW_NOT_OK( - ::arrow::io::ReadableFile::Open(path, props.allocator(), &handle)); + ::arrow::io::ReadableFile::Open(path, props.memory_pool(), &handle)); source = handle; } else { std::shared_ptr<::arrow::io::MemoryMappedFile> handle; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/file/writer-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index 8c1316b..a0a62b9 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -23,6 +23,8 @@ #include "parquet/thrift.h" #include "parquet/util/memory.h" +using arrow::MemoryPool; + using parquet::schema::GroupNode; using parquet::schema::SchemaFlattener; @@ -35,10 +37,10 @@ static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; // SerializedPageWriter SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type codec, - ColumnChunkMetaDataBuilder* metadata, MemoryAllocator* allocator) + ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool) : sink_(sink), metadata_(metadata), - allocator_(allocator), + pool_(pool), num_values_(0), dictionary_page_offset_(0), data_page_offset_(0), @@ -78,7 +80,7 @@ std::shared_ptr<Buffer> SerializedPageWriter::Compress( compressor_->MaxCompressedLen(buffer->size(), buffer->data()); std::shared_ptr<PoolBuffer> compression_buffer = - AllocateBuffer(allocator_, max_compressed_size); + AllocateBuffer(pool_, max_compressed_size); int64_t compressed_size = compressor_->Compress(buffer->size(), buffer->data(), max_compressed_size, compression_buffer->mutable_data()); @@ -168,7 +170,7 @@ ColumnWriter* RowGroupSerializer::NextColumn() { const ColumnDescriptor* column_descr = col_meta->descr(); std::unique_ptr<PageWriter> pager( new SerializedPageWriter(sink_, properties_->compression(column_descr->path()), - col_meta, properties_->allocator())); + col_meta, properties_->memory_pool())); current_column_writer_ = ColumnWriter::Make(col_meta, std::move(pager), num_rows_, properties_); return current_column_writer_.get(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/file/writer-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index 0140c5b..5bd00be 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -37,7 +37,7 @@ class SerializedPageWriter : public PageWriter { public: SerializedPageWriter(OutputStream* sink, Compression::type codec, ColumnChunkMetaDataBuilder* metadata, - MemoryAllocator* allocator = default_allocator()); + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); virtual ~SerializedPageWriter() {} @@ -55,7 +55,7 @@ class SerializedPageWriter : public PageWriter { private: OutputStream* sink_; ColumnChunkMetaDataBuilder* metadata_; - MemoryAllocator* allocator_; + ::arrow::MemoryPool* pool_; int64_t num_values_; int64_t dictionary_page_offset_; int64_t data_page_offset_; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/thrift.h ---------------------------------------------------------------------- diff --git a/src/parquet/thrift.h b/src/parquet/thrift.h index aafd3f5..7fa0de3 100644 --- a/src/parquet/thrift.h +++ b/src/parquet/thrift.h @@ -99,7 +99,8 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport( new apache::thrift::transport::TMemoryBuffer(const_cast<uint8_t*>(buf), *len)); apache::thrift::protocol::TCompactProtocolFactoryT< - apache::thrift::transport::TMemoryBuffer> tproto_factory; + apache::thrift::transport::TMemoryBuffer> + tproto_factory; boost::shared_ptr<apache::thrift::protocol::TProtocol> tproto = tproto_factory.getProtocol(tmem_transport); try { @@ -121,7 +122,8 @@ inline int64_t SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) { boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> mem_buffer( new apache::thrift::transport::TMemoryBuffer(len)); apache::thrift::protocol::TCompactProtocolFactoryT< - apache::thrift::transport::TMemoryBuffer> tproto_factory; + apache::thrift::transport::TMemoryBuffer> + tproto_factory; boost::shared_ptr<apache::thrift::protocol::TProtocol> tproto = tproto_factory.getProtocol(mem_buffer); try { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/util/memory-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory-test.cc b/src/parquet/util/memory-test.cc index 5225261..cdb6d21 100644 --- a/src/parquet/util/memory-test.cc +++ b/src/parquet/util/memory-test.cc @@ -27,52 +27,13 @@ #include "parquet/util/memory.h" #include "parquet/util/test-common.h" +using arrow::default_memory_pool; +using arrow::MemoryPool; + namespace parquet { class TestBuffer : public ::testing::Test {}; -TEST(TestAllocator, AllocateFree) { - TrackingAllocator allocator; - - uint8_t* data; - - ASSERT_TRUE(allocator.Allocate(100, &data).ok()); - ASSERT_TRUE(nullptr != data); - data[99] = 55; - allocator.Free(data, 100); - - ASSERT_TRUE(allocator.Allocate(0, &data).ok()); - ASSERT_EQ(nullptr, data); - allocator.Free(data, 0); - - int64_t to_alloc = std::numeric_limits<int64_t>::max(); - ASSERT_FALSE(allocator.Allocate(to_alloc, &data).ok()); -} - -TEST(TestAllocator, TotalMax) { - TrackingAllocator allocator; - ASSERT_EQ(0, allocator.bytes_allocated()); - ASSERT_EQ(0, allocator.max_memory()); - - uint8_t* data; - uint8_t* data2; - ASSERT_TRUE(allocator.Allocate(100, &data).ok()); - ASSERT_EQ(100, allocator.bytes_allocated()); - ASSERT_EQ(100, allocator.max_memory()); - - ASSERT_TRUE(allocator.Allocate(10, &data2).ok()); - ASSERT_EQ(110, allocator.bytes_allocated()); - ASSERT_EQ(110, allocator.max_memory()); - - allocator.Free(data, 100); - ASSERT_EQ(10, allocator.bytes_allocated()); - ASSERT_EQ(110, allocator.max_memory()); - - allocator.Free(data2, 10); - ASSERT_EQ(0, allocator.bytes_allocated()); - ASSERT_EQ(110, allocator.max_memory()); -} - // Utility class to call private functions on MemPool. class ChunkedAllocatorTest { public: @@ -294,7 +255,7 @@ TEST(TestBufferedInputStream, Basics) { int64_t stream_offset = 10; int64_t stream_size = source_size - stream_offset; int64_t chunk_size = 50; - std::shared_ptr<PoolBuffer> buf = AllocateBuffer(default_allocator(), source_size); + std::shared_ptr<PoolBuffer> buf = AllocateBuffer(default_memory_pool(), source_size); ASSERT_EQ(source_size, buf->size()); for (int i = 0; i < source_size; i++) { buf->mutable_data()[i] = i; @@ -303,9 +264,8 @@ TEST(TestBufferedInputStream, Basics) { auto wrapper = std::make_shared<ArrowInputFile>(std::make_shared<::arrow::io::BufferReader>(buf)); - TrackingAllocator allocator; std::unique_ptr<BufferedInputStream> stream(new BufferedInputStream( - &allocator, chunk_size, wrapper.get(), stream_offset, stream_size)); + default_memory_pool(), chunk_size, wrapper.get(), stream_offset, stream_size)); const uint8_t* output; int64_t bytes_read; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/util/memory.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc index 744dff9..021a346 100644 --- a/src/parquet/util/memory.cc +++ b/src/parquet/util/memory.cc @@ -29,47 +29,13 @@ #include "parquet/util/bit-util.h" #include "parquet/util/logging.h" -namespace parquet { - -::arrow::Status TrackingAllocator::Allocate(int64_t size, uint8_t** out) { - if (size == 0) { - *out = nullptr; - return ::arrow::Status::OK(); - } - ARROW_RETURN_NOT_OK(allocator_->Allocate(size, out)); - const int64_t total_memory = allocator_->bytes_allocated(); - if (total_memory > max_memory_) { max_memory_ = total_memory; } - return ::arrow::Status::OK(); -} - -::arrow::Status TrackingAllocator::Reallocate( - int64_t old_size, int64_t new_size, uint8_t** out) { - ARROW_RETURN_NOT_OK(allocator_->Reallocate(old_size, new_size, out)); - const int64_t total_memory = allocator_->bytes_allocated(); - if (total_memory > max_memory_) { max_memory_ = total_memory; } - return ::arrow::Status::OK(); -} - -void TrackingAllocator::Free(uint8_t* p, int64_t size) { - allocator_->Free(p, size); -} - -int64_t TrackingAllocator::max_memory() const { - return max_memory_.load(); -} +using arrow::MemoryPool; -int64_t TrackingAllocator::bytes_allocated() const { - return allocator_->bytes_allocated(); -} - -MemoryAllocator* default_allocator() { - static TrackingAllocator allocator; - return &allocator; -} +namespace parquet { template <class T> -Vector<T>::Vector(int64_t size, MemoryAllocator* allocator) - : buffer_(AllocateUniqueBuffer(allocator, size * sizeof(T))), +Vector<T>::Vector(int64_t size, MemoryPool* pool) + : buffer_(AllocateUniqueBuffer(pool, size * sizeof(T))), size_(size), capacity_(size) { if (size > 0) { @@ -122,13 +88,13 @@ template class Vector<FixedLenByteArray>; const int ChunkedAllocator::INITIAL_CHUNK_SIZE; const int ChunkedAllocator::MAX_CHUNK_SIZE; -ChunkedAllocator::ChunkedAllocator(MemoryAllocator* allocator) +ChunkedAllocator::ChunkedAllocator(MemoryPool* pool) : current_chunk_idx_(-1), next_chunk_size_(INITIAL_CHUNK_SIZE), total_allocated_bytes_(0), peak_allocated_bytes_(0), total_reserved_bytes_(0), - allocator_(allocator) {} + pool_(pool) {} ChunkedAllocator::ChunkInfo::ChunkInfo(int64_t size, uint8_t* buf) : data(buf), size(size), allocated_bytes(0) {} @@ -137,7 +103,7 @@ ChunkedAllocator::~ChunkedAllocator() { int64_t total_bytes_released = 0; for (size_t i = 0; i < chunks_.size(); ++i) { total_bytes_released += chunks_[i].size; - allocator_->Free(chunks_[i].data, chunks_[i].size); + pool_->Free(chunks_[i].data, chunks_[i].size); } DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool"; @@ -190,7 +156,7 @@ void ChunkedAllocator::FreeAll() { int64_t total_bytes_released = 0; for (size_t i = 0; i < chunks_.size(); ++i) { total_bytes_released += chunks_[i].size; - allocator_->Free(chunks_[i].data, chunks_[i].size); + pool_->Free(chunks_[i].data, chunks_[i].size); } chunks_.clear(); next_chunk_size_ = INITIAL_CHUNK_SIZE; @@ -229,7 +195,7 @@ bool ChunkedAllocator::FindChunk(int64_t min_size) { // Allocate a new chunk. Return early if malloc fails. uint8_t* buf = nullptr; - PARQUET_THROW_NOT_OK(allocator_->Allocate(chunk_size, &buf)); + PARQUET_THROW_NOT_OK(pool_->Allocate(chunk_size, &buf)); if (UNLIKELY(buf == NULL)) { DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size())); current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1; @@ -452,11 +418,10 @@ void InMemoryInputStream::Advance(int64_t num_bytes) { // ---------------------------------------------------------------------- // In-memory output stream -InMemoryOutputStream::InMemoryOutputStream( - MemoryAllocator* allocator, int64_t initial_capacity) +InMemoryOutputStream::InMemoryOutputStream(MemoryPool* pool, int64_t initial_capacity) : size_(0), capacity_(initial_capacity) { if (initial_capacity == 0) { initial_capacity = kInMemoryDefaultCapacity; } - buffer_ = AllocateBuffer(allocator, initial_capacity); + buffer_ = AllocateBuffer(pool, initial_capacity); } InMemoryOutputStream::~InMemoryOutputStream() {} @@ -492,7 +457,7 @@ std::shared_ptr<Buffer> InMemoryOutputStream::GetBuffer() { // ---------------------------------------------------------------------- // BufferedInputStream -BufferedInputStream::BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size, +BufferedInputStream::BufferedInputStream(MemoryPool* pool, int64_t buffer_size, RandomAccessSource* source, int64_t start, int64_t num_bytes) : source_(source), stream_offset_(start), stream_end_(start + num_bytes) { buffer_ = AllocateBuffer(pool, buffer_size); @@ -534,15 +499,14 @@ void BufferedInputStream::Advance(int64_t num_bytes) { buffer_offset_ += num_bytes; } -std::shared_ptr<PoolBuffer> AllocateBuffer(MemoryAllocator* allocator, int64_t size) { - auto result = std::make_shared<PoolBuffer>(allocator); +std::shared_ptr<PoolBuffer> AllocateBuffer(MemoryPool* pool, int64_t size) { + auto result = std::make_shared<PoolBuffer>(pool); if (size > 0) { PARQUET_THROW_NOT_OK(result->Resize(size)); } return result; } -std::unique_ptr<PoolBuffer> AllocateUniqueBuffer( - MemoryAllocator* allocator, int64_t size) { - std::unique_ptr<PoolBuffer> result(new PoolBuffer(allocator)); +std::unique_ptr<PoolBuffer> AllocateUniqueBuffer(MemoryPool* pool, int64_t size) { + std::unique_ptr<PoolBuffer> result(new PoolBuffer(pool)); if (size > 0) { PARQUET_THROW_NOT_OK(result->Resize(size)); } return result; } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/util/memory.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h index e82df95..e2e522d 100644 --- a/src/parquet/util/memory.h +++ b/src/parquet/util/memory.h @@ -66,31 +66,11 @@ using Buffer = ::arrow::Buffer; using MutableBuffer = ::arrow::MutableBuffer; using ResizableBuffer = ::arrow::ResizableBuffer; using PoolBuffer = ::arrow::PoolBuffer; -using MemoryAllocator = ::arrow::MemoryPool; - -PARQUET_EXPORT MemoryAllocator* default_allocator(); - -class PARQUET_EXPORT TrackingAllocator : public MemoryAllocator { - public: - explicit TrackingAllocator(MemoryAllocator* allocator = ::arrow::default_memory_pool()) - : allocator_(allocator), max_memory_(0) {} - - ::arrow::Status Allocate(int64_t size, uint8_t** out) override; - ::arrow::Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override; - void Free(uint8_t* p, int64_t size) override; - - int64_t max_memory() const; - int64_t bytes_allocated() const override; - - private: - MemoryAllocator* allocator_; - std::atomic<int64_t> max_memory_; -}; template <class T> class Vector { public: - explicit Vector(int64_t size, MemoryAllocator* allocator); + explicit Vector(int64_t size, ::arrow::MemoryPool* pool); void Resize(int64_t new_size); void Reserve(int64_t new_capacity); void Assign(int64_t size, const T val); @@ -149,7 +129,7 @@ class Vector { class ChunkedAllocator { public: - explicit ChunkedAllocator(MemoryAllocator* allocator = default_allocator()); + explicit ChunkedAllocator(::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); /// Frees all chunks of memory and subtracts the total allocated bytes /// from the registered limits. @@ -227,7 +207,7 @@ class ChunkedAllocator { std::vector<ChunkInfo> chunks_; - MemoryAllocator* allocator_; + ::arrow::MemoryPool* pool_; /// Find or allocated a chunk with at least min_size spare capacity and update /// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_ @@ -347,7 +327,8 @@ class PARQUET_EXPORT ArrowOutputStream : public ArrowFileMethods, public OutputS class PARQUET_EXPORT InMemoryOutputStream : public OutputStream { public: - explicit InMemoryOutputStream(MemoryAllocator* allocator = default_allocator(), + explicit InMemoryOutputStream( + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), int64_t initial_capacity = kInMemoryDefaultCapacity); virtual ~InMemoryOutputStream(); @@ -421,7 +402,7 @@ class InMemoryInputStream : public InputStream { // Implementation of an InputStream when only some of the bytes are in memory. class BufferedInputStream : public InputStream { public: - BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size, + BufferedInputStream(::arrow::MemoryPool* pool, int64_t buffer_size, RandomAccessSource* source, int64_t start, int64_t end); virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes); virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes); @@ -437,10 +418,10 @@ class BufferedInputStream : public InputStream { int64_t buffer_size_; }; -std::shared_ptr<PoolBuffer> AllocateBuffer(MemoryAllocator* allocator, int64_t size = 0); +std::shared_ptr<PoolBuffer> AllocateBuffer(::arrow::MemoryPool* pool, int64_t size = 0); std::unique_ptr<PoolBuffer> AllocateUniqueBuffer( - MemoryAllocator* allocator, int64_t size = 0); + ::arrow::MemoryPool* pool, int64_t size = 0); } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/util/test-common.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h index edadb53..2327aeb 100644 --- a/src/parquet/util/test-common.h +++ b/src/parquet/util/test-common.h @@ -32,7 +32,8 @@ namespace parquet { namespace test { typedef ::testing::Types<BooleanType, Int32Type, Int64Type, Int96Type, FloatType, - DoubleType, ByteArrayType, FLBAType> ParquetTypes; + DoubleType, ByteArrayType, FLBAType> + ParquetTypes; template <typename T> static inline void assert_vector_equal(const vector<T>& left, const vector<T>& right) {
