This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new b916c79  ARROW-2539: [Plasma] Use unique_ptr instead of raw pointer
b916c79 is described below

commit b916c797833541299f9c3adc6e24d109359b7a6c
Author: Zhijun Fu <[email protected]>
AuthorDate: Fri May 4 12:26:29 2018 -0700

    ARROW-2539: [Plasma] Use unique_ptr instead of raw pointer
    
    use unique_ptr to replace raw pointer, so that allocated memory can be 
freed automatically
    
    Author: Zhijun Fu <[email protected]>
    
    Closes #1993 from zhijunfu/improve-code and squashes the following commits:
    
    3c69ada9 <Zhijun Fu> fix format check
    6bfebc2d <Zhijun Fu> fix lint
    b5b2fac2 <Zhijun Fu> fix build on travis-ci
    d4d64b02 <Zhijun Fu> Merge branch 'master' of 
https://github.com/zhijunfu/arrow into improve-code
    84b7e371 <Zhijun Fu>  Use unique_ptr instead of raw pointer
---
 cpp/src/plasma/client.cc |  5 ++---
 cpp/src/plasma/io.cc     |  8 ++++----
 cpp/src/plasma/io.h      |  3 ++-
 cpp/src/plasma/plasma.cc |  9 +++++----
 cpp/src/plasma/plasma.h  |  2 +-
 cpp/src/plasma/store.cc  | 32 +++++++++-----------------------
 cpp/src/plasma/store.h   |  2 +-
 7 files changed, 24 insertions(+), 37 deletions(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 2a6f183..b4ee098 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -874,11 +874,11 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {
 
 Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
                                            int64_t* data_size, int64_t* 
metadata_size) {
-  uint8_t* notification = read_message_async(fd);
+  auto notification = read_message_async(fd);
   if (notification == NULL) {
     return Status::IOError("Failed to read object notification from Plasma 
socket");
   }
-  auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
+  auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification.get());
   ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
   memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID));
   if (object_info->is_deletion()) {
@@ -888,7 +888,6 @@ Status PlasmaClient::Impl::GetNotification(int fd, 
ObjectID* object_id,
     *data_size = object_info->data_size();
     *metadata_size = object_info->metadata_size();
   }
-  delete[] notification;
   return Status::OK();
 }
 
diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc
index 2cba897..4142bf9 100644
--- a/cpp/src/plasma/io.cc
+++ b/cpp/src/plasma/io.cc
@@ -18,6 +18,7 @@
 #include "plasma/io.h"
 
 #include <cstdint>
+#include <memory>
 #include <sstream>
 
 #include "arrow/status.h"
@@ -210,7 +211,7 @@ int AcceptClient(int socket_fd) {
   return client_fd;
 }
 
