This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new a8bd1c9 ARROW-3548: [Plasma] Add CreateAndSeal object store method
for faster puts for small objects.
a8bd1c9 is described below
commit a8bd1c9ce4273d53eee2c67074b86410d1db4ce1
Author: Robert Nishihara <[email protected]>
AuthorDate: Thu Oct 18 18:15:32 2018 -0700
ARROW-3548: [Plasma] Add CreateAndSeal object store method for faster puts
for small objects.
To benchmark, start a store with `plasma_store_server -s /tmp/store -m
1000000000`
Then do
```python
import pyarrow.plasma as plasma
client = plasma.connect('/tmp/store', '', 0)
```
```python
def before():
object_id = plasma.ObjectID.from_random()
client.create(object_id, 20, b'')
client.seal(object_id)
def after():
object_id = plasma.ObjectID.from_random()
client.create_and_seal(object_id, 20 * b'a', b'')
```
```
%timeit before()
63.4 µs ± 631 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit after()
30.5 µs ± 669 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
```
There's actually more that could be done in the future in order to not have
to wait for the return IPC if the client "reserves" a big chunk of memory up
front.
Author: Robert Nishihara <[email protected]>
Closes #2783 from robertnishihara/inlinesmallobjects and squashes the
following commits:
98bb37b53 <Robert Nishihara> Improve test.
a76e5052a <Robert Nishihara> Add comment explaining device_num = 0.
62fea3785 <Robert Nishihara> Linting.
ca92be24a <Robert Nishihara> Fix linting.
711d5049f <Robert Nishihara> Add CreateAndSeal method for putting small
objects in the object store more quickly.
---
cpp/src/plasma/client.cc | 65 +++++++++++++++++++++++++++++--------
cpp/src/plasma/client.h | 11 +++++++
cpp/src/plasma/format/plasma.fbs | 18 ++++++++++
cpp/src/plasma/protocol.cc | 39 ++++++++++++++++++++++
cpp/src/plasma/protocol.h | 13 ++++++++
cpp/src/plasma/store.cc | 29 +++++++++++++++++
python/pyarrow/_plasma.pyx | 32 ++++++++++++++++++
python/pyarrow/tests/test_plasma.py | 32 ++++++++++++++++++
8 files changed, 226 insertions(+), 13 deletions(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index d88e8b1..7e2ce0f 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -174,6 +174,9 @@ class PlasmaClient::Impl : public
std::enable_shared_from_this<PlasmaClient::Imp
Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t*
metadata,
int64_t metadata_size, std::shared_ptr<Buffer>* data, int
device_num = 0);
+ Status CreateAndSeal(const ObjectID& object_id, const std::string& data,
+ const std::string& metadata);
+
Status Get(const std::vector<ObjectID>& object_ids, int64_t timeout_ms,
std::vector<ObjectBuffer>* object_buffers);
@@ -245,6 +248,10 @@ class PlasmaClient::Impl : public
std::enable_shared_from_this<PlasmaClient::Imp
uint64_t ComputeObjectHash(const ObjectBuffer& obj_buffer);
+ uint64_t ComputeObjectHash(const uint8_t* data, int64_t data_size,
+ const uint8_t* metadata, int64_t metadata_size,
+ int device_num);
+
/// File descriptor of the Unix domain socket that connects to the store.
int store_conn_;
/// File descriptor of the Unix domain socket that connects to the manager.
@@ -432,6 +439,29 @@ Status PlasmaClient::Impl::Create(const ObjectID&
object_id, int64_t data_size,
return Status::OK();
}
+Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
+ const std::string& data,
+ const std::string& metadata) {
+ ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_;
+
+ // Compute the object hash.
+ static unsigned char digest[kDigestSize];
+ // CreateAndSeal currently only supports device_num = 0, which corresponds to
+ // the host.
+ int device_num = 0;
+ uint64_t hash = ComputeObjectHash(
+ reinterpret_cast<const uint8_t*>(data.data()), data.size(),
+ reinterpret_cast<const uint8_t*>(metadata.data()), metadata.size(),
device_num);
+ memcpy(&digest[0], &hash, sizeof(hash));
+
+ RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, data,
metadata, digest));
+ std::vector<uint8_t> buffer;
+ RETURN_NOT_OK(
+ PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealReply,
&buffer));
+ RETURN_NOT_OK(ReadCreateAndSealReply(buffer.data(), buffer.size()));
+ return Status::OK();
+}
+
Status PlasmaClient::Impl::GetBuffers(
const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
const std::function<std::shared_ptr<Buffer>(
@@ -756,26 +786,30 @@ bool
PlasmaClient::Impl::ComputeObjectHashParallel(XXH64_state_t* hash_state,
}
uint64_t PlasmaClient::Impl::ComputeObjectHash(const ObjectBuffer& obj_buffer)
{
- DCHECK(obj_buffer.metadata);
- DCHECK(obj_buffer.data);
+ return ComputeObjectHash(obj_buffer.data->data(), obj_buffer.data->size(),
+ obj_buffer.metadata->data(),
obj_buffer.metadata->size(),
+ obj_buffer.device_num);
+}
+
+uint64_t PlasmaClient::Impl::ComputeObjectHash(const uint8_t* data, int64_t
data_size,
+ const uint8_t* metadata,
+ int64_t metadata_size, int
device_num) {
+ DCHECK(metadata);
+ DCHECK(data);
XXH64_state_t hash_state;
- if (obj_buffer.device_num != 0) {
+ if (device_num != 0) {
// TODO(wap): Create cuda program to hash data on gpu.
return 0;
}
XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
- if (obj_buffer.data->size() >= kBytesInMB) {
- ComputeObjectHashParallel(
- &hash_state, reinterpret_cast<const unsigned
char*>(obj_buffer.data->data()),
- obj_buffer.data->size());
+ if (data_size >= kBytesInMB) {
+ ComputeObjectHashParallel(&hash_state, reinterpret_cast<const unsigned
char*>(data),
+ data_size);
} else {
- XXH64_update(&hash_state,
- reinterpret_cast<const unsigned
char*>(obj_buffer.data->data()),
- obj_buffer.data->size());
+ XXH64_update(&hash_state, reinterpret_cast<const unsigned char*>(data),
data_size);
}
- XXH64_update(&hash_state,
- reinterpret_cast<const unsigned
char*>(obj_buffer.metadata->data()),
- obj_buffer.metadata->size());
+ XXH64_update(&hash_state, reinterpret_cast<const unsigned char*>(metadata),
+ metadata_size);
return XXH64_digest(&hash_state);
}
@@ -1046,6 +1080,11 @@ Status PlasmaClient::Create(const ObjectID& object_id,
int64_t data_size,
return impl_->Create(object_id, data_size, metadata, metadata_size, data,
device_num);
}
+Status PlasmaClient::CreateAndSeal(const ObjectID& object_id, const
std::string& data,
+ const std::string& metadata) {
+ return impl_->CreateAndSeal(object_id, data, metadata);
+}
+
Status PlasmaClient::Get(const std::vector<ObjectID>& object_ids, int64_t
timeout_ms,
std::vector<ObjectBuffer>* object_buffers) {
return impl_->Get(object_ids, timeout_ms, object_buffers);
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 9d4dbf5..59b001c 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -95,6 +95,17 @@ class ARROW_EXPORT PlasmaClient {
Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t*
metadata,
int64_t metadata_size, std::shared_ptr<Buffer>* data, int
device_num = 0);
+ /// Create and seal an object in the object store. This is an optimization
+ /// which allows small objects to be created quickly with fewer messages to
+ /// the store.
+ ///
+ /// \param object_id The ID of the object to create.
+ /// \param data The data for the object to create.
+ /// \param metadata The metadata for the object to create.
+ /// \return The return status.
+ Status CreateAndSeal(const ObjectID& object_id, const std::string& data,
+ const std::string& metadata);
+
/// Get some objects from the Plasma Store. This function will block until
the
/// objects have all been created and sealed in the Plasma Store or the
/// timeout expires.
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index ded714a..ef934fb 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -26,6 +26,8 @@ enum MessageType:long {
// Create a new object.
PlasmaCreateRequest,
PlasmaCreateReply,
+ PlasmaCreateAndSealRequest,
+ PlasmaCreateAndSealReply,
PlasmaAbortRequest,
PlasmaAbortReply,
// Seal an object.
@@ -141,6 +143,22 @@ table PlasmaCreateReply {
ipc_handle: CudaHandle;
}
+table PlasmaCreateAndSealRequest {
+ // ID of the object to be created.
+ object_id: string;
+ // The object's data.
+ data: string;
+ // The object's metadata.
+ metadata: string;
+ // Hash of the object data.
+ digest: string;
+}
+
+table PlasmaCreateAndSealReply {
+ // Error that occurred for this call.
+ error: PlasmaError;
+}
+
table PlasmaAbortRequest {
// ID of the object to be aborted.
object_id: string;
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index 5b93b65..a74db66 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -180,6 +180,45 @@ Status ReadCreateReply(uint8_t* data, size_t size,
ObjectID* object_id,
return PlasmaErrorStatus(message->error());
}
+Status SendCreateAndSealRequest(int sock, const ObjectID& object_id,
+ const std::string& data, const std::string&
metadata,
+ unsigned char* digest) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto digest_string = fbb.CreateString(reinterpret_cast<char*>(digest),
kDigestSize);
+ auto message = fb::CreatePlasmaCreateAndSealRequest(
+ fbb, fbb.CreateString(object_id.binary()), fbb.CreateString(data),
+ fbb.CreateString(metadata), digest_string);
+ return PlasmaSend(sock, MessageType::PlasmaCreateAndSealRequest, &fbb,
message);
+}
+
+Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID*
object_id,
+ std::string* object_data, std::string*
metadata,
+ unsigned char* digest) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ *object_data = message->data()->str();
+ *metadata = message->metadata()->str();
+ ARROW_CHECK(message->digest()->size() == kDigestSize);
+ memcpy(digest, message->digest()->data(), kDigestSize);
+ return Status::OK();
+}
+
+Status SendCreateAndSealReply(int sock, PlasmaError error) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaCreateAndSealReply(fbb,
static_cast<PlasmaError>(error));
+ return PlasmaSend(sock, MessageType::PlasmaCreateAndSealReply, &fbb,
message);
+}
+
+Status ReadCreateAndSealReply(uint8_t* data, size_t size) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ return PlasmaErrorStatus(message->error());
+}
+
Status SendAbortRequest(int sock, ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message = fb::CreatePlasmaAbortRequest(fbb,
fbb.CreateString(object_id.binary()));
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index 057ba1c..c820458 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -19,6 +19,7 @@
#define PLASMA_PROTOCOL_H
#include <memory>
+#include <string>
#include <unordered_map>
#include <vector>
@@ -57,6 +58,18 @@ Status SendCreateReply(int sock, ObjectID object_id,
PlasmaObject* object,
Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
PlasmaObject* object, int* store_fd, int64_t*
mmap_size);
+Status SendCreateAndSealRequest(int sock, const ObjectID& object_id,
+ const std::string& data, const std::string&
metadata,
+ unsigned char* digest);
+
+Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID*
object_id,
+ std::string* object_data, std::string*
metadata,
+ unsigned char* digest);
+
+Status SendCreateAndSealReply(int sock, PlasmaError error);
+
+Status ReadCreateAndSealReply(uint8_t* data, size_t size);
+
Status SendAbortRequest(int sock, ObjectID object_id);
Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id);
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index a01c9a2..54792e9 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -802,6 +802,35 @@ Status PlasmaStore::ProcessMessage(Client* client) {
WarnIfSigpipe(send_fd(client->fd, object.store_fd), client->fd);
}
} break;
+ case fb::MessageType::PlasmaCreateAndSealRequest: {
+ std::string data;
+ std::string metadata;
+ unsigned char digest[kDigestSize];
+ RETURN_NOT_OK(ReadCreateAndSealRequest(input, input_size, &object_id,
&data,
+ &metadata, &digest[0]));
+ // CreateAndSeal currently only supports device_num = 0, which
corresponds
+ // to the host.
+ int device_num = 0;
+ PlasmaError error_code = CreateObject(object_id, data.size(),
metadata.size(),
+ device_num, client, &object);
+ // Reply to the client.
+ HANDLE_SIGPIPE(SendCreateAndSealReply(client->fd, error_code),
client->fd);
+
+ // If the object was successfully created, fill out the object data and
seal it.
+ if (error_code == PlasmaError::OK) {
+ auto entry = GetObjectTableEntry(&store_info_, object_id);
+ ARROW_CHECK(entry != nullptr);
+ // Write the inlined data and metadata into the allocated object.
+ std::memcpy(entry->pointer, data.data(), data.size());
+ std::memcpy(entry->pointer + data.size(), metadata.data(),
metadata.size());
+ SealObject(object_id, &digest[0]);
+ // Remove the client from the object's array of clients because the
+ // object is not being used by any client. The client was added to the
+ // object's array of clients in CreateObject. This is analogous to the
+ // Release call that happens in the client's Seal method.
+ ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1);
+ }
+ } break;
case fb::MessageType::PlasmaAbortRequest: {
RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
ARROW_CHECK(AbortObject(object_id, client) == 1) << "To abort an object,
the only "
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index 2b9f93e..e2cbc18 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -110,6 +110,9 @@ cdef extern from "plasma/client.h" nogil:
const uint8_t* metadata, int64_t metadata_size,
const shared_ptr[CBuffer]* data)
+ CStatus CreateAndSeal(const CUniqueID& object_id, const c_string& data,
+ const c_string& metadata)
+
CStatus Get(const c_vector[CUniqueID] object_ids, int64_t timeout_ms,
c_vector[CObjectBuffer]* object_buffers)
@@ -344,6 +347,35 @@ cdef class PlasmaClient:
data.get().mutable_data(),
data_size)
+ def create_and_seal(self, ObjectID object_id, c_string data,
+ c_string metadata=b""):
+ """
+ Store a new object in the PlasmaStore for a particular object ID.
+
+ Parameters
+ ----------
+ object_id : ObjectID
+ The object ID used to identify an object.
+ data : bytes
+ The object to store.
+ metadata : bytes
+ An optional string of bytes encoding whatever metadata the user
+ wishes to encode.
+
+ Raises
+ ------
+ PlasmaObjectExists
+ This exception is raised if the object could not be created because
+ there already is an object with the same ID in the plasma store.
+
+ PlasmaStoreFull: This exception is raised if the object could
+ not be created because the plasma store is unable to evict
+ enough objects to create room for it.
+ """
+ with nogil:
+ check_status(self.client.get().CreateAndSeal(object_id.data, data,
+ metadata))
+
def get_buffers(self, object_ids, timeout_ms=-1):
"""
Returns data buffer from the PlasmaStore based on object ID.
diff --git a/python/pyarrow/tests/test_plasma.py
b/python/pyarrow/tests/test_plasma.py
index efda2af..9229479 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -210,6 +210,38 @@ class TestPlasmaClient(object):
else:
assert False
+ def test_create_and_seal(self):
+
+ # Create a bunch of objects.
+ object_ids = []
+ for i in range(1000):
+ object_id = random_object_id()
+ object_ids.append(object_id)
+ self.plasma_client.create_and_seal(object_id, i * b'a', i * b'b')
+
+ for i in range(1000):
+ assert self.plasma_client.get_buffer(object_ids[i]) == i * b'a'
+ assert (self.plasma_client.get_metadata(
+ [object_ids[i]])[0].to_pybytes()
+ == i * b'b')
+
+ # Make sure that creating the same object twice raises an exception.
+ object_id = random_object_id()
+ self.plasma_client.create_and_seal(object_id, b'a', b'b')
+ with pytest.raises(pa.PlasmaObjectExists):
+ self.plasma_client.create_and_seal(object_id, b'a', b'b')
+
+ # Make sure that these objects can be evicted.
+ big_object = DEFAULT_PLASMA_STORE_MEMORY // 10 * b'a'
+ object_ids = []
+ for _ in range(20):
+ object_id = random_object_id()
+ object_ids.append(object_id)
+ self.plasma_client.create_and_seal(random_object_id(), big_object,
+ big_object)
+ for i in range(10):
+ assert not self.plasma_client.contains(object_ids[i])
+
def test_get(self):
num_object_ids = 60
# Test timing out of get with various timeouts.