This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2657f9d  ARROW-2701: [C++] Make MemoryMappedFile resizable redux
2657f9d is described below

commit 2657f9d77c4e64a251094af38d83ecfa62d272ea
Author: Dimitri Vorona <[email protected]>
AuthorDate: Tue Jul 24 22:55:00 2018 -0400

    ARROW-2701: [C++] Make MemoryMappedFile resizable redux
    
    This is an adjusted version of the [previous 
PR](https://github.com/apache/arrow/pull/2134).
    
    The following problems were addressed:
    
    * `MemoryMap` doesn't differentiate between `size_` and `capacity_` or 
`Resize` and `Reserve`. I still keep them, so I can inherit from 
`ResizableBuffer` to show the intent, but it might as well be changed back to 
`MutableBuffer` with a generic `Resize` method with a cleaner signature
    
    * Before resizing, I acquire the `resize_lock` and check the `use_count` of 
the `MemoryMap`. If there are active slices, meaning readers holding a 
reference to this map, the resizing fails.
    
    * For this to work, the readers have to acquire the same lock, or else they 
can change the `use_count` while we aren't looking. This means, that the 
readers can't create slices concurrently, which may not be as bad, since 
slicing doesn't block that long, but it's not nice. It's worse for the 
`memcpy`ing `ReadAt`. A `shared_mutex` would solve this problem.
    
    * I removed any automatic resizing
    
    * 1ba892 fixes a bug, where you could write past the end using `WriteAt`. I 
can open a separate PR for it
    
    As I said, I'm not very happy with the implementation, but it works and 
seems safe to me.
    
    Author: Dimitri Vorona <[email protected]>
    
    Closes #2205 from alendit/resizable_mmap_basic and squashes the following 
commits:
    
    86349138 <Dimitri Vorona> Make MemoryMappedFile resizable on linux
---
 cpp/src/arrow/io/file.cc             | 150 ++++++++++++++++++++-------
 cpp/src/arrow/io/file.h              |  15 ++-
 cpp/src/arrow/io/io-file-test.cc     | 193 +++++++++++++++++++++++++++++++++++
 cpp/src/arrow/util/io-util.cc        |  87 +++++++++++++++-
 cpp/src/arrow/util/io-util.h         |   3 +
 python/pyarrow/includes/libarrow.pxd |   2 +
 python/pyarrow/io.pxi                |  13 +++
 python/pyarrow/tests/test_io.py      |  20 ++++
 8 files changed, 440 insertions(+), 43 deletions(-)

diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 12c287b..6cd1546 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -27,6 +27,7 @@
 #undef Free
 #else
 #include <sys/mman.h>
+#include <unistd.h>
 #endif
 
 #include <string.h>
@@ -345,29 +346,28 @@ Status FileOutputStream::Write(const void* data, int64_t 
length) {
 int FileOutputStream::file_descriptor() const { return impl_->fd(); }
 
 // ----------------------------------------------------------------------
-// Implement MemoryMappedFile
-
+// Implement MemoryMappedFile as a buffer subclass
+// The class doesn't differentiate between size and capacity
 class MemoryMappedFile::MemoryMap : public MutableBuffer {
  public:
   MemoryMap() : MutableBuffer(nullptr, 0) {}
 
   ~MemoryMap() {
     if (file_->is_open()) {
-      munmap(mutable_data_, static_cast<size_t>(size_));
+      if (mutable_data_ != nullptr) {
+        DCHECK_EQ(munmap(mutable_data_, static_cast<size_t>(size_)), 0);
+      }
       DCHECK(file_->Close().ok());
     }
   }
 
   Status Open(const std::string& path, FileMode::type mode) {
-    int prot_flags;
-    int map_mode;
-
     file_.reset(new OSFile());
 
     if (mode != FileMode::READ) {
       // Memory mapping has permission failures if PROT_READ not set
-      prot_flags = PROT_READ | PROT_WRITE;
-      map_mode = MAP_SHARED;
+      prot_flags_ = PROT_READ | PROT_WRITE;
+      map_mode_ = MAP_SHARED;
       constexpr bool append = false;
       constexpr bool truncate = false;
       constexpr bool write_only = false;
@@ -375,35 +375,26 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {
 
       is_mutable_ = true;
     } else {
-      prot_flags = PROT_READ;
-      map_mode = MAP_PRIVATE;  // Changes are not to be committed back to the 
file
+      prot_flags_ = PROT_READ;
+      map_mode_ = MAP_PRIVATE;  // Changes are not to be committed back to the 
file
       RETURN_NOT_OK(file_->OpenReadable(path));
 
       is_mutable_ = false;
     }
 
-    size_ = file_->size();
-
-    void* result = nullptr;
-
     // Memory mapping fails when file size is 0
-    if (size_ > 0) {
-      result =
-          mmap(nullptr, static_cast<size_t>(size_), prot_flags, map_mode, 
file_->fd(), 0);
-      if (result == MAP_FAILED) {
-        std::stringstream ss;
-        ss << "Memory mapping file failed: " << std::strerror(errno);
-        return Status::IOError(ss.str());
-      }
+    // delay it until the first resize
+    if (file_->size() > 0) {
+      RETURN_NOT_OK(InitMMap(file_->size()));
     }
 
-    data_ = mutable_data_ = reinterpret_cast<uint8_t*>(result);
-
     position_ = 0;
 
     return Status::OK();
   }
 
+  Status Resize(const int64_t new_size) { return ResizeMap(new_size); }
+
   int64_t size() const { return size_; }
 
   Status Seek(int64_t position) {
@@ -426,11 +417,73 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {
 
   int fd() const { return file_->fd(); }
 
-  std::mutex& lock() { return file_->lock(); }
+  std::mutex& write_lock() { return file_->lock(); }
+
+  std::mutex& resize_lock() { return resize_lock_; }
 
  private:
+  // Resize the mmap and file to the specified size.
+  Status ResizeMap(int64_t new_size) {
+    if (file_->mode() != FileMode::type::READWRITE &&
+        file_->mode() != FileMode::type::WRITE) {
+      return Status::IOError("Cannot resize a readonly memory map");
+    }
+
+    if (new_size == 0) {
+      if (mutable_data_ != nullptr) {
+        // just unmap the mmap and truncate the file to 0 size
+        if (munmap(mutable_data_, capacity_) != 0) {
+          return Status::IOError("Cannot unmap the file");
+        }
+        RETURN_NOT_OK(internal::FileTruncate(file_->fd(), 0));
+        data_ = mutable_data_ = nullptr;
+        size_ = capacity_ = 0;
+      }
+      position_ = 0;
+      return Status::OK();
+    }
+
+    if (mutable_data_) {
+      void* result;
+      RETURN_NOT_OK(
+          internal::MemoryMapRemap(mutable_data_, size_, new_size, 
file_->fd(), &result));
+      size_ = capacity_ = new_size;
+      data_ = mutable_data_ = static_cast<uint8_t*>(result);
+      if (position_ > size_) {
+        position_ = size_;
+      }
+    } else {
+      DCHECK_EQ(position_, 0);
+      // the mmap is not yet initialized, resize the underlying
+      // file, since it might have been 0-sized
+      RETURN_NOT_OK(InitMMap(new_size, /*resize_file*/ true));
+    }
+    return Status::OK();
+  }
+
+  // Initialize the mmap and set size, capacity and the data pointers
+  Status InitMMap(int64_t initial_size, bool resize_file = false) {
+    if (resize_file) {
+      RETURN_NOT_OK(internal::FileTruncate(file_->fd(), initial_size));
+    }
+    DCHECK(data_ == nullptr && mutable_data_ == nullptr);
+    void* result = mmap(nullptr, static_cast<size_t>(initial_size), 
prot_flags_,
+                        map_mode_, file_->fd(), 0);
+    if (result == MAP_FAILED) {
+      std::stringstream ss;
+      ss << "Memory mapping file failed: " << std::strerror(errno);
+      return Status::IOError(ss.str());
+    }
+    size_ = capacity_ = initial_size;
+    data_ = mutable_data_ = static_cast<uint8_t*>(result);
+
+    return Status::OK();
+  }
   std::unique_ptr<OSFile> file_;
+  int prot_flags_;
+  int map_mode_;
   int64_t position_;
+  std::mutex resize_lock_;
 };
 
 MemoryMappedFile::MemoryMappedFile() {}
@@ -475,18 +528,12 @@ Status MemoryMappedFile::Close() {
   return Status::OK();
 }
 
-Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* 
bytes_read,
-                                void* out) {
-  nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->size() - 
position));
-  if (nbytes > 0) {
-    std::memcpy(out, memory_map_->data() + position, 
static_cast<size_t>(nbytes));
-  }
-  *bytes_read = nbytes;
-  return Status::OK();
-}
-
 Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes,
                                 std::shared_ptr<Buffer>* out) {
+  // we acquire the lock before creating any slices in case a resize is
+  // triggered concurrently, otherwise we wouldn't detect a change in the
+  // use count
+  std::lock_guard<std::mutex> guard_resize(memory_map_->resize_lock());
   nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->size() - 
position));
 
   if (nbytes > 0) {
@@ -497,6 +544,17 @@ Status MemoryMappedFile::ReadAt(int64_t position, int64_t 
nbytes,
   return Status::OK();
 }
 
+Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* 
bytes_read,
+                                void* out) {
+  std::lock_guard<std::mutex> resize_guard(memory_map_->resize_lock());
+  nbytes = std::max<int64_t>(0, std::min(nbytes, memory_map_->size() - 
position));
+  if (nbytes > 0) {
+    std::memcpy(out, memory_map_->data() + position, 
static_cast<size_t>(nbytes));
+  }
+  *bytes_read = nbytes;
+  return Status::OK();
+}
+
 Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, void* out) {
   RETURN_NOT_OK(ReadAt(memory_map_->position(), nbytes, bytes_read, out));
   memory_map_->advance(*bytes_read);
@@ -512,7 +570,7 @@ Status MemoryMappedFile::Read(int64_t nbytes, 
std::shared_ptr<Buffer>* out) {
 bool MemoryMappedFile::supports_zero_copy() const { return true; }
 
 Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t 
nbytes) {
-  std::lock_guard<std::mutex> guard(memory_map_->lock());
+  std::lock_guard<std::mutex> guard(memory_map_->write_lock());
 
   if (!memory_map_->opened() || !memory_map_->writable()) {
     return Status::IOError("Unable to write");
@@ -522,11 +580,15 @@ Status MemoryMappedFile::WriteAt(int64_t position, const 
void* data, int64_t nby
   }
 
   RETURN_NOT_OK(memory_map_->Seek(position));
+  if (nbytes + memory_map_->position() > memory_map_->size()) {
+    return Status::Invalid("Cannot write past end of memory map");
+  }
+
   return WriteInternal(data, nbytes);
 }
 
 Status MemoryMappedFile::Write(const void* data, int64_t nbytes) {
-  std::lock_guard<std::mutex> guard(memory_map_->lock());
+  std::lock_guard<std::mutex> guard(memory_map_->write_lock());
 
   if (!memory_map_->opened() || !memory_map_->writable()) {
     return Status::IOError("Unable to write");
@@ -544,6 +606,20 @@ Status MemoryMappedFile::WriteInternal(const void* data, 
int64_t nbytes) {
   return Status::OK();
 }
 
+Status MemoryMappedFile::Resize(int64_t new_size) {
+  std::unique_lock<std::mutex> write_guard(memory_map_->write_lock(), 
std::defer_lock);
+  std::unique_lock<std::mutex> resize_guard(memory_map_->resize_lock(), 
std::defer_lock);
+  std::lock(write_guard, resize_guard);
+  // having both locks, we can check the number of times memory_map_
+  // was borrwed (meaning number of reader still holding a ref to it + 1)
+  // and if it's greater than 1, we fail loudly
+  if (memory_map_.use_count() > 1) {
+    return Status::IOError("Cannot resize memory map while there are active 
readers");
+  }
+  RETURN_NOT_OK(memory_map_->Resize(new_size));
+  return Status::OK();
+}
+
 int MemoryMappedFile::file_descriptor() const { return memory_map_->fd(); }
 
 }  // namespace io
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
index 7020cab..423d11e 100644
--- a/cpp/src/arrow/io/file.h
+++ b/cpp/src/arrow/io/file.h
@@ -195,20 +195,27 @@ class ARROW_EXPORT MemoryMappedFile : public 
ReadWriteFileInterface {
   // Required by RandomAccessFile, copies memory into out. Not thread-safe
   Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override;
 
-  // Zero copy read. Not thread-safe
+  // Zero copy read, moves position pointer. Not thread-safe
   Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
 
+  // Zero-copy read, leaves position unchanged. Acquires a reader lock
+  // for the duration of slice creation (typically very short). Is thread-safe.
+  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* 
out) override;
+
+  // Raw copy of the memory at specified position. Thread-safe, but
+  // locks out other readers for the duration of memcpy. Prefer the
+  // zero copy method
   Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
                 void* out) override;
 
-  /// Default implementation is thread-safe
-  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* 
out) override;
-
   bool supports_zero_copy() const override;
 
   /// Write data at the current position in the file. Thread-safe
   Status Write(const void* data, int64_t nbytes) override;
 
+  /// Set the size of the map to new_size.
+  Status Resize(int64_t new_size);
+
   /// Write data at a particular position in the file. Thread-safe
   Status WriteAt(int64_t position, const void* data, int64_t nbytes) override;
 
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index a06edca..29f3b09 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -614,6 +614,199 @@ TEST_F(TestMemoryMappedFile, WriteRead) {
   }
 }
 
+TEST_F(TestMemoryMappedFile, WriteResizeRead) {
+  const int64_t buffer_size = 1024;
+  const int reps = 5;
+  std::vector<std::vector<uint8_t>> buffers(reps);
+  for (auto& b : buffers) {
+    b.resize(buffer_size);
+    test::random_bytes(buffer_size, 0, b.data());
+  }
+
+  std::string path = "io-memory-map-write-read-test";
+  std::shared_ptr<MemoryMappedFile> result;
+  ASSERT_OK(InitMemoryMap(buffer_size, path, &result));
+
+  int64_t position = 0;
+  std::shared_ptr<Buffer> out_buffer;
+  for (int i = 0; i < reps; ++i) {
+    if (i != 0) {
+      ASSERT_OK(result->Resize(buffer_size * (i + 1)));
+    }
+    ASSERT_OK(result->Write(buffers[i].data(), buffer_size));
+    ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer));
+
+    ASSERT_EQ(out_buffer->size(), buffer_size);
+    ASSERT_EQ(0, memcmp(out_buffer->data(), buffers[i].data(), buffer_size));
+    out_buffer.reset();
+
+    position += buffer_size;
+  }
+}
+
+TEST_F(TestMemoryMappedFile, ResizeRaisesOnExported) {
+  const int64_t buffer_size = 1024;
+  std::vector<uint8_t> buffer(buffer_size);
+  test::random_bytes(buffer_size, 0, buffer.data());
+
+  std::string path = "io-memory-map-write-read-test";
+  std::shared_ptr<MemoryMappedFile> result;
+  ASSERT_OK(InitMemoryMap(buffer_size, path, &result));
+
+  std::shared_ptr<Buffer> out_buffer1, out_buffer2;
+  ASSERT_OK(result->Write(buffer.data(), buffer_size));
+  ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer1));
+  ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer2));
+  ASSERT_EQ(0, memcmp(out_buffer1->data(), buffer.data(), buffer_size));
+  ASSERT_EQ(0, memcmp(out_buffer2->data(), buffer.data(), buffer_size));
+
+  // attempt resize
+  ASSERT_RAISES(IOError, result->Resize(2 * buffer_size));
+
+  out_buffer1.reset();
+
+  ASSERT_RAISES(IOError, result->Resize(2 * buffer_size));
+
+  out_buffer2.reset();
+
+  ASSERT_OK(result->Resize(2 * buffer_size));
+
+  int64_t map_size;
+  ASSERT_OK(result->GetSize(&map_size));
+  ASSERT_EQ(map_size, 2 * buffer_size);
+
+  int64_t file_size;
+  ASSERT_OK(internal::FileGetSize(result->file_descriptor(), &file_size));
+  ASSERT_EQ(file_size, buffer_size * 2);
+}
+
+TEST_F(TestMemoryMappedFile, WriteReadZeroInitSize) {
+  const int64_t buffer_size = 1024;
+  std::vector<uint8_t> buffer(buffer_size);
+  test::random_bytes(buffer_size, 0, buffer.data());
+
+  std::string path = "io-memory-map-write-read-test";
+  std::shared_ptr<MemoryMappedFile> result;
+  ASSERT_OK(InitMemoryMap(0, path, &result));
+
+  std::shared_ptr<Buffer> out_buffer;
+  ASSERT_OK(result->Resize(buffer_size));
+  ASSERT_OK(result->Write(buffer.data(), buffer_size));
+  ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer));
+  ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
+
+  int64_t map_size;
+  ASSERT_OK(result->GetSize(&map_size));
+  ASSERT_EQ(map_size, buffer_size);
+}
+
+TEST_F(TestMemoryMappedFile, WriteThenShrink) {
+  const int64_t buffer_size = 1024;
+  std::vector<uint8_t> buffer(buffer_size);
+  test::random_bytes(buffer_size, 0, buffer.data());
+
+  std::string path = "io-memory-map-write-read-test";
+  std::shared_ptr<MemoryMappedFile> result;
+  ASSERT_OK(InitMemoryMap(buffer_size * 2, path, &result));
+
+  std::shared_ptr<Buffer> out_buffer;
+  ASSERT_OK(result->Resize(buffer_size));
+  ASSERT_OK(result->Write(buffer.data(), buffer_size));
+  ASSERT_OK(result->Resize(buffer_size / 2));
+
+  ASSERT_OK(result->ReadAt(0, buffer_size / 2, &out_buffer));
+  ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size / 2));
+
+  int64_t map_size;
+  ASSERT_OK(result->GetSize(&map_size));
+  ASSERT_EQ(map_size, buffer_size / 2);
+
+  int64_t file_size;
+  ASSERT_OK(internal::FileGetSize(result->file_descriptor(), &file_size));
+  ASSERT_EQ(file_size, buffer_size / 2);
+}
+
+TEST_F(TestMemoryMappedFile, WriteThenShrinkToHalfThenWrite) {
+  const int64_t buffer_size = 1024;
+  std::vector<uint8_t> buffer(buffer_size);
+  test::random_bytes(buffer_size, 0, buffer.data());
+
+  std::string path = "io-memory-map-write-read-test";
+  std::shared_ptr<MemoryMappedFile> result;
+  ASSERT_OK(InitMemoryMap(buffer_size, path, &result));
+
+  std::shared_ptr<Buffer> out_buffer;
+  ASSERT_OK(result->Write(buffer.data(), buffer_size));
+  ASSERT_OK(result->Resize(buffer_size / 2));
+
+  int64_t position;
+  ASSERT_OK(result->Tell(&position));
+  ASSERT_EQ(position, buffer_size / 2);
+
+  ASSERT_OK(result->ReadAt(0, buffer_size / 2, &out_buffer));
+  ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size / 2));
+  out_buffer.reset();
+
+  // should resume writing directly at the seam
+  ASSERT_OK(result->Resize(buffer_size));
+  ASSERT_OK(result->Write(buffer.data() + buffer_size / 2, buffer_size / 2));
+
+  ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer));
+  ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
+
+  int64_t map_size;
+  ASSERT_OK(result->GetSize(&map_size));
+  ASSERT_EQ(map_size, buffer_size);
+
+  int64_t file_size;
+  ASSERT_OK(internal::FileGetSize(result->file_descriptor(), &file_size));
+  ASSERT_EQ(file_size, buffer_size);
+}
+
+TEST_F(TestMemoryMappedFile, ResizeToZeroThanWrite) {
+  const int64_t buffer_size = 1024;
+  std::vector<uint8_t> buffer(buffer_size);
+  test::random_bytes(buffer_size, 0, buffer.data());
+
+  std::string path = "io-memory-map-write-read-test";
+  std::shared_ptr<MemoryMappedFile> result;
+  ASSERT_OK(InitMemoryMap(buffer_size, path, &result));
+
+  std::shared_ptr<Buffer> out_buffer;
+  // just a sanity check that writing works ook
+  ASSERT_OK(result->Write(buffer.data(), buffer_size));
+  ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer));
+  ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
+  out_buffer.reset();
+
+  ASSERT_OK(result->Resize(0));
+  int64_t mapped_size;
+  ASSERT_OK(result->GetSize(&mapped_size));
+  ASSERT_EQ(mapped_size, 0);
+
+  int64_t position;
+  ASSERT_OK(result->Tell(&position));
+  ASSERT_EQ(position, 0);
+
+  int64_t file_size;
+  ASSERT_OK(internal::FileGetSize(result->file_descriptor(), &file_size));
+  ASSERT_EQ(file_size, 0);
+
+  // provision a vector to the buffer size in case ReadAt decides
+  // to read even though it shouldn't
+  std::vector<uint8_t> should_remain_empty(buffer_size);
+  int64_t bytes_read;
+  ASSERT_OK(result->ReadAt(0, 1, &bytes_read,
+                           
reinterpret_cast<void*>(should_remain_empty.data())));
+  ASSERT_EQ(bytes_read, 0);
+
+  // just a sanity check that writing works ook
+  ASSERT_OK(result->Resize(buffer_size));
+  ASSERT_OK(result->Write(buffer.data(), buffer_size));
+  ASSERT_OK(result->ReadAt(0, buffer_size, &out_buffer));
+  ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
+}
+
 TEST_F(TestMemoryMappedFile, WriteAt) {
   const int64_t buffer_size = 1024;
   std::vector<uint8_t> buffer(buffer_size);
diff --git a/cpp/src/arrow/util/io-util.cc b/cpp/src/arrow/util/io-util.cc
index 2071446..c191e52 100644
--- a/cpp/src/arrow/util/io-util.cc
+++ b/cpp/src/arrow/util/io-util.cc
@@ -61,9 +61,14 @@
 // nothing
 #endif
 
-#ifndef _MSC_VER  // POSIX-like platforms
+#ifdef _WIN32  // Windows
+#include "arrow/io/mman.h"
+#undef Realloc
+#undef Free
+#else  // POSIX-like platforms
+#include <sys/mman.h>
 #include <unistd.h>
-#endif  // _MSC_VER
+#endif
 
 // POSIX systems do not have this
 #ifndef O_BINARY
@@ -231,6 +236,84 @@ Status CreatePipe(int fd[2]) {
 }
 
 //
+// Compatible way to remap a memory map
+//
+
+Status MemoryMapRemap(void* addr, size_t old_size, size_t new_size, int fildes,
+                      void** new_addr) {
+  // should only be called with writable files
+  *new_addr = MAP_FAILED;
+#ifdef _WIN32
+  // flags are ignored on windows
+  HANDLE fm, h;
+
+  if (!UnmapViewOfFile(addr)) {
+    errno = __map_mman_error(GetLastError(), EPERM);
+    std::stringstream ss;
+    ss << "UnmapViewOfFile failed: " << std::strerror(errno);
+    return Status::IOError(ss.str());
+  }
+
+  h = reinterpret_cast<HANDLE>(_get_osfhandle(fildes));
+  if (h == INVALID_HANDLE_VALUE) {
+    errno = __map_mman_error(GetLastError(), EPERM);
+    std::stringstream ss;
+    ss << "cannot get file handle: " << std::strerror(errno);
+    return Status::IOError(ss.str());
+  }
+
+  LONG new_size_low = static_cast<LONG>(new_size & 0xFFFFFFFFL);
+  LONG new_size_high = static_cast<LONG>((new_size >> 32) & 0xFFFFFFFFL);
+
+  SetFilePointer(h, new_size_low, &new_size_high, FILE_BEGIN);
+  SetEndOfFile(h);
+  fm = CreateFileMapping(h, NULL, PAGE_READWRITE, 0, 0, "");
+  if (fm == NULL) {
+    errno = __map_mman_error(GetLastError(), EPERM);
+    std::stringstream ss;
+    ss << "mremap failed: " << std::strerror(errno);
+    return Status::IOError(ss.str());
+  }
+  *new_addr = MapViewOfFile(fm, FILE_MAP_WRITE, 0, 0, new_size);
+  CloseHandle(fm);
+  if (new_addr == NULL) {
+    errno = __map_mman_error(GetLastError(), EPERM);
+    std::stringstream ss;
+    ss << "mremap failed: " << std::strerror(errno);
+    return Status::IOError(ss.str());
+  }
+  return Status::OK();
+#else
+#ifdef __APPLE__
+  // we have to close the mmap first, truncate the file to the new size
+  // and recreate the mmap
+  if (munmap(addr, old_size) == -1) {
+    std::stringstream ss;
+    ss << "munmap failed: " << std::strerror(errno);
+    return Status::IOError(ss.str());
+  }
+  if (ftruncate(fildes, new_size) == -1) {
+    std::stringstream ss;
+    ss << "cannot truncate file: " << std::strerror(errno);
+    return Status::IOError(ss.str());
+  }
+  // we set READ / WRITE flags on the new map, since we could only have
+  // unlarged a RW map in the first place
+  *new_addr = mmap(NULL, new_size, PROT_READ | PROT_WRITE, MAP_SHARED, fildes, 
0);
+  return Status::OK();
+#else
+  if (ftruncate(fildes, new_size) == -1) {
+    std::stringstream ss;
+    ss << "file truncate failed: " << std::strerror(errno);
+    return Status::IOError(ss.str());
+  }
+  *new_addr = mremap(addr, old_size, new_size, MREMAP_MAYMOVE);
+  return Status::OK();
+#endif
+#endif
+}
+
+//
 // Closing files
 //
 
diff --git a/cpp/src/arrow/util/io-util.h b/cpp/src/arrow/util/io-util.h
index ac85b9b..9fbce39 100644
--- a/cpp/src/arrow/util/io-util.h
+++ b/cpp/src/arrow/util/io-util.h
@@ -164,6 +164,9 @@ Status FileClose(int fd);
 
 Status CreatePipe(int fd[2]);
 
+Status MemoryMapRemap(void* addr, size_t old_size, size_t new_size, int fildes,
+                      void** new_addr);
+
 Status GetEnvVar(const char* name, std::string* out);
 Status GetEnvVar(const std::string& name, std::string* out);
 Status SetEnvVar(const char* name, const char* value);
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 657139c..da46075 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -661,6 +661,8 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" 
nogil:
         CStatus Open(const c_string& path, FileMode mode,
                      shared_ptr[CMemoryMappedFile]* file)
 
+        CStatus Resize(int64_t size)
+
         int file_descriptor()
 
     # ----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 8d35d0d..264d1c3 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -548,6 +548,7 @@ cdef class MemoryMappedFile(NativeFile):
     Supports 'r', 'r+w', 'w' modes
     """
     cdef:
+        shared_ptr[CMemoryMappedFile] handle
         object path
 
     @staticmethod
@@ -566,6 +567,7 @@ cdef class MemoryMappedFile(NativeFile):
         result.is_writable = True
         result.wr_file = <shared_ptr[OutputStream]> handle
         result.rd_file = <shared_ptr[RandomAccessFile]> handle
+        result.handle = handle
         result.closed = False
 
         return result
@@ -596,8 +598,19 @@ cdef class MemoryMappedFile(NativeFile):
 
         self.wr_file = <shared_ptr[OutputStream]> handle
         self.rd_file = <shared_ptr[RandomAccessFile]> handle
+        self.handle = handle
         self.closed = False
 
+    def resize(self, new_size):
+        """
+        Resize the map and underlying file.
+
+        Parameters
+        ----------
+        new_size : new size in bytes
+        """
+        check_status(self.handle.get().Resize(new_size))
+
     def fileno(self):
         self._assert_open()
         return self.handle.get().file_descriptor()
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index eafa40c..2b32c2b 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -759,6 +759,26 @@ def test_memory_map_writer(tmpdir):
     assert f.read(3) == b'foo'
 
 
+def test_memory_map_resize(tmpdir):
+    SIZE = 4096
+    arr = np.random.randint(0, 256, size=SIZE).astype(np.uint8)
+    data1 = arr.tobytes()[:(SIZE // 2)]
+    data2 = arr.tobytes()[(SIZE // 2):]
+
+    path = os.path.join(str(tmpdir), guid())
+
+    mmap = pa.create_memory_map(path, SIZE / 2)
+    mmap.write(data1)
+
+    mmap.resize(SIZE)
+    mmap.write(data2)
+
+    mmap.close()
+
+    with open(path, 'rb') as f:
+        assert f.read() == arr.tobytes()
+
+
 def test_memory_zero_length(tmpdir):
     path = os.path.join(str(tmpdir), guid())
     f = open(path, 'wb')

Reply via email to