[ 
https://issues.apache.org/jira/browse/ARROW-2195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16425923#comment-16425923
 ] 

ASF GitHub Bot commented on ARROW-2195:
---------------------------------------

pcmoritz closed pull request #1807: ARROW-2195: [Plasma] Return auto-releasing 
buffers
URL: https://github.com/apache/arrow/pull/1807
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc
index ae411c9b8..04e1f929e 100644
--- a/cpp/src/arrow/gpu/cuda-test.cc
+++ b/cpp/src/arrow/gpu/cuda-test.cc
@@ -80,6 +80,49 @@ TEST_F(TestCudaBuffer, CopyFromHost) {
   AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize);
 }
 
+TEST_F(TestCudaBuffer, FromBuffer) {
+  const int64_t kSize = 1000;
+  // Initialize device buffer with random data
+  std::shared_ptr<PoolBuffer> host_buffer;
+  std::shared_ptr<CudaBuffer> device_buffer;
+  ASSERT_OK(context_->Allocate(kSize, &device_buffer));
+  ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), 
&host_buffer));
+  ASSERT_OK(device_buffer->CopyFromHost(0, host_buffer->data(), 1000));
+  // Sanity check
+  AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize);
+
+  // Get generic Buffer from device buffer
+  std::shared_ptr<Buffer> buffer;
+  std::shared_ptr<CudaBuffer> result;
+  buffer = std::static_pointer_cast<Buffer>(device_buffer);
+  ASSERT_OK(CudaBuffer::FromBuffer(buffer, &result));
+  ASSERT_EQ(result->size(), kSize);
+  ASSERT_EQ(result->is_mutable(), true);
+  ASSERT_EQ(result->mutable_data(), buffer->mutable_data());
+  AssertCudaBufferEquals(*result, host_buffer->data(), kSize);
+
+  buffer = SliceBuffer(device_buffer, 0, kSize);
+  ASSERT_OK(CudaBuffer::FromBuffer(buffer, &result));
+  ASSERT_EQ(result->size(), kSize);
+  ASSERT_EQ(result->is_mutable(), false);
+  AssertCudaBufferEquals(*result, host_buffer->data(), kSize);
+
+  buffer = SliceMutableBuffer(device_buffer, 0, kSize);
+  ASSERT_OK(CudaBuffer::FromBuffer(buffer, &result));
+  ASSERT_EQ(result->size(), kSize);
+  ASSERT_EQ(result->is_mutable(), true);
+  ASSERT_EQ(result->mutable_data(), buffer->mutable_data());
+  AssertCudaBufferEquals(*result, host_buffer->data(), kSize);
+
+  buffer = SliceMutableBuffer(device_buffer, 3, kSize - 10);
+  buffer = SliceMutableBuffer(buffer, 8, kSize - 20);
+  ASSERT_OK(CudaBuffer::FromBuffer(buffer, &result));
+  ASSERT_EQ(result->size(), kSize - 20);
+  ASSERT_EQ(result->is_mutable(), true);
+  ASSERT_EQ(result->mutable_data(), buffer->mutable_data());
+  AssertCudaBufferEquals(*result, host_buffer->data() + 11, kSize - 20);
+}
+
 // IPC only supported on Linux
 #if defined(__linux)
 
diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc
index 183cbcbc6..a24550947 100644
--- a/cpp/src/arrow/gpu/cuda_memory.cc
+++ b/cpp/src/arrow/gpu/cuda_memory.cc
@@ -98,7 +98,34 @@ CudaBuffer::CudaBuffer(const std::shared_ptr<CudaBuffer>& 
parent, const int64_t
     : Buffer(parent, offset, size),
       context_(parent->context()),
       own_data_(false),
-      is_ipc_(false) {}
+      is_ipc_(false) {
+  if (parent->is_mutable()) {
+    is_mutable_ = true;
+    mutable_data_ = const_cast<uint8_t*>(data_);
+  }
+}
+
+Status CudaBuffer::FromBuffer(std::shared_ptr<Buffer> buffer,
+                              std::shared_ptr<CudaBuffer>* out) {
+  int64_t offset = 0, size = buffer->size();
+  bool is_mutable = buffer->is_mutable();
+  // The original CudaBuffer may have been wrapped in another Buffer
+  // (for example through slicing).
+  while (!(*out = std::dynamic_pointer_cast<CudaBuffer>(buffer))) {
+    const std::shared_ptr<Buffer> parent = buffer->parent();
+    if (!parent) {
+      return Status::TypeError("buffer is not backed by a CudaBuffer");
+    }
+    offset += buffer->data() - parent->data();
+    buffer = parent;
+  }
+  // Re-slice to represent the same memory area
+  if (offset != 0 || (*out)->size() != size || !is_mutable) {
+    *out = std::make_shared<CudaBuffer>(*out, offset, size);
+    (*out)->is_mutable_ = is_mutable;
+  }
+  return Status::OK();
+}
 
 Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes,
                               void* out) const {
@@ -129,8 +156,13 @@ CudaHostBuffer::~CudaHostBuffer() {
 // ----------------------------------------------------------------------
 // CudaBufferReader
 
-CudaBufferReader::CudaBufferReader(const std::shared_ptr<CudaBuffer>& buffer)
-    : io::BufferReader(buffer), cuda_buffer_(buffer), 
context_(buffer->context()) {}
+CudaBufferReader::CudaBufferReader(const std::shared_ptr<Buffer>& buffer)
+    : io::BufferReader(buffer) {
+  if (!CudaBuffer::FromBuffer(buffer, &cuda_buffer_).ok()) {
+    throw std::bad_cast();
+  }
+  context_ = cuda_buffer_->context();
+}
 
 CudaBufferReader::~CudaBufferReader() {}
 
diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h
index 3f3dd2f6c..7eb8b884f 100644
--- a/cpp/src/arrow/gpu/cuda_memory.h
+++ b/cpp/src/arrow/gpu/cuda_memory.h
@@ -46,6 +46,15 @@ class ARROW_EXPORT CudaBuffer : public Buffer {
 
   ~CudaBuffer();
 
+  /// \brief Convert back generic buffer into CudaBuffer
+  /// \param[in] buffer buffer to convert
+  /// \param[out] out conversion result
+  /// \return Status
+  ///
+  /// This function returns an error if the buffer isn't backed by GPU memory
+  static Status FromBuffer(std::shared_ptr<Buffer> buffer,
+                           std::shared_ptr<CudaBuffer>* out);
+
   /// \brief Copy memory from GPU device to CPU host
   /// \param[out] out a pre-allocated output buffer
   /// \return Status
@@ -123,7 +132,7 @@ class ARROW_EXPORT CudaIpcMemHandle {
 /// able to do anything other than pointer arithmetic on the returned buffers
 class ARROW_EXPORT CudaBufferReader : public io::BufferReader {
  public:
-  explicit CudaBufferReader(const std::shared_ptr<CudaBuffer>& buffer);
+  explicit CudaBufferReader(const std::shared_ptr<Buffer>& buffer);
   ~CudaBufferReader();
 
   /// \brief Read bytes into pre-allocated host memory
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index fdd42a67a..58f82b348 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -324,6 +324,14 @@ void AssertChunkedEqual(const ChunkedArray& expected, 
const ChunkedArray& actual
   }
 }
 
+void AssertBufferEqual(const Buffer& buffer, const std::vector<uint8_t>& 
expected) {
+  ASSERT_EQ(buffer.size(), expected.size());
+  const uint8_t* buffer_data = buffer.data();
+  for (size_t i = 0; i < expected.size(); ++i) {
+    ASSERT_EQ(buffer_data[i], expected[i]);
+  }
+}
+
 void PrintColumn(const Column& col, std::stringstream* ss) {
   const ChunkedArray& carr = *col.data();
   for (int i = 0; i < carr.num_chunks(); ++i) {
diff --git a/cpp/src/arrow/util/macros.h b/cpp/src/arrow/util/macros.h
index 8b1125d02..d90025600 100644
--- a/cpp/src/arrow/util/macros.h
+++ b/cpp/src/arrow/util/macros.h
@@ -94,4 +94,29 @@
 #endif
 #endif  // !defined(MANUALLY_ALIGNED_STRUCT)
 
+// ----------------------------------------------------------------------
+// From googletest
+// (also in parquet-cpp)
+
+// When you need to test the private or protected members of a class,
+// use the FRIEND_TEST macro to declare your tests as friends of the
+// class.  For example:
+//
+// class MyClass {
+//  private:
+//   void MyMethod();
+//   FRIEND_TEST(MyClassTest, MyMethod);
+// };
+//
+// class MyClassTest : public testing::Test {
+//   // ...
+// };
+//
+// TEST_F(MyClassTest, MyMethod) {
+//   // Can call MyClass::MyMethod() here.
+// }
+
+#define FRIEND_TEST(test_case_name, test_name) \
+  friend class test_case_name##_##test_name##_Test
+
 #endif  // ARROW_UTIL_MACROS_H
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index a9bbd8cc4..9635e70e4 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -72,6 +72,27 @@ constexpr int64_t kThreadPoolSize = 8;
 constexpr int64_t kBytesInMB = 1 << 20;
 static std::vector<std::thread> threadpool_(kThreadPoolSize);
 
+/// A Buffer class that automatically releases the backing plasma object
+/// when it goes out of scope.
+class PlasmaBuffer : public Buffer {
+ public:
+  ~PlasmaBuffer();
+
+  PlasmaBuffer(PlasmaClient* client, const ObjectID& object_id,
+               const std::shared_ptr<Buffer>& buffer)
+      : Buffer(buffer, 0, buffer->size()), client_(client), 
object_id_(object_id) {
+    if (buffer->is_mutable()) {
+      is_mutable_ = true;
+    }
+  }
+
+ private:
+  PlasmaClient* client_;
+  ObjectID object_id_;
+};
+
+PlasmaBuffer::~PlasmaBuffer() { ARROW_UNUSED(client_->Release(object_id_)); }
+
 struct ObjectInUseEntry {
   /// A count of the number of times this client has called 
PlasmaClient::Create
   /// or
@@ -144,6 +165,11 @@ uint8_t* PlasmaClient::lookup_mmapped_file(int 
store_fd_val) {
   return entry->second.pointer;
 }
 
+bool PlasmaClient::IsInUse(const ObjectID& object_id) {
+  const auto elem = objects_in_use_.find(object_id);
+  return (elem != objects_in_use_.end());
+}
+
 void PlasmaClient::increment_object_count(const ObjectID& object_id, 
PlasmaObject* object,
                                           bool is_sealed) {
   // Increment the count of the object to track the fact that it is being used.
@@ -182,7 +208,7 @@ void PlasmaClient::increment_object_count(const ObjectID& 
object_id, PlasmaObjec
 }
 
 Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
-                            uint8_t* metadata, int64_t metadata_size,
+                            const uint8_t* metadata, int64_t metadata_size,
                             std::shared_ptr<Buffer>* data, int device_num) {
   ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with 
size "
                    << data_size << " and metadata size " << metadata_size;
@@ -247,49 +273,45 @@ Status PlasmaClient::Create(const ObjectID& object_id, 
int64_t data_size,
   return Status::OK();
 }
 
-Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
-                         int64_t timeout_ms, ObjectBuffer* object_buffers) {
+Status PlasmaClient::GetBuffers(
+    const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
+    const std::function<std::shared_ptr<Buffer>(
+        const ObjectID&, const std::shared_ptr<Buffer>&)>& wrap_buffer,
+    ObjectBuffer* object_buffers) {
   // Fill out the info for the objects that are already in use locally.
   bool all_present = true;
-  for (int i = 0; i < num_objects; ++i) {
+  for (int64_t i = 0; i < num_objects; ++i) {
     auto object_entry = objects_in_use_.find(object_ids[i]);
     if (object_entry == objects_in_use_.end()) {
       // This object is not currently in use by this client, so we need to send
       // a request to the store.
       all_present = false;
-      // Make a note to ourselves that the object is not present.
-      object_buffers[i].data_size = -1;
     } else {
       // NOTE: If the object is still unsealed, we will deadlock, since we must
       // have been the one who created it.
       ARROW_CHECK(object_entry->second->is_sealed)
           << "Plasma client called get on an unsealed object that it created";
       PlasmaObject* object = &object_entry->second->object;
+      std::shared_ptr<Buffer> physical_buf;
+
       if (object->device_num == 0) {
         uint8_t* data = lookup_mmapped_file(object->store_fd);
-        object_buffers[i].data =
-            std::make_shared<Buffer>(data + object->data_offset, 
object->data_size);
-        object_buffers[i].metadata = std::make_shared<Buffer>(
-            data + object->data_offset + object->data_size, 
object->metadata_size);
+        physical_buf = std::make_shared<Buffer>(
+            data + object->data_offset, object->data_size + 
object->metadata_size);
       } else {
 #ifdef PLASMA_GPU
-        std::shared_ptr<CudaBuffer> gpu_handle =
-            gpu_object_map.find(object_ids[i])->second->ptr;
-        object_buffers[i].data =
-            std::make_shared<CudaBuffer>(gpu_handle, 0, object->data_size);
-        object_buffers[i].metadata = std::make_shared<CudaBuffer>(
-            gpu_handle, object->data_size, object->metadata_size);
+        physical_buf = gpu_object_map.find(object_ids[i])->second->ptr;
 #else
         ARROW_LOG(FATAL) << "Arrow GPU library is not enabled.";
 #endif
       }
-      object_buffers[i].data_size = object->data_size;
-      object_buffers[i].metadata_size = object->metadata_size;
+      physical_buf = wrap_buffer(object_ids[i], physical_buf);
+      object_buffers[i].data = SliceBuffer(physical_buf, 0, object->data_size);
+      object_buffers[i].metadata =
+          SliceBuffer(physical_buf, object->data_size, object->metadata_size);
       object_buffers[i].device_num = object->device_num;
       // Increment the count of the number of instances of this object that 
this
-      // client is using. A call to PlasmaClient::Release is required to
-      // decrement this
-      // count. Cache the reference to the object.
+      // client is using. Cache the reference to the object.
       increment_object_count(object_ids[i], object, true);
     }
   }
@@ -300,7 +322,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, 
int64_t num_objects,
 
   // If we get here, then the objects aren't all currently in use by this
   // client, so we need to send a request to the plasma store.
-  RETURN_NOT_OK(SendGetRequest(store_conn_, object_ids, num_objects, 
timeout_ms));
+  RETURN_NOT_OK(SendGetRequest(store_conn_, &object_ids[0], num_objects, 
timeout_ms));
   std::vector<uint8_t> buffer;
   RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaGetReply, 
&buffer));
   std::vector<ObjectID> received_object_ids(num_objects);
@@ -320,10 +342,10 @@ Status PlasmaClient::Get(const ObjectID* object_ids, 
int64_t num_objects,
     lookup_or_mmap(fd, store_fds[i], mmap_sizes[i]);
   }
 
-  for (int i = 0; i < num_objects; ++i) {
+  for (int64_t i = 0; i < num_objects; ++i) {
     DCHECK(received_object_ids[i] == object_ids[i]);
     object = &object_data[i];
-    if (object_buffers[i].data_size != -1) {
+    if (object_buffers[i].data) {
       // If the object was already in use by the client, then the store should
       // have returned it.
       DCHECK_NE(object->data_size, -1);
@@ -334,56 +356,67 @@ Status PlasmaClient::Get(const ObjectID* object_ids, 
int64_t num_objects,
     // If we are here, the object was not currently in use, so we need to
     // process the reply from the object store.
     if (object->data_size != -1) {
+      std::shared_ptr<Buffer> physical_buf;
       if (object->device_num == 0) {
         uint8_t* data = lookup_mmapped_file(object->store_fd);
-        // Finish filling out the return values.
-        object_buffers[i].data =
-            std::make_shared<Buffer>(data + object->data_offset, 
object->data_size);
-        object_buffers[i].metadata = std::make_shared<Buffer>(
-            data + object->data_offset + object->data_size, 
object->metadata_size);
+        physical_buf = std::make_shared<Buffer>(
+            data + object->data_offset, object->data_size + 
object->metadata_size);
       } else {
 #ifdef PLASMA_GPU
         std::lock_guard<std::mutex> lock(gpu_mutex);
         auto handle = gpu_object_map.find(object_ids[i]);
-        std::shared_ptr<CudaBuffer> gpu_handle;
         if (handle == gpu_object_map.end()) {
           std::shared_ptr<CudaContext> context;
           RETURN_NOT_OK(manager_->GetContext(object->device_num - 1, 
&context));
           GpuProcessHandle* obj_handle = new GpuProcessHandle();
           RETURN_NOT_OK(context->OpenIpcBuffer(*object->ipc_handle, 
&obj_handle->ptr));
           gpu_object_map[object_ids[i]] = obj_handle;
-          gpu_handle = obj_handle->ptr;
+          physical_buf = obj_handle->ptr;
         } else {
           handle->second->client_count += 1;
-          gpu_handle = handle->second->ptr;
+          physical_buf = handle->second->ptr;
         }
-        object_buffers[i].data =
-            std::make_shared<CudaBuffer>(gpu_handle, 0, object->data_size);
-        object_buffers[i].metadata = std::make_shared<CudaBuffer>(
-            gpu_handle, object->data_size, object->metadata_size);
 #else
         ARROW_LOG(FATAL) << "Arrow GPU library is not enabled.";
 #endif
       }
-      object_buffers[i].data_size = object->data_size;
-      object_buffers[i].metadata_size = object->metadata_size;
+      // Finish filling out the return values.
+      physical_buf = wrap_buffer(object_ids[i], physical_buf);
+      object_buffers[i].data = SliceBuffer(physical_buf, 0, object->data_size);
+      object_buffers[i].metadata =
+          SliceBuffer(physical_buf, object->data_size, object->metadata_size);
       object_buffers[i].device_num = object->device_num;
       // Increment the count of the number of instances of this object that 
this
-      // client is using. A call to PlasmaClient::Release is required to
-      // decrement this
-      // count. Cache the reference to the object.
+      // client is using. Cache the reference to the object.
       increment_object_count(received_object_ids[i], object, true);
     } else {
-      // The object was not retrieved. Make sure we already put a -1 here to
-      // indicate that the object was not retrieved. The caller is not
-      // responsible for releasing this object.
-      DCHECK_EQ(object_buffers[i].data_size, -1);
-      object_buffers[i].data_size = -1;
+      // The object was not retrieved.  The caller can detect this condition
+      // by checking the boolean value of the metadata/data buffers.
+      DCHECK(!object_buffers[i].metadata);
+      DCHECK(!object_buffers[i].data);
     }
   }
   return Status::OK();
 }
 
+Status PlasmaClient::Get(const std::vector<ObjectID>& object_ids, int64_t 
timeout_ms,
+                         std::vector<ObjectBuffer>* out) {
+  const auto wrap_buffer = [=](const ObjectID& object_id,
+                               const std::shared_ptr<Buffer>& buffer) {
+    return std::make_shared<PlasmaBuffer>(this, object_id, buffer);
+  };
+  const size_t num_objects = object_ids.size();
+  *out = std::vector<ObjectBuffer>(num_objects);
+  return GetBuffers(&object_ids[0], num_objects, timeout_ms, wrap_buffer, 
&(*out)[0]);
+}
+
+Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
+                         int64_t timeout_ms, ObjectBuffer* out) {
+  const auto wrap_buffer = [](const ObjectID& object_id,
+                              const std::shared_ptr<Buffer>& buffer) { return 
buffer; };
+  return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out);
+}
+
 Status PlasmaClient::UnmapObject(const ObjectID& object_id) {
   auto object_entry = objects_in_use_.find(object_id);
   ARROW_CHECK(object_entry != objects_in_use_.end());
@@ -546,24 +579,26 @@ static inline bool 
compute_object_hash_parallel(XXH64_state_t* hash_state,
 }
 
 static uint64_t compute_object_hash(const ObjectBuffer& obj_buffer) {
+  DCHECK(obj_buffer.metadata);
+  DCHECK(obj_buffer.data);
   XXH64_state_t hash_state;
   if (obj_buffer.device_num != 0) {
     // TODO(wap): Create cuda program to hash data on gpu.
     return 0;
   }
   XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
-  if (obj_buffer.data_size >= kBytesInMB) {
+  if (obj_buffer.data->size() >= kBytesInMB) {
     compute_object_hash_parallel(
         &hash_state, reinterpret_cast<const unsigned 
char*>(obj_buffer.data->data()),
-        obj_buffer.data_size);
+        obj_buffer.data->size());
   } else {
     XXH64_update(&hash_state,
                  reinterpret_cast<const unsigned 
char*>(obj_buffer.data->data()),
-                 obj_buffer.data_size);
+                 obj_buffer.data->size());
   }
   XXH64_update(&hash_state,
                reinterpret_cast<const unsigned 
char*>(obj_buffer.metadata->data()),
-               obj_buffer.metadata_size);
+               obj_buffer.metadata->size());
   return XXH64_digest(&hash_state);
 }
 
@@ -647,17 +682,16 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& 
num_bytes_evicted) {
 Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) {
   // Get the plasma object data. We pass in a timeout of 0 to indicate that
   // the operation should timeout immediately.
-  ObjectBuffer object_buffer;
-  RETURN_NOT_OK(Get(&object_id, 1, 0, &object_buffer));
+  std::vector<ObjectBuffer> object_buffers;
+  RETURN_NOT_OK(Get({object_id}, 0, &object_buffers));
   // If the object was not retrieved, return false.
-  if (object_buffer.data_size == -1) {
+  if (!object_buffers[0].data) {
     return Status::PlasmaObjectNonexistent("Object not found");
   }
   // Compute the hash.
-  uint64_t hash = compute_object_hash(object_buffer);
+  uint64_t hash = compute_object_hash(object_buffers[0]);
   memcpy(digest, &hash, sizeof(hash));
-  // Release the plasma object.
-  return Release(object_id);
+  return Status::OK();
 }
 
 Status PlasmaClient::Subscribe(int* fd) {
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 7c27c474d..dd8175d48 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -25,9 +25,11 @@
 #include <memory>
 #include <string>
 #include <unordered_map>
+#include <vector>
 
 #include "arrow/buffer.h"
 #include "arrow/status.h"
+#include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
 #include "plasma/common.h"
 #ifdef PLASMA_GPU
@@ -48,12 +50,8 @@ constexpr int64_t kL3CacheSizeBytes = 100000000;
 struct ObjectBuffer {
   /// The data buffer.
   std::shared_ptr<Buffer> data;
-  /// The size in bytes of the data object.
-  int64_t data_size;
   /// The metadata buffer.
   std::shared_ptr<Buffer> metadata;
-  /// The metadata size in bytes.
-  int64_t metadata_size;
   /// The device number.
   int device_num;
 };
@@ -121,31 +119,47 @@ class ARROW_EXPORT PlasmaClient {
   ///        device_num = 1 corresponds to GPU0,
   ///        device_num = 2 corresponds to GPU1, etc.
   /// \return The return status.
-  Status Create(const ObjectID& object_id, int64_t data_size, uint8_t* 
metadata,
+  ///
+  /// The returned object must be released once it is done with.  It must also
+  /// be either sealed or aborted.
+  Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* 
metadata,
                 int64_t metadata_size, std::shared_ptr<Buffer>* data, int 
device_num = 0);
+
   /// Get some objects from the Plasma Store. This function will block until 
the
   /// objects have all been created and sealed in the Plasma Store or the
-  /// timeout
-  /// expires. The caller is responsible for releasing any retrieved objects,
-  /// but
-  /// the caller should not release objects that were not retrieved.
+  /// timeout expires.
+  ///
+  /// \param object_ids The IDs of the objects to get.
+  /// \param timeout_ms The amount of time in milliseconds to wait before this
+  ///        request times out. If this value is -1, then no timeout is set.
+  /// \param[out] object_buffers The object results.
+  /// \return The return status.
+  ///
+  /// If an object was not retrieved, the corresponding metadata and data
+  /// fields in the ObjectBuffer structure will evaluate to false.
+  /// Objects are automatically released by the client when their buffers
+  /// get out of scope.
+  Status Get(const std::vector<ObjectID>& object_ids, int64_t timeout_ms,
+             std::vector<ObjectBuffer>* object_buffers);
+
+  /// Deprecated variant of Get() that doesn't automatically release buffers
+  /// when they get out of scope.
   ///
   /// \param object_ids The IDs of the objects to get.
   /// \param num_objects The number of object IDs to get.
   /// \param timeout_ms The amount of time in milliseconds to wait before this
   ///        request times out. If this value is -1, then no timeout is set.
-  /// \param object_buffers An array where the results will be stored. If the
-  /// data
-  ///        size field is -1, then the object was not retrieved.
+  /// \param object_buffers An array where the results will be stored.
   /// \return The return status.
+  ///
+  /// The caller is responsible for releasing any retrieved objects, but it
+  /// should not release objects that were not retrieved.
   Status Get(const ObjectID* object_ids, int64_t num_objects, int64_t 
timeout_ms,
              ObjectBuffer* object_buffers);
 
   /// Tell Plasma that the client no longer needs the object. This should be
-  /// called
-  /// after Get when the client is done with the object. After this call,
-  /// the address returned by Get is no longer valid. This should be called
-  /// once for each call to Get (with the same object ID).
+  /// called after Get() or Create() when the client is done with the object.
+  /// After this call, the buffer returned by Get() is no longer valid.
   ///
   /// \param object_id The ID of the object that is no longer needed.
   /// \return The return status.
@@ -328,6 +342,10 @@ class ARROW_EXPORT PlasmaClient {
   int get_manager_fd() const;
 
  private:
+  FRIEND_TEST(TestPlasmaStore, GetTest);
+  FRIEND_TEST(TestPlasmaStore, LegacyGetTest);
+  FRIEND_TEST(TestPlasmaStore, AbortTest);
+
   /// This is a helper method for unmapping objects for which all references 
have
   /// gone out of scope, either by calling Release or Abort.
   ///
@@ -340,6 +358,14 @@ class ARROW_EXPORT PlasmaClient {
 
   Status PerformRelease(const ObjectID& object_id);
 
+  /// Common helper for Get() variants
+  Status GetBuffers(const ObjectID* object_ids, int64_t num_objects, int64_t 
timeout_ms,
+                    const std::function<std::shared_ptr<Buffer>(
+                        const ObjectID&, const std::shared_ptr<Buffer>&)>& 
wrap_buffer,
+                    ObjectBuffer* object_buffers);
+
+  bool IsInUse(const ObjectID& object_id);
+
   uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size);
 
   uint8_t* lookup_mmapped_file(int store_fd_val);
diff --git a/cpp/src/plasma/test/client_tests.cc 
b/cpp/src/plasma/test/client_tests.cc
index 07e0f9c6a..10e4e4f64 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -24,6 +24,8 @@
 
 #include <random>
 
+#include "arrow/test-util.h"
+
 #include "plasma/client.h"
 #include "plasma/common.h"
 #include "plasma/plasma.h"
@@ -35,6 +37,13 @@ namespace plasma {
 
 std::string test_executable;  // NOLINT
 
+void AssertObjectBufferEqual(const ObjectBuffer& object_buffer,
+                             const std::vector<uint8_t>& metadata,
+                             const std::vector<uint8_t>& data) {
+  arrow::test::AssertBufferEqual(*object_buffer.metadata, metadata);
+  arrow::test::AssertBufferEqual(*object_buffer.data, data);
+}
+
 class TestPlasmaStore : public ::testing::Test {
  public:
   // TODO(pcm): At the moment, stdout of the test gets mixed up with
@@ -55,10 +64,25 @@ class TestPlasmaStore : public ::testing::Test {
     ARROW_CHECK_OK(
         client2_.Connect("/tmp/store" + store_index, "", 
PLASMA_DEFAULT_RELEASE_DELAY));
   }
-  virtual void Finish() {
+  virtual void TearDown() {
     ARROW_CHECK_OK(client_.Disconnect());
     ARROW_CHECK_OK(client2_.Disconnect());
-    system("killall plasma_store &");
+    // Kill all plasma_store processes
+    // TODO should only kill the processes we launched
+    system("killall -9 plasma_store");
+  }
+
+  void CreateObject(PlasmaClient& client, const ObjectID& object_id,
+                    const std::vector<uint8_t>& metadata,
+                    const std::vector<uint8_t>& data) {
+    std::shared_ptr<Buffer> data_buffer;
+    ARROW_CHECK_OK(client.Create(object_id, data.size(), &metadata[0], 
metadata.size(),
+                                 &data_buffer));
+    for (size_t i = 0; i < data.size(); i++) {
+      data_buffer->mutable_data()[i] = data[i];
+    }
+    ARROW_CHECK_OK(client.Seal(object_id));
+    ARROW_CHECK_OK(client.Release(object_id));
   }
 
  protected:
@@ -101,54 +125,87 @@ TEST_F(TestPlasmaStore, ContainsTest) {
 
   // Test for the object being in local Plasma store.
   // First create object.
-  int64_t data_size = 100;
-  uint8_t metadata[] = {5};
-  int64_t metadata_size = sizeof(metadata);
-  std::shared_ptr<Buffer> data;
-  ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, 
&data));
-  ARROW_CHECK_OK(client_.Seal(object_id));
+  std::vector<uint8_t> data(100, 0);
+  CreateObject(client_, object_id, {42}, data);
   // Avoid race condition of Plasma Manager waiting for notification.
-  ObjectBuffer object_buffer;
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  std::vector<ObjectBuffer> object_buffers;
+  ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
   ASSERT_EQ(has_object, true);
 }
 
 TEST_F(TestPlasmaStore, GetTest) {
+  std::vector<ObjectBuffer> object_buffers;
+
   ObjectID object_id = ObjectID::from_random();
-  ObjectBuffer object_buffer;
 
   // Test for object non-existence.
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
-  ASSERT_EQ(object_buffer.data_size, -1);
+  ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
+  ASSERT_EQ(object_buffers.size(), 1);
+  ASSERT_FALSE(object_buffers[0].metadata);
+  ASSERT_FALSE(object_buffers[0].data);
+  EXPECT_FALSE(client_.IsInUse(object_id));
 
   // Test for the object being in local Plasma store.
   // First create object.
-  int64_t data_size = 4;
-  uint8_t metadata[] = {5};
-  int64_t metadata_size = sizeof(metadata);
-  std::shared_ptr<Buffer> data_buffer;
-  uint8_t* data;
-  ARROW_CHECK_OK(
-      client_.Create(object_id, data_size, metadata, metadata_size, 
&data_buffer));
-  data = data_buffer->mutable_data();
-  for (int64_t i = 0; i < data_size; i++) {
-    data[i] = static_cast<uint8_t>(i % 4);
+  std::vector<uint8_t> data = {3, 5, 6, 7, 9};
+  CreateObject(client_, object_id, {42}, data);
+  ARROW_CHECK_OK(client_.FlushReleaseHistory());
+  EXPECT_FALSE(client_.IsInUse(object_id));
+
+  object_buffers.clear();
+  ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
+  ASSERT_EQ(object_buffers.size(), 1);
+  ASSERT_EQ(object_buffers[0].device_num, 0);
+  AssertObjectBufferEqual(object_buffers[0], {42}, {3, 5, 6, 7, 9});
+
+  // Metadata keeps object in use
+  {
+    auto metadata = object_buffers[0].metadata;
+    object_buffers.clear();
+    ::arrow::test::AssertBufferEqual(*metadata, {42});
+    ARROW_CHECK_OK(client_.FlushReleaseHistory());
+    EXPECT_TRUE(client_.IsInUse(object_id));
   }
-  ARROW_CHECK_OK(client_.Seal(object_id));
+  // Object is automatically released
+  ARROW_CHECK_OK(client_.FlushReleaseHistory());
+  EXPECT_FALSE(client_.IsInUse(object_id));
+}
 
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
-  const uint8_t* object_data = object_buffer.data->data();
-  for (int64_t i = 0; i < data_size; i++) {
-    ASSERT_EQ(data[i], object_data[i]);
+TEST_F(TestPlasmaStore, LegacyGetTest) {
+  // Test for old non-releasing Get() variant
+  ObjectID object_id = ObjectID::from_random();
+  {
+    ObjectBuffer object_buffer;
+
+    // Test for object non-existence.
+    ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
+    ASSERT_FALSE(object_buffer.metadata);
+    ASSERT_FALSE(object_buffer.data);
+    EXPECT_FALSE(client_.IsInUse(object_id));
+
+    // First create object.
+    std::vector<uint8_t> data = {3, 5, 6, 7, 9};
+    CreateObject(client_, object_id, {42}, data);
+    ARROW_CHECK_OK(client_.FlushReleaseHistory());
+    EXPECT_FALSE(client_.IsInUse(object_id));
+
+    ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+    AssertObjectBufferEqual(object_buffer, {42}, {3, 5, 6, 7, 9});
   }
+  // Object needs releasing manually
+  ARROW_CHECK_OK(client_.FlushReleaseHistory());
+  EXPECT_TRUE(client_.IsInUse(object_id));
+  ARROW_CHECK_OK(client_.Release(object_id));
+  ARROW_CHECK_OK(client_.FlushReleaseHistory());
+  EXPECT_FALSE(client_.IsInUse(object_id));
 }
 
 TEST_F(TestPlasmaStore, MultipleGetTest) {
   ObjectID object_id1 = ObjectID::from_random();
   ObjectID object_id2 = ObjectID::from_random();
-  ObjectID object_ids[2] = {object_id1, object_id2};
-  ObjectBuffer object_buffer[2];
+  std::vector<ObjectID> object_ids = {object_id1, object_id2};
+  std::vector<ObjectBuffer> object_buffers;
 
   int64_t data_size = 4;
   uint8_t metadata[] = {5};
@@ -162,18 +219,18 @@ TEST_F(TestPlasmaStore, MultipleGetTest) {
   data->mutable_data()[0] = 2;
   ARROW_CHECK_OK(client_.Seal(object_id2));
 
-  ARROW_CHECK_OK(client_.Get(object_ids, 2, -1, object_buffer));
-  ASSERT_EQ(object_buffer[0].data->data()[0], 1);
-  ASSERT_EQ(object_buffer[1].data->data()[0], 2);
+  ARROW_CHECK_OK(client_.Get(object_ids, -1, &object_buffers));
+  ASSERT_EQ(object_buffers[0].data->data()[0], 1);
+  ASSERT_EQ(object_buffers[1].data->data()[0], 2);
 }
 
 TEST_F(TestPlasmaStore, AbortTest) {
   ObjectID object_id = ObjectID::from_random();
-  ObjectBuffer object_buffer;
+  std::vector<ObjectBuffer> object_buffers;
 
   // Test for object non-existence.
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
-  ASSERT_EQ(object_buffer.data_size, -1);
+  ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
+  ASSERT_FALSE(object_buffers[0].data);
 
   // Test object abort.
   // First create object.
@@ -193,30 +250,29 @@ TEST_F(TestPlasmaStore, AbortTest) {
   ASSERT_TRUE(status.IsInvalid());
   // Release, then abort.
   ARROW_CHECK_OK(client_.Release(object_id));
+  ARROW_CHECK_OK(client_.FlushReleaseHistory());
+  EXPECT_TRUE(client_.IsInUse(object_id));
+
   ARROW_CHECK_OK(client_.Abort(object_id));
+  ARROW_CHECK_OK(client_.FlushReleaseHistory());
+  EXPECT_FALSE(client_.IsInUse(object_id));
 
   // Test for object non-existence after the abort.
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
-  ASSERT_EQ(object_buffer.data_size, -1);
+  ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
+  ASSERT_FALSE(object_buffers[0].data);
 
   // Create the object successfully this time.
-  ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, 
&data));
-  data_ptr = data->mutable_data();
-  for (int64_t i = 0; i < data_size; i++) {
-    data_ptr[i] = static_cast<uint8_t>(i % 4);
-  }
-  ARROW_CHECK_OK(client_.Seal(object_id));
+  CreateObject(client_, object_id, {42, 43}, {1, 2, 3, 4, 5});
 
   // Test that we can get the object.
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
-  const uint8_t* buffer_ptr = object_buffer.data->data();
-  for (int64_t i = 0; i < data_size; i++) {
-    ASSERT_EQ(data_ptr[i], buffer_ptr[i]);
-  }
+  ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
+  AssertObjectBufferEqual(object_buffers[0], {42, 43}, {1, 2, 3, 4, 5});
+  ARROW_CHECK_OK(client_.Release(object_id));
 }
 
 TEST_F(TestPlasmaStore, MultipleClientTest) {
   ObjectID object_id = ObjectID::from_random();
+  std::vector<ObjectBuffer> object_buffers;
 
   // Test for object non-existence on the first client.
   bool has_object;
@@ -232,8 +288,8 @@ TEST_F(TestPlasmaStore, MultipleClientTest) {
   ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, 
metadata_size, &data));
   ARROW_CHECK_OK(client2_.Seal(object_id));
   // Test that the first client can get the object.
-  ObjectBuffer object_buffer;
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
+  ASSERT_TRUE(object_buffers[0].data);
   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
   ASSERT_EQ(has_object, true);
 
@@ -245,7 +301,8 @@ TEST_F(TestPlasmaStore, MultipleClientTest) {
   ARROW_CHECK_OK(client_.Disconnect());
   // Test that the second client can seal and get the created object.
   ARROW_CHECK_OK(client2_.Seal(object_id));
-  ARROW_CHECK_OK(client2_.Get(&object_id, 1, -1, &object_buffer));
+  ARROW_CHECK_OK(client2_.Get({object_id}, -1, &object_buffers));
+  ASSERT_TRUE(object_buffers[0].data);
   ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
   ASSERT_EQ(has_object, true);
 }
@@ -308,42 +365,66 @@ using arrow::gpu::CudaBuffer;
 using arrow::gpu::CudaBufferReader;
 using arrow::gpu::CudaBufferWriter;
 
+namespace {
+
+void AssertCudaRead(const std::shared_ptr<Buffer>& buffer,
+                    const std::vector<uint8_t>& expected_data) {
+  std::shared_ptr<CudaBuffer> gpu_buffer;
+  const size_t data_size = expected_data.size();
+
+  ARROW_CHECK_OK(CudaBuffer::FromBuffer(buffer, &gpu_buffer));
+  ASSERT_EQ(gpu_buffer->size(), data_size);
+
+  CudaBufferReader reader(gpu_buffer);
+  uint8_t read_data[data_size];
+  int64_t read_data_size;
+  ARROW_CHECK_OK(reader.Read(data_size, &read_data_size, read_data));
+  ASSERT_EQ(read_data_size, data_size);
+
+  for (size_t i = 0; i < data_size; i++) {
+    ASSERT_EQ(read_data[i], expected_data[i]);
+  }
+}
+
+}  // namespace
+
 TEST_F(TestPlasmaStore, GetGPUTest) {
   ObjectID object_id = ObjectID::from_random();
-  ObjectBuffer object_buffer;
+  std::vector<ObjectBuffer> object_buffers;
 
   // Test for object non-existence.
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
-  ASSERT_EQ(object_buffer.data_size, -1);
+  ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers));
+  ASSERT_EQ(object_buffers.size(), 1);
+  ASSERT_FALSE(object_buffers[0].data);
 
   // Test for the object being in local Plasma store.
   // First create object.
   uint8_t data[] = {4, 5, 3, 1};
   int64_t data_size = sizeof(data);
-  uint8_t metadata[] = {5};
+  uint8_t metadata[] = {42};
   int64_t metadata_size = sizeof(metadata);
   std::shared_ptr<Buffer> data_buffer;
   std::shared_ptr<CudaBuffer> gpu_buffer;
   ARROW_CHECK_OK(
       client_.Create(object_id, data_size, metadata, metadata_size, 
&data_buffer, 1));
-  gpu_buffer = std::dynamic_pointer_cast<CudaBuffer>(data_buffer);
+  ARROW_CHECK_OK(CudaBuffer::FromBuffer(data_buffer, &gpu_buffer));
   CudaBufferWriter writer(gpu_buffer);
-  writer.Write(data, data_size);
+  ARROW_CHECK_OK(writer.Write(data, data_size));
   ARROW_CHECK_OK(client_.Seal(object_id));
 
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
-  gpu_buffer = std::dynamic_pointer_cast<CudaBuffer>(object_buffer.data);
-  CudaBufferReader reader(gpu_buffer);
-  uint8_t read_data[data_size];
-  int64_t read_data_size;
-  reader.Read(data_size, &read_data_size, read_data);
-  for (int64_t i = 0; i < data_size; i++) {
-    ASSERT_EQ(data[i], read_data[i]);
-  }
+  object_buffers.clear();
+  ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
+  ASSERT_EQ(object_buffers.size(), 1);
+  ASSERT_EQ(object_buffers[0].device_num, 1);
+  // Check data
+  AssertCudaRead(object_buffers[0].data, {4, 5, 3, 1});
+  // Check metadata
+  AssertCudaRead(object_buffers[0].metadata, {42});
 }
 
 TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
   ObjectID object_id = ObjectID::from_random();
+  std::vector<ObjectBuffer> object_buffers;
 
   // Test for object non-existence on the first client.
   bool has_object;
@@ -360,8 +441,7 @@ TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
       client2_.Create(object_id, data_size, metadata, metadata_size, &data, 
1));
   ARROW_CHECK_OK(client2_.Seal(object_id));
   // Test that the first client can get the object.
-  ObjectBuffer object_buffer;
-  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers));
   ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
   ASSERT_EQ(has_object, true);
 
@@ -374,12 +454,16 @@ TEST_F(TestPlasmaStore, MultipleClientGPUTest) {
   ARROW_CHECK_OK(client_.Disconnect());
   // Test that the second client can seal and get the created object.
   ARROW_CHECK_OK(client2_.Seal(object_id));
-  ARROW_CHECK_OK(client2_.Get(&object_id, 1, -1, &object_buffer));
+  object_buffers.clear();
   ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
   ASSERT_EQ(has_object, true);
+  ARROW_CHECK_OK(client2_.Get({object_id}, -1, &object_buffers));
+  ASSERT_EQ(object_buffers.size(), 1);
+  ASSERT_EQ(object_buffers[0].device_num, 1);
+  AssertCudaRead(object_buffers[0].metadata, {5});
 }
 
-#endif
+#endif  // PLASMA_GPU
 
 }  // namespace plasma
 
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index 32f6d189d..b99e2b0ef 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -29,7 +29,7 @@ from cpython.pycapsule cimport *
 import collections
 import pyarrow
 
-from pyarrow.lib cimport Buffer, NativeFile, check_status
+from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer
 from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer,
                                         CFixedSizeBufferWriter, CStatus)
 
@@ -83,8 +83,8 @@ cdef extern from "plasma/client.h" nogil:
                        const uint8_t* metadata, int64_t metadata_size,
                        const shared_ptr[CBuffer]* data)
 
-        CStatus Get(const CUniqueID* object_ids, int64_t num_objects,
-                    int64_t timeout_ms, CObjectBuffer* object_buffers)
+        CStatus Get(const c_vector[CUniqueID] object_ids, int64_t timeout_ms,
+                    c_vector[CObjectBuffer]* object_buffers)
 
         CStatus Seal(const CUniqueID& object_id)
 
@@ -117,9 +117,7 @@ cdef extern from "plasma/client.h" nogil:
 cdef extern from "plasma/client.h" nogil:
 
     cdef struct CObjectBuffer" plasma::ObjectBuffer":
-        int64_t data_size
         shared_ptr[CBuffer] data
-        int64_t metadata_size
         shared_ptr[CBuffer] metadata
 
 
@@ -239,21 +237,16 @@ cdef class PlasmaClient:
 
     cdef _get_object_buffers(self, object_ids, int64_t timeout_ms,
                              c_vector[CObjectBuffer]* result):
-        cdef c_vector[CUniqueID] ids
-        cdef ObjectID object_id
+        cdef:
+            c_vector[CUniqueID] ids
+            ObjectID object_id
+
         for object_id in object_ids:
             ids.push_back(object_id.data)
-        result[0].resize(ids.size())
         with nogil:
-            check_status(self.client.get().Get(ids.data(), ids.size(),
-                         timeout_ms, result[0].data()))
-
-    cdef _make_plasma_buffer(self, ObjectID object_id,
-                             shared_ptr[CBuffer] buffer, int64_t size):
-        result = PlasmaBuffer(object_id, self)
-        result.init(buffer)
-        return result
+            check_status(self.client.get().Get(ids, timeout_ms, result))
 
+    # XXX C++ API should instead expose some kind of CreateAuto()
     cdef _make_mutable_plasma_buffer(self, ObjectID object_id, uint8_t* data,
                                      int64_t size):
         cdef shared_ptr[CBuffer] buffer
@@ -332,11 +325,8 @@ cdef class PlasmaClient:
         self._get_object_buffers(object_ids, timeout_ms, &object_buffers)
         result = []
         for i in range(object_buffers.size()):
-            if object_buffers[i].data_size != -1:
-                result.append(
-                    self._make_plasma_buffer(object_ids[i],
-                                             object_buffers[i].data,
-                                             object_buffers[i].data_size))
+            if object_buffers[i].data.get() != nullptr:
+                result.append(pyarrow_wrap_buffer(object_buffers[i].data))
             else:
                 result.append(None)
         return result
@@ -367,10 +357,10 @@ cdef class PlasmaClient:
         self._get_object_buffers(object_ids, timeout_ms, &object_buffers)
         result = []
         for i in range(object_buffers.size()):
-            result.append(
-                self._make_plasma_buffer(object_ids[i],
-                                         object_buffers[i].metadata,
-                                         object_buffers[i].metadata_size))
+            if object_buffers[i].metadata.get() != nullptr:
+                result.append(pyarrow_wrap_buffer(object_buffers[i].metadata))
+            else:
+                result.append(None)
         return result
 
     def put(self, object value, ObjectID object_id=None, int memcopy_threads=6,
diff --git a/python/pyarrow/tests/test_plasma.py 
b/python/pyarrow/tests/test_plasma.py
index 1df213dec..435884199 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -190,7 +190,6 @@ def setup_method(self, test_method):
         plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
         # Connect to Plasma.
         self.plasma_client = plasma.connect(plasma_store_name, "", 64)
-        # For the eviction test
         self.plasma_client2 = plasma.connect(plasma_store_name, "", 0)
 
     def teardown_method(self, test_method):
@@ -311,6 +310,36 @@ def test_get(self):
                 else:
                     assert results[i] is None
 
+    def test_buffer_lifetime(self):
+        # ARROW-2195
+        arr = pa.array([1, 12, 23, 3, 34], pa.int32())
+        batch = pa.RecordBatch.from_arrays([arr], ['field1'])
+
+        # Serialize RecordBatch into Plasma store
+        sink = pa.MockOutputStream()
+        writer = pa.RecordBatchStreamWriter(sink, batch.schema)
+        writer.write_batch(batch)
+        writer.close()
+
+        object_id = random_object_id()
+        data_buffer = self.plasma_client.create(object_id, sink.size())
+        stream = pa.FixedSizeBufferWriter(data_buffer)
+        writer = pa.RecordBatchStreamWriter(stream, batch.schema)
+        writer.write_batch(batch)
+        writer.close()
+        self.plasma_client.seal(object_id)
+        del data_buffer
+
+        # Unserialize RecordBatch from Plasma store
+        [data_buffer] = self.plasma_client2.get_buffers([object_id])
+        reader = pa.RecordBatchStreamReader(data_buffer)
+        read_batch = reader.read_next_batch()
+        # Lose reference to returned buffer.  The RecordBatch must still
+        # be backed by valid memory.
+        del data_buffer, reader
+
+        assert read_batch.equals(batch)
+
     def test_put_and_get(self):
         for value in [["hello", "world", 3, 1.0], None, "hello"]:
             object_id = self.plasma_client.put(value)
@@ -770,15 +799,15 @@ def test_use_one_memory_mapped_file(self):
         # them go out of scope.
         for _ in range(100):
             create_object(
-                self.plasma_client,
+                self.plasma_client2,
                 np.random.randint(1, DEFAULT_PLASMA_STORE_MEMORY // 20), 0)
         # Create large objects that require the full object store size, and
         # verify that they fit.
         for _ in range(2):
-            create_object(self.plasma_client, DEFAULT_PLASMA_STORE_MEMORY, 0)
+            create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0)
         # Verify that an object that is too large does not fit.
         with pytest.raises(pa.lib.PlasmaStoreFull):
-            create_object(self.plasma_client, DEFAULT_PLASMA_STORE_MEMORY + 1,
+            create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY + 1,
                           0)
 
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [Plasma] Segfault when retrieving RecordBatch from plasma store
> ---------------------------------------------------------------
>
>                 Key: ARROW-2195
>                 URL: https://issues.apache.org/jira/browse/ARROW-2195
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Plasma (C++), Python
>    Affects Versions: 0.8.0
>            Reporter: Philipp Moritz
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.10.0
>
>
> It can be reproduced with the following script:
> {code:python}
> import pyarrow as pa
> import pyarrow.plasma as plasma
> def retrieve1():
>     client = plasma.connect('test', "", 0)
>     key = "keynumber1keynumber1"
>     pid = plasma.ObjectID(bytearray(key,'UTF-8'))
>     [buff] = client .get_buffers([pid])
>     batch = pa.RecordBatchStreamReader(buff).read_next_batch()
>     print(batch)
>     print(batch.schema)
>     print(batch[0])
>     return batch
> client = plasma.connect('test', "", 0)
> test1 = [1, 12, 23, 3, 21, 34]
> test1 = pa.array(test1, pa.int32())
> batch = pa.RecordBatch.from_arrays([test1], ['FIELD1'])
> key = "keynumber1keynumber1"
> pid = plasma.ObjectID(bytearray(key,'UTF-8'))
> sink = pa.MockOutputStream()
> stream_writer = pa.RecordBatchStreamWriter(sink, batch.schema)
> stream_writer.write_batch(batch)
> stream_writer.close()
> bff = client.create(pid, sink.size())
> stream = pa.FixedSizeBufferWriter(bff)
> writer = pa.RecordBatchStreamWriter(stream, batch.schema)
> writer.write_batch(batch)
> client.seal(pid)
> batch = retrieve1()
> print(batch)
> print(batch.schema)
> print(batch[0])
> {code}
>  
> Preliminary backtrace:
>  
> {code}
> CESS (code=1, address=0x111138158)
>     frame #0: 0x000000010e6457fc 
> lib.so`__pyx_pw_7pyarrow_3lib_10Int32Value_1as_py(_object*, _object*) + 28
> lib.so`__pyx_pw_7pyarrow_3lib_10Int32Value_1as_py:
> ->  0x10e6457fc <+28>: movslq (%rdx,%rcx,4), %rdi
>     0x10e645800 <+32>: callq  0x10e698170               ; symbol stub for: 
> PyInt_FromLong
>     0x10e645805 <+37>: testq  %rax, %rax
>     0x10e645808 <+40>: je     0x10e64580c               ; <+44>
> (lldb) bt
>  * thread #1: tid = 0xf1378e, 0x000000010e6457fc 
> lib.so`__pyx_pw_7pyarrow_3lib_10Int32Value_1as_py(_object*, _object*) + 28, 
> queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=1, 
> address=0x111138158)
>   * frame #0: 0x000000010e6457fc 
> lib.so`__pyx_pw_7pyarrow_3lib_10Int32Value_1as_py(_object*, _object*) + 28
>     frame #1: 0x000000010e5ccd35 lib.so`__Pyx_PyObject_CallNoArg(_object*) + 
> 133
>     frame #2: 0x000000010e613b25 
> lib.so`__pyx_pw_7pyarrow_3lib_10ArrayValue_3__repr__(_object*) + 933
>     frame #3: 0x000000010c2f83bc libpython2.7.dylib`PyObject_Repr + 60
>     frame #4: 0x000000010c35f651 libpython2.7.dylib`PyEval_EvalFrameEx + 22305
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to