[
https://issues.apache.org/jira/browse/ARROW-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16247579#comment-16247579
]
ASF GitHub Bot commented on ARROW-1788:
---------------------------------------
wesm closed pull request #1299: ARROW-1788 Fix Plasma store abort bug on client
disconnection
URL: https://github.com/apache/arrow/pull/1299
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 5dbdebc23..31033ccbb 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -393,16 +393,22 @@ void PlasmaStore::seal_object(const ObjectID& object_id,
unsigned char digest[])
update_object_get_requests(object_id);
}
-void PlasmaStore::abort_object(const ObjectID& object_id) {
+int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) {
auto entry = get_object_table_entry(&store_info_, object_id);
ARROW_CHECK(entry != NULL) << "To abort an object it must be in the object
table.";
ARROW_CHECK(entry->state != PLASMA_SEALED)
<< "To abort an object it must not have been sealed.";
- ARROW_CHECK(entry->clients.size() == 1)
- << "To abort an object, the only client currently using it is the
creator.";
-
- dlfree(entry->pointer);
- store_info_.objects.erase(object_id);
+ auto it = entry->clients.find(client);
+ if (it == entry->clients.end()) {
+ // If the client requesting the abort is not the creator, do not
+ // perform the abort.
+ return 0;
+ } else {
+ // The client requesting the abort is the creator. Free the object.
+ dlfree(entry->pointer);
+ store_info_.objects.erase(object_id);
+ return 1;
+ }
}
void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
@@ -454,11 +460,12 @@ void PlasmaStore::disconnect_client(int client_fd) {
ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd;
// If this client was using any objects, remove it from the appropriate
// lists.
+ auto client = it->second.get();
for (const auto& entry : store_info_.objects) {
if (entry.second->state == PLASMA_SEALED) {
- remove_client_from_object_clients(entry.second.get(), it->second.get());
+ remove_client_from_object_clients(entry.second.get(), client);
} else {
- abort_object(entry.first);
+ abort_object(entry.first, client);
}
}
@@ -600,7 +607,9 @@ Status PlasmaStore::process_message(Client* client) {
} break;
case MessageType_PlasmaAbortRequest: {
RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
- abort_object(object_id);
+ ARROW_CHECK(abort_object(object_id, client) == 1) << "To abort an
object, the only "
+ "client currently
using it "
+ "must be the
creator.";
HANDLE_SIGPIPE(SendAbortReply(client->fd, object_id), client->fd);
} break;
case MessageType_PlasmaGetRequest: {
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 0d08d8a67..a72c6259a 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -74,7 +74,14 @@ class PlasmaStore {
int create_object(const ObjectID& object_id, int64_t data_size, int64_t
metadata_size,
Client* client, PlasmaObject* result);
- void abort_object(const ObjectID& object_id);
+ /// Abort a created but unsealed object. If the client is not the
+ /// creator, then the abort will fail.
+ ///
+ /// @param object_id Object ID of the object to be aborted.
+ /// @param client The client who created the object. If this does not
+ /// match the creator of the object, then the abort will fail.
+ /// @return 1 if the abort succeeds, else 0.
+ int abort_object(const ObjectID& object_id, Client* client);
/// Delete objects that have been created in the hash table. This should only
/// be called on objects that are returned by the eviction policy to evict.
diff --git a/cpp/src/plasma/test/client_tests.cc
b/cpp/src/plasma/test/client_tests.cc
index 5c0cee4c0..d4285f898 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -45,14 +45,17 @@ class TestPlasmaStore : public ::testing::Test {
"/plasma_store -m 1000000000 -s /tmp/store 1> /dev/null 2> /dev/null
&";
system(plasma_command.c_str());
ARROW_CHECK_OK(client_.Connect("/tmp/store", "",
PLASMA_DEFAULT_RELEASE_DELAY));
+ ARROW_CHECK_OK(client2_.Connect("/tmp/store", "",
PLASMA_DEFAULT_RELEASE_DELAY));
}
virtual void Finish() {
ARROW_CHECK_OK(client_.Disconnect());
+ ARROW_CHECK_OK(client2_.Disconnect());
system("killall plasma_store &");
}
protected:
PlasmaClient client_;
+ PlasmaClient client2_;
};
TEST_F(TestPlasmaStore, ContainsTest) {
@@ -171,6 +174,41 @@ TEST_F(TestPlasmaStore, AbortTest) {
}
}
+TEST_F(TestPlasmaStore, MultipleClientTest) {
+ ObjectID object_id = ObjectID::from_random();
+
+ // Test for object non-existence on the first client.
+ bool has_object;
+ ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+ ASSERT_EQ(has_object, false);
+
+ // Test for the object being in local Plasma store.
+ // First create and seal object on the second client.
+ int64_t data_size = 100;
+ uint8_t metadata[] = {5};
+ int64_t metadata_size = sizeof(metadata);
+ uint8_t* data;
+ ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata,
metadata_size, &data));
+ ARROW_CHECK_OK(client2_.Seal(object_id));
+ // Test that the first client can get the object.
+ ObjectBuffer object_buffer;
+ ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+ ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+ ASSERT_EQ(has_object, true);
+
+ // Test that one client disconnecting does not interfere with the other.
+ // First create object on the second client.
+ object_id = ObjectID::from_random();
+ ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata,
metadata_size, &data));
+ // Disconnect the first client.
+ ARROW_CHECK_OK(client_.Disconnect());
+ // Test that the second client can seal and get the created object.
+ ARROW_CHECK_OK(client2_.Seal(object_id));
+ ARROW_CHECK_OK(client2_.Get(&object_id, 1, -1, &object_buffer));
+ ARROW_CHECK_OK(client2_.Contains(object_id, &has_object));
+ ASSERT_EQ(has_object, true);
+}
+
} // namespace plasma
int main(int argc, char** argv) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Plasma store crashes when trying to abort objects for disconnected client
> -------------------------------------------------------------------------
>
> Key: ARROW-1788
> URL: https://issues.apache.org/jira/browse/ARROW-1788
> Project: Apache Arrow
> Issue Type: Bug
> Components: Plasma (C++)
> Affects Versions: 0.8.0
> Reporter: Stephanie Wang
> Labels: pull-request-available
> Fix For: 0.8.0
>
>
> ARROW-1775 introduces the ability to abort objects in the Plasma store. When
> a client disconnects, the store iterates through all of the objects and
> either removes the client from the list of object users or aborts the object,
> depending on whether the object was sealed or not.
> Before aborting the object, the store does not first check whether the client
> was the one who created the object. Right now, if a different client created
> the object, the store will crash when aborting it.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)