[ 
https://issues.apache.org/jira/browse/ARROW-1775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16244827#comment-16244827
 ] 

ASF GitHub Bot commented on ARROW-1775:
---------------------------------------

pcmoritz closed pull request #1289: ARROW-1775: Ability to abort created but 
unsealed Plasma objects
URL: https://github.com/apache/arrow/pull/1289
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index e57a2a6f3..dd32bdc81 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -278,6 +278,39 @@ Status PlasmaClient::Get(const ObjectID* object_ids, 
int64_t num_objects,
   return Status::OK();
 }
 
+Status PlasmaClient::UnmapObject(const ObjectID& object_id) {
+  auto object_entry = objects_in_use_.find(object_id);
+  ARROW_CHECK(object_entry != objects_in_use_.end());
+  ARROW_CHECK(object_entry->second->count == 0);
+
+  // Decrement the count of the number of objects in this memory-mapped file
+  // that the client is using. The corresponding increment should have
+  // happened in plasma_get.
+  int fd = object_entry->second->object.handle.store_fd;
+  auto entry = mmap_table_.find(fd);
+  ARROW_CHECK(entry != mmap_table_.end());
+  ARROW_CHECK(entry->second.count >= 1);
+  if (entry->second.count == 1) {
+    // If no other objects are being used, then unmap the file.
+    int err = munmap(entry->second.pointer, entry->second.length);
+    if (err == -1) {
+      return Status::IOError("Error during munmap");
+    }
+    // Remove the corresponding entry from the hash table.
+    mmap_table_.erase(fd);
+  } else {
+    // If there are other objects being used, decrement the reference count.
+    entry->second.count -= 1;
+  }
+  // Update the in_use_object_bytes_.
+  in_use_object_bytes_ -= (object_entry->second->object.data_size +
+                           object_entry->second->object.metadata_size);
+  DCHECK_GE(in_use_object_bytes_, 0);
+  // Remove the entry from the hash table of objects currently in use.
+  objects_in_use_.erase(object_id);
+  return Status::OK();
+}
+
 /// This is a helper method for implementing plasma_release. We maintain a
 /// buffer
 /// of release calls and only perform them once the buffer becomes full (as
@@ -297,28 +330,9 @@ Status PlasmaClient::PerformRelease(const ObjectID& 
object_id) {
   ARROW_CHECK(object_entry->second->count >= 0);
   // Check if the client is no longer using this object.
   if (object_entry->second->count == 0) {
-    // Decrement the count of the number of objects in this memory-mapped file
-    // that the client is using. The corresponding increment should have
-    // happened in plasma_get.
-    int fd = object_entry->second->object.handle.store_fd;
-    auto entry = mmap_table_.find(fd);
-    ARROW_CHECK(entry != mmap_table_.end());
-    entry->second.count -= 1;
-    ARROW_CHECK(entry->second.count >= 0);
-    // If none are being used then unmap the file.
-    if (entry->second.count == 0) {
-      munmap(entry->second.pointer, entry->second.length);
-      // Remove the corresponding entry from the hash table.
-      mmap_table_.erase(fd);
-    }
     // Tell the store that the client no longer needs the object.
+    RETURN_NOT_OK(UnmapObject(object_id));
     RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id));
-    // Update the in_use_object_bytes_.
-    in_use_object_bytes_ -= (object_entry->second->object.data_size +
-                             object_entry->second->object.metadata_size);
-    DCHECK_GE(in_use_object_bytes_, 0);
-    // Remove the entry from the hash table of objects currently in use.
-    objects_in_use_.erase(object_id);
   }
   return Status::OK();
 }
@@ -344,6 +358,20 @@ Status PlasmaClient::Release(const ObjectID& object_id) {
   return Status::OK();
 }
 
+Status PlasmaClient::FlushReleaseHistory() {
+  // If the client is already disconnected, ignore the flush.
+  if (store_conn_ < 0) {
+    return Status::OK();
+  }
+  while (release_history_.size() > 0) {
+    // Perform a release for the object ID for the first pending release.
+    RETURN_NOT_OK(PerformRelease(release_history_.back()));
+    // Remove the last entry from the release history.
+    release_history_.pop_back();
+  }
+  return Status::OK();
+}
+
 // This method is used to query whether the plasma store contains an object.
 Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
   // Check if we already have a reference to the object.
@@ -443,6 +471,35 @@ Status PlasmaClient::Seal(const ObjectID& object_id) {
   return Release(object_id);
 }
 