-uint8_t* read_message_async(int sock) {
+std::unique_ptr<uint8_t[]> read_message_async(int sock) {
   int64_t size;
   Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(&size), 
sizeof(int64_t));
   if (!s.ok()) {
@@ -219,10 +220,9 @@ uint8_t* read_message_async(int sock) {
     close(sock);
     return NULL;
   }
-  uint8_t* message = reinterpret_cast<uint8_t*>(malloc(size));
-  s = ReadBytes(sock, message, size);
+  auto message = std::unique_ptr<uint8_t[]>(new uint8_t[size]);
+  s = ReadBytes(sock, message.get(), size);
   if (!s.ok()) {
-    free(message);
     /* The other side has closed the socket. */
     ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has 
occurred.";
     close(sock);
diff --git a/cpp/src/plasma/io.h b/cpp/src/plasma/io.h
index 4beb134..8869c9b 100644
--- a/cpp/src/plasma/io.h
+++ b/cpp/src/plasma/io.h
@@ -23,6 +23,7 @@
 #include <sys/un.h>
 #include <unistd.h>
 
+#include <memory>
 #include <string>
 #include <vector>
 
@@ -56,7 +57,7 @@ Status ConnectIpcSocketRetry(const std::string& pathname, int 
num_retries,
 
 int AcceptClient(int socket_fd);
 
-uint8_t* read_message_async(int sock);
+std::unique_ptr<uint8_t[]> read_message_async(int sock);
 
 }  // namespace plasma
 
diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc
index 0a019dd..e57049d 100644
--- a/cpp/src/plasma/plasma.cc
+++ b/cpp/src/plasma/plasma.cc
@@ -51,13 +51,14 @@ int warn_if_sigpipe(int status, int client_sock) {
  * @return The object info buffer. It is the caller's responsibility to free
  *         this buffer with "delete" after it has been used.
  */
-uint8_t* create_object_info_buffer(ObjectInfoT* object_info) {
+std::unique_ptr<uint8_t[]> create_object_info_buffer(ObjectInfoT* object_info) 
{
   flatbuffers::FlatBufferBuilder fbb;
   auto message = CreateObjectInfo(fbb, object_info);
   fbb.Finish(message);
-  uint8_t* notification = new uint8_t[sizeof(int64_t) + fbb.GetSize()];
-  *(reinterpret_cast<int64_t*>(notification)) = fbb.GetSize();
-  memcpy(notification + sizeof(int64_t), fbb.GetBufferPointer(), 
fbb.GetSize());
+  auto notification =
+      std::unique_ptr<uint8_t[]>(new uint8_t[sizeof(int64_t) + fbb.GetSize()]);
+  *(reinterpret_cast<int64_t*>(notification.get())) = fbb.GetSize();
+  memcpy(notification.get() + sizeof(int64_t), fbb.GetBufferPointer(), 
fbb.GetSize());
   return notification;
 }
 
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index cfaa927..4b4064c 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -179,7 +179,7 @@ ObjectTableEntry* get_object_table_entry(PlasmaStoreInfo* 
store_info,
 /// @return The errno set.
 int warn_if_sigpipe(int status, int client_sock);
 
-uint8_t* create_object_info_buffer(ObjectInfoT* object_info);
+std::unique_ptr<uint8_t[]> create_object_info_buffer(ObjectInfoT* object_info);
 
 }  // namespace plasma
 
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 5e7b452..2caa3cb 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -118,17 +118,7 @@ PlasmaStore::PlasmaStore(EventLoop* loop, int64_t 
system_memory, std::string dir
 }
 
 // TODO(pcm): Get rid of this destructor by using RAII to clean up data.
-PlasmaStore::~PlasmaStore() {
-  for (const auto& element : pending_notifications_) {
-    auto object_notifications = element.second.object_notifications;
-    for (size_t i = 0; i < object_notifications.size(); ++i) {
-      uint8_t* notification = 
reinterpret_cast<uint8_t*>(object_notifications.at(i));
-      uint8_t* data = notification;
-      // TODO(pcm): Get rid of this delete.
-      delete[] data;
-    }
-  }
-}
+PlasmaStore::~PlasmaStore() {}
 
 const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return 
&store_info_; }
 
@@ -322,11 +312,11 @@ void PlasmaStore::return_from_get(GetRequest* get_req) {
 }
 
 void PlasmaStore::update_object_get_requests(const ObjectID& object_id) {
-  std::vector<GetRequest*>& get_requests = object_get_requests_[object_id];
+  auto& get_requests = object_get_requests_[object_id];
   size_t index = 0;
   size_t num_requests = get_requests.size();
   for (size_t i = 0; i < num_requests; ++i) {
-    GetRequest* get_req = get_requests[index];
+    auto get_req = get_requests[index];
     auto entry = get_object_table_entry(&store_info_, object_id);
     ARROW_CHECK(entry != NULL);
 
@@ -356,7 +346,7 @@ void PlasmaStore::process_get_request(Client* client,
                                       const std::vector<ObjectID>& object_ids,
                                       int64_t timeout_ms) {
   // Create a get request for this object.
-  GetRequest* get_req = new GetRequest(client, object_ids);
+  auto get_req = new GetRequest(client, object_ids);
 
   for (auto object_id : object_ids) {
     // Check if this object is already present locally. If so, record that the
@@ -582,13 +572,12 @@ void PlasmaStore::send_notifications(int client_fd) {
   // Loop over the array of pending notifications and send as many of them as
   // possible.
   for (size_t i = 0; i < it->second.object_notifications.size(); ++i) {
-    uint8_t* notification =
-        reinterpret_cast<uint8_t*>(it->second.object_notifications.at(i));
+    auto& notification = it->second.object_notifications.at(i);
     // Decode the length, which is the first bytes of the message.
-    int64_t size = *(reinterpret_cast<int64_t*>(notification));
+    int64_t size = *(reinterpret_cast<int64_t*>(notification.get()));
 
     // Attempt to send a notification about this object ID.
-    ssize_t nbytes = send(client_fd, notification, sizeof(int64_t) + size, 0);
+    ssize_t nbytes = send(client_fd, notification.get(), sizeof(int64_t) + 
size, 0);
     if (nbytes >= 0) {
       ARROW_CHECK(nbytes == static_cast<ssize_t>(sizeof(int64_t)) + size);
     } else if (nbytes == -1 &&
@@ -613,9 +602,6 @@ void PlasmaStore::send_notifications(int client_fd) {
       }
     }
     num_processed += 1;
-    // The corresponding malloc happened in create_object_info_buffer
-    // within push_notification.
-    delete[] notification;
   }
   // Remove the sent notifications from the array.
   it->second.object_notifications.erase(
@@ -636,8 +622,8 @@ void PlasmaStore::send_notifications(int client_fd) {
 
 void PlasmaStore::push_notification(ObjectInfoT* object_info) {
   for (auto& element : pending_notifications_) {
-    uint8_t* notification = create_object_info_buffer(object_info);
-    element.second.object_notifications.push_back(notification);
+    auto notification = create_object_info_buffer(object_info);
+    element.second.object_notifications.emplace_back(std::move(notification));
     send_notifications(element.first);
     // The notification gets freed in send_notifications when the notification
     // is sent over the socket.
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index d97cdf7..ac6b2c4 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -37,7 +37,7 @@ struct GetRequest;
 struct NotificationQueue {
   /// The object notifications for clients. We notify the client about the
   /// objects in the order that the objects were sealed or deleted.
-  std::deque<uint8_t*> object_notifications;
+  std::deque<std::unique_ptr<uint8_t[]>> object_notifications;
 };
 
 /// Contains all information that is associated with a Plasma store client.

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to