[ 
https://issues.apache.org/jira/browse/ARROW-1947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16306574#comment-16306574
 ] 

ASF GitHub Bot commented on ARROW-1947:
---------------------------------------

pcmoritz closed pull request #1444: ARROW-1947: [Plasma] Change Client Create 
and Get to use Buffers
URL: https://github.com/apache/arrow/pull/1444
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 9bbafac38..0dd1c44d7 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -40,6 +40,7 @@
 #include <thread>
 #include <vector>
 
+#include "arrow/buffer.h"
 #include "plasma/common.h"
 #include "plasma/fling.h"
 #include "plasma/io.h"
@@ -53,6 +54,8 @@
 
 namespace plasma {
 
+using arrow::MutableBuffer;
+
 // Number of threads used for memcopy and hash computations.
 constexpr int64_t kThreadPoolSize = 8;
 constexpr int64_t kBytesInMB = 1 << 20;
@@ -145,7 +148,8 @@ void PlasmaClient::increment_object_count(const ObjectID& 
object_id, PlasmaObjec
 }
 
 Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
-                            uint8_t* metadata, int64_t metadata_size, 
uint8_t** data) {
+                            uint8_t* metadata, int64_t metadata_size,
+                            std::shared_ptr<Buffer>* data) {
   ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with 
size "
                    << data_size << " and metadata size " << metadata_size;
   RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, data_size, 
metadata_size));
@@ -162,14 +166,16 @@ Status PlasmaClient::Create(const ObjectID& object_id, 
int64_t data_size,
   ARROW_CHECK(object.metadata_size == metadata_size);
   // The metadata should come right after the data.
   ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
-  *data = lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) +
-          object.data_offset;
+  *data = std::make_shared<MutableBuffer>(
+      lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) +
+          object.data_offset,
+      data_size);
   // If plasma_create is being called from a transfer, then we will not copy 
the
   // metadata here. The metadata will be written along with the data streamed
   // from the transfer.
   if (metadata != NULL) {
     // Copy the metadata to the buffer.
-    memcpy(*data + object.data_size, metadata, metadata_size);
+    memcpy((*data)->mutable_data() + object.data_size, metadata, 
metadata_size);
   }
   // Increment the count of the number of instances of this object that this
   // client is using. A call to PlasmaClient::Release is required to decrement