+Status PlasmaClient::Abort(const ObjectID& object_id) {
+  auto object_entry = objects_in_use_.find(object_id);
+  ARROW_CHECK(object_entry != objects_in_use_.end())
+      << "Plasma client called abort on an object without a reference to it";
+  ARROW_CHECK(!object_entry->second->is_sealed)
+      << "Plasma client called abort on a sealed object";
+
+  // Flush the release history.
+  RETURN_NOT_OK(FlushReleaseHistory());
+  // Make sure that the Plasma client only has one reference to the object. If
+  // it has more, then the client needs to release the buffer before calling
+  // abort.
+  if (object_entry->second->count > 1) {
+    return Status::Invalid("Plasma client cannot have a reference to the 
buffer.");
+  }
+
+  // Send the abort request.
+  RETURN_NOT_OK(SendAbortRequest(store_conn_, object_id));
+  // Decrease the reference count to zero, then remove the object.
+  object_entry->second->count--;
+  RETURN_NOT_OK(UnmapObject(object_id));
+
+  std::vector<uint8_t> buffer;
+  ObjectID id;
+  int64_t type;
+  RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer));
+  return ReadAbortReply(buffer.data(), buffer.size(), &id);
+}
+
 Status PlasmaClient::Delete(const ObjectID& object_id) {
   // TODO(rkn): In the future, we can use this method to give hints to the
   // eviction policy about when an object will no longer be needed.
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 145942441..89df2b0b0 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -152,6 +152,15 @@ class ARROW_EXPORT PlasmaClient {
   /// \return The return status.
   Status Contains(const ObjectID& object_id, bool* has_object);
 
+  /// Abort an unsealed object in the object store. If the abort succeeds, then
+  /// it will be as if the object was never created at all. The unsealed object
+  /// must have only a single reference (the one that would have been removed 
by
+  /// calling Seal).
+  ///
+  /// \param object_id The ID of the object to abort.
+  /// \return The return status.
+  Status Abort(const ObjectID& object_id);
+
   /// Seal an object in the object store. The object will be immutable after
   /// this
   /// call.
@@ -307,6 +316,16 @@ class ARROW_EXPORT PlasmaClient {
   int get_manager_fd();
 
  private:
+  /// This is a helper method for unmapping objects for which all references 
have
+  /// gone out of scope, either by calling Release or Abort.
+  ///
+  /// @param object_id The object ID whose data we should unmap.
+  Status UnmapObject(const ObjectID& object_id);
+
+  /// This is a helper method that flushes all pending release calls to the
+  /// store.
+  Status FlushReleaseHistory();
+
   Status PerformRelease(const ObjectID& object_id);
 
   uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size);
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index 23782ade5..b6d03b8a3 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -21,6 +21,8 @@ enum MessageType:int {
   // Create a new object.
   PlasmaCreateRequest = 1,
   PlasmaCreateReply,
+  PlasmaAbortRequest,
+  PlasmaAbortReply,
   // Seal an object.
   PlasmaSealRequest,
   PlasmaSealReply,
@@ -113,6 +115,16 @@ table PlasmaCreateReply {
   error: PlasmaError;
 }
 
+table PlasmaAbortRequest {
+  // ID of the object to be aborted.
+  object_id: string;
+}
+
+table PlasmaAbortReply {
+  // ID of the object that was aborted.
+  object_id: string;
+}
+
 table PlasmaSealRequest {
   // ID of the object to be sealed.
   object_id: string;
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index 2261b6a62..c0ebb88fe 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -100,6 +100,34 @@ Status ReadCreateReply(uint8_t* data, size_t size, 
ObjectID* object_id,
   return plasma_error_status(message->error());
 }
 
+Status SendAbortRequest(int sock, ObjectID object_id) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaAbortRequest(fbb, 
fbb.CreateString(object_id.binary()));
+  return PlasmaSend(sock, MessageType_PlasmaAbortRequest, &fbb, message);
+}
+
+Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaAbortRequest>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return Status::OK();
+}
+
+Status SendAbortReply(int sock, ObjectID object_id) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaAbortReply(fbb, 
fbb.CreateString(object_id.binary()));
+  return PlasmaSend(sock, MessageType_PlasmaAbortReply, &fbb, message);
+}
+
+Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaAbortReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return Status::OK();
+}
+
 // Seal messages.
 
 Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) {
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index af4b13978..e8c334f91 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -51,6 +51,14 @@ Status SendCreateReply(int sock, ObjectID object_id, 
PlasmaObject* object, int e
 Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
                        PlasmaObject* object);
 
+Status SendAbortRequest(int sock, ObjectID object_id);
+
+Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id);
+
+Status SendAbortReply(int sock, ObjectID object_id);
+
+Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id);
+
 /* Plasma Seal message functions. */
 
 Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest);
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 210cce162..5dbdebc23 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -393,6 +393,18 @@ void PlasmaStore::seal_object(const ObjectID& object_id, 
unsigned char digest[])
   update_object_get_requests(object_id);
 }
 
