This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bed3bc  ARROW-2864: [Plasma] Add deletion cache to delete objects 
later when they are not in use.
4bed3bc is described below

commit 4bed3bc34f1d274a0ec1e49ca9d04f78e1612795
Author: Yuhong Guo <[email protected]>
AuthorDate: Tue Jul 24 15:58:47 2018 -0700

    ARROW-2864: [Plasma] Add deletion cache to delete objects later when they 
are not in use.
    
    1. When we send the request of deleting objects, some objects may be in 
use. We will put these objects into a cache.
    2. Delete call will flush the release history, so after this Delete call, 
there should not be release history of the to-be-deleted objects in the history 
cache.
    3. When Release is called, we will first handle the objects in the deletion 
cache without waiting. (The rest objects will waiting until the handling 
condition is triggered.)
    
    Author: Yuhong Guo <[email protected]>
    Author: Philipp Moritz <[email protected]>
    
    Closes #2273 from guoyuhong/deleteObjectInUse and squashes the following 
commits:
    
    10ee293f <Philipp Moritz> Update client_tests.cc
    af55bae8 <Yuhong Guo> Fix Lint for _plasma.pyx
    59048e8d <Yuhong Guo> Fix building failure while integrating with ray
    e000a800 <Yuhong Guo> Add deletion cache in store server.
    939c45bc <Yuhong Guo> Change comment
    34a630a5 <Yuhong Guo> Add deletion cache to delete objects when they are 
not in use later.
---
 cpp/src/plasma/client.cc            | 16 ++++++++++++++
 cpp/src/plasma/store.cc             | 19 +++++++++++++----
 cpp/src/plasma/store.h              |  2 ++
 cpp/src/plasma/test/client_tests.cc | 42 ++++++++++++++++++++++++++++++-------
 python/pyarrow/_plasma.pyx          | 17 +++++++++++++++
 5 files changed, 85 insertions(+), 11 deletions(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 2d977ec..3c30f3e 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -40,6 +40,7 @@
 #include <deque>
 #include <mutex>
 #include <unordered_map>
+#include <unordered_set>
 #include <vector>
 
 #include "arrow/buffer.h"
@@ -269,6 +270,8 @@ class PlasmaClient::Impl : public 
std::enable_shared_from_this<PlasmaClient::Imp
   /// information to make sure that it does not delay in releasing so much
   /// memory that the store is unable to evict enough objects to free up space.
   int64_t store_capacity_;
+  /// A hash set to record the ids that users want to delete but still in use.
+  std::unordered_set<ObjectID> deletion_cache_;
 
 #ifdef PLASMA_GPU
   /// Cuda Device Manager.
@@ -630,11 +633,22 @@ Status PlasmaClient::Impl::PerformRelease(const ObjectID& 
object_id) {
     // Tell the store that the client no longer needs the object.
     RETURN_NOT_OK(UnmapObject(object_id));
     RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id));
+    auto iter = deletion_cache_.find(object_id);
+    if (iter != deletion_cache_.end()) {
+      deletion_cache_.erase(object_id);
+      RETURN_NOT_OK(Delete({object_id}));
+    }
   }
   return Status::OK();
 }
 
 Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