@@ -203,10 +209,12 @@ Status PlasmaClient::Get(const ObjectID* object_ids, 
int64_t num_objects,
       ARROW_CHECK(object_entry->second->is_sealed)
           << "Plasma client called get on an unsealed object that it created";
       PlasmaObject* object = &object_entry->second->object;
-      object_buffers[i].data = lookup_mmapped_file(object->handle.store_fd);
-      object_buffers[i].data = object_buffers[i].data + object->data_offset;
+      uint8_t* data = lookup_mmapped_file(object->handle.store_fd);
+      object_buffers[i].data =
+          std::make_shared<Buffer>(data + object->data_offset, 
object->data_size);
+      object_buffers[i].metadata = std::make_shared<Buffer>(
+          data + object->data_offset + object->data_size, 
object->metadata_size);
       object_buffers[i].data_size = object->data_size;
-      object_buffers[i].metadata = object_buffers[i].data + object->data_size;
       object_buffers[i].metadata_size = object->metadata_size;
       // Increment the count of the number of instances of this object that 
this
       // client is using. A call to PlasmaClient::Release is required to
@@ -254,13 +262,15 @@ Status PlasmaClient::Get(const ObjectID* object_ids, 
int64_t num_objects,
       // The object was retrieved. The user will be responsible for releasing
       // this object.
       int fd = recv_fd(store_conn_);
-      ARROW_CHECK(fd >= 0);
-      object_buffers[i].data =
+      uint8_t* data =
           lookup_or_mmap(fd, object->handle.store_fd, 
object->handle.mmap_size);
+      ARROW_CHECK(fd >= 0);
       // Finish filling out the return values.
-      object_buffers[i].data = object_buffers[i].data + object->data_offset;
+      object_buffers[i].data =
+          std::make_shared<Buffer>(data + object->data_offset, 
object->data_size);
+      object_buffers[i].metadata = std::make_shared<Buffer>(
+          data + object->data_offset + object->data_size, 
object->metadata_size);
       object_buffers[i].data_size = object->data_size;
-      object_buffers[i].metadata = object_buffers[i].data + object->data_size;
       object_buffers[i].metadata_size = object->metadata_size;
       // Increment the count of the number of instances of this object that 
this
       // client is using. A call to PlasmaClient::Release is required to
@@ -438,14 +448,16 @@ static uint64_t compute_object_hash(const ObjectBuffer& 
obj_buffer) {
   XXH64_state_t hash_state;
   XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
   if (obj_buffer.data_size >= kBytesInMB) {
-    compute_object_hash_parallel(&hash_state,
-                                 reinterpret_cast<unsigned 
char*>(obj_buffer.data),
-                                 obj_buffer.data_size);
+    compute_object_hash_parallel(
+        &hash_state, reinterpret_cast<const unsigned 
char*>(obj_buffer.data->data()),
+        obj_buffer.data_size);
   } else {
-    XXH64_update(&hash_state, reinterpret_cast<unsigned 
char*>(obj_buffer.data),
+    XXH64_update(&hash_state,
+                 reinterpret_cast<const unsigned 
char*>(obj_buffer.data->data()),
                  obj_buffer.data_size);
   }
-  XXH64_update(&hash_state, reinterpret_cast<unsigned 
char*>(obj_buffer.metadata),
+  XXH64_update(&hash_state,
+               reinterpret_cast<const unsigned 
char*>(obj_buffer.metadata->data()),
                obj_buffer.metadata_size);
   return XXH64_digest(&hash_state);
 }
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index cfd11c16d..78793f1a7 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -26,11 +26,13 @@
 #include <string>
 #include <unordered_map>
 
+#include "arrow/buffer.h"
 #include "arrow/status.h"
 #include "arrow/util/visibility.h"
 #include "plasma/common.h"
 
 using arrow::Status;
+using arrow::Buffer;
 
 namespace plasma {
 
@@ -41,14 +43,16 @@ constexpr int64_t kL3CacheSizeBytes = 100000000;
 
 /// Object buffer data structure.
 struct ObjectBuffer {
+  /// The data buffer.
+  std::shared_ptr<Buffer> data;
   /// The size in bytes of the data object.
   int64_t data_size;
-  /// The address of the data object.
-  uint8_t* data;
+  /// The metadata buffer.
+  std::shared_ptr<Buffer> metadata;
   /// The metadata size in bytes.
   int64_t metadata_size;
-  /// The address of the metadata.
-  uint8_t* metadata;
+  /// The device number.
+  int device_num;
 };
 
 /// Configuration options for the plasma client.
@@ -107,11 +111,11 @@ class ARROW_EXPORT PlasmaClient {
   ///        should be NULL.
   /// \param metadata_size The size in bytes of the metadata. If there is no
   ///        metadata, this should be 0.
-  /// \param data The address of the newly created object will be written here.
+  /// \param data A buffer containing the address of the newly created object
+  ///        will be written here.
   /// \return The return status.
   Status Create(const ObjectID& object_id, int64_t data_size, uint8_t* 
metadata,
-                int64_t metadata_size, uint8_t** data);
-
+                int64_t metadata_size, std::shared_ptr<Buffer>* data);
   /// Get some objects from the Plasma Store. This function will block until 
the
   /// objects have all been created and sealed in the Plasma Store or the
   /// timeout
diff --git a/cpp/src/plasma/test/client_tests.cc 
b/cpp/src/plasma/test/client_tests.cc
index f44ed2510..5cd3063bb 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -71,7 +71,7 @@ TEST_F(TestPlasmaStore, ContainsTest) {
   int64_t data_size = 100;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
-  uint8_t* data;
+  std::shared_ptr<Buffer> data;
   ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, 
&data));
   ARROW_CHECK_OK(client_.Seal(object_id));
   // Avoid race condition of Plasma Manager waiting for notification.
@@ -94,16 +94,20 @@ TEST_F(TestPlasmaStore, GetTest) {
   int64_t data_size = 4;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
+  std::shared_ptr<Buffer> data_buffer;
   uint8_t* data;
-  ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, 
&data));
+  ARROW_CHECK_OK(
+      client_.Create(object_id, data_size, metadata, metadata_size, 
&data_buffer));
+  data = data_buffer->mutable_data();
   for (int64_t i = 0; i < data_size; i++) {
     data[i] = static_cast<uint8_t>(i % 4);
   }
   ARROW_CHECK_OK(client_.Seal(object_id));
 
   ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  const uint8_t* object_data = object_buffer.data->data();
   for (int64_t i = 0; i < data_size; i++) {
-    ASSERT_EQ(data[i], object_buffer.data[i]);
+    ASSERT_EQ(data[i], object_data[i]);
   }
 }
 
@@ -116,18 +120,18 @@ TEST_F(TestPlasmaStore, MultipleGetTest) {
   int64_t data_size = 4;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
-  uint8_t* data;
+  std::shared_ptr<Buffer> data;
   ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, 
metadata_size, &data));
-  data[0] = 1;
+  data->mutable_data()[0] = 1;
   ARROW_CHECK_OK(client_.Seal(object_id1));
 
   ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, 
metadata_size, &data));
-  data[0] = 2;
+  data->mutable_data()[0] = 2;
   ARROW_CHECK_OK(client_.Seal(object_id2));
 
   ARROW_CHECK_OK(client_.Get(object_ids, 2, -1, object_buffer));
