This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 8156e25 ARROW-2551: [Plasma] Improve notification logic
8156e25 is described below
commit 8156e25fcd08f237ea6c55066bd9249cb6e0f296
Author: Zhijun Fu <[email protected]>
AuthorDate: Fri Jun 8 21:01:41 2018 -0400
ARROW-2551: [Plasma] Improve notification logic
This change targets to improve a few places in current plasma notification
code:
1. When a client subscribes to Plasma, the store pushes notifications about
existing objects to ALL subscribers, while it should only push to the new
subscriber.
2. And in the above scenario, it should only push "sealed" objects to the
new subscriber, while currently it pushes all objects regardless of the state.
3. When a client disconnects, it will no longer be able to receive
notifications, thus the NotificationQueue for the client should be removed from
global map.
Author: Zhijun Fu <[email protected]>
Author: Zhijun Fu <[email protected]>
Closes #2031 from zhijunfu/refactor-notification and squashes the following
commits:
84f49357 <Zhijun Fu> Trigger
fce35f1c <Zhijun Fu> Trigger
b93ba0e1 <Zhijun Fu> Trigger
f2377f8a <Zhijun Fu> fix code with rebase
d50651ed <Zhijun Fu> Address review comments
6a7f492a <Zhijun Fu> delete object notification queue for a client when it
disconnects with plasma
57bbab32 <Zhijun Fu> plasma shouldn't push notifications to all
subscribers when a new client subscribes
---
cpp/src/plasma/store.cc | 40 ++++++++++++++++++++++++++++++++--------
cpp/src/plasma/store.h | 6 ++++++
2 files changed, 38 insertions(+), 8 deletions(-)
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 171f062..69a02dc 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -103,7 +103,7 @@ GetRequest::GetRequest(Client* client, const
std::vector<ObjectID>& object_ids)
num_objects_to_wait_for = unique_ids.size();
}
-Client::Client(int fd) : fd(fd) {}
+Client::Client(int fd) : fd(fd), notification_fd(-1) {}
PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory, std::string
directory,
bool hugepages_enabled)
@@ -559,10 +559,18 @@ void PlasmaStore::disconnect_client(int client_fd) {
remove_from_client_object_ids(entry, client);
}
- // Note, the store may still attempt to send a message to the disconnected
- // client (for example, when an object ID that the client was waiting for
- // is ready). In these cases, the attempt to send the message will fail, but
- // the store should just ignore the failure.
+ if (client->notification_fd > 0) {
+ // This client has subscribed for notifications.
+ auto notify_fd = client->notification_fd;
+ loop_->RemoveFileEvent(notify_fd);
+ // Close socket.
+ close(notify_fd);
+ // Remove notification queue for this fd from global map.
+ pending_notifications_.erase(notify_fd);
+ // Reset fd.
+ client->notification_fd = -1;
+ }
+
connected_clients_.erase(it);
}
@@ -642,9 +650,23 @@ void PlasmaStore::push_notification(ObjectInfoT*
object_info) {
}
}
+void PlasmaStore::push_notification(ObjectInfoT* object_info, int client_fd) {
+ auto it = pending_notifications_.find(client_fd);
+ if (it != pending_notifications_.end()) {
+ auto notification = create_object_info_buffer(object_info);
+ it->second.object_notifications.emplace_back(std::move(notification));
+ send_notifications(it);
+ }
+}
+
// Subscribe to notifications about sealed objects.
void PlasmaStore::subscribe_to_updates(Client* client) {
ARROW_LOG(DEBUG) << "subscribing to updates on fd " << client->fd;
+ if (client->notification_fd > 0) {
+ // This client has already subscribed. Return.
+ return;
+ }
+
// TODO(rkn): The store could block here if the client doesn't send a file
// descriptor.
int fd = recv_fd(client->fd);
@@ -657,12 +679,14 @@ void PlasmaStore::subscribe_to_updates(Client* client) {
// Add this fd to global map, which is needed for this client to receive
notifications.
pending_notifications_[fd];
+ client->notification_fd = fd;
- // Push notifications to the new subscriber about existing objects.
+ // Push notifications to the new subscriber about existing sealed objects.
for (const auto& entry : store_info_.objects) {
- push_notification(&entry.second->info);
+ if (entry.second->state == PLASMA_SEALED) {
+ push_notification(&entry.second->info, fd);
+ }
}
- send_notifications(pending_notifications_.find(fd));
}
Status PlasmaStore::process_message(Client* client) {
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 9b3850b..e5ef917 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -50,6 +50,10 @@ struct Client {
/// Object ids that are used by this client.
std::unordered_set<ObjectID> object_ids;
+
+ /// The file descriptor used to push notifications to client. This is only
valid
+ /// if client subscribes to plasma store. -1 indicates invalid.
+ int notification_fd;
};
class PlasmaStore {
@@ -170,6 +174,8 @@ class PlasmaStore {
private:
void push_notification(ObjectInfoT* object_notification);
+ void push_notification(ObjectInfoT* object_notification, int client_fd);
+
void add_to_client_object_ids(ObjectTableEntry* entry, Client* client);
void return_from_get(GetRequest* get_req);
--
To stop receiving notification emails like this one, please contact
[email protected].