This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 66b05ab ARROW-7998: [C++][Plasma] Make Seal requests synchronous
66b05ab is described below
commit 66b05abc267661172286b47b9246ad55f1581555
Author: Stephanie Wang <[email protected]>
AuthorDate: Thu Mar 5 11:38:21 2020 -0800
ARROW-7998: [C++][Plasma] Make Seal requests synchronous
When handling a `Seal` request to create an object and make it visible to
other clients, the plasma store does not wait until the seal is complete before
responding to the requesting client. This makes the interface hard to use,
since the client is not guaranteed that the object is visible yet and would
have to use an additional IPC round-trip to determine when the object is ready.
This improvement would require the plasma store to wait until the object
has been created before responding to the client.
Closes #6529 from stephanie-wang/sync-seal and squashes the following
commits:
cb3867e4f <Stephanie Wang> Make Seal synchronous
Authored-by: Stephanie Wang <[email protected]>
Signed-off-by: Philipp Moritz <[email protected]>
---
cpp/src/plasma/client.cc | 5 +++++
cpp/src/plasma/store.cc | 10 ++++++----
2 files changed, 11 insertions(+), 4 deletions(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index d6c5d7b..0640091 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -859,6 +859,11 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id)
{
RETURN_NOT_OK(Hash(object_id, &digest[0]));
RETURN_NOT_OK(
SendSealRequest(store_conn_, object_id, std::string(digest.begin(),
digest.end())));
+ std::vector<uint8_t> buffer;
+ RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaSealReply,
&buffer));
+ ObjectID sealed_id;
+ RETURN_NOT_OK(ReadSealReply(buffer.data(), buffer.size(), &sealed_id));
+ ARROW_CHECK(sealed_id == object_id);
// We call PlasmaClient::Release to decrement the number of instances of this
// object
// that are currently being used by this client. The corresponding increment
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 253250e..d02765f 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -959,8 +959,6 @@ Status PlasmaStore::ProcessMessage(Client* client) {
int device_num = 0;
PlasmaError error_code = CreateObject(object_id, data.size(),
metadata.size(),
device_num, client, &object);
- // Reply to the client.
- HANDLE_SIGPIPE(SendCreateAndSealReply(client->fd, error_code),
client->fd);
// If the object was successfully created, fill out the object data and
seal it.
if (error_code == PlasmaError::OK) {
@@ -976,6 +974,9 @@ Status PlasmaStore::ProcessMessage(Client* client) {
// Release call that happens in the client's Seal method.
ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1);
}
+
+ // Reply to the client.
+ HANDLE_SIGPIPE(SendCreateAndSealReply(client->fd, error_code),
client->fd);
} break;
case fb::MessageType::PlasmaCreateAndSealBatchRequest: {
std::vector<ObjectID> object_ids;
@@ -999,8 +1000,6 @@ Status PlasmaStore::ProcessMessage(Client* client) {
}
}
- HANDLE_SIGPIPE(SendCreateAndSealBatchReply(client->fd, error_code),
client->fd);
-
// if OK, seal all the objects,
// if error, abort the previous i objects immediately
if (error_code == PlasmaError::OK) {
@@ -1027,6 +1026,8 @@ Status PlasmaStore::ProcessMessage(Client* client) {
AbortObject(object_ids[j], client);
}
}
+
+ HANDLE_SIGPIPE(SendCreateAndSealBatchReply(client->fd, error_code),
client->fd);
} break;
case fb::MessageType::PlasmaAbortRequest: {
RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
@@ -1071,6 +1072,7 @@ Status PlasmaStore::ProcessMessage(Client* client) {
std::string digest;
RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest));
SealObjects({object_id}, {digest});
+ HANDLE_SIGPIPE(SendSealReply(client->fd, object_id, PlasmaError::OK),
client->fd);
} break;
case fb::MessageType::PlasmaEvictRequest: {
// This code path should only be used for testing.