This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a8bd1c9  ARROW-3548: [Plasma] Add CreateAndSeal object store method 
for faster puts for small objects.
a8bd1c9 is described below

commit a8bd1c9ce4273d53eee2c67074b86410d1db4ce1
Author: Robert Nishihara <[email protected]>
AuthorDate: Thu Oct 18 18:15:32 2018 -0700

    ARROW-3548: [Plasma] Add CreateAndSeal object store method for faster puts 
for small objects.
    
    To benchmark, start a store with `plasma_store_server -s /tmp/store -m 
1000000000`
    
    Then do
    
    ```python
    import pyarrow.plasma as plasma
    client = plasma.connect('/tmp/store', '', 0)
    ```
    
    ```python
    def before():
        object_id = plasma.ObjectID.from_random()
        client.create(object_id, 20, b'')
        client.seal(object_id)
    
    def after():
        object_id = plasma.ObjectID.from_random()
        client.create_and_seal(object_id, 20 * b'a', b'')
    ```
    
    ```
    %timeit before()
    63.4 µs ± 631 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
    
    %timeit after()
    30.5 µs ± 669 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
    ```
    
    There's actually more that could be done in the future in order to not have 
to wait for the return IPC if the client "reserves" a big chunk of memory up 
front.
    
    Author: Robert Nishihara <[email protected]>
    
    Closes #2783 from robertnishihara/inlinesmallobjects and squashes the 
following commits:
    
    98bb37b53 <Robert Nishihara> Improve test.
    a76e5052a <Robert Nishihara> Add comment explaining device_num = 0.
    62fea3785 <Robert Nishihara> Linting.
    ca92be24a <Robert Nishihara> Fix linting.
    711d5049f <Robert Nishihara> Add CreateAndSeal method for putting small 
objects in the object store more quickly.
---
 cpp/src/plasma/client.cc            | 65 +++++++++++++++++++++++++++++--------
 cpp/src/plasma/client.h             | 11 +++++++
 cpp/src/plasma/format/plasma.fbs    | 18 ++++++++++
 cpp/src/plasma/protocol.cc          | 39 ++++++++++++++++++++++
 cpp/src/plasma/protocol.h           | 13 ++++++++
 cpp/src/plasma/store.cc             | 29 +++++++++++++++++
 python/pyarrow/_plasma.pyx          | 32 ++++++++++++++++++
 python/pyarrow/tests/test_plasma.py | 32 ++++++++++++++++++
 8 files changed, 226 insertions(+), 13 deletions(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index d88e8b1..7e2ce0f 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -174,6 +174,9 @@ class PlasmaClient::Impl : public 
std::enable_shared_from_this<PlasmaClient::Imp
   Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* 
metadata,
                 int64_t metadata_size, std::shared_ptr<Buffer>* data, int 
device_num = 0);
 
+  Status CreateAndSeal(const ObjectID& object_id, const std::string& data,
+                       const std::string& metadata);
+
   Status Get(const std::vector<ObjectID>& object_ids, int64_t timeout_ms,
              std::vector<ObjectBuffer>* object_buffers);
 
@@ -245,6 +248,10 @@ class PlasmaClient::Impl : public 
std::enable_shared_from_this<PlasmaClient::Imp
 
   uint64_t ComputeObjectHash(const ObjectBuffer& obj_buffer);
 
+  uint64_t ComputeObjectHash(const uint8_t* data, int64_t data_size,
+                             const uint8_t* metadata, int64_t metadata_size,
+                             int device_num);
+
   /// File descriptor of the Unix domain socket that connects to the store.
   int store_conn_;
   /// File descriptor of the Unix domain socket that connects to the manager.
@@ -432,6 +439,29 @@ Status PlasmaClient::Impl::Create(const ObjectID& 
object_id, int64_t data_size,
   return Status::OK();
 }
 
+Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
+                                         const std::string& data,
+                                         const std::string& metadata) {
+  ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_;
+
+  // Compute the object hash.
+  static unsigned char digest[kDigestSize];
+  // CreateAndSeal currently only supports device_num = 0, which corresponds to
+  // the host.
+  int device_num = 0;
+  uint64_t hash = ComputeObjectHash(
+      reinterpret_cast<const uint8_t*>(data.data()), data.size(),
+      reinterpret_cast<const uint8_t*>(metadata.data()), metadata.size(), 
device_num);
+  memcpy(&digest[0], &hash, sizeof(hash));
+
+  RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, data, 
metadata, digest));
+  std::vector<uint8_t> buffer;
+  RETURN_NOT_OK(
+      PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealReply, 
&buffer));
+  RETURN_NOT_OK(ReadCreateAndSealReply(buffer.data(), buffer.size()));
+  return Status::OK();
+}
+
 Status PlasmaClient::Impl::GetBuffers(
     const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
     const std::function<std::shared_ptr<Buffer>(
@@ -756,26 +786,30 @@ bool 
PlasmaClient::Impl::ComputeObjectHashParallel(XXH64_state_t* hash_state,
 }
 
 uint64_t PlasmaClient::Impl::ComputeObjectHash(const ObjectBuffer& obj_buffer) 
{
-  DCHECK(obj_buffer.metadata);
-  DCHECK(obj_buffer.data);
+  return ComputeObjectHash(obj_buffer.data->data(), obj_buffer.data->size(),
+                           obj_buffer.metadata->data(), 
obj_buffer.metadata->size(),
+                           obj_buffer.device_num);
+}
+
+uint64_t PlasmaClient::Impl::ComputeObjectHash(const uint8_t* data, int64_t 
data_size,
+                                               const uint8_t* metadata,
+                                               int64_t metadata_size, int 
device_num) {
+  DCHECK(metadata);
+  DCHECK(data);
   XXH64_state_t hash_state;
-  if (obj_buffer.device_num != 0) {
+  if (device_num != 0) {
     // TODO(wap): Create cuda program to hash data on gpu.
     return 0;
   }
   XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
-  if (obj_buffer.data->size() >= kBytesInMB) {
-    ComputeObjectHashParallel(
-        &hash_state, reinterpret_cast<const unsigned 
char*>(obj_buffer.data->data()),
-        obj_buffer.data->size());
+  if (data_size >= kBytesInMB) {
+    ComputeObjectHashParallel(&hash_state, reinterpret_cast<const unsigned 
char*>(data),
+                              data_size);
   } else {
-    XXH64_update(&hash_state,
-                 reinterpret_cast<const unsigned 
char*>(obj_buffer.data->data()),
-                 obj_buffer.data->size());
+    XXH64_update(&hash_state, reinterpret_cast<const unsigned char*>(data), 
data_size);
   }
-  XXH64_update(&hash_state,
-               reinterpret_cast<const unsigned 
char*>(obj_buffer.metadata->data()),
-               obj_buffer.metadata->size());
+  XXH64_update(&hash_state, reinterpret_cast<const unsigned char*>(metadata),
+               metadata_size);
   return XXH64_digest(&hash_state);
 }
 
@@ -1046,6 +1080,11 @@ Status PlasmaClient::Create(const ObjectID& object_id, 
int64_t data_size,
   return impl_->Create(object_id, data_size, metadata, metadata_size, data, 
device_num);
 }
 
+Status PlasmaClient::CreateAndSeal(const ObjectID& object_id, const 
std::string& data,
+                                   const std::string& metadata) {
+  return impl_->CreateAndSeal(object_id, data, metadata);
+}
+
 Status PlasmaClient::Get(const std::vector<ObjectID>& object_ids, int64_t 
timeout_ms,
                          std::vector<ObjectBuffer>* object_buffers) {
   return impl_->Get(object_ids, timeout_ms, object_buffers);
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 9d4dbf5..59b001c 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -95,6 +95,17 @@ class ARROW_EXPORT PlasmaClient {
   Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* 
metadata,
                 int64_t metadata_size, std::shared_ptr<Buffer>* data, int 
device_num = 0);
 
+  /// Create and seal an object in the object store. This is an optimization
+  /// which allows small objects to be created quickly with fewer messages to
+  /// the store.
+  ///
+  /// \param object_id The ID of the object to create.
+  /// \param data The data for the object to create.
+  /// \param metadata The metadata for the object to create.
+  /// \return The return status.
+  Status CreateAndSeal(const ObjectID& object_id, const std::string& data,
+                       const std::string& metadata);
+
   /// Get some objects from the Plasma Store. This function will block until 
the
   /// objects have all been created and sealed in the Plasma Store or the
   /// timeout expires.
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index ded714a..ef934fb 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -26,6 +26,8 @@ enum MessageType:long {
   // Create a new object.
   PlasmaCreateRequest,
   PlasmaCreateReply,
+  PlasmaCreateAndSealRequest,
+  PlasmaCreateAndSealReply,
   PlasmaAbortRequest,
   PlasmaAbortReply,
   // Seal an object.
@@ -141,6 +143,22 @@ table PlasmaCreateReply {
   ipc_handle: CudaHandle;
 }
 
+table PlasmaCreateAndSealRequest {
+  // ID of the object to be created.
+  object_id: string;
+  // The object's data.
+  data: string;
+  // The object's metadata.
+  metadata: string;
+  // Hash of the object data.
+  digest: string;
+}
+
+table PlasmaCreateAndSealReply {
+  // Error that occurred for this call.
+  error: PlasmaError;
+}
+
 table PlasmaAbortRequest {
   // ID of the object to be aborted.
   object_id: string;
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index 5b93b65..a74db66 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -180,6 +180,45 @@ Status ReadCreateReply(uint8_t* data, size_t size, 
ObjectID* object_id,
   return PlasmaErrorStatus(message->error());
 }
 
+Status SendCreateAndSealRequest(int sock, const ObjectID& object_id,
+                                const std::string& data, const std::string& 
metadata,
+                                unsigned char* digest) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto digest_string = fbb.CreateString(reinterpret_cast<char*>(digest), 
kDigestSize);
+  auto message = fb::CreatePlasmaCreateAndSealRequest(
+      fbb, fbb.CreateString(object_id.binary()), fbb.CreateString(data),
+      fbb.CreateString(metadata), digest_string);
+  return PlasmaSend(sock, MessageType::PlasmaCreateAndSealRequest, &fbb, 
message);
+}
+
+Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* 
object_id,
+                                std::string* object_data, std::string* 
metadata,
+                                unsigned char* digest) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealRequest>(data);
+  DCHECK(VerifyFlatbuffer(message, data, size));
+
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  *object_data = message->data()->str();
+  *metadata = message->metadata()->str();
+  ARROW_CHECK(message->digest()->size() == kDigestSize);
+  memcpy(digest, message->digest()->data(), kDigestSize);
+  return Status::OK();
+}
+
+Status SendCreateAndSealReply(int sock, PlasmaError error) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = fb::CreatePlasmaCreateAndSealReply(fbb, 
static_cast<PlasmaError>(error));
+  return PlasmaSend(sock, MessageType::PlasmaCreateAndSealReply, &fbb, 
message);
+}
+
+Status ReadCreateAndSealReply(uint8_t* data, size_t size) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealReply>(data);
+  DCHECK(VerifyFlatbuffer(message, data, size));
+  return PlasmaErrorStatus(message->error());
+}
+
 Status SendAbortRequest(int sock, ObjectID object_id) {
   flatbuffers::FlatBufferBuilder fbb;
   auto message = fb::CreatePlasmaAbortRequest(fbb, 
fbb.CreateString(object_id.binary()));
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index 057ba1c..c820458 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -19,6 +19,7 @@
 #define PLASMA_PROTOCOL_H
 
 #include <memory>
+#include <string>
 #include <unordered_map>
 #include <vector>
 
@@ -57,6 +58,18 @@ Status SendCreateReply(int sock, ObjectID object_id, 
PlasmaObject* object,
 Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
                        PlasmaObject* object, int* store_fd, int64_t* 
mmap_size);
 
+Status SendCreateAndSealRequest(int sock, const ObjectID& object_id,
+                                const std::string& data, const std::string& 
metadata,
+                                unsigned char* digest);
+
+Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* 
object_id,
+                                std::string* object_data, std::string* 
metadata,
+                                unsigned char* digest);
+
+Status SendCreateAndSealReply(int sock, PlasmaError error);
+
+Status ReadCreateAndSealReply(uint8_t* data, size_t size);
+
 Status SendAbortRequest(int sock, ObjectID object_id);
 
 Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id);
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index a01c9a2..54792e9 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -802,6 +802,35 @@ Status PlasmaStore::ProcessMessage(Client* client) {
         WarnIfSigpipe(send_fd(client->fd, object.store_fd), client->fd);
       }
     } break;
