[
https://issues.apache.org/jira/browse/ARROW-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321343#comment-16321343
]
ASF GitHub Bot commented on ARROW-1927:
---------------------------------------
pcmoritz closed pull request #1427: ARROW-1927: [Plasma] Add delete function
URL: https://github.com/apache/arrow/pull/1427
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 0dd1c44d7..d74c0f412 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -513,9 +513,20 @@ Status PlasmaClient::Abort(const ObjectID& object_id) {
}
Status PlasmaClient::Delete(const ObjectID& object_id) {
- // TODO(rkn): In the future, we can use this method to give hints to the
- // eviction policy about when an object will no longer be needed.
- return Status::NotImplemented("PlasmaClient::Delete is not implemented.");
+ RETURN_NOT_OK(FlushReleaseHistory());
+ // If the object is in used, client can't send the remove message.
+ if (objects_in_use_.count(object_id) > 0) {
+ return Status::UnknownError("PlasmaClient::Object is in use.");
+ } else {
+ // If we don't already have a reference to the object, we can try to
remove the object
+ RETURN_NOT_OK(SendDeleteRequest(store_conn_, object_id));
+ std::vector<uint8_t> buffer;
+ RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaDeleteReply,
&buffer));
+ ObjectID object_id2;
+ DCHECK_GT(buffer.size(), 0);
+ RETURN_NOT_OK(ReadDeleteReply(buffer.data(), buffer.size(), &object_id2));
+ return Status::OK();
+ }
}
Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 78793f1a7..35182f840 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -174,7 +174,8 @@ class ARROW_EXPORT PlasmaClient {
Status Seal(const ObjectID& object_id);
/// Delete an object from the object store. This currently assumes that the
- /// object is present and has been sealed.
+ /// object is present, has been sealed and not used by another client.
Otherwise,
+ /// it is a no operation.
///
/// @todo We may want to allow the deletion of objects that are not present
or
/// haven't been sealed.
diff --git a/cpp/src/plasma/eviction_policy.cc
b/cpp/src/plasma/eviction_policy.cc
index a7758fd2c..66a3b2ea2 100644
--- a/cpp/src/plasma/eviction_policy.cc
+++ b/cpp/src/plasma/eviction_policy.cc
@@ -102,4 +102,14 @@ void EvictionPolicy::end_object_access(const ObjectID&
object_id,
cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
}
+void EvictionPolicy::remove_object(const ObjectID& object_id) {
+ /* If the object is in the LRU cache, remove it. */
+ cache_.remove(object_id);
+
+ auto entry = store_info_->objects[object_id].get();
+ int64_t size = entry->info.data_size + entry->info.metadata_size;
+ ARROW_CHECK(memory_used_ >= size);
+ memory_used_ -= size;
+}
+
} // namespace plasma
diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h
index cebf35b1c..b07630955 100644
--- a/cpp/src/plasma/eviction_policy.h
+++ b/cpp/src/plasma/eviction_policy.h
@@ -120,6 +120,11 @@ class EvictionPolicy {
int64_t choose_objects_to_evict(int64_t num_bytes_required,
std::vector<ObjectID>* objects_to_evict);
+ /// This method will be called when an object is going to be removed
+ ///
+ /// @param object_id The ID of the object that is now being used.
+ void remove_object(const ObjectID& object_id);
+
private:
/// The amount of memory (in bytes) currently being used.
int64_t memory_used_;
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index b6d03b8a3..ea6dc8bb9 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -76,7 +76,11 @@ enum PlasmaError:int {
// Trying to access an object that doesn't exist.
ObjectNonexistent,
// Trying to create an object but there isn't enough space in the store.
- OutOfMemory
+ OutOfMemory,
+ // Trying to delete an object but it's not sealed.
+ ObjectNotSealed,
+ // Trying to delete an object but it's in use.
+ ObjectInUse
}
// Plasma store messages
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index c6a19a547..dde7f9cdf 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -411,6 +411,39 @@ int PlasmaStore::abort_object(const ObjectID& object_id,
Client* client) {
}
}
+int PlasmaStore::delete_object(ObjectID& object_id) {
+ auto entry = get_object_table_entry(&store_info_, object_id);
+ // TODO(rkn): This should probably not fail, but should instead throw an
+ // error. Maybe we should also support deleting objects that have been
+ // created but not sealed.
+ if (entry == NULL) {
+ // To delete an object it must be in the object table.
+ return PlasmaError_ObjectNonexistent;
+ }
+
+ if (entry->state != PLASMA_SEALED) {
+ // To delete an object it must have been sealed.
+ return PlasmaError_ObjectNotSealed;
+ }
+
+ if (entry->clients.size() != 0) {
+ // To delete an object, there must be no clients currently using it.
+ return PlasmaError_ObjectInUse;
+ }
+
+ eviction_policy_.remove_object(object_id);
+
+ dlfree(entry->pointer);
+ store_info_.objects.erase(object_id);
+ // Inform all subscribers that the object has been deleted.
+ ObjectInfoT notification;
+ notification.object_id = object_id.binary();
+ notification.is_deletion = true;
+ push_notification(¬ification);
+
+ return PlasmaError_OK;
+}
+
void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
for (const auto& object_id : object_ids) {
ARROW_LOG(DEBUG) << "deleting object " << object_id.hex();
@@ -626,18 +659,23 @@ Status PlasmaStore::process_message(Client* client) {
RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get,
&timeout_ms));
process_get_request(client, object_ids_to_get, timeout_ms);
} break;
- case MessageType_PlasmaReleaseRequest:
+ case MessageType_PlasmaReleaseRequest: {
RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id));
release_object(object_id, client);
- break;
- case MessageType_PlasmaContainsRequest:
+ } break;
+ case MessageType_PlasmaDeleteRequest: {
+ RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_id));
+ int error_code = delete_object(object_id);
+ HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_id, error_code),
client->fd);
+ } break;
+ case MessageType_PlasmaContainsRequest: {
RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id));
if (contains_object(object_id) == OBJECT_FOUND) {
HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 1),
client->fd);
} else {
HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0),
client->fd);
}
- break;
+ } break;
case MessageType_PlasmaSealRequest: {
unsigned char digest[kDigestSize];
RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id,
&digest[0]));
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index a72c6259a..7eada5a12 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -83,6 +83,15 @@ class PlasmaStore {
/// @return 1 if the abort succeeds, else 0.
int abort_object(const ObjectID& object_id, Client* client);
+ /// Delete an specific object by object_id that have been created in the
hash table.
+ ///
+ /// @param object_id Object ID of the object to be deleted.
+ /// @return One of the following error codes:
+ /// - PlasmaError_OK, if the object was delete successfully.
+ /// - PlasmaError_ObjectNonexistent, if ths object isn't existed.
+ /// - PlasmaError_ObjectInUse, if the object is in use.
+ int delete_object(ObjectID& object_id);
+
/// Delete objects that have been created in the hash table. This should only
/// be called on objects that are returned by the eviction policy to evict.
///
diff --git a/cpp/src/plasma/test/client_tests.cc
b/cpp/src/plasma/test/client_tests.cc
index 5cd3063bb..f19c2bfbd 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -58,6 +58,31 @@ class TestPlasmaStore : public ::testing::Test {
PlasmaClient client2_;
};
+TEST_F(TestPlasmaStore, DeleteTest) {
+ ObjectID object_id = ObjectID::from_random();
+
+ // Test for deleting non-existance object.
+ Status result = client_.Delete(object_id);
+ ASSERT_EQ(result.IsPlasmaObjectNonexistent(), true);
+
+ // Test for the object being in local Plasma store.
+ // First create object.
+ int64_t data_size = 100;
+ uint8_t metadata[] = {5};
+ int64_t metadata_size = sizeof(metadata);
+ std::shared_ptr<Buffer> data;
+ ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size,
&data));
+ ARROW_CHECK_OK(client_.Seal(object_id));
+
+ // Object is in use, can't be delete.
+ result = client_.Delete(object_id);
+ ASSERT_EQ(result.IsUnknownError(), true);
+
+ // Avoid race condition of Plasma Manager waiting for notification.
+ ARROW_CHECK_OK(client_.Release(object_id));
+ ARROW_CHECK_OK(client_.Delete(object_id));
+}
+
TEST_F(TestPlasmaStore, ContainsTest) {
ObjectID object_id = ObjectID::from_random();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> [Plasma] Implement delete function
> ----------------------------------
>
> Key: ARROW-1927
> URL: https://issues.apache.org/jira/browse/ARROW-1927
> Project: Apache Arrow
> Issue Type: Improvement
> Components: Plasma (C++), Python
> Reporter: Philipp Moritz
> Labels: pull-request-available
> Fix For: 0.9.0
>
>
> The function should check if the reference count of the object is zero and if
> yes, delete it from the store. If no, it should raise an exception or return
> a status value.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)