+void PlasmaStore::abort_object(const ObjectID& object_id) {
+  auto entry = get_object_table_entry(&store_info_, object_id);
+  ARROW_CHECK(entry != NULL) << "To abort an object it must be in the object 
table.";
+  ARROW_CHECK(entry->state != PLASMA_SEALED)
+      << "To abort an object it must not have been sealed.";
+  ARROW_CHECK(entry->clients.size() == 1)
+      << "To abort an object, the only client currently using it is the 
creator.";
+
+  dlfree(entry->pointer);
+  store_info_.objects.erase(object_id);
+}
+
 void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
   for (const auto& object_id : object_ids) {
     ARROW_LOG(DEBUG) << "deleting object " << object_id.hex();
@@ -443,7 +455,11 @@ void PlasmaStore::disconnect_client(int client_fd) {
   // If this client was using any objects, remove it from the appropriate
   // lists.
   for (const auto& entry : store_info_.objects) {
-    remove_client_from_object_clients(entry.second.get(), it->second.get());
+    if (entry.second->state == PLASMA_SEALED) {
+      remove_client_from_object_clients(entry.second.get(), it->second.get());
+    } else {
+      abort_object(entry.first);
+    }
   }
 
   // Note, the store may still attempt to send a message to the disconnected
@@ -582,6 +598,11 @@ Status PlasmaStore::process_message(Client* client) {
         warn_if_sigpipe(send_fd(client->fd, object.handle.store_fd), 
client->fd);
       }
     } break;
+    case MessageType_PlasmaAbortRequest: {
+      RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
+      abort_object(object_id);
+      HANDLE_SIGPIPE(SendAbortReply(client->fd, object_id), client->fd);
+    } break;
     case MessageType_PlasmaGetRequest: {
       std::vector<ObjectID> object_ids_to_get;
       int64_t timeout_ms;
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index d03d11f4e..0d08d8a67 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -48,6 +48,7 @@ struct Client {
 
 class PlasmaStore {
  public:
+  // TODO: PascalCase PlasmaStore methods.
   PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory,
               bool hugetlbfs_enabled);
 
@@ -73,6 +74,8 @@ class PlasmaStore {
   int create_object(const ObjectID& object_id, int64_t data_size, int64_t 
metadata_size,
                     Client* client, PlasmaObject* result);
 
+  void abort_object(const ObjectID& object_id);
+
   /// Delete objects that have been created in the hash table. This should only
   /// be called on objects that are returned by the eviction policy to evict.
   ///
diff --git a/cpp/src/plasma/test/client_tests.cc 
b/cpp/src/plasma/test/client_tests.cc
index 02b383214..0b5d0f911 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -127,6 +127,50 @@ TEST_F(TestPlasmaStore, MultipleGetTest) {
   ASSERT_EQ(object_buffer[1].data[0], 2);
 }
 
+TEST_F(TestPlasmaStore, AbortTest) {
+  ObjectID object_id = ObjectID::from_random();
+  ObjectBuffer object_buffer;
+
+  // Test for object non-existence.
+  ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
+  ASSERT_EQ(object_buffer.data_size, -1);
+
+  // Test object abort.
+  // First create object.
+  int64_t data_size = 4;
+  uint8_t metadata[] = {5};
+  int64_t metadata_size = sizeof(metadata);
+  uint8_t* data;
+  ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, 
&data));
+  // Write some data.
+  for (int64_t i = 0; i < data_size / 2; i++) {
+    data[i] = static_cast<uint8_t>(i % 4);
+  }
+  // Attempt to abort. Test that this fails before the first release.
+  Status status = client_.Abort(object_id);
+  ASSERT_TRUE(status.IsInvalid());
+  // Release, then abort.
+  ARROW_CHECK_OK(client_.Release(object_id));
+  ARROW_CHECK_OK(client_.Abort(object_id));
+
+  // Test for object non-existence after the abort.
+  ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
+  ASSERT_EQ(object_buffer.data_size, -1);
+
+  // Create the object successfully this time.
+  ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, 
&data));
+  for (int64_t i = 0; i < data_size; i++) {
+    data[i] = static_cast<uint8_t>(i % 4);
+  }
+  ARROW_CHECK_OK(client_.Seal(object_id));
+
+  // Test that we can get the object.
+  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  for (int64_t i = 0; i < data_size; i++) {
+    ASSERT_EQ(data[i], object_buffer.data[i]);
+  }
+}
+
 }  // namespace plasma
 
 int main(int argc, char** argv) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Ability to abort created but unsealed Plasma objects
> ----------------------------------------------------
>
>                 Key: ARROW-1775
>                 URL: https://issues.apache.org/jira/browse/ARROW-1775
>             Project: Apache Arrow
>          Issue Type: New Feature
>          Components: Plasma (C++)
>            Reporter: Stephanie Wang
>              Labels: pull-request-available
>
> It would be useful to allow a Plasma client to abort an object that it 
> created but hasn't yet sealed. After the abort, it should appear as if the 
> object was never created all. The logic is similar to the delete case, except 
> that the client must release the object atomically with the removal of the 
> object from the cache and store.
> In Ray, for example, we need this for the distributed version of the Plasma 
> store, where many Plasma clients transfer objects to each other. If a sending 
> Plasma client fails during a transfer, we want to make sure that the 
> receiving client can abort the transfer, so that we can later recreate the 
> object successfully. Otherwise, we will fail with an error that the object 
> already exists.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to