+    case fb::MessageType::PlasmaCreateAndSealRequest: {
+      std::string data;
+      std::string metadata;
+      unsigned char digest[kDigestSize];
+      RETURN_NOT_OK(ReadCreateAndSealRequest(input, input_size, &object_id, 
&data,
+                                             &metadata, &digest[0]));
+      // CreateAndSeal currently only supports device_num = 0, which 
corresponds
+      // to the host.
+      int device_num = 0;
+      PlasmaError error_code = CreateObject(object_id, data.size(), 
metadata.size(),
+                                            device_num, client, &object);
+      // Reply to the client.
+      HANDLE_SIGPIPE(SendCreateAndSealReply(client->fd, error_code), 
client->fd);
+
+      // If the object was successfully created, fill out the object data and 
seal it.
+      if (error_code == PlasmaError::OK) {
+        auto entry = GetObjectTableEntry(&store_info_, object_id);
+        ARROW_CHECK(entry != nullptr);
+        // Write the inlined data and metadata into the allocated object.
+        std::memcpy(entry->pointer, data.data(), data.size());
+        std::memcpy(entry->pointer + data.size(), metadata.data(), 
metadata.size());
+        SealObject(object_id, &digest[0]);
+        // Remove the client from the object's array of clients because the
+        // object is not being used by any client. The client was added to the
+        // object's array of clients in CreateObject. This is analogous to the
+        // Release call that happens in the client's Seal method.
+        ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1);
+      }
+    } break;
     case fb::MessageType::PlasmaAbortRequest: {
       RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
       ARROW_CHECK(AbortObject(object_id, client) == 1) << "To abort an object, 
the only "
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index 2b9f93e..e2cbc18 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -110,6 +110,9 @@ cdef extern from "plasma/client.h" nogil:
                        const uint8_t* metadata, int64_t metadata_size,
                        const shared_ptr[CBuffer]* data)
 
