http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/input.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/input.h b/src/parquet/util/input.h deleted file mode 100644 index 1bb41e3..0000000 --- a/src/parquet/util/input.h +++ /dev/null @@ -1,211 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_UTIL_INPUT_H -#define PARQUET_UTIL_INPUT_H - -#include <cstdint> -#include <cstdio> -#include <memory> -#include <string> -#include <vector> - -#include "parquet/util/mem-allocator.h" -#include "parquet/util/visibility.h" - -namespace parquet { - -class Buffer; -class OwnedMutableBuffer; - -// ---------------------------------------------------------------------- -// Random access input (e.g. file-like) - -// Random -class PARQUET_EXPORT RandomAccessSource { - public: - virtual ~RandomAccessSource() {} - - virtual void Close() = 0; - virtual int64_t Tell() const = 0; - virtual void Seek(int64_t pos) = 0; - int64_t Size() const; - - // Returns actual number of bytes read - virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0; - - virtual std::shared_ptr<Buffer> Read(int64_t nbytes) = 0; - std::shared_ptr<Buffer> ReadAt(int64_t pos, int64_t nbytes); - - protected: - int64_t size_; -}; - -// ---------------------------------------------------------------------- -// Implementations of RandomAccessSource used for testing and internal CLI tools. -// May not be sufficiently robust for general production use. - -class PARQUET_EXPORT LocalFileSource : public RandomAccessSource { - public: - explicit LocalFileSource(MemoryAllocator* allocator = default_allocator()) - : file_(nullptr), is_open_(false), allocator_(allocator) {} - - virtual ~LocalFileSource(); - - virtual void Open(const std::string& path); - - virtual void Close(); - virtual int64_t Tell() const; - virtual void Seek(int64_t pos); - - // Returns actual number of bytes read - virtual int64_t Read(int64_t nbytes, uint8_t* out); - - virtual std::shared_ptr<Buffer> Read(int64_t nbytes); - - bool is_open() const { return is_open_; } - const std::string& path() const { return path_; } - - // Return the integer file descriptor - int file_descriptor() const; - - protected: - void CloseFile(); - void SeekFile(int64_t pos, int origin = SEEK_SET); - - std::string path_; - FILE* file_; - bool is_open_; - MemoryAllocator* allocator_; -}; - -class PARQUET_EXPORT MemoryMapSource : public LocalFileSource { - public: - explicit MemoryMapSource(MemoryAllocator* allocator = default_allocator()) - : LocalFileSource(allocator), data_(nullptr), pos_(0) {} - - virtual ~MemoryMapSource(); - - virtual void Close(); - virtual void Open(const std::string& path); - - virtual int64_t Tell() const; - virtual void Seek(int64_t pos); - - // Copy data from memory map into out (must be already allocated memory) - // @returns: actual number of bytes read - virtual int64_t Read(int64_t nbytes, uint8_t* out); - - // Return a buffer referencing memory-map (no copy) - virtual std::shared_ptr<Buffer> Read(int64_t nbytes); - - private: - void CloseFile(); - - uint8_t* data_; - int64_t pos_; -}; - -// ---------------------------------------------------------------------- -// A file-like object that reads from virtual address space - -class PARQUET_EXPORT BufferReader : public RandomAccessSource { - public: - explicit BufferReader(const std::shared_ptr<Buffer>& buffer); - virtual void Close() {} - virtual int64_t Tell() const; - virtual void Seek(int64_t pos); - - virtual int64_t Read(int64_t nbytes, uint8_t* out); - - virtual std::shared_ptr<Buffer> Read(int64_t nbytes); - - protected: - const uint8_t* Head() { return data_ + pos_; } - - std::shared_ptr<Buffer> buffer_; - const uint8_t* data_; - int64_t pos_; -}; - -// ---------------------------------------------------------------------- -// Streaming input interfaces - -// Interface for the column reader to get the bytes. The interface is a stream -// interface, meaning the bytes in order and once a byte is read, it does not -// need to be read again. -class InputStream { - public: - // Returns the next 'num_to_peek' without advancing the current position. - // *num_bytes will contain the number of bytes returned which can only be - // less than num_to_peek at end of stream cases. - // Since the position is not advanced, calls to this function are idempotent. - // The buffer returned to the caller is still owned by the input stream and must - // stay valid until the next call to Peek() or Read(). - virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes) = 0; - - // Identical to Peek(), except the current position in the stream is advanced by - // *num_bytes. - virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes) = 0; - - // Advance the stream without reading - virtual void Advance(int64_t num_bytes) = 0; - - virtual ~InputStream() {} - - protected: - InputStream() {} -}; - -// Implementation of an InputStream when all the bytes are in memory. -class InMemoryInputStream : public InputStream { - public: - InMemoryInputStream(RandomAccessSource* source, int64_t start, int64_t end); - explicit InMemoryInputStream(const std::shared_ptr<Buffer>& buffer); - virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes); - virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes); - - virtual void Advance(int64_t num_bytes); - - private: - std::shared_ptr<Buffer> buffer_; - int64_t len_; - int64_t offset_; -}; - -// Implementation of an InputStream when only some of the bytes are in memory. -class BufferedInputStream : public InputStream { - public: - BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size, - RandomAccessSource* source, int64_t start, int64_t end); - virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes); - virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes); - - virtual void Advance(int64_t num_bytes); - - private: - std::shared_ptr<OwnedMutableBuffer> buffer_; - RandomAccessSource* source_; - int64_t stream_offset_; - int64_t stream_end_; - int64_t buffer_offset_; - int64_t buffer_size_; -}; - -} // namespace parquet - -#endif // PARQUET_UTIL_INPUT_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-allocator-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/mem-allocator-test.cc b/src/parquet/util/mem-allocator-test.cc deleted file mode 100644 index 336d3b4..0000000 --- a/src/parquet/util/mem-allocator-test.cc +++ /dev/null @@ -1,67 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <gtest/gtest.h> - -#include "parquet/exception.h" -#include "parquet/util/mem-allocator.h" - -namespace parquet { - -TEST(TestAllocator, AllocateFree) { - TrackingAllocator allocator; - - uint8_t* data = allocator.Malloc(100); - ASSERT_TRUE(nullptr != data); - data[99] = 55; - allocator.Free(data, 100); - - data = allocator.Malloc(0); - ASSERT_EQ(nullptr, data); - allocator.Free(data, 0); - - data = allocator.Malloc(1); - ASSERT_THROW(allocator.Free(data, 2), ParquetException); - ASSERT_NO_THROW(allocator.Free(data, 1)); - - int64_t to_alloc = std::numeric_limits<int64_t>::max(); - ASSERT_THROW(allocator.Malloc(to_alloc), ParquetException); -} - -TEST(TestAllocator, TotalMax) { - TrackingAllocator allocator; - ASSERT_EQ(0, allocator.TotalMemory()); - ASSERT_EQ(0, allocator.MaxMemory()); - - uint8_t* data = allocator.Malloc(100); - ASSERT_EQ(100, allocator.TotalMemory()); - ASSERT_EQ(100, allocator.MaxMemory()); - - uint8_t* data2 = allocator.Malloc(10); - ASSERT_EQ(110, allocator.TotalMemory()); - ASSERT_EQ(110, allocator.MaxMemory()); - - allocator.Free(data, 100); - ASSERT_EQ(10, allocator.TotalMemory()); - ASSERT_EQ(110, allocator.MaxMemory()); - - allocator.Free(data2, 10); - ASSERT_EQ(0, allocator.TotalMemory()); - ASSERT_EQ(110, allocator.MaxMemory()); -} - -} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-allocator.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/mem-allocator.cc b/src/parquet/util/mem-allocator.cc deleted file mode 100644 index 2b6592d..0000000 --- a/src/parquet/util/mem-allocator.cc +++ /dev/null @@ -1,61 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "parquet/util/mem-allocator.h" - -#include <cstdlib> - -#include "parquet/exception.h" - -namespace parquet { - -MemoryAllocator::~MemoryAllocator() {} - -uint8_t* TrackingAllocator::Malloc(int64_t size) { - if (0 == size) { return nullptr; } - - uint8_t* p = static_cast<uint8_t*>(std::malloc(size)); - if (!p) { throw ParquetException("OOM: memory allocation failed"); } - { - std::lock_guard<std::mutex> lock(stats_mutex_); - total_memory_ += size; - if (total_memory_ > max_memory_) { max_memory_ = total_memory_; } - } - return p; -} - -void TrackingAllocator::Free(uint8_t* p, int64_t size) { - if (nullptr != p && size > 0) { - { - std::lock_guard<std::mutex> lock(stats_mutex_); - if (total_memory_ < size) { - throw ParquetException("Attempting to free too much memory"); - } - total_memory_ -= size; - } - std::free(p); - } -} - -TrackingAllocator::~TrackingAllocator() {} - -MemoryAllocator* default_allocator() { - static TrackingAllocator default_allocator; - return &default_allocator; -} - -} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-allocator.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/mem-allocator.h b/src/parquet/util/mem-allocator.h deleted file mode 100644 index a0f3693..0000000 --- a/src/parquet/util/mem-allocator.h +++ /dev/null @@ -1,59 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_UTIL_MEMORY_POOL_H -#define PARQUET_UTIL_MEMORY_POOL_H - -#include <cstdint> -#include <mutex> - -#include "parquet/util/visibility.h" - -namespace parquet { - -class PARQUET_EXPORT MemoryAllocator { - public: - virtual ~MemoryAllocator(); - - // Returns nullptr if size is 0 - virtual uint8_t* Malloc(int64_t size) = 0; - virtual void Free(uint8_t* p, int64_t size) = 0; -}; - -PARQUET_EXPORT MemoryAllocator* default_allocator(); - -class PARQUET_EXPORT TrackingAllocator : public MemoryAllocator { - public: - TrackingAllocator() : total_memory_(0), max_memory_(0) {} - virtual ~TrackingAllocator(); - - uint8_t* Malloc(int64_t size) override; - void Free(uint8_t* p, int64_t size) override; - - int64_t TotalMemory() { return total_memory_; } - - int64_t MaxMemory() { return max_memory_; } - - private: - std::mutex stats_mutex_; - int64_t total_memory_; - int64_t max_memory_; -}; - -} // namespace parquet - -#endif // PARQUET_UTIL_MEMORY_POOL_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-pool-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/mem-pool-test.cc b/src/parquet/util/mem-pool-test.cc deleted file mode 100644 index 3f3424b..0000000 --- a/src/parquet/util/mem-pool-test.cc +++ /dev/null @@ -1,247 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Initially imported from Apache Impala on 2016-02-23, and has been modified -// since for parquet-cpp - -#include <cstdint> -#include <gtest/gtest.h> -#include <limits> -#include <string> - -#include "parquet/util/bit-util.h" -#include "parquet/util/mem-pool.h" - -namespace parquet { - -// Utility class to call private functions on MemPool. -class MemPoolTest { - public: - static bool CheckIntegrity(MemPool* pool, bool current_chunk_empty) { - return pool->CheckIntegrity(current_chunk_empty); - } - - static const int INITIAL_CHUNK_SIZE = MemPool::INITIAL_CHUNK_SIZE; - static const int MAX_CHUNK_SIZE = MemPool::MAX_CHUNK_SIZE; -}; - -const int MemPoolTest::INITIAL_CHUNK_SIZE; -const int MemPoolTest::MAX_CHUNK_SIZE; - -TEST(MemPoolTest, Basic) { - MemPool p; - MemPool p2; - MemPool p3; - - for (int iter = 0; iter < 2; ++iter) { - // allocate a total of 24K in 32-byte pieces (for which we only request 25 bytes) - for (int i = 0; i < 768; ++i) { - // pads to 32 bytes - p.Allocate(25); - } - // we handed back 24K - EXPECT_EQ(24 * 1024, p.total_allocated_bytes()); - // .. and allocated 28K of chunks (4, 8, 16) - EXPECT_EQ(28 * 1024, p.GetTotalChunkSizes()); - - // we're passing on the first two chunks, containing 12K of data; we're left with - // one chunk of 16K containing 12K of data - p2.AcquireData(&p, true); - EXPECT_EQ(12 * 1024, p.total_allocated_bytes()); - EXPECT_EQ(16 * 1024, p.GetTotalChunkSizes()); - - // we allocate 8K, for which there isn't enough room in the current chunk, - // so another one is allocated (32K) - p.Allocate(8 * 1024); - EXPECT_EQ((16 + 32) * 1024, p.GetTotalChunkSizes()); - - // we allocate 65K, which doesn't fit into the current chunk or the default - // size of the next allocated chunk (64K) - p.Allocate(65 * 1024); - EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes()); - if (iter == 0) { - EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); - } else { - EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); - } - EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); - - // Clear() resets allocated data, but doesn't remove any chunks - p.Clear(); - EXPECT_EQ(0, p.total_allocated_bytes()); - if (iter == 0) { - EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); - } else { - EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); - } - EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); - - // next allocation reuses existing chunks - p.Allocate(1024); - EXPECT_EQ(1024, p.total_allocated_bytes()); - if (iter == 0) { - EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); - } else { - EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); - } - EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); - - // ... unless it doesn't fit into any available chunk - p.Allocate(120 * 1024); - EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes()); - if (iter == 0) { - EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes()); - } else { - EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); - } - EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); - - // ... Try another chunk that fits into an existing chunk - p.Allocate(33 * 1024); - EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes()); - EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); - - // we're releasing 3 chunks, which get added to p2 - p2.AcquireData(&p, false); - EXPECT_EQ(0, p.total_allocated_bytes()); - EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); - EXPECT_EQ(0, p.GetTotalChunkSizes()); - - p3.AcquireData(&p2, true); // we're keeping the 65k chunk - EXPECT_EQ(33 * 1024, p2.total_allocated_bytes()); - EXPECT_EQ(65 * 1024, p2.GetTotalChunkSizes()); - - p.FreeAll(); - p2.FreeAll(); - p3.FreeAll(); - } -} - -// Test that we can keep an allocated chunk and a free chunk. -// This case verifies that when chunks are acquired by another memory pool the -// remaining chunks are consistent if there were more than one used chunk and some -// free chunks. -TEST(MemPoolTest, Keep) { - MemPool p; - p.Allocate(4 * 1024); - p.Allocate(8 * 1024); - p.Allocate(16 * 1024); - EXPECT_EQ((4 + 8 + 16) * 1024, p.total_allocated_bytes()); - EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); - p.Clear(); - EXPECT_EQ(0, p.total_allocated_bytes()); - EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); - p.Allocate(1 * 1024); - p.Allocate(4 * 1024); - EXPECT_EQ((1 + 4) * 1024, p.total_allocated_bytes()); - EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); - - MemPool p2; - p2.AcquireData(&p, true); - EXPECT_EQ(4 * 1024, p.total_allocated_bytes()); - EXPECT_EQ((8 + 16) * 1024, p.GetTotalChunkSizes()); - EXPECT_EQ(1 * 1024, p2.total_allocated_bytes()); - EXPECT_EQ(4 * 1024, p2.GetTotalChunkSizes()); - - p.FreeAll(); - p2.FreeAll(); -} - -// Tests that we can return partial allocations. -TEST(MemPoolTest, ReturnPartial) { - MemPool p; - uint8_t* ptr = p.Allocate(1024); - EXPECT_EQ(1024, p.total_allocated_bytes()); - memset(ptr, 0, 1024); - p.ReturnPartialAllocation(1024); - - uint8_t* ptr2 = p.Allocate(1024); - EXPECT_EQ(1024, p.total_allocated_bytes()); - EXPECT_TRUE(ptr == ptr2); - p.ReturnPartialAllocation(1016); - - ptr2 = p.Allocate(1016); - EXPECT_EQ(1024, p.total_allocated_bytes()); - EXPECT_TRUE(ptr2 == ptr + 8); - p.ReturnPartialAllocation(512); - memset(ptr2, 1, 1016 - 512); - - uint8_t* ptr3 = p.Allocate(512); - EXPECT_EQ(1024, p.total_allocated_bytes()); - EXPECT_TRUE(ptr3 == ptr + 512); - memset(ptr3, 2, 512); - - for (int i = 0; i < 8; ++i) { - EXPECT_EQ(0, ptr[i]); - } - for (int i = 8; i < 512; ++i) { - EXPECT_EQ(1, ptr[i]); - } - for (int i = 512; i < 1024; ++i) { - EXPECT_EQ(2, ptr[i]); - } - - p.FreeAll(); -} - -// Test that the MemPool overhead is bounded when we make allocations of -// INITIAL_CHUNK_SIZE. -TEST(MemPoolTest, MemoryOverhead) { - MemPool p; - const int alloc_size = MemPoolTest::INITIAL_CHUNK_SIZE; - const int num_allocs = 1000; - int64_t total_allocated = 0; - - for (int i = 0; i < num_allocs; ++i) { - uint8_t* mem = p.Allocate(alloc_size); - ASSERT_TRUE(mem != NULL); - total_allocated += alloc_size; - - int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated; - // The initial chunk fits evenly into MAX_CHUNK_SIZE, so should have at most - // one empty chunk at the end. - EXPECT_LE(wasted_memory, MemPoolTest::MAX_CHUNK_SIZE); - // The chunk doubling algorithm should not allocate chunks larger than the total - // amount of memory already allocated. - EXPECT_LE(wasted_memory, total_allocated); - } - - p.FreeAll(); -} - -// Test that the MemPool overhead is bounded when we make alternating large and small -// allocations. -TEST(MemPoolTest, FragmentationOverhead) { - MemPool p; - const int num_allocs = 100; - int64_t total_allocated = 0; - - for (int i = 0; i < num_allocs; ++i) { - int alloc_size = i % 2 == 0 ? 1 : MemPoolTest::MAX_CHUNK_SIZE; - uint8_t* mem = p.Allocate(alloc_size); - ASSERT_TRUE(mem != NULL); - total_allocated += alloc_size; - - int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated; - // Fragmentation should not waste more than half of each completed chunk. - EXPECT_LE(wasted_memory, total_allocated + MemPoolTest::MAX_CHUNK_SIZE); - } - - p.FreeAll(); -} - -} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-pool.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/mem-pool.cc b/src/parquet/util/mem-pool.cc deleted file mode 100644 index 1ab40bc..0000000 --- a/src/parquet/util/mem-pool.cc +++ /dev/null @@ -1,264 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Initially imported from Apache Impala on 2016-02-23, and has been modified -// since for parquet-cpp - -#include "parquet/util/mem-pool.h" - -#include <stdio.h> - -#include <algorithm> -#include <cstdint> -#include <sstream> -#include <string> - -#include "parquet/util/bit-util.h" -#include "parquet/util/logging.h" - -namespace parquet { - -const int MemPool::INITIAL_CHUNK_SIZE; -const int MemPool::MAX_CHUNK_SIZE; - -MemPool::MemPool(MemoryAllocator* allocator) - : current_chunk_idx_(-1), - next_chunk_size_(INITIAL_CHUNK_SIZE), - total_allocated_bytes_(0), - peak_allocated_bytes_(0), - total_reserved_bytes_(0), - allocator_(allocator) {} - -MemPool::ChunkInfo::ChunkInfo(int64_t size, uint8_t* buf) - : data(buf), size(size), allocated_bytes(0) {} - -MemPool::~MemPool() { - int64_t total_bytes_released = 0; - for (size_t i = 0; i < chunks_.size(); ++i) { - total_bytes_released += chunks_[i].size; - allocator_->Free(chunks_[i].data, chunks_[i].size); - } - - DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool"; -} - -void MemPool::ReturnPartialAllocation(int byte_size) { - DCHECK_GE(byte_size, 0); - DCHECK(current_chunk_idx_ != -1); - ChunkInfo& info = chunks_[current_chunk_idx_]; - DCHECK_GE(info.allocated_bytes, byte_size); - info.allocated_bytes -= byte_size; - total_allocated_bytes_ -= byte_size; -} - -template <bool CHECK_LIMIT_FIRST> -uint8_t* MemPool::Allocate(int size) { - if (size == 0) return NULL; - - int64_t num_bytes = BitUtil::RoundUp(size, 8); - if (current_chunk_idx_ == -1 || - num_bytes + chunks_[current_chunk_idx_].allocated_bytes > - chunks_[current_chunk_idx_].size) { - // If we couldn't allocate a new chunk, return NULL. - if (UNLIKELY(!FindChunk(num_bytes))) return NULL; - } - ChunkInfo& info = chunks_[current_chunk_idx_]; - uint8_t* result = info.data + info.allocated_bytes; - DCHECK_LE(info.allocated_bytes + num_bytes, info.size); - info.allocated_bytes += num_bytes; - total_allocated_bytes_ += num_bytes; - DCHECK_LE(current_chunk_idx_, static_cast<int>(chunks_.size()) - 1); - peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_); - return result; -} - -uint8_t* MemPool::Allocate(int size) { - return Allocate<false>(size); -} - -void MemPool::Clear() { - current_chunk_idx_ = -1; - for (auto chunk = chunks_.begin(); chunk != chunks_.end(); ++chunk) { - chunk->allocated_bytes = 0; - } - total_allocated_bytes_ = 0; - DCHECK(CheckIntegrity(false)); -} - -void MemPool::FreeAll() { - int64_t total_bytes_released = 0; - for (size_t i = 0; i < chunks_.size(); ++i) { - total_bytes_released += chunks_[i].size; - allocator_->Free(chunks_[i].data, chunks_[i].size); - } - chunks_.clear(); - next_chunk_size_ = INITIAL_CHUNK_SIZE; - current_chunk_idx_ = -1; - total_allocated_bytes_ = 0; - total_reserved_bytes_ = 0; -} - -bool MemPool::FindChunk(int64_t min_size) { - // Try to allocate from a free chunk. The first free chunk, if any, will be immediately - // after the current chunk. - int first_free_idx = current_chunk_idx_ + 1; - // (cast size() to signed int in order to avoid everything else being cast to - // unsigned long, in particular -1) - while (++current_chunk_idx_ < static_cast<int>(chunks_.size())) { - // we found a free chunk - DCHECK_EQ(chunks_[current_chunk_idx_].allocated_bytes, 0); - - if (chunks_[current_chunk_idx_].size >= min_size) { - // This chunk is big enough. Move it before the other free chunks. - if (current_chunk_idx_ != first_free_idx) { - std::swap(chunks_[current_chunk_idx_], chunks_[first_free_idx]); - current_chunk_idx_ = first_free_idx; - } - break; - } - } - - if (current_chunk_idx_ == static_cast<int>(chunks_.size())) { - // need to allocate new chunk. - int64_t chunk_size; - DCHECK_GE(next_chunk_size_, INITIAL_CHUNK_SIZE); - DCHECK_LE(next_chunk_size_, MAX_CHUNK_SIZE); - - chunk_size = std::max<int64_t>(min_size, next_chunk_size_); - - // Allocate a new chunk. Return early if malloc fails. - uint8_t* buf = allocator_->Malloc(chunk_size); - if (UNLIKELY(buf == NULL)) { - DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size())); - current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1; - return false; - } - - // If there are no free chunks put it at the end, otherwise before the first free. - if (first_free_idx == static_cast<int>(chunks_.size())) { - chunks_.push_back(ChunkInfo(chunk_size, buf)); - } else { - current_chunk_idx_ = first_free_idx; - auto insert_chunk = chunks_.begin() + current_chunk_idx_; - chunks_.insert(insert_chunk, ChunkInfo(chunk_size, buf)); - } - total_reserved_bytes_ += chunk_size; - // Don't increment the chunk size until the allocation succeeds: if an attempted - // large allocation fails we don't want to increase the chunk size further. - next_chunk_size_ = - static_cast<int>(std::min<int64_t>(chunk_size * 2, MAX_CHUNK_SIZE)); - } - - DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size())); - DCHECK(CheckIntegrity(true)); - return true; -} - -void MemPool::AcquireData(MemPool* src, bool keep_current) { - DCHECK(src->CheckIntegrity(false)); - int num_acquired_chunks; - if (keep_current) { - num_acquired_chunks = src->current_chunk_idx_; - } else if (src->GetFreeOffset() == 0) { - // nothing in the last chunk - num_acquired_chunks = src->current_chunk_idx_; - } else { - num_acquired_chunks = src->current_chunk_idx_ + 1; - } - - if (num_acquired_chunks <= 0) { - if (!keep_current) src->FreeAll(); - return; - } - - auto end_chunk = src->chunks_.begin() + num_acquired_chunks; - int64_t total_transfered_bytes = 0; - for (auto i = src->chunks_.begin(); i != end_chunk; ++i) { - total_transfered_bytes += i->size; - } - src->total_reserved_bytes_ -= total_transfered_bytes; - total_reserved_bytes_ += total_transfered_bytes; - - // insert new chunks after current_chunk_idx_ - auto insert_chunk = chunks_.begin() + current_chunk_idx_ + 1; - chunks_.insert(insert_chunk, src->chunks_.begin(), end_chunk); - src->chunks_.erase(src->chunks_.begin(), end_chunk); - current_chunk_idx_ += num_acquired_chunks; - - if (keep_current) { - src->current_chunk_idx_ = 0; - DCHECK(src->chunks_.size() == 1 || src->chunks_[1].allocated_bytes == 0); - total_allocated_bytes_ += src->total_allocated_bytes_ - src->GetFreeOffset(); - src->total_allocated_bytes_ = src->GetFreeOffset(); - } else { - src->current_chunk_idx_ = -1; - total_allocated_bytes_ += src->total_allocated_bytes_; - src->total_allocated_bytes_ = 0; - } - peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_); - - if (!keep_current) src->FreeAll(); - DCHECK(CheckIntegrity(false)); -} - -std::string MemPool::DebugString() { - std::stringstream out; - char str[16]; - out << "MemPool(#chunks=" << chunks_.size() << " ["; - for (size_t i = 0; i < chunks_.size(); ++i) { - sprintf(str, "0x%lx=", reinterpret_cast<size_t>(chunks_[i].data)); // NOLINT - out << (i > 0 ? " " : "") << str << chunks_[i].size << "/" - << chunks_[i].allocated_bytes; - } - out << "] current_chunk=" << current_chunk_idx_ - << " total_sizes=" << GetTotalChunkSizes() - << " total_alloc=" << total_allocated_bytes_ << ")"; - return out.str(); -} - -int64_t MemPool::GetTotalChunkSizes() const { - int64_t result = 0; - for (size_t i = 0; i < chunks_.size(); ++i) { - result += chunks_[i].size; - } - return result; -} - -bool MemPool::CheckIntegrity(bool current_chunk_empty) { - // check that current_chunk_idx_ points to the last chunk with allocated data - DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size())); - int64_t total_allocated = 0; - for (int i = 0; i < static_cast<int>(chunks_.size()); ++i) { - DCHECK_GT(chunks_[i].size, 0); - if (i < current_chunk_idx_) { - DCHECK_GT(chunks_[i].allocated_bytes, 0); - } else if (i == current_chunk_idx_) { - if (current_chunk_empty) { - DCHECK_EQ(chunks_[i].allocated_bytes, 0); - } else { - DCHECK_GT(chunks_[i].allocated_bytes, 0); - } - } else { - DCHECK_EQ(chunks_[i].allocated_bytes, 0); - } - total_allocated += chunks_[i].allocated_bytes; - } - DCHECK_EQ(total_allocated, total_allocated_bytes_); - return true; -} - -} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-pool.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/mem-pool.h b/src/parquet/util/mem-pool.h deleted file mode 100644 index 5f6afa9..0000000 --- a/src/parquet/util/mem-pool.h +++ /dev/null @@ -1,179 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Initially imported from Apache Impala on 2016-02-23, and has been modified -// since for parquet-cpp - -#ifndef PARQUET_UTIL_MEM_POOL_H -#define PARQUET_UTIL_MEM_POOL_H - -#include <algorithm> -#include <cstdint> -#include <stdio.h> -#include <string> -#include <vector> - -#include "parquet/util/mem-allocator.h" - -namespace parquet { - -/// A MemPool maintains a list of memory chunks from which it allocates memory -/// in response to Allocate() calls; -/// Chunks stay around for the lifetime of the mempool or until they are passed on to -/// another mempool. -// -/// An Allocate() call will attempt to allocate memory from the chunk that was most -/// recently added; if that chunk doesn't have enough memory to -/// satisfy the allocation request, the free chunks are searched for one that is -/// big enough otherwise a new chunk is added to the list. -/// The current_chunk_idx_ always points to the last chunk with allocated memory. -/// In order to keep allocation overhead low, chunk sizes double with each new one -/// added, until they hit a maximum size. -// -/// Example: -/// MemPool* p = new MemPool(); -/// for (int i = 0; i < 1024; ++i) { -/// returns 8-byte aligned memory (effectively 24 bytes): -/// .. = p->Allocate(17); -/// } -/// at this point, 17K have been handed out in response to Allocate() calls and -/// 28K of chunks have been allocated (chunk sizes: 4K, 8K, 16K) -/// We track total and peak allocated bytes. At this point they would be the same: -/// 28k bytes. A call to Clear will return the allocated memory so -/// total_allocate_bytes_ -/// becomes 0 while peak_allocate_bytes_ remains at 28k. -/// p->Clear(); -/// the entire 1st chunk is returned: -/// .. = p->Allocate(4 * 1024); -/// 4K of the 2nd chunk are returned: -/// .. = p->Allocate(4 * 1024); -/// a new 20K chunk is created -/// .. = p->Allocate(20 * 1024); -// -/// MemPool* p2 = new MemPool(); -/// the new mempool receives all chunks containing data from p -/// p2->AcquireData(p, false); -/// At this point p.total_allocated_bytes_ would be 0 while p.peak_allocated_bytes_ -/// remains unchanged. -/// The one remaining (empty) chunk is released: -/// delete p; - -class MemPool { - public: - explicit MemPool(MemoryAllocator* allocator = default_allocator()); - - /// Frees all chunks of memory and subtracts the total allocated bytes - /// from the registered limits. - ~MemPool(); - - /// Allocates 8-byte aligned section of memory of 'size' bytes at the end - /// of the the current chunk. Creates a new chunk if there aren't any chunks - /// with enough capacity. - uint8_t* Allocate(int size); - - /// Returns 'byte_size' to the current chunk back to the mem pool. This can - /// only be used to return either all or part of the previous allocation returned - /// by Allocate(). - void ReturnPartialAllocation(int byte_size); - - /// Makes all allocated chunks available for re-use, but doesn't delete any chunks. - void Clear(); - - /// Deletes all allocated chunks. FreeAll() or AcquireData() must be called for - /// each mem pool - void FreeAll(); - - /// Absorb all chunks that hold data from src. If keep_current is true, let src hold on - /// to its last allocated chunk that contains data. - /// All offsets handed out by calls to GetCurrentOffset() for 'src' become invalid. - void AcquireData(MemPool* src, bool keep_current); - - std::string DebugString(); - - int64_t total_allocated_bytes() const { return total_allocated_bytes_; } - int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; } - int64_t total_reserved_bytes() const { return total_reserved_bytes_; } - - /// Return sum of chunk_sizes_. - int64_t GetTotalChunkSizes() const; - - private: - friend class MemPoolTest; - static const int INITIAL_CHUNK_SIZE = 4 * 1024; - - /// The maximum size of chunk that should be allocated. Allocations larger than this - /// size will get their own individual chunk. - static const int MAX_CHUNK_SIZE = 1024 * 1024; - - struct ChunkInfo { - uint8_t* data; // Owned by the ChunkInfo. - int64_t size; // in bytes - - /// bytes allocated via Allocate() in this chunk - int64_t allocated_bytes; - - explicit ChunkInfo(int64_t size, uint8_t* buf); - - ChunkInfo() : data(NULL), size(0), allocated_bytes(0) {} - }; - - /// chunk from which we served the last Allocate() call; - /// always points to the last chunk that contains allocated data; - /// chunks 0..current_chunk_idx_ are guaranteed to contain data - /// (chunks_[i].allocated_bytes > 0 for i: 0..current_chunk_idx_); - /// -1 if no chunks present - int current_chunk_idx_; - - /// The size of the next chunk to allocate. - int64_t next_chunk_size_; - - /// sum of allocated_bytes_ - int64_t total_allocated_bytes_; - - /// Maximum number of bytes allocated from this pool at one time. - int64_t peak_allocated_bytes_; - - /// sum of all bytes allocated in chunks_ - int64_t total_reserved_bytes_; - - std::vector<ChunkInfo> chunks_; - - MemoryAllocator* allocator_; - - /// Find or allocated a chunk with at least min_size spare capacity and update - /// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_ - /// if a new chunk needs to be created. - bool FindChunk(int64_t min_size); - - /// Check integrity of the supporting data structures; always returns true but DCHECKs - /// all invariants. - /// If 'current_chunk_empty' is false, checks that the current chunk contains data. - bool CheckIntegrity(bool current_chunk_empty); - - /// Return offset to unoccpied space in current chunk. - int GetFreeOffset() const { - if (current_chunk_idx_ == -1) return 0; - return chunks_[current_chunk_idx_].allocated_bytes; - } - - template <bool CHECK_LIMIT_FIRST> - uint8_t* Allocate(int size); -}; - -} // namespace parquet - -#endif // PARQUET_UTIL_MEM_POOL_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/memory-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory-test.cc b/src/parquet/util/memory-test.cc new file mode 100644 index 0000000..45aa819 --- /dev/null +++ b/src/parquet/util/memory-test.cc @@ -0,0 +1,385 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <cstdio> +#include <memory> +#include <string> +#include <vector> + +#include <gtest/gtest.h> + +#include "parquet/exception.h" +#include "parquet/util/memory.h" +#include "parquet/util/test-common.h" + +namespace parquet { + +class TestBuffer : public ::testing::Test {}; + +TEST(TestAllocator, AllocateFree) { + TrackingAllocator allocator; + + uint8_t* data; + + ASSERT_TRUE(allocator.Allocate(100, &data).ok()); + ASSERT_TRUE(nullptr != data); + data[99] = 55; + allocator.Free(data, 100); + + ASSERT_TRUE(allocator.Allocate(0, &data).ok()); + ASSERT_EQ(nullptr, data); + allocator.Free(data, 0); + + int64_t to_alloc = std::numeric_limits<int64_t>::max(); + ASSERT_FALSE(allocator.Allocate(to_alloc, &data).ok()); +} + +TEST(TestAllocator, TotalMax) { + TrackingAllocator allocator; + ASSERT_EQ(0, allocator.bytes_allocated()); + ASSERT_EQ(0, allocator.max_memory()); + + uint8_t* data; + uint8_t* data2; + ASSERT_TRUE(allocator.Allocate(100, &data).ok()); + ASSERT_EQ(100, allocator.bytes_allocated()); + ASSERT_EQ(100, allocator.max_memory()); + + ASSERT_TRUE(allocator.Allocate(10, &data2).ok()); + ASSERT_EQ(110, allocator.bytes_allocated()); + ASSERT_EQ(110, allocator.max_memory()); + + allocator.Free(data, 100); + ASSERT_EQ(10, allocator.bytes_allocated()); + ASSERT_EQ(110, allocator.max_memory()); + + allocator.Free(data2, 10); + ASSERT_EQ(0, allocator.bytes_allocated()); + ASSERT_EQ(110, allocator.max_memory()); +} + +// Utility class to call private functions on MemPool. +class ChunkedAllocatorTest { + public: + static bool CheckIntegrity(ChunkedAllocator* pool, bool current_chunk_empty) { + return pool->CheckIntegrity(current_chunk_empty); + } + + static const int INITIAL_CHUNK_SIZE = ChunkedAllocator::INITIAL_CHUNK_SIZE; + static const int MAX_CHUNK_SIZE = ChunkedAllocator::MAX_CHUNK_SIZE; +}; + +const int ChunkedAllocatorTest::INITIAL_CHUNK_SIZE; +const int ChunkedAllocatorTest::MAX_CHUNK_SIZE; + +TEST(ChunkedAllocatorTest, Basic) { + ChunkedAllocator p; + ChunkedAllocator p2; + ChunkedAllocator p3; + + for (int iter = 0; iter < 2; ++iter) { + // allocate a total of 24K in 32-byte pieces (for which we only request 25 bytes) + for (int i = 0; i < 768; ++i) { + // pads to 32 bytes + p.Allocate(25); + } + // we handed back 24K + EXPECT_EQ(24 * 1024, p.total_allocated_bytes()); + // .. and allocated 28K of chunks (4, 8, 16) + EXPECT_EQ(28 * 1024, p.GetTotalChunkSizes()); + + // we're passing on the first two chunks, containing 12K of data; we're left with + // one chunk of 16K containing 12K of data + p2.AcquireData(&p, true); + EXPECT_EQ(12 * 1024, p.total_allocated_bytes()); + EXPECT_EQ(16 * 1024, p.GetTotalChunkSizes()); + + // we allocate 8K, for which there isn't enough room in the current chunk, + // so another one is allocated (32K) + p.Allocate(8 * 1024); + EXPECT_EQ((16 + 32) * 1024, p.GetTotalChunkSizes()); + + // we allocate 65K, which doesn't fit into the current chunk or the default + // size of the next allocated chunk (64K) + p.Allocate(65 * 1024); + EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes()); + if (iter == 0) { + EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); + } else { + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + } + EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // Clear() resets allocated data, but doesn't remove any chunks + p.Clear(); + EXPECT_EQ(0, p.total_allocated_bytes()); + if (iter == 0) { + EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); + } else { + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + } + EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // next allocation reuses existing chunks + p.Allocate(1024); + EXPECT_EQ(1024, p.total_allocated_bytes()); + if (iter == 0) { + EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); + } else { + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + } + EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // ... unless it doesn't fit into any available chunk + p.Allocate(120 * 1024); + EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes()); + if (iter == 0) { + EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes()); + } else { + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + } + EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // ... Try another chunk that fits into an existing chunk + p.Allocate(33 * 1024); + EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes()); + EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // we're releasing 3 chunks, which get added to p2 + p2.AcquireData(&p, false); + EXPECT_EQ(0, p.total_allocated_bytes()); + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + EXPECT_EQ(0, p.GetTotalChunkSizes()); + + p3.AcquireData(&p2, true); // we're keeping the 65k chunk + EXPECT_EQ(33 * 1024, p2.total_allocated_bytes()); + EXPECT_EQ(65 * 1024, p2.GetTotalChunkSizes()); + + p.FreeAll(); + p2.FreeAll(); + p3.FreeAll(); + } +} + +// Test that we can keep an allocated chunk and a free chunk. +// This case verifies that when chunks are acquired by another memory pool the +// remaining chunks are consistent if there were more than one used chunk and some +// free chunks. +TEST(ChunkedAllocatorTest, Keep) { + ChunkedAllocator p; + p.Allocate(4 * 1024); + p.Allocate(8 * 1024); + p.Allocate(16 * 1024); + EXPECT_EQ((4 + 8 + 16) * 1024, p.total_allocated_bytes()); + EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); + p.Clear(); + EXPECT_EQ(0, p.total_allocated_bytes()); + EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); + p.Allocate(1 * 1024); + p.Allocate(4 * 1024); + EXPECT_EQ((1 + 4) * 1024, p.total_allocated_bytes()); + EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); + + ChunkedAllocator p2; + p2.AcquireData(&p, true); + EXPECT_EQ(4 * 1024, p.total_allocated_bytes()); + EXPECT_EQ((8 + 16) * 1024, p.GetTotalChunkSizes()); + EXPECT_EQ(1 * 1024, p2.total_allocated_bytes()); + EXPECT_EQ(4 * 1024, p2.GetTotalChunkSizes()); + + p.FreeAll(); + p2.FreeAll(); +} + +// Tests that we can return partial allocations. +TEST(ChunkedAllocatorTest, ReturnPartial) { + ChunkedAllocator p; + uint8_t* ptr = p.Allocate(1024); + EXPECT_EQ(1024, p.total_allocated_bytes()); + memset(ptr, 0, 1024); + p.ReturnPartialAllocation(1024); + + uint8_t* ptr2 = p.Allocate(1024); + EXPECT_EQ(1024, p.total_allocated_bytes()); + EXPECT_TRUE(ptr == ptr2); + p.ReturnPartialAllocation(1016); + + ptr2 = p.Allocate(1016); + EXPECT_EQ(1024, p.total_allocated_bytes()); + EXPECT_TRUE(ptr2 == ptr + 8); + p.ReturnPartialAllocation(512); + memset(ptr2, 1, 1016 - 512); + + uint8_t* ptr3 = p.Allocate(512); + EXPECT_EQ(1024, p.total_allocated_bytes()); + EXPECT_TRUE(ptr3 == ptr + 512); + memset(ptr3, 2, 512); + + for (int i = 0; i < 8; ++i) { + EXPECT_EQ(0, ptr[i]); + } + for (int i = 8; i < 512; ++i) { + EXPECT_EQ(1, ptr[i]); + } + for (int i = 512; i < 1024; ++i) { + EXPECT_EQ(2, ptr[i]); + } + + p.FreeAll(); +} + +// Test that the ChunkedAllocator overhead is bounded when we make allocations of +// INITIAL_CHUNK_SIZE. +TEST(ChunkedAllocatorTest, MemoryOverhead) { + ChunkedAllocator p; + const int alloc_size = ChunkedAllocatorTest::INITIAL_CHUNK_SIZE; + const int num_allocs = 1000; + int64_t total_allocated = 0; + + for (int i = 0; i < num_allocs; ++i) { + uint8_t* mem = p.Allocate(alloc_size); + ASSERT_TRUE(mem != NULL); + total_allocated += alloc_size; + + int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated; + // The initial chunk fits evenly into MAX_CHUNK_SIZE, so should have at most + // one empty chunk at the end. + EXPECT_LE(wasted_memory, ChunkedAllocatorTest::MAX_CHUNK_SIZE); + // The chunk doubling algorithm should not allocate chunks larger than the total + // amount of memory already allocated. + EXPECT_LE(wasted_memory, total_allocated); + } + + p.FreeAll(); +} + +// Test that the ChunkedAllocator overhead is bounded when we make alternating +// large and small allocations. +TEST(ChunkedAllocatorTest, FragmentationOverhead) { + ChunkedAllocator p; + const int num_allocs = 100; + int64_t total_allocated = 0; + + for (int i = 0; i < num_allocs; ++i) { + int alloc_size = i % 2 == 0 ? 1 : ChunkedAllocatorTest::MAX_CHUNK_SIZE; + uint8_t* mem = p.Allocate(alloc_size); + ASSERT_TRUE(mem != NULL); + total_allocated += alloc_size; + + int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated; + // Fragmentation should not waste more than half of each completed chunk. + EXPECT_LE(wasted_memory, total_allocated + ChunkedAllocatorTest::MAX_CHUNK_SIZE); + } + + p.FreeAll(); +} + +TEST(TestBufferedInputStream, Basics) { + int64_t source_size = 256; + int64_t stream_offset = 10; + int64_t stream_size = source_size - stream_offset; + int64_t chunk_size = 50; + std::shared_ptr<PoolBuffer> buf = AllocateBuffer(default_allocator(), source_size); + ASSERT_EQ(source_size, buf->size()); + for (int i = 0; i < source_size; i++) { + buf->mutable_data()[i] = i; + } + + auto wrapper = + std::make_shared<ArrowInputFile>(std::make_shared<::arrow::io::BufferReader>(buf)); + + TrackingAllocator allocator; + std::unique_ptr<BufferedInputStream> stream(new BufferedInputStream( + &allocator, chunk_size, wrapper.get(), stream_offset, stream_size)); + + const uint8_t* output; + int64_t bytes_read; + + // source is at offset 10 + output = stream->Peek(10, &bytes_read); + ASSERT_EQ(10, bytes_read); + for (int i = 0; i < 10; i++) { + ASSERT_EQ(10 + i, output[i]) << i; + } + output = stream->Read(10, &bytes_read); + ASSERT_EQ(10, bytes_read); + for (int i = 0; i < 10; i++) { + ASSERT_EQ(10 + i, output[i]) << i; + } + output = stream->Read(10, &bytes_read); + ASSERT_EQ(10, bytes_read); + for (int i = 0; i < 10; i++) { + ASSERT_EQ(20 + i, output[i]) << i; + } + stream->Advance(5); + stream->Advance(5); + // source is at offset 40 + // read across buffer boundary. buffer size is 50 + output = stream->Read(20, &bytes_read); + ASSERT_EQ(20, bytes_read); + for (int i = 0; i < 20; i++) { + ASSERT_EQ(40 + i, output[i]) << i; + } + // read more than original chunk_size + output = stream->Read(60, &bytes_read); + ASSERT_EQ(60, bytes_read); + for (int i = 0; i < 60; i++) { + ASSERT_EQ(60 + i, output[i]) << i; + } + + stream->Advance(120); + // source is at offset 240 + // read outside of source boundary. source size is 256 + output = stream->Read(30, &bytes_read); + ASSERT_EQ(16, bytes_read); + for (int i = 0; i < 16; i++) { + ASSERT_EQ(240 + i, output[i]) << i; + } +} + +TEST(TestArrowInputFile, Basics) { + std::string data = "this is the data"; + auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str()); + + auto file = std::make_shared<::arrow::io::BufferReader>(data_buffer, data.size()); + auto source = std::make_shared<ArrowInputFile>(file); + + ASSERT_EQ(0, source->Tell()); + ASSERT_NO_THROW(source->Seek(5)); + ASSERT_EQ(5, source->Tell()); + ASSERT_NO_THROW(source->Seek(0)); + + // Seek out of bounds + ASSERT_THROW(source->Seek(100), ParquetException); + + uint8_t buffer[50]; + + ASSERT_NO_THROW(source->Read(4, buffer)); + ASSERT_EQ(0, std::memcmp(buffer, "this", 4)); + ASSERT_EQ(4, source->Tell()); + + std::shared_ptr<Buffer> pq_buffer; + + ASSERT_NO_THROW(pq_buffer = source->Read(7)); + + auto expected_buffer = std::make_shared<Buffer>(data_buffer + 4, 7); + + ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get())); +} + +} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/memory.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc new file mode 100644 index 0000000..9ad0336 --- /dev/null +++ b/src/parquet/util/memory.cc @@ -0,0 +1,543 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/util/memory.h" + +#include <algorithm> +#include <cstdint> +#include <cstdio> +#include <string> + +#include "parquet/exception.h" +#include "parquet/types.h" +#include "parquet/util/bit-util.h" +#include "parquet/util/logging.h" + +namespace parquet { + +::arrow::Status TrackingAllocator::Allocate(int64_t size, uint8_t** out) { + if (0 == size) { + *out = nullptr; + return ::arrow::Status::OK(); + } + + uint8_t* p = static_cast<uint8_t*>(std::malloc(size)); + if (!p) { return ::arrow::Status::OutOfMemory("memory allocation failed"); } + { + std::lock_guard<std::mutex> lock(stats_mutex_); + total_memory_ += size; + if (total_memory_ > max_memory_) { max_memory_ = total_memory_; } + } + *out = p; + return ::arrow::Status::OK(); +} + +void TrackingAllocator::Free(uint8_t* p, int64_t size) { + if (nullptr != p && size > 0) { + { + std::lock_guard<std::mutex> lock(stats_mutex_); + DCHECK_GE(total_memory_, size) << "Attempting to free too much memory"; + total_memory_ -= size; + } + std::free(p); + } +} + +MemoryAllocator* default_allocator() { + static TrackingAllocator allocator; + return &allocator; +} + +template <class T> +Vector<T>::Vector(int64_t size, MemoryAllocator* allocator) + : buffer_(AllocateUniqueBuffer(allocator, size * sizeof(T))), + size_(size), + capacity_(size) { + if (size > 0) { + data_ = reinterpret_cast<T*>(buffer_->mutable_data()); + } else { + data_ = nullptr; + } +} + +template <class T> +void Vector<T>::Reserve(int64_t new_capacity) { + if (new_capacity > capacity_) { + PARQUET_THROW_NOT_OK(buffer_->Resize(new_capacity * sizeof(T))); + data_ = reinterpret_cast<T*>(buffer_->mutable_data()); + capacity_ = new_capacity; + } +} + +template <class T> +void Vector<T>::Resize(int64_t new_size) { + Reserve(new_size); + size_ = new_size; +} + +template <class T> +void Vector<T>::Assign(int64_t size, const T val) { + Resize(size); + for (int64_t i = 0; i < size_; i++) { + data_[i] = val; + } +} + +template <class T> +void Vector<T>::Swap(Vector<T>& v) { + buffer_.swap(v.buffer_); + std::swap(size_, v.size_); + std::swap(capacity_, v.capacity_); + std::swap(data_, v.data_); +} + +template class Vector<int32_t>; +template class Vector<int64_t>; +template class Vector<bool>; +template class Vector<float>; +template class Vector<double>; +template class Vector<Int96>; +template class Vector<ByteArray>; +template class Vector<FixedLenByteArray>; + +const int ChunkedAllocator::INITIAL_CHUNK_SIZE; +const int ChunkedAllocator::MAX_CHUNK_SIZE; + +ChunkedAllocator::ChunkedAllocator(MemoryAllocator* allocator) + : current_chunk_idx_(-1), + next_chunk_size_(INITIAL_CHUNK_SIZE), + total_allocated_bytes_(0), + peak_allocated_bytes_(0), + total_reserved_bytes_(0), + allocator_(allocator) {} + +ChunkedAllocator::ChunkInfo::ChunkInfo(int64_t size, uint8_t* buf) + : data(buf), size(size), allocated_bytes(0) {} + +ChunkedAllocator::~ChunkedAllocator() { + int64_t total_bytes_released = 0; + for (size_t i = 0; i < chunks_.size(); ++i) { + total_bytes_released += chunks_[i].size; + allocator_->Free(chunks_[i].data, chunks_[i].size); + } + + DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool"; +} + +void ChunkedAllocator::ReturnPartialAllocation(int byte_size) { + DCHECK_GE(byte_size, 0); + DCHECK(current_chunk_idx_ != -1); + ChunkInfo& info = chunks_[current_chunk_idx_]; + DCHECK_GE(info.allocated_bytes, byte_size); + info.allocated_bytes -= byte_size; + total_allocated_bytes_ -= byte_size; +} + +template <bool CHECK_LIMIT_FIRST> +uint8_t* ChunkedAllocator::Allocate(int size) { + if (size == 0) return NULL; + + int64_t num_bytes = BitUtil::RoundUp(size, 8); + if (current_chunk_idx_ == -1 || + num_bytes + chunks_[current_chunk_idx_].allocated_bytes > + chunks_[current_chunk_idx_].size) { + // If we couldn't allocate a new chunk, return NULL. + if (UNLIKELY(!FindChunk(num_bytes))) return NULL; + } + ChunkInfo& info = chunks_[current_chunk_idx_]; + uint8_t* result = info.data + info.allocated_bytes; + DCHECK_LE(info.allocated_bytes + num_bytes, info.size); + info.allocated_bytes += num_bytes; + total_allocated_bytes_ += num_bytes; + DCHECK_LE(current_chunk_idx_, static_cast<int>(chunks_.size()) - 1); + peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_); + return result; +} + +uint8_t* ChunkedAllocator::Allocate(int size) { + return Allocate<false>(size); +} + +void ChunkedAllocator::Clear() { + current_chunk_idx_ = -1; + for (auto chunk = chunks_.begin(); chunk != chunks_.end(); ++chunk) { + chunk->allocated_bytes = 0; + } + total_allocated_bytes_ = 0; + DCHECK(CheckIntegrity(false)); +} + +void ChunkedAllocator::FreeAll() { + int64_t total_bytes_released = 0; + for (size_t i = 0; i < chunks_.size(); ++i) { + total_bytes_released += chunks_[i].size; + allocator_->Free(chunks_[i].data, chunks_[i].size); + } + chunks_.clear(); + next_chunk_size_ = INITIAL_CHUNK_SIZE; + current_chunk_idx_ = -1; + total_allocated_bytes_ = 0; + total_reserved_bytes_ = 0; +} + +bool ChunkedAllocator::FindChunk(int64_t min_size) { + // Try to allocate from a free chunk. The first free chunk, if any, will be immediately + // after the current chunk. + int first_free_idx = current_chunk_idx_ + 1; + // (cast size() to signed int in order to avoid everything else being cast to + // unsigned long, in particular -1) + while (++current_chunk_idx_ < static_cast<int>(chunks_.size())) { + // we found a free chunk + DCHECK_EQ(chunks_[current_chunk_idx_].allocated_bytes, 0); + + if (chunks_[current_chunk_idx_].size >= min_size) { + // This chunk is big enough. Move it before the other free chunks. + if (current_chunk_idx_ != first_free_idx) { + std::swap(chunks_[current_chunk_idx_], chunks_[first_free_idx]); + current_chunk_idx_ = first_free_idx; + } + break; + } + } + + if (current_chunk_idx_ == static_cast<int>(chunks_.size())) { + // need to allocate new chunk. + int64_t chunk_size; + DCHECK_GE(next_chunk_size_, INITIAL_CHUNK_SIZE); + DCHECK_LE(next_chunk_size_, MAX_CHUNK_SIZE); + + chunk_size = std::max<int64_t>(min_size, next_chunk_size_); + + // Allocate a new chunk. Return early if malloc fails. + uint8_t* buf = nullptr; + PARQUET_THROW_NOT_OK(allocator_->Allocate(chunk_size, &buf)); + if (UNLIKELY(buf == NULL)) { + DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size())); + current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1; + return false; + } + + // If there are no free chunks put it at the end, otherwise before the first free. + if (first_free_idx == static_cast<int>(chunks_.size())) { + chunks_.push_back(ChunkInfo(chunk_size, buf)); + } else { + current_chunk_idx_ = first_free_idx; + auto insert_chunk = chunks_.begin() + current_chunk_idx_; + chunks_.insert(insert_chunk, ChunkInfo(chunk_size, buf)); + } + total_reserved_bytes_ += chunk_size; + // Don't increment the chunk size until the allocation succeeds: if an attempted + // large allocation fails we don't want to increase the chunk size further. + next_chunk_size_ = + static_cast<int>(std::min<int64_t>(chunk_size * 2, MAX_CHUNK_SIZE)); + } + + DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size())); + DCHECK(CheckIntegrity(true)); + return true; +} + +void ChunkedAllocator::AcquireData(ChunkedAllocator* src, bool keep_current) { + DCHECK(src->CheckIntegrity(false)); + int num_acquired_chunks; + if (keep_current) { + num_acquired_chunks = src->current_chunk_idx_; + } else if (src->GetFreeOffset() == 0) { + // nothing in the last chunk + num_acquired_chunks = src->current_chunk_idx_; + } else { + num_acquired_chunks = src->current_chunk_idx_ + 1; + } + + if (num_acquired_chunks <= 0) { + if (!keep_current) src->FreeAll(); + return; + } + + auto end_chunk = src->chunks_.begin() + num_acquired_chunks; + int64_t total_transfered_bytes = 0; + for (auto i = src->chunks_.begin(); i != end_chunk; ++i) { + total_transfered_bytes += i->size; + } + src->total_reserved_bytes_ -= total_transfered_bytes; + total_reserved_bytes_ += total_transfered_bytes; + + // insert new chunks after current_chunk_idx_ + auto insert_chunk = chunks_.begin() + current_chunk_idx_ + 1; + chunks_.insert(insert_chunk, src->chunks_.begin(), end_chunk); + src->chunks_.erase(src->chunks_.begin(), end_chunk); + current_chunk_idx_ += num_acquired_chunks; + + if (keep_current) { + src->current_chunk_idx_ = 0; + DCHECK(src->chunks_.size() == 1 || src->chunks_[1].allocated_bytes == 0); + total_allocated_bytes_ += src->total_allocated_bytes_ - src->GetFreeOffset(); + src->total_allocated_bytes_ = src->GetFreeOffset(); + } else { + src->current_chunk_idx_ = -1; + total_allocated_bytes_ += src->total_allocated_bytes_; + src->total_allocated_bytes_ = 0; + } + peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_); + + if (!keep_current) src->FreeAll(); + DCHECK(CheckIntegrity(false)); +} + +std::string ChunkedAllocator::DebugString() { + std::stringstream out; + char str[16]; + out << "ChunkedAllocator(#chunks=" << chunks_.size() << " ["; + for (size_t i = 0; i < chunks_.size(); ++i) { + sprintf(str, "0x%lx=", reinterpret_cast<size_t>(chunks_[i].data)); // NOLINT + out << (i > 0 ? " " : "") << str << chunks_[i].size << "/" + << chunks_[i].allocated_bytes; + } + out << "] current_chunk=" << current_chunk_idx_ + << " total_sizes=" << GetTotalChunkSizes() + << " total_alloc=" << total_allocated_bytes_ << ")"; + return out.str(); +} + +int64_t ChunkedAllocator::GetTotalChunkSizes() const { + int64_t result = 0; + for (size_t i = 0; i < chunks_.size(); ++i) { + result += chunks_[i].size; + } + return result; +} + +bool ChunkedAllocator::CheckIntegrity(bool current_chunk_empty) { + // check that current_chunk_idx_ points to the last chunk with allocated data + DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size())); + int64_t total_allocated = 0; + for (int i = 0; i < static_cast<int>(chunks_.size()); ++i) { + DCHECK_GT(chunks_[i].size, 0); + if (i < current_chunk_idx_) { + DCHECK_GT(chunks_[i].allocated_bytes, 0); + } else if (i == current_chunk_idx_) { + if (current_chunk_empty) { + DCHECK_EQ(chunks_[i].allocated_bytes, 0); + } else { + DCHECK_GT(chunks_[i].allocated_bytes, 0); + } + } else { + DCHECK_EQ(chunks_[i].allocated_bytes, 0); + } + total_allocated += chunks_[i].allocated_bytes; + } + DCHECK_EQ(total_allocated, total_allocated_bytes_); + return true; +} + +// ---------------------------------------------------------------------- +// Arrow IO wrappers + +// Close the output stream +void ArrowFileMethods::Close() { + PARQUET_THROW_NOT_OK(file_interface()->Close()); +} + +// Return the current position in the output stream relative to the start +int64_t ArrowFileMethods::Tell() { + int64_t position = 0; + PARQUET_THROW_NOT_OK(file_interface()->Tell(&position)); + return position; +} + +ArrowInputFile::ArrowInputFile( + const std::shared_ptr<::arrow::io::ReadableFileInterface>& file) + : file_(file) {} + +::arrow::io::FileInterface* ArrowInputFile::file_interface() { + return file_.get(); +} + +int64_t ArrowInputFile::Size() const { + int64_t size; + PARQUET_THROW_NOT_OK(file_->GetSize(&size)); + return size; +} + +void ArrowInputFile::Seek(int64_t position) { + PARQUET_THROW_NOT_OK(file_->Seek(position)); +} + +// Returns bytes read +int64_t ArrowInputFile::Read(int64_t nbytes, uint8_t* out) { + int64_t bytes_read = 0; + PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out)); + return bytes_read; +} + +std::shared_ptr<Buffer> ArrowInputFile::Read(int64_t nbytes) { + std::shared_ptr<Buffer> out; + PARQUET_THROW_NOT_OK(file_->Read(nbytes, &out)); + return out; +} + +std::shared_ptr<Buffer> ArrowInputFile::ReadAt(int64_t position, int64_t nbytes) { + std::shared_ptr<Buffer> out; + PARQUET_THROW_NOT_OK(file_->ReadAt(position, nbytes, &out)); + return out; +} + +ArrowOutputStream::ArrowOutputStream( + const std::shared_ptr<::arrow::io::OutputStream> file) + : file_(file) {} + +::arrow::io::FileInterface* ArrowOutputStream::file_interface() { + return file_.get(); +} + +// Copy bytes into the output stream +void ArrowOutputStream::Write(const uint8_t* data, int64_t length) { + PARQUET_THROW_NOT_OK(file_->Write(data, length)); +} + +// ---------------------------------------------------------------------- +// InMemoryInputStream + +InMemoryInputStream::InMemoryInputStream(const std::shared_ptr<Buffer>& buffer) + : buffer_(buffer), offset_(0) { + len_ = buffer_->size(); +} + +InMemoryInputStream::InMemoryInputStream( + RandomAccessSource* source, int64_t start, int64_t num_bytes) + : offset_(0) { + buffer_ = source->ReadAt(start, num_bytes); + if (buffer_->size() < num_bytes) { + throw ParquetException("Unable to read column chunk data"); + } + len_ = buffer_->size(); +} + +const uint8_t* InMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) { + *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_); + return buffer_->data() + offset_; +} + +const uint8_t* InMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) { + const uint8_t* result = Peek(num_to_read, num_bytes); + offset_ += *num_bytes; + return result; +} + +void InMemoryInputStream::Advance(int64_t num_bytes) { + offset_ += num_bytes; +} + +// ---------------------------------------------------------------------- +// In-memory output stream + +InMemoryOutputStream::InMemoryOutputStream( + MemoryAllocator* allocator, int64_t initial_capacity) + : size_(0), capacity_(initial_capacity) { + if (initial_capacity == 0) { initial_capacity = kInMemoryDefaultCapacity; } + buffer_ = AllocateBuffer(allocator, initial_capacity); +} + +InMemoryOutputStream::~InMemoryOutputStream() {} + +uint8_t* InMemoryOutputStream::Head() { + return buffer_->mutable_data() + size_; +} + +void InMemoryOutputStream::Write(const uint8_t* data, int64_t length) { + if (size_ + length > capacity_) { + int64_t new_capacity = capacity_ * 2; + while (new_capacity < size_ + length) { + new_capacity *= 2; + } + PARQUET_THROW_NOT_OK(buffer_->Resize(new_capacity)); + capacity_ = new_capacity; + } + memcpy(Head(), data, length); + size_ += length; +} + +int64_t InMemoryOutputStream::Tell() { + return size_; +} + +std::shared_ptr<Buffer> InMemoryOutputStream::GetBuffer() { + PARQUET_THROW_NOT_OK(buffer_->Resize(size_)); + std::shared_ptr<Buffer> result = buffer_; + buffer_ = nullptr; + return result; +} + +// ---------------------------------------------------------------------- +// BufferedInputStream + +BufferedInputStream::BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size, + RandomAccessSource* source, int64_t start, int64_t num_bytes) + : source_(source), stream_offset_(start), stream_end_(start + num_bytes) { + buffer_ = AllocateBuffer(pool, buffer_size); + buffer_size_ = buffer_->size(); + // Required to force a lazy read + buffer_offset_ = buffer_size_; +} + +const uint8_t* BufferedInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) { + *num_bytes = std::min(num_to_peek, stream_end_ - stream_offset_); + // increase the buffer size if needed + if (*num_bytes > buffer_size_) { + PARQUET_THROW_NOT_OK(buffer_->Resize(*num_bytes)); + buffer_size_ = buffer_->size(); + DCHECK(buffer_size_ >= *num_bytes); + } + // Read more data when buffer has insufficient left or when resized + if (*num_bytes > (buffer_size_ - buffer_offset_)) { + source_->Seek(stream_offset_); + buffer_size_ = std::min(buffer_size_, stream_end_ - stream_offset_); + int64_t bytes_read = source_->Read(buffer_size_, buffer_->mutable_data()); + if (bytes_read < *num_bytes) { + throw ParquetException("Failed reading column data from source"); + } + buffer_offset_ = 0; + } + return buffer_->data() + buffer_offset_; +} + +const uint8_t* BufferedInputStream::Read(int64_t num_to_read, int64_t* num_bytes) { + const uint8_t* result = Peek(num_to_read, num_bytes); + stream_offset_ += *num_bytes; + buffer_offset_ += *num_bytes; + return result; +} + +void BufferedInputStream::Advance(int64_t num_bytes) { + stream_offset_ += num_bytes; + buffer_offset_ += num_bytes; +} + +std::shared_ptr<PoolBuffer> AllocateBuffer(MemoryAllocator* allocator, int64_t size) { + auto result = std::make_shared<PoolBuffer>(allocator); + if (size > 0) { PARQUET_THROW_NOT_OK(result->Resize(size)); } + return result; +} + +std::unique_ptr<PoolBuffer> AllocateUniqueBuffer( + MemoryAllocator* allocator, int64_t size) { + std::unique_ptr<PoolBuffer> result(new PoolBuffer(allocator)); + if (size > 0) { PARQUET_THROW_NOT_OK(result->Resize(size)); } + return result; +} + +} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/memory.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h new file mode 100644 index 0000000..1ffca35 --- /dev/null +++ b/src/parquet/util/memory.h @@ -0,0 +1,440 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_UTIL_MEMORY_H +#define PARQUET_UTIL_MEMORY_H + +#include <cstdint> +#include <cstdlib> +#include <cstring> +#include <memory> +#include <mutex> +#include <string> +#include <vector> + +#include "arrow/buffer.h" +#include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" +#include "arrow/memory_pool.h" +#include "arrow/status.h" + +#include "parquet/exception.h" +#include "parquet/util/macros.h" +#include "parquet/util/visibility.h" + +#define PARQUET_CATCH_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) { \ + return ::arrow::Status::IOError(e.what()); \ + } + +#define PARQUET_IGNORE_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) {} + +#define PARQUET_THROW_NOT_OK(s) \ + do { \ + ::arrow::Status _s = (s); \ + if (!_s.ok()) { \ + std::stringstream ss; \ + ss << "Arrow error: " << _s.ToString(); \ + ::parquet::ParquetException::Throw(ss.str()); \ + } \ + } while (0); + +namespace parquet { + +static constexpr int64_t kInMemoryDefaultCapacity = 1024; + +using Buffer = ::arrow::Buffer; +using MutableBuffer = ::arrow::MutableBuffer; +using ResizableBuffer = ::arrow::ResizableBuffer; +using PoolBuffer = ::arrow::PoolBuffer; +using MemoryAllocator = ::arrow::MemoryPool; + +PARQUET_EXPORT MemoryAllocator* default_allocator(); + +class PARQUET_EXPORT TrackingAllocator : public MemoryAllocator { + public: + TrackingAllocator() : total_memory_(0), max_memory_(0) {} + + ::arrow::Status Allocate(int64_t size, uint8_t** out) override; + void Free(uint8_t* p, int64_t size) override; + + int64_t bytes_allocated() const override { return total_memory_; } + + int64_t max_memory() { return max_memory_; } + + private: + std::mutex stats_mutex_; + int64_t total_memory_; + int64_t max_memory_; +}; + +template <class T> +class Vector { + public: + explicit Vector(int64_t size, MemoryAllocator* allocator); + void Resize(int64_t new_size); + void Reserve(int64_t new_capacity); + void Assign(int64_t size, const T val); + void Swap(Vector<T>& v); + inline T& operator[](int64_t i) const { return data_[i]; } + + private: + std::unique_ptr<PoolBuffer> buffer_; + int64_t size_; + int64_t capacity_; + T* data_; + + DISALLOW_COPY_AND_ASSIGN(Vector); +}; + +/// A ChunkedAllocator maintains a list of memory chunks from which it +/// allocates memory in response to Allocate() calls; Chunks stay around for +/// the lifetime of the allocator or until they are passed on to another +/// allocator. +// +/// An Allocate() call will attempt to allocate memory from the chunk that was most +/// recently added; if that chunk doesn't have enough memory to +/// satisfy the allocation request, the free chunks are searched for one that is +/// big enough otherwise a new chunk is added to the list. +/// The current_chunk_idx_ always points to the last chunk with allocated memory. +/// In order to keep allocation overhead low, chunk sizes double with each new one +/// added, until they hit a maximum size. +// +/// Example: +/// ChunkedAllocator* p = new ChunkedAllocator(); +/// for (int i = 0; i < 1024; ++i) { +/// returns 8-byte aligned memory (effectively 24 bytes): +/// .. = p->Allocate(17); +/// } +/// at this point, 17K have been handed out in response to Allocate() calls and +/// 28K of chunks have been allocated (chunk sizes: 4K, 8K, 16K) +/// We track total and peak allocated bytes. At this point they would be the same: +/// 28k bytes. A call to Clear will return the allocated memory so +/// total_allocate_bytes_ +/// becomes 0 while peak_allocate_bytes_ remains at 28k. +/// p->Clear(); +/// the entire 1st chunk is returned: +/// .. = p->Allocate(4 * 1024); +/// 4K of the 2nd chunk are returned: +/// .. = p->Allocate(4 * 1024); +/// a new 20K chunk is created +/// .. = p->Allocate(20 * 1024); +// +/// ChunkedAllocator* p2 = new ChunkedAllocator(); +/// the new ChunkedAllocator receives all chunks containing data from p +/// p2->AcquireData(p, false); +/// At this point p.total_allocated_bytes_ would be 0 while p.peak_allocated_bytes_ +/// remains unchanged. +/// The one remaining (empty) chunk is released: +/// delete p; + +class ChunkedAllocator { + public: + explicit ChunkedAllocator(MemoryAllocator* allocator = default_allocator()); + + /// Frees all chunks of memory and subtracts the total allocated bytes + /// from the registered limits. + ~ChunkedAllocator(); + + /// Allocates 8-byte aligned section of memory of 'size' bytes at the end + /// of the the current chunk. Creates a new chunk if there aren't any chunks + /// with enough capacity. + uint8_t* Allocate(int size); + + /// Returns 'byte_size' to the current chunk back to the mem pool. This can + /// only be used to return either all or part of the previous allocation returned + /// by Allocate(). + void ReturnPartialAllocation(int byte_size); + + /// Makes all allocated chunks available for re-use, but doesn't delete any chunks. + void Clear(); + + /// Deletes all allocated chunks. FreeAll() or AcquireData() must be called for + /// each mem pool + void FreeAll(); + + /// Absorb all chunks that hold data from src. If keep_current is true, let src hold on + /// to its last allocated chunk that contains data. + /// All offsets handed out by calls to GetCurrentOffset() for 'src' become invalid. + void AcquireData(ChunkedAllocator* src, bool keep_current); + + std::string DebugString(); + + int64_t total_allocated_bytes() const { return total_allocated_bytes_; } + int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; } + int64_t total_reserved_bytes() const { return total_reserved_bytes_; } + + /// Return sum of chunk_sizes_. + int64_t GetTotalChunkSizes() const; + + private: + friend class ChunkedAllocatorTest; + static const int INITIAL_CHUNK_SIZE = 4 * 1024; + + /// The maximum size of chunk that should be allocated. Allocations larger than this + /// size will get their own individual chunk. + static const int MAX_CHUNK_SIZE = 1024 * 1024; + + struct ChunkInfo { + uint8_t* data; // Owned by the ChunkInfo. + int64_t size; // in bytes + + /// bytes allocated via Allocate() in this chunk + int64_t allocated_bytes; + + explicit ChunkInfo(int64_t size, uint8_t* buf); + + ChunkInfo() : data(NULL), size(0), allocated_bytes(0) {} + }; + + /// chunk from which we served the last Allocate() call; + /// always points to the last chunk that contains allocated data; + /// chunks 0..current_chunk_idx_ are guaranteed to contain data + /// (chunks_[i].allocated_bytes > 0 for i: 0..current_chunk_idx_); + /// -1 if no chunks present + int current_chunk_idx_; + + /// The size of the next chunk to allocate. + int64_t next_chunk_size_; + + /// sum of allocated_bytes_ + int64_t total_allocated_bytes_; + + /// Maximum number of bytes allocated from this pool at one time. + int64_t peak_allocated_bytes_; + + /// sum of all bytes allocated in chunks_ + int64_t total_reserved_bytes_; + + std::vector<ChunkInfo> chunks_; + + MemoryAllocator* allocator_; + + /// Find or allocated a chunk with at least min_size spare capacity and update + /// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_ + /// if a new chunk needs to be created. + bool FindChunk(int64_t min_size); + + /// Check integrity of the supporting data structures; always returns true but DCHECKs + /// all invariants. + /// If 'current_chunk_empty' is false, checks that the current chunk contains data. + bool CheckIntegrity(bool current_chunk_empty); + + /// Return offset to unoccpied space in current chunk. + int GetFreeOffset() const { + if (current_chunk_idx_ == -1) return 0; + return chunks_[current_chunk_idx_].allocated_bytes; + } + + template <bool CHECK_LIMIT_FIRST> + uint8_t* Allocate(int size); +}; + +// File input and output interfaces that translate arrow::Status to exceptions + +class PARQUET_EXPORT FileInterface { + public: + // Close the file + virtual void Close() = 0; + + // Return the current position in the file relative to the start + virtual int64_t Tell() = 0; +}; + +class PARQUET_EXPORT RandomAccessSource : virtual public FileInterface { + public: + virtual ~RandomAccessSource() {} + + virtual int64_t Size() const = 0; + + virtual void Seek(int64_t position) = 0; + + // Returns bytes read + virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0; + + virtual std::shared_ptr<Buffer> Read(int64_t nbytes) = 0; + + virtual std::shared_ptr<Buffer> ReadAt(int64_t position, int64_t nbytes) = 0; +}; + +class PARQUET_EXPORT OutputStream : virtual public FileInterface { + public: + virtual ~OutputStream() {} + + // Copy bytes into the output stream + virtual void Write(const uint8_t* data, int64_t length) = 0; +}; + +class PARQUET_EXPORT ArrowFileMethods : virtual public FileInterface { + public: + void Close() override; + int64_t Tell() override; + + protected: + virtual ::arrow::io::FileInterface* file_interface() = 0; +}; + +class PARQUET_EXPORT ArrowInputFile : public ArrowFileMethods, public RandomAccessSource { + public: + explicit ArrowInputFile( + const std::shared_ptr<::arrow::io::ReadableFileInterface>& file); + + int64_t Size() const override; + + void Seek(int64_t position) override; + + // Returns bytes read + int64_t Read(int64_t nbytes, uint8_t* out) override; + + std::shared_ptr<Buffer> Read(int64_t nbytes) override; + + std::shared_ptr<Buffer> ReadAt(int64_t position, int64_t nbytes) override; + + std::shared_ptr<::arrow::io::ReadableFileInterface> file() const { return file_; } + + // Diamond inheritance + using ArrowFileMethods::Close; + using ArrowFileMethods::Tell; + + private: + ::arrow::io::FileInterface* file_interface() override; + std::shared_ptr<::arrow::io::ReadableFileInterface> file_; +}; + +class PARQUET_EXPORT ArrowOutputStream : public ArrowFileMethods, public OutputStream { + public: + explicit ArrowOutputStream(const std::shared_ptr<::arrow::io::OutputStream> file); + + // Copy bytes into the output stream + void Write(const uint8_t* data, int64_t length) override; + + std::shared_ptr<::arrow::io::OutputStream> file() { return file_; } + + // Diamond inheritance + using ArrowFileMethods::Close; + using ArrowFileMethods::Tell; + + private: + ::arrow::io::FileInterface* file_interface() override; + std::shared_ptr<::arrow::io::OutputStream> file_; +}; + +class PARQUET_EXPORT InMemoryOutputStream : public OutputStream { + public: + explicit InMemoryOutputStream(MemoryAllocator* allocator = default_allocator(), + int64_t initial_capacity = kInMemoryDefaultCapacity); + + virtual ~InMemoryOutputStream(); + + // Close is currently a no-op with the in-memory stream + virtual void Close() {} + + virtual int64_t Tell(); + + virtual void Write(const uint8_t* data, int64_t length); + + // Return complete stream as Buffer + std::shared_ptr<Buffer> GetBuffer(); + + private: + // Mutable pointer to the current write position in the stream + uint8_t* Head(); + + std::shared_ptr<ResizableBuffer> buffer_; + int64_t size_; + int64_t capacity_; + + DISALLOW_COPY_AND_ASSIGN(InMemoryOutputStream); +}; + +// ---------------------------------------------------------------------- +// Streaming input interfaces + +// Interface for the column reader to get the bytes. The interface is a stream +// interface, meaning the bytes in order and once a byte is read, it does not +// need to be read again. +class InputStream { + public: + // Returns the next 'num_to_peek' without advancing the current position. + // *num_bytes will contain the number of bytes returned which can only be + // less than num_to_peek at end of stream cases. + // Since the position is not advanced, calls to this function are idempotent. + // The buffer returned to the caller is still owned by the input stream and must + // stay valid until the next call to Peek() or Read(). + virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes) = 0; + + // Identical to Peek(), except the current position in the stream is advanced by + // *num_bytes. + virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes) = 0; + + // Advance the stream without reading + virtual void Advance(int64_t num_bytes) = 0; + + virtual ~InputStream() {} + + protected: + InputStream() {} +}; + +// Implementation of an InputStream when all the bytes are in memory. +class InMemoryInputStream : public InputStream { + public: + InMemoryInputStream(RandomAccessSource* source, int64_t start, int64_t end); + explicit InMemoryInputStream(const std::shared_ptr<Buffer>& buffer); + virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes); + virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes); + + virtual void Advance(int64_t num_bytes); + + private: + std::shared_ptr<Buffer> buffer_; + int64_t len_; + int64_t offset_; +}; + +// Implementation of an InputStream when only some of the bytes are in memory. +class BufferedInputStream : public InputStream { + public: + BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size, + RandomAccessSource* source, int64_t start, int64_t end); + virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes); + virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes); + + virtual void Advance(int64_t num_bytes); + + private: + std::shared_ptr<PoolBuffer> buffer_; + RandomAccessSource* source_; + int64_t stream_offset_; + int64_t stream_end_; + int64_t buffer_offset_; + int64_t buffer_size_; +}; + +std::shared_ptr<PoolBuffer> AllocateBuffer(MemoryAllocator* allocator, int64_t size = 0); + +std::unique_ptr<PoolBuffer> AllocateUniqueBuffer( + MemoryAllocator* allocator, int64_t size = 0); + +} // namespace parquet + +#endif // PARQUET_UTIL_MEMORY_H
