This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new b916c79 ARROW-2539: [Plasma] Use unique_ptr instead of raw pointer
b916c79 is described below
commit b916c797833541299f9c3adc6e24d109359b7a6c
Author: Zhijun Fu <[email protected]>
AuthorDate: Fri May 4 12:26:29 2018 -0700
ARROW-2539: [Plasma] Use unique_ptr instead of raw pointer
use unique_ptr to replace raw pointer, so that allocated memory can be
freed automatically
Author: Zhijun Fu <[email protected]>
Closes #1993 from zhijunfu/improve-code and squashes the following commits:
3c69ada9 <Zhijun Fu> fix format check
6bfebc2d <Zhijun Fu> fix lint
b5b2fac2 <Zhijun Fu> fix build on travis-ci
d4d64b02 <Zhijun Fu> Merge branch 'master' of
https://github.com/zhijunfu/arrow into improve-code
84b7e371 <Zhijun Fu> Use unique_ptr instead of raw pointer
---
cpp/src/plasma/client.cc | 5 ++---
cpp/src/plasma/io.cc | 8 ++++----
cpp/src/plasma/io.h | 3 ++-
cpp/src/plasma/plasma.cc | 9 +++++----
cpp/src/plasma/plasma.h | 2 +-
cpp/src/plasma/store.cc | 32 +++++++++-----------------------
cpp/src/plasma/store.h | 2 +-
7 files changed, 24 insertions(+), 37 deletions(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 2a6f183..b4ee098 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -874,11 +874,11 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {
Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
int64_t* data_size, int64_t*
metadata_size) {
- uint8_t* notification = read_message_async(fd);
+ auto notification = read_message_async(fd);
if (notification == NULL) {
return Status::IOError("Failed to read object notification from Plasma
socket");
}
- auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
+ auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification.get());
ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID));
if (object_info->is_deletion()) {
@@ -888,7 +888,6 @@ Status PlasmaClient::Impl::GetNotification(int fd,
ObjectID* object_id,
*data_size = object_info->data_size();
*metadata_size = object_info->metadata_size();
}
- delete[] notification;
return Status::OK();
}
diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc
index 2cba897..4142bf9 100644
--- a/cpp/src/plasma/io.cc
+++ b/cpp/src/plasma/io.cc
@@ -18,6 +18,7 @@
#include "plasma/io.h"
#include <cstdint>
+#include <memory>
#include <sstream>
#include "arrow/status.h"
@@ -210,7 +211,7 @@ int AcceptClient(int socket_fd) {
return client_fd;
}
-uint8_t* read_message_async(int sock) {
+std::unique_ptr<uint8_t[]> read_message_async(int sock) {
int64_t size;
Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(&size),
sizeof(int64_t));
if (!s.ok()) {
@@ -219,10 +220,9 @@ uint8_t* read_message_async(int sock) {
close(sock);
return NULL;
}
- uint8_t* message = reinterpret_cast<uint8_t*>(malloc(size));
- s = ReadBytes(sock, message, size);
+ auto message = std::unique_ptr<uint8_t[]>(new uint8_t[size]);
+ s = ReadBytes(sock, message.get(), size);
if (!s.ok()) {
- free(message);
/* The other side has closed the socket. */
ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has
occurred.";
close(sock);
diff --git a/cpp/src/plasma/io.h b/cpp/src/plasma/io.h
index 4beb134..8869c9b 100644
--- a/cpp/src/plasma/io.h
+++ b/cpp/src/plasma/io.h
@@ -23,6 +23,7 @@
#include <sys/un.h>
#include <unistd.h>
+#include <memory>
#include <string>
#include <vector>
@@ -56,7 +57,7 @@ Status ConnectIpcSocketRetry(const std::string& pathname, int
num_retries,
int AcceptClient(int socket_fd);
-uint8_t* read_message_async(int sock);
+std::unique_ptr<uint8_t[]> read_message_async(int sock);
} // namespace plasma
diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc
index 0a019dd..e57049d 100644
--- a/cpp/src/plasma/plasma.cc
+++ b/cpp/src/plasma/plasma.cc
@@ -51,13 +51,14 @@ int warn_if_sigpipe(int status, int client_sock) {
* @return The object info buffer. It is the caller's responsibility to free
* this buffer with "delete" after it has been used.
*/
-uint8_t* create_object_info_buffer(ObjectInfoT* object_info) {
+std::unique_ptr<uint8_t[]> create_object_info_buffer(ObjectInfoT* object_info)
{
flatbuffers::FlatBufferBuilder fbb;
auto message = CreateObjectInfo(fbb, object_info);
fbb.Finish(message);
- uint8_t* notification = new uint8_t[sizeof(int64_t) + fbb.GetSize()];
- *(reinterpret_cast<int64_t*>(notification)) = fbb.GetSize();
- memcpy(notification + sizeof(int64_t), fbb.GetBufferPointer(),
fbb.GetSize());
+ auto notification =
+ std::unique_ptr<uint8_t[]>(new uint8_t[sizeof(int64_t) + fbb.GetSize()]);
+ *(reinterpret_cast<int64_t*>(notification.get())) = fbb.GetSize();
+ memcpy(notification.get() + sizeof(int64_t), fbb.GetBufferPointer(),
fbb.GetSize());
return notification;
}
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index cfaa927..4b4064c 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -179,7 +179,7 @@ ObjectTableEntry* get_object_table_entry(PlasmaStoreInfo*
store_info,
/// @return The errno set.
int warn_if_sigpipe(int status, int client_sock);
-uint8_t* create_object_info_buffer(ObjectInfoT* object_info);
+std::unique_ptr<uint8_t[]> create_object_info_buffer(ObjectInfoT* object_info);
} // namespace plasma
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 5e7b452..2caa3cb 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -118,17 +118,7 @@ PlasmaStore::PlasmaStore(EventLoop* loop, int64_t
system_memory, std::string dir
}
// TODO(pcm): Get rid of this destructor by using RAII to clean up data.
-PlasmaStore::~PlasmaStore() {
- for (const auto& element : pending_notifications_) {
- auto object_notifications = element.second.object_notifications;
- for (size_t i = 0; i < object_notifications.size(); ++i) {
- uint8_t* notification =
reinterpret_cast<uint8_t*>(object_notifications.at(i));
- uint8_t* data = notification;
- // TODO(pcm): Get rid of this delete.
- delete[] data;
- }
- }
-}
+PlasmaStore::~PlasmaStore() {}
const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return
&store_info_; }
@@ -322,11 +312,11 @@ void PlasmaStore::return_from_get(GetRequest* get_req) {
}
void PlasmaStore::update_object_get_requests(const ObjectID& object_id) {
- std::vector<GetRequest*>& get_requests = object_get_requests_[object_id];
+ auto& get_requests = object_get_requests_[object_id];
size_t index = 0;
size_t num_requests = get_requests.size();
for (size_t i = 0; i < num_requests; ++i) {
- GetRequest* get_req = get_requests[index];
+ auto get_req = get_requests[index];
auto entry = get_object_table_entry(&store_info_, object_id);
ARROW_CHECK(entry != NULL);
@@ -356,7 +346,7 @@ void PlasmaStore::process_get_request(Client* client,
const std::vector<ObjectID>& object_ids,
int64_t timeout_ms) {
// Create a get request for this object.
- GetRequest* get_req = new GetRequest(client, object_ids);
+ auto get_req = new GetRequest(client, object_ids);
for (auto object_id : object_ids) {
// Check if this object is already present locally. If so, record that the
@@ -582,13 +572,12 @@ void PlasmaStore::send_notifications(int client_fd) {
// Loop over the array of pending notifications and send as many of them as
// possible.
for (size_t i = 0; i < it->second.object_notifications.size(); ++i) {
- uint8_t* notification =
- reinterpret_cast<uint8_t*>(it->second.object_notifications.at(i));
+ auto& notification = it->second.object_notifications.at(i);
// Decode the length, which is the first bytes of the message.
- int64_t size = *(reinterpret_cast<int64_t*>(notification));
+ int64_t size = *(reinterpret_cast<int64_t*>(notification.get()));
// Attempt to send a notification about this object ID.
- ssize_t nbytes = send(client_fd, notification, sizeof(int64_t) + size, 0);
+ ssize_t nbytes = send(client_fd, notification.get(), sizeof(int64_t) +
size, 0);
if (nbytes >= 0) {
ARROW_CHECK(nbytes == static_cast<ssize_t>(sizeof(int64_t)) + size);
} else if (nbytes == -1 &&
@@ -613,9 +602,6 @@ void PlasmaStore::send_notifications(int client_fd) {
}
}
num_processed += 1;
- // The corresponding malloc happened in create_object_info_buffer
- // within push_notification.
- delete[] notification;
}
// Remove the sent notifications from the array.
it->second.object_notifications.erase(
@@ -636,8 +622,8 @@ void PlasmaStore::send_notifications(int client_fd) {
void PlasmaStore::push_notification(ObjectInfoT* object_info) {
for (auto& element : pending_notifications_) {
- uint8_t* notification = create_object_info_buffer(object_info);
- element.second.object_notifications.push_back(notification);
+ auto notification = create_object_info_buffer(object_info);
+ element.second.object_notifications.emplace_back(std::move(notification));
send_notifications(element.first);
// The notification gets freed in send_notifications when the notification
// is sent over the socket.
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index d97cdf7..ac6b2c4 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -37,7 +37,7 @@ struct GetRequest;
struct NotificationQueue {
/// The object notifications for clients. We notify the client about the
/// objects in the order that the objects were sealed or deleted.
- std::deque<uint8_t*> object_notifications;
+ std::deque<std::unique_ptr<uint8_t[]>> object_notifications;
};
/// Contains all information that is associated with a Plasma store client.
--
To stop receiving notification emails like this one, please contact
[email protected].