+        CStatus CreateAndSeal(const CUniqueID& object_id, const c_string& data,
+                              const c_string& metadata)
+
         CStatus Get(const c_vector[CUniqueID] object_ids, int64_t timeout_ms,
                     c_vector[CObjectBuffer]* object_buffers)
 
@@ -344,6 +347,35 @@ cdef class PlasmaClient:
                                                 data.get().mutable_data(),
                                                 data_size)
 
+    def create_and_seal(self, ObjectID object_id, c_string data,
+                        c_string metadata=b""):
+        """
+        Store a new object in the PlasmaStore for a particular object ID.
+
+        Parameters
+        ----------
+        object_id : ObjectID
+            The object ID used to identify an object.
+        data : bytes
+            The object to store.
+        metadata : bytes
+            An optional string of bytes encoding whatever metadata the user
+            wishes to encode.
+
+        Raises
+        ------
+        PlasmaObjectExists
+            This exception is raised if the object could not be created because
+            there already is an object with the same ID in the plasma store.
+
+        PlasmaStoreFull: This exception is raised if the object could
+                not be created because the plasma store is unable to evict
+                enough objects to create room for it.
+        """
+        with nogil:
+            check_status(self.client.get().CreateAndSeal(object_id.data, data,
+                                                         metadata))
+
     def get_buffers(self, object_ids, timeout_ms=-1):
         """
         Returns data buffer from the PlasmaStore based on object ID.
diff --git a/python/pyarrow/tests/test_plasma.py 
b/python/pyarrow/tests/test_plasma.py
index efda2af..9229479 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -210,6 +210,38 @@ class TestPlasmaClient(object):
             else:
                 assert False
 
+    def test_create_and_seal(self):
+
+        # Create a bunch of objects.
+        object_ids = []
+        for i in range(1000):
+            object_id = random_object_id()
+            object_ids.append(object_id)
+            self.plasma_client.create_and_seal(object_id, i * b'a', i * b'b')
+
+        for i in range(1000):
+            assert self.plasma_client.get_buffer(object_ids[i]) == i * b'a'
+            assert (self.plasma_client.get_metadata(
+                        [object_ids[i]])[0].to_pybytes()
+                    == i * b'b')
+
+        # Make sure that creating the same object twice raises an exception.
+        object_id = random_object_id()
+        self.plasma_client.create_and_seal(object_id, b'a', b'b')
+        with pytest.raises(pa.PlasmaObjectExists):
+            self.plasma_client.create_and_seal(object_id, b'a', b'b')
+
+        # Make sure that these objects can be evicted.
+        big_object = DEFAULT_PLASMA_STORE_MEMORY // 10 * b'a'
+        object_ids = []
+        for _ in range(20):
+            object_id = random_object_id()
+            object_ids.append(object_id)
+            self.plasma_client.create_and_seal(random_object_id(), big_object,
+                                               big_object)
+        for i in range(10):
+            assert not self.plasma_client.contains(object_ids[i])
+
     def test_get(self):
         num_object_ids = 60
         # Test timing out of get with various timeouts.

Reply via email to