-  ASSERT_EQ(object_buffer[0].data[0], 1);
-  ASSERT_EQ(object_buffer[1].data[0], 2);
+  ASSERT_EQ(object_buffer[0].data->data()[0], 1);
+  ASSERT_EQ(object_buffer[1].data->data()[0], 2);
 }
 
 TEST_F(TestPlasmaStore, AbortTest) {
@@ -143,11 +147,13 @@ TEST_F(TestPlasmaStore, AbortTest) {
   int64_t data_size = 4;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
-  uint8_t* data;
+  std::shared_ptr<Buffer> data;
+  uint8_t* data_ptr;
   ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, 
&data));
+  data_ptr = data->mutable_data();
   // Write some data.
   for (int64_t i = 0; i < data_size / 2; i++) {
-    data[i] = static_cast<uint8_t>(i % 4);
+    data_ptr[i] = static_cast<uint8_t>(i % 4);
   }
   // Attempt to abort. Test that this fails before the first release.
   Status status = client_.Abort(object_id);
@@ -162,15 +168,17 @@ TEST_F(TestPlasmaStore, AbortTest) {
 
   // Create the object successfully this time.
   ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, 
&data));
+  data_ptr = data->mutable_data();
   for (int64_t i = 0; i < data_size; i++) {
-    data[i] = static_cast<uint8_t>(i % 4);
+    data_ptr[i] = static_cast<uint8_t>(i % 4);
   }
   ARROW_CHECK_OK(client_.Seal(object_id));
 
   // Test that we can get the object.
   ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  const uint8_t* buffer_ptr = object_buffer.data->data();
   for (int64_t i = 0; i < data_size; i++) {
-    ASSERT_EQ(data[i], object_buffer.data[i]);
+    ASSERT_EQ(data_ptr[i], buffer_ptr[i]);
   }
 }
 
@@ -187,7 +195,7 @@ TEST_F(TestPlasmaStore, MultipleClientTest) {
   int64_t data_size = 100;
   uint8_t metadata[] = {5};
   int64_t metadata_size = sizeof(metadata);
-  uint8_t* data;
+  std::shared_ptr<Buffer> data;
   ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, 
metadata_size, &data));
   ARROW_CHECK_OK(client2_.Seal(object_id));
   // Test that the first client can get the object.
@@ -227,7 +235,7 @@ TEST_F(TestPlasmaStore, ManyObjectTest) {
     int64_t data_size = 100;
     uint8_t metadata[] = {5};
     int64_t metadata_size = sizeof(metadata);
-    uint8_t* data;
+    std::shared_ptr<Buffer> data;
     ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, 
metadata_size, &data));
 
     if (i % 3 == 0) {
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index f2e8653d8..2d42eb0a3 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -81,7 +81,7 @@ cdef extern from "plasma/client.h" nogil:
 
         CStatus Create(const CUniqueID& object_id, int64_t data_size,
                        const uint8_t* metadata, int64_t metadata_size,
-                       uint8_t** data)
+                       const shared_ptr[CBuffer]* data)
 
         CStatus Get(const CUniqueID* object_ids, int64_t num_objects,
                     int64_t timeout_ms, CObjectBuffer* object_buffers)
@@ -118,9 +118,9 @@ cdef extern from "plasma/client.h" nogil:
 
     cdef struct CObjectBuffer" plasma::ObjectBuffer":
         int64_t data_size
-        uint8_t* data
+        shared_ptr[CBuffer] data
         int64_t metadata_size
-        uint8_t* metadata
+        shared_ptr[CBuffer] metadata
 
 
 def make_object_id(object_id):
@@ -245,10 +245,8 @@ cdef class PlasmaClient:
             check_status(self.client.get().Get(ids.data(), ids.size(),
                          timeout_ms, result[0].data()))
 
-    cdef _make_plasma_buffer(self, ObjectID object_id, uint8_t* data,
+    cdef _make_plasma_buffer(self, ObjectID object_id, shared_ptr[CBuffer] 
buffer,
                              int64_t size):
-        cdef shared_ptr[CBuffer] buffer
-        buffer.reset(new CBuffer(data, size))
         result = PlasmaBuffer(object_id, self)
         result.init(buffer)
         return result
@@ -296,12 +294,12 @@ cdef class PlasmaClient:
                 not be created because the plasma store is unable to evict
                 enough objects to create room for it.
         """
-        cdef uint8_t* data
+        cdef shared_ptr[CBuffer] data
         with nogil:
             check_status(self.client.get().Create(object_id.data, data_size,
                                                   <uint8_t*>(metadata.data()),
                                                   metadata.size(), &data))
-        return self._make_mutable_plasma_buffer(object_id, data, data_size)
+        return self._make_mutable_plasma_buffer(object_id, 
data.get().mutable_data(), data_size)
 
     def get_buffers(self, object_ids, timeout_ms=-1):
         """


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> [Plasma] Change Client Create and Get to use Buffers
> ----------------------------------------------------
>
>                 Key: ARROW-1947
>                 URL: https://issues.apache.org/jira/browse/ARROW-1947
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: Plasma (C++)
>            Reporter: William Paul
>              Labels: pull-request-available
>
> This is primarily preparation for ARROW-1394 for returning CudaBuffers, but 
> also allows Arrow buffer tools to be used with Plasma.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to