+  // If an object is in the deletion cache, handle it directly without waiting.
+  auto iter = deletion_cache_.find(object_id);
+  if (iter != deletion_cache_.end()) {
+    RETURN_NOT_OK(PerformRelease(object_id));
+    return Status::OK();
+  }
   // If the client is already disconnected, ignore release requests.
   if (store_conn_ < 0) {
     return Status::OK();
@@ -820,6 +834,8 @@ Status PlasmaClient::Impl::Delete(const 
std::vector<ObjectID>& object_ids) {
     // If the object is in used, skip it.
     if (objects_in_use_.count(object_id) == 0) {
       not_in_use_ids.push_back(object_id);
+    } else {
+      deletion_cache_.emplace(object_id);
     }
   }
   if (not_in_use_ids.size() > 0) {
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 58ea7a4..f55f3c9 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -400,10 +400,17 @@ int 
PlasmaStore::RemoveFromClientObjectIds(ObjectTableEntry* entry, Client* clie
     // If no more clients are using this object, notify the eviction policy
     // that the object is no longer being used.
     if (entry->ref_count == 0) {
-      // Tell the eviction policy that this object is no longer being used.
-      std::vector<ObjectID> objects_to_evict;
-      eviction_policy_.EndObjectAccess(entry->object_id, &objects_to_evict);
-      DeleteObjects(objects_to_evict);
+      if (deletion_cache_.count(entry->object_id) == 0) {
+        // Tell the eviction policy that this object is no longer being used.
+        std::vector<ObjectID> objects_to_evict;
+        eviction_policy_.EndObjectAccess(entry->object_id, &objects_to_evict);
+        DeleteObjects(objects_to_evict);
+      } else {
+        // Above code does not really delete an object. Instead, it just put an
+        // object to LRU cache which will be cleaned when the memory is not 
enough.
+        deletion_cache_.erase(entry->object_id);
+        DeleteObjects({entry->object_id});
+      }
     }
     // Return 1 to indicate that the client was removed.
     return 1;
@@ -474,11 +481,15 @@ PlasmaError PlasmaStore::DeleteObject(ObjectID& 
object_id) {
 
   if (entry->state != ObjectState::PLASMA_SEALED) {
     // To delete an object it must have been sealed.
+    // Put it into deletion cache, it will be deleted later.
+    deletion_cache_.emplace(object_id);
     return PlasmaError::ObjectNotSealed;
   }
 
   if (entry->ref_count != 0) {
     // To delete an object, there must be no clients currently using it.
+    // Put it into deletion cache, it will be deleted later.
+    deletion_cache_.emplace(object_id);
     return PlasmaError::ObjectInUse;
   }
 
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 782e234..40412a8 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -209,6 +209,8 @@ class PlasmaStore {
   NotificationMap pending_notifications_;
 
   std::unordered_map<int, std::unique_ptr<Client>> connected_clients_;
+
+  std::unordered_set<ObjectID> deletion_cache_;
 #ifdef PLASMA_GPU
   arrow::gpu::CudaDeviceManager* manager_;
 #endif
diff --git a/cpp/src/plasma/test/client_tests.cc 
b/cpp/src/plasma/test/client_tests.cc
index 4d75443..a945ce6 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -177,11 +177,16 @@ TEST_F(TestPlasmaStore, DeleteTest) {
   ARROW_CHECK_OK(client_.Seal(object_id));
 
   result = client_.Delete(object_id);
-  // TODO: Guarantee that the in-use object will be deleted when it is 
released.
   ARROW_CHECK_OK(result);
+  bool has_object = false;
+  ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+  ASSERT_TRUE(has_object);
 
   // Avoid race condition of Plasma Manager waiting for notification.
   ARROW_CHECK_OK(client_.Release(object_id));
+  // object_id is marked as to-be-deleted, when it is not in use, it will be 
deleted.
+  ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+  ASSERT_FALSE(has_object);
   ARROW_CHECK_OK(client_.Delete(object_id));
 }
 
@@ -202,14 +207,37 @@ TEST_F(TestPlasmaStore, DeleteObjectsTest) {
   ARROW_CHECK_OK(client_.Seal(object_id1));
   ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, 
metadata_size, &data));
   ARROW_CHECK_OK(client_.Seal(object_id2));
-  // Objects are in use.
-  result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
-  // TODO: Guarantee that the in-use object will be deleted when it is 
released.
-  ARROW_CHECK_OK(result);
-  // Avoid race condition of Plasma Manager waiting for notification.
+  // Release the ref count of Create function.
   ARROW_CHECK_OK(client_.Release(object_id1));
   ARROW_CHECK_OK(client_.Release(object_id2));
-  ARROW_CHECK_OK(client_.Delete(std::vector<ObjectID>{object_id1, 
object_id2}));
+  // Increase the ref count by calling Get using client2_.
+  std::vector<ObjectBuffer> object_buffers;
+  ARROW_CHECK_OK(client2_.Get({object_id1, object_id2}, 0, &object_buffers));
+  // Objects are still used by client2_.
+  result = client_.Delete(std::vector<ObjectID>{object_id1, object_id2});
+  ARROW_CHECK_OK(result);
+  // The object is used and it should not be deleted right now.
+  bool has_object = false;
+  ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
+  ASSERT_TRUE(has_object);
+  ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
+  ASSERT_TRUE(has_object);
+  // Decrease the ref count by deleting the PlasmaBuffer (in ObjectBuffer).
+  // client2_ won't send the release request immediately because the trigger
+  // condition is not reached. The release is only added to release cache.
+  object_buffers.clear();
+  // The reference count went to zero, but the objects are still in the release
+  // cache.
+  ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
+  ASSERT_TRUE(has_object);
+  ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
+  ASSERT_TRUE(has_object);
+  // The Delete call will flush release cache and send the Delete request.
+  result = client2_.Delete(std::vector<ObjectID>{object_id1, object_id2});
+  ARROW_CHECK_OK(client_.Contains(object_id1, &has_object));
+  ASSERT_FALSE(has_object);
+  ARROW_CHECK_OK(client_.Contains(object_id2, &has_object));
+  ASSERT_FALSE(has_object);
 }
 
 TEST_F(TestPlasmaStore, ContainsTest) {
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index ea6a8da..b638c38 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -114,6 +114,7 @@ cdef extern from "plasma/client.h" nogil:
         CStatus Transfer(const char* addr, int port,
                          const CUniqueID& object_id)
 
+        CStatus Delete(const c_vector[CUniqueID] object_ids)
 
 cdef extern from "plasma/client.h" nogil:
 
@@ -647,6 +648,22 @@ cdef class PlasmaClient:
         with nogil:
             check_status(self.client.get().Disconnect())
 
+    def delete(self, object_ids):
+        """
+        Delete the objects with the given IDs from other object store.
+
+        Parameters
+        ----------
+        object_ids : list
+            A list of strings used to identify the objects.
+        """
+        cdef c_vector[CUniqueID] ids
+        cdef ObjectID object_id
+        for object_id in object_ids:
+            ids.push_back(object_id.data)
+        with nogil:
+            check_status(self.client.get().Delete(ids))
+
 
 def connect(store_socket_name, manager_socket_name, int release_delay,
             int num_retries=-1):

Reply via email to