Repository: arrow Updated Branches: refs/heads/master 53a478dfb -> 69cdbd8ce
ARROW-494: [C++] Extend lifetime of memory mapped data if any buffers reference it If you read memory in an IPC scenario and then the `arrow::io::MemoryMappedFile` goes out of scope, before this patch the memory was being unmapped even if there are `arrow::Buffer` object referencing it. Author: Wes McKinney <[email protected]> Closes #298 from wesm/ARROW-494 and squashes the following commits: 60222e3 [Wes McKinney] clang-format 2960d17 [Wes McKinney] Add C++ unit test d7d776a [Wes McKinney] Add Python unit test where memory mapped file is garbage collected edf1295 [Wes McKinney] Reimplement memory map owner as Buffer subclass so that MemoryMappedFile can be safely destructed without invalidating Buffers referencing the mapped data Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/69cdbd8c Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/69cdbd8c Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/69cdbd8c Branch: refs/heads/master Commit: 69cdbd8ce665138ce35bb34d0bbe8087c0e9513e Parents: 53a478d Author: Wes McKinney <[email protected]> Authored: Mon Jan 23 06:43:05 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Mon Jan 23 06:43:05 2017 -0500 ---------------------------------------------------------------------- cpp/src/arrow/io/file.cc | 94 +++++++++++++++++++---------------- cpp/src/arrow/io/file.h | 7 ++- cpp/src/arrow/io/io-file-test.cc | 31 ++++++++++++ python/pyarrow/tests/test_io.py | 20 +++++++- 4 files changed, 104 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/69cdbd8c/cpp/src/arrow/io/file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc index 1de6efa..3bf8dfa 100644 --- a/cpp/src/arrow/io/file.cc +++ b/cpp/src/arrow/io/file.cc @@ -372,6 +372,8 @@ class OSFile { int64_t size() const { return size_; } + FileMode::type mode() const { return mode_; } + protected: std::string path_; @@ -513,14 +515,14 @@ int FileOutputStream::file_descriptor() const { // ---------------------------------------------------------------------- // Implement MemoryMappedFile -class MemoryMappedFile::MemoryMappedFileImpl : public OSFile { +class MemoryMappedFile::MemoryMap : public MutableBuffer { public: - MemoryMappedFileImpl() : OSFile(), data_(nullptr) {} + MemoryMap() : MutableBuffer(nullptr, 0) {} - ~MemoryMappedFileImpl() { - if (is_open_) { - munmap(data_, size_); - OSFile::Close(); + ~MemoryMap() { + if (file_->is_open()) { + munmap(mutable_data_, size_); + file_->Close(); } } @@ -528,27 +530,35 @@ class MemoryMappedFile::MemoryMappedFileImpl : public OSFile { int prot_flags; int map_mode; + file_.reset(new OSFile()); + if (mode != FileMode::READ) { // Memory mapping has permission failures if PROT_READ not set prot_flags = PROT_READ | PROT_WRITE; map_mode = MAP_SHARED; constexpr bool append = true; constexpr bool write_only = false; - RETURN_NOT_OK(OSFile::OpenWriteable(path, append, write_only)); - mode_ = mode; + RETURN_NOT_OK(file_->OpenWriteable(path, append, write_only)); + + is_mutable_ = true; } else { prot_flags = PROT_READ; map_mode = MAP_PRIVATE; // Changes are not to be committed back to the file - RETURN_NOT_OK(OSFile::OpenReadable(path)); + RETURN_NOT_OK(file_->OpenReadable(path)); + + is_mutable_ = false; } - void* result = mmap(nullptr, size_, prot_flags, map_mode, fd(), 0); + void* result = mmap(nullptr, file_->size(), prot_flags, map_mode, file_->fd(), 0); if (result == MAP_FAILED) { std::stringstream ss; ss << "Memory mapping file failed, errno: " << errno; return Status::IOError(ss.str()); } - data_ = reinterpret_cast<uint8_t*>(result); + + data_ = mutable_data_ = reinterpret_cast<uint8_t*>(result); + size_ = file_->size(); + position_ = 0; return Status::OK(); @@ -566,50 +576,45 @@ class MemoryMappedFile::MemoryMappedFileImpl : public OSFile { void advance(int64_t nbytes) { position_ = position_ + nbytes; } - uint8_t* data() { return data_; } + uint8_t* head() { return mutable_data_ + position_; } - uint8_t* head() { return data_ + position_; } + bool writable() { return file_->mode() != FileMode::READ; } - bool writable() { return mode_ != FileMode::READ; } + bool opened() { return file_->is_open(); } - bool opened() { return is_open_; } + int fd() const { return file_->fd(); } private: + std::unique_ptr<OSFile> file_; int64_t position_; - - // The memory map - uint8_t* data_; }; -MemoryMappedFile::MemoryMappedFile(FileMode::type mode) { - ReadableFileInterface::set_mode(mode); -} - +MemoryMappedFile::MemoryMappedFile() {} MemoryMappedFile::~MemoryMappedFile() {} Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode, std::shared_ptr<MemoryMappedFile>* out) { - std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile(mode)); + std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile()); - result->impl_.reset(new MemoryMappedFileImpl()); - RETURN_NOT_OK(result->impl_->Open(path, mode)); + result->memory_map_.reset(new MemoryMap()); + RETURN_NOT_OK(result->memory_map_->Open(path, mode)); *out = result; return Status::OK(); } Status MemoryMappedFile::GetSize(int64_t* size) { - *size = impl_->size(); + *size = memory_map_->size(); return Status::OK(); } Status MemoryMappedFile::Tell(int64_t* position) { - *position = impl_->position(); + *position = memory_map_->position(); return Status::OK(); } Status MemoryMappedFile::Seek(int64_t position) { - return impl_->Seek(position); + return memory_map_->Seek(position); } Status MemoryMappedFile::Close() { @@ -618,19 +623,24 @@ Status MemoryMappedFile::Close() { } Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) { - nbytes = std::max<int64_t>(0, std::min(nbytes, impl_->size() - impl_->position())); - if (nbytes > 0) { std::memcpy(out, impl_->head(), nbytes); } + nbytes = std::max<int64_t>( + 0, std::min(nbytes, memory_map_->size() - memory_map_->position())); + if (nbytes > 0) { std::memcpy(out, memory_map_->head(), nbytes); } *bytes_read = nbytes; - impl_->advance(nbytes); + memory_map_->advance(nbytes); return Status::OK(); } Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { - nbytes = std::max<int64_t>(0, std::min(nbytes, impl_->size() - impl_->position())); + nbytes = std::max<int64_t>( + 0, std::min(nbytes, memory_map_->size() - memory_map_->position())); - const uint8_t* data = nbytes > 0 ? impl_->head() : nullptr; - *out = std::make_shared<Buffer>(data, nbytes); - impl_->advance(nbytes); + if (nbytes > 0) { + *out = SliceBuffer(memory_map_, memory_map_->position(), nbytes); + } else { + *out = std::make_shared<Buffer>(nullptr, 0); + } + memory_map_->advance(nbytes); return Status::OK(); } @@ -639,19 +649,19 @@ bool MemoryMappedFile::supports_zero_copy() const { } Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) { - if (!impl_->opened() || !impl_->writable()) { + if (!memory_map_->opened() || !memory_map_->writable()) { return Status::IOError("Unable to write"); } - RETURN_NOT_OK(impl_->Seek(position)); + RETURN_NOT_OK(memory_map_->Seek(position)); return WriteInternal(data, nbytes); } Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) { - if (!impl_->opened() || !impl_->writable()) { + if (!memory_map_->opened() || !memory_map_->writable()) { return Status::IOError("Unable to write"); } - if (nbytes + impl_->position() > impl_->size()) { + if (nbytes + memory_map_->position() > memory_map_->size()) { return Status::Invalid("Cannot write past end of memory map"); } @@ -659,13 +669,13 @@ Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) { } Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) { - memcpy(impl_->head(), data, nbytes); - impl_->advance(nbytes); + memcpy(memory_map_->head(), data, nbytes); + memory_map_->advance(nbytes); return Status::OK(); } int MemoryMappedFile::file_descriptor() const { - return impl_->fd(); + return memory_map_->fd(); } } // namespace io http://git-wip-us.apache.org/repos/asf/arrow/blob/69cdbd8c/cpp/src/arrow/io/file.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h index 2387232..930346b 100644 --- a/cpp/src/arrow/io/file.h +++ b/cpp/src/arrow/io/file.h @@ -130,13 +130,12 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { int file_descriptor() const; private: - explicit MemoryMappedFile(FileMode::type mode); + MemoryMappedFile(); Status WriteInternal(const uint8_t* data, int64_t nbytes); - // Hide the internal details of this class for now - class ARROW_NO_EXPORT MemoryMappedFileImpl; - std::unique_ptr<MemoryMappedFileImpl> impl_; + class ARROW_NO_EXPORT MemoryMap; + std::shared_ptr<MemoryMap> memory_map_; }; } // namespace io http://git-wip-us.apache.org/repos/asf/arrow/blob/69cdbd8c/cpp/src/arrow/io/io-file-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc index f18f7b6..999b296 100644 --- a/cpp/src/arrow/io/io-file-test.cc +++ b/cpp/src/arrow/io/io-file-test.cc @@ -396,6 +396,37 @@ TEST_F(TestMemoryMappedFile, ReadOnly) { rommap->Close(); } +TEST_F(TestMemoryMappedFile, RetainMemoryMapReference) { + // ARROW-494 + + const int64_t buffer_size = 1024; + std::vector<uint8_t> buffer(buffer_size); + + test::random_bytes(1024, 0, buffer.data()); + + std::string path = "ipc-read-only-test"; + CreateFile(path, buffer_size); + + { + std::shared_ptr<MemoryMappedFile> rwmmap; + ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &rwmmap)); + ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size)); + ASSERT_OK(rwmmap->Close()); + } + + std::shared_ptr<Buffer> out_buffer; + + { + std::shared_ptr<MemoryMappedFile> rommap; + ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap)); + ASSERT_OK(rommap->Read(buffer_size, &out_buffer)); + ASSERT_OK(rommap->Close()); + } + + // valgrind will catch if memory is unmapped + ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); +} + TEST_F(TestMemoryMappedFile, InvalidMode) { const int64_t buffer_size = 1024; std::vector<uint8_t> buffer(buffer_size); http://git-wip-us.apache.org/repos/asf/arrow/blob/69cdbd8c/python/pyarrow/tests/test_io.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index f28d44a..dfa84a2 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -16,6 +16,7 @@ # under the License. from io import BytesIO +import gc import os import pytest @@ -163,9 +164,8 @@ def test_inmemory_write_after_closed(): # ---------------------------------------------------------------------- # OS files and memory maps [email protected](scope='session') [email protected] def sample_disk_data(request): - SIZE = 4096 arr = np.random.randint(0, 256, size=SIZE).astype('u1') data = arr.tobytes()[:SIZE] @@ -206,6 +206,22 @@ def test_memory_map_reader(sample_disk_data): _check_native_file_reader(io.MemoryMappedFile, sample_disk_data) +def test_memory_map_retain_buffer_reference(sample_disk_data): + path, data = sample_disk_data + + cases = [] + with io.MemoryMappedFile(path, 'rb') as f: + cases.append((f.read_buffer(100), data[:100])) + cases.append((f.read_buffer(100), data[100:200])) + cases.append((f.read_buffer(100), data[200:300])) + + # Call gc.collect() for good measure + gc.collect() + + for buf, expected in cases: + assert buf.to_pybytes() == expected + + def test_os_file_reader(sample_disk_data): _check_native_file_reader(io.OSFile, sample_disk_data)
