Repository: arrow Updated Branches: refs/heads/master 204f148bf -> 07b89bf3a
http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/util/rle-encoding.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/rle-encoding.h b/cpp/src/arrow/util/rle-encoding.h index 9ec6235..e690778 100644 --- a/cpp/src/arrow/util/rle-encoding.h +++ b/cpp/src/arrow/util/rle-encoding.h @@ -21,8 +21,8 @@ #ifndef ARROW_UTIL_RLE_ENCODING_H #define ARROW_UTIL_RLE_ENCODING_H -#include <algorithm> #include <math.h> +#include <algorithm> #include "arrow/util/bit-stream-utils.h" #include "arrow/util/bit-util.h" @@ -122,7 +122,8 @@ class RleDecoder { /// Like GetBatchWithDict but add spacing for null entries template <typename T> int GetBatchWithDictSpaced(const T* dictionary, T* values, int batch_size, - int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset); + int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset); protected: BitReader bit_reader_; @@ -289,7 +290,7 @@ inline int RleDecoder::GetBatch(T* values, int batch_size) { int repeat_batch = std::min(batch_size - values_read, static_cast<int>(repeat_count_)); std::fill(values + values_read, values + values_read + repeat_batch, - static_cast<T>(current_value_)); + static_cast<T>(current_value_)); repeat_count_ -= repeat_batch; values_read += repeat_batch; } else if (literal_count_ > 0) { @@ -318,7 +319,7 @@ inline int RleDecoder::GetBatchWithDict(const T* dictionary, T* values, int batc int repeat_batch = std::min(batch_size - values_read, static_cast<int>(repeat_count_)); std::fill(values + values_read, values + values_read + repeat_batch, - dictionary[current_value_]); + dictionary[current_value_]); repeat_count_ -= repeat_batch; values_read += repeat_batch; } else if (literal_count_ > 0) { @@ -345,8 +346,9 @@ inline int RleDecoder::GetBatchWithDict(const T* dictionary, T* values, int batc template <typename T> inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary, T* values, - int batch_size, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset) { + int batch_size, int null_count, + const uint8_t* valid_bits, + int64_t valid_bits_offset) { DCHECK_GE(bit_width_, 0); int values_read = 0; int remaining_nulls = null_count; @@ -379,8 +381,8 @@ inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary, T* values, std::fill(values + values_read, values + values_read + repeat_batch, value); values_read += repeat_batch; } else if (literal_count_ > 0) { - int literal_batch = std::min( - batch_size - values_read - remaining_nulls, static_cast<int>(literal_count_)); + int literal_batch = std::min(batch_size - values_read - remaining_nulls, + static_cast<int>(literal_count_)); // Decode the literals constexpr int kBufferSize = 1024; @@ -434,7 +436,7 @@ bool RleDecoder::NextCounts() { repeat_count_ = indicator_value >> 1; bool result = bit_reader_.GetAligned<T>(static_cast<int>(BitUtil::Ceil(bit_width_, 8)), - reinterpret_cast<T*>(¤t_value_)); + reinterpret_cast<T*>(¤t_value_)); DCHECK(result); } return true; @@ -509,8 +511,8 @@ inline void RleEncoder::FlushRepeatedRun() { // The lsb of 0 indicates this is a repeated run int32_t indicator_value = repeat_count_ << 1 | 0; result &= bit_writer_.PutVlqInt(indicator_value); - result &= bit_writer_.PutAligned( - current_value_, static_cast<int>(BitUtil::Ceil(bit_width_, 8))); + result &= bit_writer_.PutAligned(current_value_, + static_cast<int>(BitUtil::Ceil(bit_width_, 8))); DCHECK(result); num_buffered_values_ = 0; repeat_count_ = 0; @@ -552,7 +554,7 @@ inline void RleEncoder::FlushBufferedValues(bool done) { inline int RleEncoder::Flush() { if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { bool all_repeat = literal_count_ == 0 && (repeat_count_ == num_buffered_values_ || - num_buffered_values_ == 0); + num_buffered_values_ == 0); // There is something pending, figure out if it's a repeated or literal run if (repeat_count_ > 0 && all_repeat) { FlushRepeatedRun(); http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/util/sse-util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/sse-util.h b/cpp/src/arrow/util/sse-util.h index 570c405..a0ec8a2 100644 --- a/cpp/src/arrow/util/sse-util.h +++ b/cpp/src/arrow/util/sse-util.h @@ -53,8 +53,8 @@ static const int STRCMP_MODE = /// Precomputed mask values up to 16 bits. static const int SSE_BITMASK[CHARS_PER_128_BIT_REGISTER] = { - 1 << 0, 1 << 1, 1 << 2, 1 << 3, 1 << 4, 1 << 5, 1 << 6, 1 << 7, 1 << 8, 1 << 9, - 1 << 10, 1 << 11, 1 << 12, 1 << 13, 1 << 14, 1 << 15, + 1 << 0, 1 << 1, 1 << 2, 1 << 3, 1 << 4, 1 << 5, 1 << 6, 1 << 7, + 1 << 8, 1 << 9, 1 << 10, 1 << 11, 1 << 12, 1 << 13, 1 << 14, 1 << 15, }; } // namespace SSEUtil http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/util/stl.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/stl.h b/cpp/src/arrow/util/stl.h index d58689b..4b8916f 100644 --- a/cpp/src/arrow/util/stl.h +++ b/cpp/src/arrow/util/stl.h @@ -40,8 +40,8 @@ inline std::vector<T> DeleteVectorElement(const std::vector<T>& values, size_t i } template <typename T> -inline std::vector<T> AddVectorElement( - const std::vector<T>& values, size_t index, const T& new_element) { +inline std::vector<T> AddVectorElement(const std::vector<T>& values, size_t index, + const T& new_element) { DCHECK_LE(index, values.size()); std::vector<T> out; out.reserve(values.size() + 1); http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/util/string.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/string.h b/cpp/src/arrow/util/string.h index 5d9fdc8..6e70ddc 100644 --- a/cpp/src/arrow/util/string.h +++ b/cpp/src/arrow/util/string.h @@ -46,7 +46,9 @@ static inline Status ParseHexValue(const char* data, uint8_t* out) { const char* pos2 = std::lower_bound(kAsciiTable, kAsciiTable + 16, c2); // Error checking - if (*pos1 != c1 || *pos2 != c2) { return Status::Invalid("Encountered non-hex digit"); } + if (*pos1 != c1 || *pos2 != c2) { + return Status::Invalid("Encountered non-hex digit"); + } *out = static_cast<uint8_t>((pos1 - kAsciiTable) << 4 | (pos2 - kAsciiTable)); return Status::OK(); http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/client.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 62bfbec..bbbeb55 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -88,7 +88,9 @@ uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size uint8_t* result = reinterpret_cast<uint8_t*>( mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)); // TODO(pcm): Don't fail here, instead return a Status. - if (result == MAP_FAILED) { ARROW_LOG(FATAL) << "mmap failed"; } + if (result == MAP_FAILED) { + ARROW_LOG(FATAL) << "mmap failed"; + } close(fd); ClientMmapTableEntry& entry = mmap_table_[store_fd_val]; entry.pointer = result; @@ -106,8 +108,8 @@ uint8_t* PlasmaClient::lookup_mmapped_file(int store_fd_val) { return entry->second.pointer; } -void PlasmaClient::increment_object_count( - const ObjectID& object_id, PlasmaObject* object, bool is_sealed) { +void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObject* object, + bool is_sealed) { // Increment the count of the object to track the fact that it is being used. // The corresponding decrement should happen in PlasmaClient::Release. auto elem = objects_in_use_.find(object_id); @@ -142,7 +144,7 @@ void PlasmaClient::increment_object_count( } Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, - uint8_t* metadata, int64_t metadata_size, uint8_t** data) { + uint8_t* metadata, int64_t metadata_size, uint8_t** data) { ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size " << data_size << " and metadata size " << metadata_size; RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, data_size, metadata_size)); @@ -183,7 +185,7 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, } Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, - int64_t timeout_ms, ObjectBuffer* object_buffers) { + int64_t timeout_ms, ObjectBuffer* object_buffers) { // Fill out the info for the objects that are already in use locally. bool all_present = true; for (int i = 0; i < num_objects; ++i) { @@ -213,7 +215,9 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, } } - if (all_present) { return Status::OK(); } + if (all_present) { + return Status::OK(); + } // If we get here, then the objects aren't all currently in use by this // client, so we need to send a request to the plasma store. @@ -223,8 +227,8 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, std::vector<ObjectID> received_object_ids(num_objects); std::vector<PlasmaObject> object_data(num_objects); PlasmaObject* object; - RETURN_NOT_OK(ReadGetReply( - buffer.data(), received_object_ids.data(), object_data.data(), num_objects)); + RETURN_NOT_OK(ReadGetReply(buffer.data(), received_object_ids.data(), + object_data.data(), num_objects)); for (int i = 0; i < num_objects; ++i) { DCHECK(received_object_ids[i] == object_ids[i]); @@ -330,7 +334,7 @@ Status PlasmaClient::Release(const ObjectID& object_id) { // pending release calls, and there are at least some pending release calls in // the release_history list, then release some objects. while ((in_use_object_bytes_ > std::min(kL3CacheSizeBytes, store_capacity_ / 100) || - release_history_.size() > config_.release_delay) && + release_history_.size() > config_.release_delay) && release_history_.size() > 0) { // Perform a release for the object ID for the first pending release. RETURN_NOT_OK(PerformRelease(release_history_.back())); @@ -364,8 +368,9 @@ static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, uint64_t *hash = XXH64_digest(&hash_state); } -static inline bool compute_object_hash_parallel( - XXH64_state_t* hash_state, const unsigned char* data, int64_t nbytes) { +static inline bool compute_object_hash_parallel(XXH64_state_t* hash_state, + const unsigned char* data, + int64_t nbytes) { // Note that this function will likely be faster if the address of data is // aligned on a 64-byte boundary. const int num_threads = kThreadPoolSize; @@ -380,16 +385,18 @@ static inline bool compute_object_hash_parallel( // Each thread gets a "chunk" of k blocks, except the suffix thread. for (int i = 0; i < num_threads; i++) { - threadpool_[i] = std::thread(ComputeBlockHash, - reinterpret_cast<uint8_t*>(data_address) + i * chunk_size, chunk_size, - &threadhash[i]); + threadpool_[i] = std::thread( + ComputeBlockHash, reinterpret_cast<uint8_t*>(data_address) + i * chunk_size, + chunk_size, &threadhash[i]); } - ComputeBlockHash( - reinterpret_cast<uint8_t*>(right_address), suffix, &threadhash[num_threads]); + ComputeBlockHash(reinterpret_cast<uint8_t*>(right_address), suffix, + &threadhash[num_threads]); // Join the threads. for (auto& t : threadpool_) { - if (t.joinable()) { t.join(); } + if (t.joinable()) { + t.join(); + } } XXH64_update(hash_state, (unsigned char*)threadhash, sizeof(threadhash)); @@ -400,13 +407,13 @@ static uint64_t compute_object_hash(const ObjectBuffer& obj_buffer) { XXH64_state_t hash_state; XXH64_reset(&hash_state, XXH64_DEFAULT_SEED); if (obj_buffer.data_size >= kBytesInMB) { - compute_object_hash_parallel( - &hash_state, (unsigned char*)obj_buffer.data, obj_buffer.data_size); + compute_object_hash_parallel(&hash_state, (unsigned char*)obj_buffer.data, + obj_buffer.data_size); } else { XXH64_update(&hash_state, (unsigned char*)obj_buffer.data, obj_buffer.data_size); } - XXH64_update( - &hash_state, (unsigned char*)obj_buffer.metadata, obj_buffer.metadata_size); + XXH64_update(&hash_state, (unsigned char*)obj_buffer.metadata, + obj_buffer.metadata_size); return XXH64_digest(&hash_state); } @@ -483,8 +490,8 @@ Status PlasmaClient::Subscribe(int* fd) { return Status::OK(); } -Status PlasmaClient::GetNotification( - int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) { +Status PlasmaClient::GetNotification(int fd, ObjectID* object_id, int64_t* data_size, + int64_t* metadata_size) { uint8_t* notification = read_message_async(fd); if (notification == NULL) { return Status::IOError("Failed to read object notification from Plasma socket"); @@ -504,7 +511,7 @@ Status PlasmaClient::GetNotification( } Status PlasmaClient::Connect(const std::string& store_socket_name, - const std::string& manager_socket_name, int release_delay) { + const std::string& manager_socket_name, int release_delay) { store_conn_ = connect_ipc_sock_retry(store_socket_name, -1, -1); if (manager_socket_name != "") { manager_conn_ = connect_ipc_sock_retry(manager_socket_name, -1, -1); @@ -548,9 +555,7 @@ Status PlasmaClient::Fetch(int num_object_ids, const ObjectID* object_ids) { return SendFetchRequest(manager_conn_, object_ids, num_object_ids); } -int PlasmaClient::get_manager_fd() { - return manager_conn_; -} +int PlasmaClient::get_manager_fd() { return manager_conn_; } Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) { ARROW_CHECK(manager_conn_ >= 0); @@ -565,7 +570,8 @@ Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) { } Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_requests, - int num_ready_objects, int64_t timeout_ms, int* num_objects_ready) { + int num_ready_objects, int64_t timeout_ms, + int* num_objects_ready) { ARROW_CHECK(manager_conn_ >= 0); ARROW_CHECK(num_object_requests > 0); ARROW_CHECK(num_ready_objects > 0); @@ -577,7 +583,7 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req } RETURN_NOT_OK(SendWaitRequest(manager_conn_, object_requests, num_object_requests, - num_ready_objects, timeout_ms)); + num_ready_objects, timeout_ms)); std::vector<uint8_t> buffer; RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaWaitReply, &buffer)); RETURN_NOT_OK(ReadWaitReply(buffer.data(), object_requests, &num_ready_objects)); @@ -588,7 +594,9 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req int status = object_requests[i].status; switch (type) { case PLASMA_QUERY_LOCAL: - if (status == ObjectStatus_Local) { *num_objects_ready += 1; } + if (status == ObjectStatus_Local) { + *num_objects_ready += 1; + } break; case PLASMA_QUERY_ANYWHERE: if (status == ObjectStatus_Local || status == ObjectStatus_Remote) { @@ -604,4 +612,4 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req return Status::OK(); } -} // namespace plasma +} // namespace plasma http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/client.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index d9ed9f7..cc05a06 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -91,7 +91,7 @@ class ARROW_EXPORT PlasmaClient { /// and not evicted to avoid too many munmaps. /// @return The return status. Status Connect(const std::string& store_socket_name, - const std::string& manager_socket_name, int release_delay); + const std::string& manager_socket_name, int release_delay); /// Create an object in the Plasma Store. Any metadata for this object must be /// be passed in when the object is created. @@ -108,7 +108,7 @@ class ARROW_EXPORT PlasmaClient { /// @param data The address of the newly created object will be written here. /// @return The return status. Status Create(const ObjectID& object_id, int64_t data_size, uint8_t* metadata, - int64_t metadata_size, uint8_t** data); + int64_t metadata_size, uint8_t** data); /// Get some objects from the Plasma Store. This function will block until the /// objects have all been created and sealed in the Plasma Store or the @@ -126,7 +126,7 @@ class ARROW_EXPORT PlasmaClient { /// size field is -1, then the object was not retrieved. /// @return The return status. Status Get(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms, - ObjectBuffer* object_buffers); + ObjectBuffer* object_buffers); /// Tell Plasma that the client no longer needs the object. This should be /// called @@ -203,8 +203,8 @@ class ARROW_EXPORT PlasmaClient { /// @param data_size Out parameter, the data size of the object that was sealed. /// @param metadata_size Out parameter, the metadata size of the object that was sealed. /// @return The return status. - Status GetNotification( - int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size); + Status GetNotification(int fd, ObjectID* object_id, int64_t* data_size, + int64_t* metadata_size); /// Disconnect from the local plasma instance, including the local store and /// manager. @@ -271,7 +271,7 @@ class ARROW_EXPORT PlasmaClient { /// min_num_ready_objects this means that timeout expired. /// @return The return status. Status Wait(int64_t num_object_requests, ObjectRequest* object_requests, - int num_ready_objects, int64_t timeout_ms, int* num_objects_ready); + int num_ready_objects, int64_t timeout_ms, int* num_objects_ready); /// Transfer local object to a different plasma manager. /// @@ -315,8 +315,8 @@ class ARROW_EXPORT PlasmaClient { uint8_t* lookup_mmapped_file(int store_fd_val); - void increment_object_count( - const ObjectID& object_id, PlasmaObject* object, bool is_sealed); + void increment_object_count(const ObjectID& object_id, PlasmaObject* object, + bool is_sealed); /// File descriptor of the Unix domain socket that connects to the store. int store_conn_; @@ -348,6 +348,6 @@ class ARROW_EXPORT PlasmaClient { int64_t store_capacity_; }; -} // namespace plasma +} // namespace plasma #endif // PLASMA_CLIENT_H http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/common.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc index a5f530e..d7a7965 100644 --- a/cpp/src/plasma/common.cc +++ b/cpp/src/plasma/common.cc @@ -41,13 +41,9 @@ UniqueID UniqueID::from_binary(const std::string& binary) { return id; } -const uint8_t* UniqueID::data() const { - return id_; -} +const uint8_t* UniqueID::data() const { return id_; } -uint8_t* UniqueID::mutable_data() { - return id_; -} +uint8_t* UniqueID::mutable_data() { return id_; } std::string UniqueID::binary() const { return std::string(reinterpret_cast<const char*>(id_), kUniqueIDSize); @@ -87,4 +83,4 @@ Status plasma_error_status(int plasma_error) { ARROW_EXPORT int ObjectStatusLocal = ObjectStatus_Local; ARROW_EXPORT int ObjectStatusRemote = ObjectStatus_Remote; -} // namespace plasma +} // namespace plasma http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/common.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h index 6f2d4dd..2b71da6 100644 --- a/cpp/src/plasma/common.h +++ b/cpp/src/plasma/common.h @@ -95,6 +95,6 @@ enum ObjectRequestType { extern int ObjectStatusLocal; extern int ObjectStatusRemote; -} // namespace plasma +} // namespace plasma #endif // PLASMA_COMMON_H http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/events.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/events.cc b/cpp/src/plasma/events.cc index 675424d..f98ced2 100644 --- a/cpp/src/plasma/events.cc +++ b/cpp/src/plasma/events.cc @@ -21,8 +21,8 @@ namespace plasma { -void EventLoop::file_event_callback( - aeEventLoop* loop, int fd, void* context, int events) { +void EventLoop::file_event_callback(aeEventLoop* loop, int fd, void* context, + int events) { FileCallback* callback = reinterpret_cast<FileCallback*>(context); (*callback)(events); } @@ -34,12 +34,12 @@ int EventLoop::timer_event_callback(aeEventLoop* loop, TimerID timer_id, void* c constexpr int kInitialEventLoopSize = 1024; -EventLoop::EventLoop() { - loop_ = aeCreateEventLoop(kInitialEventLoopSize); -} +EventLoop::EventLoop() { loop_ = aeCreateEventLoop(kInitialEventLoopSize); } bool EventLoop::add_file_event(int fd, int events, const FileCallback& callback) { - if (file_callbacks_.find(fd) != file_callbacks_.end()) { return false; } + if (file_callbacks_.find(fd) != file_callbacks_.end()) { + return false; + } auto data = std::unique_ptr<FileCallback>(new FileCallback(callback)); void* context = reinterpret_cast<void*>(data.get()); // Try to add the file descriptor. @@ -47,7 +47,9 @@ bool EventLoop::add_file_event(int fd, int events, const FileCallback& callback) // If it cannot be added, increase the size of the event loop. if (err == AE_ERR && errno == ERANGE) { err = aeResizeSetSize(loop_, 3 * aeGetSetSize(loop_) / 2); - if (err != AE_OK) { return false; } + if (err != AE_OK) { + return false; + } err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback, context); } // In any case, test if there were errors. @@ -63,9 +65,7 @@ void EventLoop::remove_file_event(int fd) { file_callbacks_.erase(fd); } -void EventLoop::run() { - aeMain(loop_); -} +void EventLoop::run() { aeMain(loop_); } int64_t EventLoop::add_timer(int64_t timeout, const TimerCallback& callback) { auto data = std::unique_ptr<TimerCallback>(new TimerCallback(callback)); @@ -82,4 +82,4 @@ int EventLoop::remove_timer(int64_t timer_id) { return err; } -} // namespace plasma +} // namespace plasma http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/events.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/events.h b/cpp/src/plasma/events.h index b989b7f..6cb5b73 100644 --- a/cpp/src/plasma/events.h +++ b/cpp/src/plasma/events.h @@ -98,6 +98,6 @@ class EventLoop { std::unordered_map<int64_t, std::unique_ptr<TimerCallback>> timer_callbacks_; }; -} // namespace plasma +} // namespace plasma #endif // PLASMA_EVENTS http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/eviction_policy.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc index ef18e33..6c2309f 100644 --- a/cpp/src/plasma/eviction_policy.cc +++ b/cpp/src/plasma/eviction_policy.cc @@ -36,8 +36,8 @@ void LRUCache::remove(const ObjectID& key) { item_map_.erase(it); } -int64_t LRUCache::choose_objects_to_evict( - int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict) { +int64_t LRUCache::choose_objects_to_evict(int64_t num_bytes_required, + std::vector<ObjectID>* objects_to_evict) { int64_t bytes_evicted = 0; auto it = item_list_.end(); while (bytes_evicted < num_bytes_required && it != item_list_.begin()) { @@ -51,8 +51,8 @@ int64_t LRUCache::choose_objects_to_evict( EvictionPolicy::EvictionPolicy(PlasmaStoreInfo* store_info) : memory_used_(0), store_info_(store_info) {} -int64_t EvictionPolicy::choose_objects_to_evict( - int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict) { +int64_t EvictionPolicy::choose_objects_to_evict(int64_t num_bytes_required, + std::vector<ObjectID>* objects_to_evict) { int64_t bytes_evicted = cache_.choose_objects_to_evict(num_bytes_required, objects_to_evict); /* Update the LRU cache. */ @@ -69,8 +69,8 @@ void EvictionPolicy::object_created(const ObjectID& object_id) { cache_.add(object_id, entry->info.data_size + entry->info.metadata_size); } -bool EvictionPolicy::require_space( - int64_t size, std::vector<ObjectID>* objects_to_evict) { +bool EvictionPolicy::require_space(int64_t size, + std::vector<ObjectID>* objects_to_evict) { /* Check if there is enough space to create the object. */ int64_t required_space = memory_used_ + size - store_info_->memory_capacity; int64_t num_bytes_evicted; @@ -95,17 +95,17 @@ bool EvictionPolicy::require_space( return num_bytes_evicted >= required_space; } -void EvictionPolicy::begin_object_access( - const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict) { +void EvictionPolicy::begin_object_access(const ObjectID& object_id, + std::vector<ObjectID>* objects_to_evict) { /* If the object is in the LRU cache, remove it. */ cache_.remove(object_id); } -void EvictionPolicy::end_object_access( - const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict) { +void EvictionPolicy::end_object_access(const ObjectID& object_id, + std::vector<ObjectID>* objects_to_evict) { auto entry = store_info_->objects[object_id].get(); /* Add the object to the LRU cache.*/ cache_.add(object_id, entry->info.data_size + entry->info.metadata_size); } -} // namespace plasma +} // namespace plasma http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/eviction_policy.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h index c4f2183..dd1c873 100644 --- a/cpp/src/plasma/eviction_policy.h +++ b/cpp/src/plasma/eviction_policy.h @@ -42,8 +42,8 @@ class LRUCache { void remove(const ObjectID& key); - int64_t choose_objects_to_evict( - int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict); + int64_t choose_objects_to_evict(int64_t num_bytes_required, + std::vector<ObjectID>* objects_to_evict); private: /// A doubly-linked list containing the items in the cache and @@ -95,8 +95,8 @@ class EvictionPolicy { /// @param objects_to_evict The object IDs that were chosen for eviction will /// be stored into this vector. /// @return Void. - void begin_object_access( - const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict); + void begin_object_access(const ObjectID& object_id, + std::vector<ObjectID>* objects_to_evict); /// This method will be called whenever an object in the Plasma store that was /// being used is no longer being used. When this method is called, the @@ -107,8 +107,8 @@ class EvictionPolicy { /// @param objects_to_evict The object IDs that were chosen for eviction will /// be stored into this vector. /// @return Void. - void end_object_access( - const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict); + void end_object_access(const ObjectID& object_id, + std::vector<ObjectID>* objects_to_evict); /// Choose some objects to evict from the Plasma store. When this method is /// called, the eviction policy will assume that the objects chosen to be @@ -121,8 +121,8 @@ class EvictionPolicy { /// @param objects_to_evict The object IDs that were chosen for eviction will /// be stored into this vector. /// @return The total number of bytes of space chosen to be evicted. - int64_t choose_objects_to_evict( - int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict); + int64_t choose_objects_to_evict(int64_t num_bytes_required, + std::vector<ObjectID>* objects_to_evict); private: /// The amount of memory (in bytes) currently being used. @@ -133,6 +133,6 @@ class EvictionPolicy { LRUCache cache_; }; -} // namespace plasma +} // namespace plasma #endif // PLASMA_EVICTION_POLICY_H http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/io.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc index 5875ebb..e3b6b61 100644 --- a/cpp/src/plasma/io.cc +++ b/cpp/src/plasma/io.cc @@ -38,7 +38,9 @@ Status WriteBytes(int fd, uint8_t* cursor, size_t length) { * advance the cursor, and decrease the amount left to write. */ nbytes = write(fd, cursor + offset, bytesleft); if (nbytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; } + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + continue; + } return Status::IOError(std::string(strerror(errno))); } else if (nbytes == 0) { return Status::IOError("Encountered unexpected EOF"); @@ -67,7 +69,9 @@ Status ReadBytes(int fd, uint8_t* cursor, size_t length) { while (bytesleft > 0) { nbytes = read(fd, cursor + offset, bytesleft); if (nbytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; } + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + continue; + } return Status::IOError(std::string(strerror(errno))); } else if (0 == nbytes) { return Status::IOError("Encountered unexpected EOF"); @@ -83,14 +87,16 @@ Status ReadBytes(int fd, uint8_t* cursor, size_t length) { Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer) { int64_t version; RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)), - *type = DISCONNECT_CLIENT); + *type = DISCONNECT_CLIENT); ARROW_CHECK(version == PLASMA_PROTOCOL_VERSION) << "version = " << version; size_t length; RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(type), sizeof(*type)), - *type = DISCONNECT_CLIENT); + *type = DISCONNECT_CLIENT); RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&length), sizeof(length)), - *type = DISCONNECT_CLIENT); - if (length > buffer->size()) { buffer->resize(length); } + *type = DISCONNECT_CLIENT); + if (length > buffer->size()) { + buffer->resize(length); + } RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length), *type = DISCONNECT_CLIENT); return Status::OK(); } @@ -105,7 +111,7 @@ int bind_ipc_sock(const std::string& pathname, bool shall_listen) { /* Tell the system to allow the port to be reused. */ int on = 1; if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&on), - sizeof(on)) < 0) { + sizeof(on)) < 0) { ARROW_LOG(ERROR) << "setsockopt failed for pathname " << pathname; close(socket_fd); return -1; @@ -134,16 +140,22 @@ int bind_ipc_sock(const std::string& pathname, bool shall_listen) { return socket_fd; } -int connect_ipc_sock_retry( - const std::string& pathname, int num_retries, int64_t timeout) { +int connect_ipc_sock_retry(const std::string& pathname, int num_retries, + int64_t timeout) { /* Pick the default values if the user did not specify. */ - if (num_retries < 0) { num_retries = NUM_CONNECT_ATTEMPTS; } - if (timeout < 0) { timeout = CONNECT_TIMEOUT_MS; } + if (num_retries < 0) { + num_retries = NUM_CONNECT_ATTEMPTS; + } + if (timeout < 0) { + timeout = CONNECT_TIMEOUT_MS; + } int fd = -1; for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) { fd = connect_ipc_sock(pathname); - if (fd >= 0) { break; } + if (fd >= 0) { + break; + } if (num_attempts == 0) { ARROW_LOG(ERROR) << "Connection to socket failed for pathname " << pathname; } @@ -151,7 +163,9 @@ int connect_ipc_sock_retry( usleep(static_cast<int>(timeout * 1000)); } /* If we could not connect to the socket, exit. */ - if (fd == -1) { ARROW_LOG(FATAL) << "Could not connect to socket " << pathname; } + if (fd == -1) { + ARROW_LOG(FATAL) << "Could not connect to socket " << pathname; + } return fd; } http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/malloc.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index 97c9a16..77a8afe 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -69,13 +69,9 @@ std::unordered_map<void*, mmap_record> mmap_records; constexpr int GRANULARITY_MULTIPLIER = 2; -static void* pointer_advance(void* p, ptrdiff_t n) { - return (unsigned char*)p + n; -} +static void* pointer_advance(void* p, ptrdiff_t n) { return (unsigned char*)p + n; } -static void* pointer_retreat(void* p, ptrdiff_t n) { - return (unsigned char*)p - n; -} +static void* pointer_retreat(void* p, ptrdiff_t n) { return (unsigned char*)p - n; } static ptrdiff_t pointer_distance(void const* pfrom, void const* pto) { return (unsigned char const*)pto - (unsigned char const*)pfrom; @@ -87,8 +83,8 @@ int create_buffer(int64_t size) { int fd; #ifdef _WIN32 if (!CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, - (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))), (DWORD)(uint64_t)size, - NULL)) { + (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))), + (DWORD)(uint64_t)size, NULL)) { fd = -1; } #else @@ -127,7 +123,9 @@ void* fake_mmap(size_t size) { int fd = create_buffer(size); ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap"; void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - if (pointer == MAP_FAILED) { return pointer; } + if (pointer == MAP_FAILED) { + return pointer; + } /* Increase dlmalloc's allocation granularity directly. */ mparams.granularity *= GRANULARITY_MULTIPLIER; @@ -156,7 +154,9 @@ int fake_munmap(void* addr, int64_t size) { } int r = munmap(addr, size); - if (r == 0) { close(entry->second.fd); } + if (r == 0) { + close(entry->second.fd); + } mmap_records.erase(entry); return r; http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/plasma.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc index bfed500..8708281 100644 --- a/cpp/src/plasma/plasma.cc +++ b/cpp/src/plasma/plasma.cc @@ -27,7 +27,9 @@ namespace plasma { int warn_if_sigpipe(int status, int client_sock) { - if (status >= 0) { return 0; } + if (status >= 0) { + return 0; + } if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) { ARROW_LOG(WARNING) << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " "sending a message to client on fd " @@ -58,11 +60,13 @@ uint8_t* create_object_info_buffer(ObjectInfoT* object_info) { return notification; } -ObjectTableEntry* get_object_table_entry( - PlasmaStoreInfo* store_info, const ObjectID& object_id) { +ObjectTableEntry* get_object_table_entry(PlasmaStoreInfo* store_info, + const ObjectID& object_id) { auto it = store_info->objects.find(object_id); - if (it == store_info->objects.end()) { return NULL; } + if (it == store_info->objects.end()) { + return NULL; + } return it->second.get(); } -} // namespace plasma +} // namespace plasma http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/plasma.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index db8669f..d60e5a8 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -138,8 +138,8 @@ struct PlasmaStoreInfo { /// @param object_id The object_id of the entry we are looking for. /// @return The entry associated with the object_id or NULL if the object_id /// is not present. -ObjectTableEntry* get_object_table_entry( - PlasmaStoreInfo* store_info, const ObjectID& object_id); +ObjectTableEntry* get_object_table_entry(PlasmaStoreInfo* store_info, + const ObjectID& object_id); /// Print a warning if the status is less than zero. This should be used to check /// the success of messages sent to plasma clients. We print a warning instead of @@ -159,6 +159,6 @@ int warn_if_sigpipe(int status, int client_sock); uint8_t* create_object_info_buffer(ObjectInfoT* object_info); -} // namespace plasma +} // namespace plasma #endif // PLASMA_PLASMA_H http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/protocol.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc index 2998c68..19240bb 100644 --- a/cpp/src/plasma/protocol.cc +++ b/cpp/src/plasma/protocol.cc @@ -29,7 +29,7 @@ using flatbuffers::uoffset_t; flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>> to_flatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids, - int64_t num_objects) { + int64_t num_objects) { std::vector<flatbuffers::Offset<flatbuffers::String>> results; for (int64_t i = 0; i < num_objects; i++) { results.push_back(fbb->CreateString(object_ids[i].binary())); @@ -47,23 +47,23 @@ Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* buffe template <typename Message> Status PlasmaSend(int sock, int64_t message_type, flatbuffers::FlatBufferBuilder* fbb, - const Message& message) { + const Message& message) { fbb->Finish(message); return WriteMessage(sock, message_type, fbb->GetSize(), fbb->GetBufferPointer()); } // Create messages. -Status SendCreateRequest( - int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size) { +Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size, + int64_t metadata_size) { flatbuffers::FlatBufferBuilder fbb; - auto message = CreatePlasmaCreateRequest( - fbb, fbb.CreateString(object_id.binary()), data_size, metadata_size); + auto message = CreatePlasmaCreateRequest(fbb, fbb.CreateString(object_id.binary()), + data_size, metadata_size); return PlasmaSend(sock, MessageType_PlasmaCreateRequest, &fbb, message); } -Status ReadCreateRequest( - uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) { +Status ReadCreateRequest(uint8_t* data, ObjectID* object_id, int64_t* data_size, + int64_t* metadata_size) { DCHECK(data); auto message = flatbuffers::GetRoot<PlasmaCreateRequest>(data); *data_size = message->data_size(); @@ -72,14 +72,14 @@ Status ReadCreateRequest( return Status::OK(); } -Status SendCreateReply( - int sock, ObjectID object_id, PlasmaObject* object, int error_code) { +Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, + int error_code) { flatbuffers::FlatBufferBuilder fbb; PlasmaObjectSpec plasma_object(object->handle.store_fd, object->handle.mmap_size, - object->data_offset, object->data_size, object->metadata_offset, - object->metadata_size); - auto message = CreatePlasmaCreateReply( - fbb, fbb.CreateString(object_id.binary()), &plasma_object, (PlasmaError)error_code); + object->data_offset, object->data_size, + object->metadata_offset, object->metadata_size); + auto message = CreatePlasmaCreateReply(fbb, fbb.CreateString(object_id.binary()), + &plasma_object, (PlasmaError)error_code); return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message); } @@ -117,8 +117,8 @@ Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest Status SendSealReply(int sock, ObjectID object_id, int error) { flatbuffers::FlatBufferBuilder fbb; - auto message = CreatePlasmaSealReply( - fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error); + auto message = CreatePlasmaSealReply(fbb, fbb.CreateString(object_id.binary()), + (PlasmaError)error); return PlasmaSend(sock, MessageType_PlasmaSealReply, &fbb, message); } @@ -146,8 +146,8 @@ Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id) { Status SendReleaseReply(int sock, ObjectID object_id, int error) { flatbuffers::FlatBufferBuilder fbb; - auto message = CreatePlasmaReleaseReply( - fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error); + auto message = CreatePlasmaReleaseReply(fbb, fbb.CreateString(object_id.binary()), + (PlasmaError)error); return PlasmaSend(sock, MessageType_PlasmaReleaseReply, &fbb, message); } @@ -175,8 +175,8 @@ Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id) { Status SendDeleteReply(int sock, ObjectID object_id, int error) { flatbuffers::FlatBufferBuilder fbb; - auto message = CreatePlasmaDeleteReply( - fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error); + auto message = CreatePlasmaDeleteReply(fbb, fbb.CreateString(object_id.binary()), + (PlasmaError)error); return PlasmaSend(sock, MessageType_PlasmaDeleteReply, &fbb, message); } @@ -205,12 +205,12 @@ Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objec return Status::OK(); } -Status SendStatusReply( - int sock, ObjectID object_ids[], int object_status[], int64_t num_objects) { +Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[], + int64_t num_objects) { flatbuffers::FlatBufferBuilder fbb; auto message = CreatePlasmaStatusReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects), - fbb.CreateVector(object_status, num_objects)); + fbb.CreateVector(object_status, num_objects)); return PlasmaSend(sock, MessageType_PlasmaStatusReply, &fbb, message); } @@ -220,8 +220,8 @@ int64_t ReadStatusReply_num_objects(uint8_t* data) { return message->object_ids()->size(); } -Status ReadStatusReply( - uint8_t* data, ObjectID object_ids[], int object_status[], int64_t num_objects) { +Status ReadStatusReply(uint8_t* data, ObjectID object_ids[], int object_status[], + int64_t num_objects) { DCHECK(data); auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data); for (uoffset_t i = 0; i < num_objects; ++i) { @@ -271,9 +271,7 @@ Status SendConnectRequest(int sock) { return PlasmaSend(sock, MessageType_PlasmaConnectRequest, &fbb, message); } -Status ReadConnectRequest(uint8_t* data) { - return Status::OK(); -} +Status ReadConnectRequest(uint8_t* data) { return Status::OK(); } Status SendConnectReply(int sock, int64_t memory_capacity) { flatbuffers::FlatBufferBuilder fbb; @@ -318,16 +316,16 @@ Status ReadEvictReply(uint8_t* data, int64_t& num_bytes) { // Get messages. -Status SendGetRequest( - int sock, const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms) { +Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects, + int64_t timeout_ms) { flatbuffers::FlatBufferBuilder fbb; - auto message = CreatePlasmaGetRequest( - fbb, to_flatbuffer(&fbb, object_ids, num_objects), timeout_ms); + auto message = CreatePlasmaGetRequest(fbb, to_flatbuffer(&fbb, object_ids, num_objects), + timeout_ms); return PlasmaSend(sock, MessageType_PlasmaGetRequest, &fbb, message); } -Status ReadGetRequest( - uint8_t* data, std::vector<ObjectID>& object_ids, int64_t* timeout_ms) { +Status ReadGetRequest(uint8_t* data, std::vector<ObjectID>& object_ids, + int64_t* timeout_ms) { DCHECK(data); auto message = flatbuffers::GetRoot<PlasmaGetRequest>(data); for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { @@ -338,7 +336,8 @@ Status ReadGetRequest( return Status::OK(); } -Status SendGetReply(int sock, ObjectID object_ids[], +Status SendGetReply( + int sock, ObjectID object_ids[], std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects, int64_t num_objects) { flatbuffers::FlatBufferBuilder fbb; @@ -347,16 +346,17 @@ Status SendGetReply(int sock, ObjectID object_ids[], for (int i = 0; i < num_objects; ++i) { const PlasmaObject& object = plasma_objects[object_ids[i]]; objects.push_back(PlasmaObjectSpec(object.handle.store_fd, object.handle.mmap_size, - object.data_offset, object.data_size, object.metadata_offset, - object.metadata_size)); + object.data_offset, object.data_size, + object.metadata_offset, object.metadata_size)); } - auto message = CreatePlasmaGetReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects), - fbb.CreateVectorOfStructs(objects.data(), num_objects)); + auto message = + CreatePlasmaGetReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects), + fbb.CreateVectorOfStructs(objects.data(), num_objects)); return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message); } Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[], - int64_t num_objects) { + int64_t num_objects) { DCHECK(data); auto message = flatbuffers::GetRoot<PlasmaGetReply>(data); for (uoffset_t i = 0; i < num_objects; ++i) { @@ -395,23 +395,23 @@ Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids) { // Wait messages. Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests, - int num_ready_objects, int64_t timeout_ms) { + int num_ready_objects, int64_t timeout_ms) { flatbuffers::FlatBufferBuilder fbb; std::vector<flatbuffers::Offset<ObjectRequestSpec>> object_request_specs; for (int i = 0; i < num_requests; i++) { - object_request_specs.push_back(CreateObjectRequestSpec(fbb, - fbb.CreateString(object_requests[i].object_id.binary()), + object_request_specs.push_back(CreateObjectRequestSpec( + fbb, fbb.CreateString(object_requests[i].object_id.binary()), object_requests[i].type)); } - auto message = CreatePlasmaWaitRequest( - fbb, fbb.CreateVector(object_request_specs), num_ready_objects, timeout_ms); + auto message = CreatePlasmaWaitRequest(fbb, fbb.CreateVector(object_request_specs), + num_ready_objects, timeout_ms); return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message); } Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests, - int64_t* timeout_ms, int* num_ready_objects) { + int64_t* timeout_ms, int* num_ready_objects) { DCHECK(data); auto message = flatbuffers::GetRoot<PlasmaWaitRequest>(data); *num_ready_objects = message->num_ready_objects(); @@ -421,14 +421,14 @@ Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests, ObjectID object_id = ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str()); ObjectRequest object_request({object_id, message->object_requests()->Get(i)->type(), - ObjectStatus_Nonexistent}); + ObjectStatus_Nonexistent}); object_requests[object_id] = object_request; } return Status::OK(); } -Status SendWaitReply( - int sock, const ObjectRequestMap& object_requests, int num_ready_objects) { +Status SendWaitReply(int sock, const ObjectRequestMap& object_requests, + int num_ready_objects) { flatbuffers::FlatBufferBuilder fbb; std::vector<flatbuffers::Offset<ObjectReply>> object_replies; @@ -443,8 +443,8 @@ Status SendWaitReply( return PlasmaSend(sock, MessageType_PlasmaWaitReply, &fbb, message); } -Status ReadWaitReply( - uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects) { +Status ReadWaitReply(uint8_t* data, ObjectRequest object_requests[], + int* num_ready_objects) { DCHECK(data); auto message = flatbuffers::GetRoot<PlasmaWaitReply>(data); @@ -485,16 +485,16 @@ Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* return Status::OK(); } -Status SendDataReply( - int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size) { +Status SendDataReply(int sock, ObjectID object_id, int64_t object_size, + int64_t metadata_size) { flatbuffers::FlatBufferBuilder fbb; - auto message = CreatePlasmaDataReply( - fbb, fbb.CreateString(object_id.binary()), object_size, metadata_size); + auto message = CreatePlasmaDataReply(fbb, fbb.CreateString(object_id.binary()), + object_size, metadata_size); return PlasmaSend(sock, MessageType_PlasmaDataReply, &fbb, message); } -Status ReadDataReply( - uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size) { +Status ReadDataReply(uint8_t* data, ObjectID* object_id, int64_t* object_size, + int64_t* metadata_size) { DCHECK(data); auto message = flatbuffers::GetRoot<PlasmaDataReply>(data); *object_id = ObjectID::from_binary(message->object_id()->str()); @@ -503,4 +503,4 @@ Status ReadDataReply( return Status::OK(); } -} // namespace plasma +} // namespace plasma http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/protocol.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h index 835c5a0..bab08b6 100644 --- a/cpp/src/plasma/protocol.h +++ b/cpp/src/plasma/protocol.h @@ -21,8 +21,8 @@ #include <vector> #include "arrow/status.h" -#include "plasma/plasma_generated.h" #include "plasma/plasma.h" +#include "plasma/plasma_generated.h" namespace plasma { @@ -34,11 +34,11 @@ Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* buffe /* Plasma Create message functions. */ -Status SendCreateRequest( - int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size); +Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size, + int64_t metadata_size); -Status ReadCreateRequest( - uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size); +Status ReadCreateRequest(uint8_t* data, ObjectID* object_id, int64_t* data_size, + int64_t* metadata_size); Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error); @@ -56,18 +56,19 @@ Status ReadSealReply(uint8_t* data, ObjectID* object_id); /* Plasma Get message functions. */ -Status SendGetRequest( - int sock, const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms); +Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects, + int64_t timeout_ms); -Status ReadGetRequest( - uint8_t* data, std::vector<ObjectID>& object_ids, int64_t* timeout_ms); +Status ReadGetRequest(uint8_t* data, std::vector<ObjectID>& object_ids, + int64_t* timeout_ms); -Status SendGetReply(int sock, ObjectID object_ids[], +Status SendGetReply( + int sock, ObjectID object_ids[], std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects, int64_t num_objects); Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[], - int64_t num_objects); + int64_t num_objects); /* Plasma Release message functions. */ @@ -95,13 +96,13 @@ Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objec Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects); -Status SendStatusReply( - int sock, ObjectID object_ids[], int object_status[], int64_t num_objects); +Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[], + int64_t num_objects); int64_t ReadStatusReply_num_objects(uint8_t* data); -Status ReadStatusReply( - uint8_t* data, ObjectID object_ids[], int object_status[], int64_t num_objects); +Status ReadStatusReply(uint8_t* data, ObjectID object_ids[], int object_status[], + int64_t num_objects); /* Plasma Constains message functions. */ @@ -142,16 +143,16 @@ Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids); /* Plasma Wait message functions. */ Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests, - int num_ready_objects, int64_t timeout_ms); + int num_ready_objects, int64_t timeout_ms); Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests, - int64_t* timeout_ms, int* num_ready_objects); + int64_t* timeout_ms, int* num_ready_objects); -Status SendWaitReply( - int sock, const ObjectRequestMap& object_requests, int num_ready_objects); +Status SendWaitReply(int sock, const ObjectRequestMap& object_requests, + int num_ready_objects); -Status ReadWaitReply( - uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects); +Status ReadWaitReply(uint8_t* data, ObjectRequest object_requests[], + int* num_ready_objects); /* Plasma Subscribe message functions. */ @@ -163,12 +164,12 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port); -Status SendDataReply( - int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size); +Status SendDataReply(int sock, ObjectID object_id, int64_t object_size, + int64_t metadata_size); -Status ReadDataReply( - uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size); +Status ReadDataReply(uint8_t* data, ObjectID* object_id, int64_t* object_size, + int64_t* metadata_size); -} // namespace plasma +} // namespace plasma #endif /* PLASMA_PROTOCOL */ http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/store.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 8d4fb10..9ceecdc 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -49,8 +49,8 @@ #include <unordered_set> #include <vector> -#include "plasma/common_generated.h" #include "plasma/common.h" +#include "plasma/common_generated.h" #include "plasma/fling.h" #include "plasma/io.h" #include "plasma/malloc.h" @@ -89,8 +89,8 @@ GetRequest::GetRequest(Client* client, const std::vector<ObjectID>& object_ids) object_ids(object_ids.begin(), object_ids.end()), objects(object_ids.size()), num_satisfied(0) { - std::unordered_set<ObjectID, UniqueIDHasher> unique_ids( - object_ids.begin(), object_ids.end()); + std::unordered_set<ObjectID, UniqueIDHasher> unique_ids(object_ids.begin(), + object_ids.end()); num_objects_to_wait_for = unique_ids.size(); } @@ -118,7 +118,9 @@ PlasmaStore::~PlasmaStore() { // object's list of clients, otherwise do nothing. void PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client* client) { // Check if this client is already using the object. - if (entry->clients.find(client) != entry->clients.end()) { return; } + if (entry->clients.find(client) != entry->clients.end()) { + return; + } // If there are no other clients using this object, notify the eviction policy // that the object is being used. if (entry->clients.size() == 0) { @@ -133,7 +135,8 @@ void PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client* // Create a new object buffer in the hash table. int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size, - int64_t metadata_size, Client* client, PlasmaObject* result) { + int64_t metadata_size, Client* client, + PlasmaObject* result) { ARROW_LOG(DEBUG) << "creating object " << object_id.hex(); if (store_info_.objects.count(object_id) != 0) { // There is already an object with the same ID in the Plasma Store, so @@ -160,7 +163,9 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size, delete_objects(objects_to_evict); // Return an error to the client if not enough space could be freed to // create the object. - if (!success) { return PlasmaError_OutOfMemory; } + if (!success) { + return PlasmaError_OutOfMemory; + } } } while (pointer == NULL); int fd; @@ -212,7 +217,7 @@ void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) { void PlasmaStore::return_from_get(GetRequest* get_req) { // Send the get reply to the client. Status s = SendGetReply(get_req->client->fd, &get_req->object_ids[0], get_req->objects, - get_req->object_ids.size()); + get_req->object_ids.size()); warn_if_sigpipe(s.ok() ? 0 : -1, get_req->client->fd); // If we successfully sent the get reply message to the client, then also send // the file descriptors. @@ -249,10 +254,14 @@ void PlasmaStore::return_from_get(GetRequest* get_req) { auto& get_requests = object_get_requests_[object_id]; // Erase get_req from the vector. auto it = std::find(get_requests.begin(), get_requests.end(), get_req); - if (it != get_requests.end()) { get_requests.erase(it); } + if (it != get_requests.end()) { + get_requests.erase(it); + } } // Remove the get request. - if (get_req->timer != -1) { ARROW_CHECK(loop_->remove_timer(get_req->timer) == AE_OK); } + if (get_req->timer != -1) { + ARROW_CHECK(loop_->remove_timer(get_req->timer) == AE_OK); + } delete get_req; } @@ -287,8 +296,9 @@ void PlasmaStore::update_object_get_requests(const ObjectID& object_id) { object_get_requests_.erase(object_id); } -void PlasmaStore::process_get_request( - Client* client, const std::vector<ObjectID>& object_ids, int64_t timeout_ms) { +void PlasmaStore::process_get_request(Client* client, + const std::vector<ObjectID>& object_ids, + int64_t timeout_ms) { // Create a get request for this object. GetRequest* get_req = new GetRequest(client, object_ids); @@ -327,8 +337,8 @@ void PlasmaStore::process_get_request( } } -int PlasmaStore::remove_client_from_object_clients( - ObjectTableEntry* entry, Client* client) { +int PlasmaStore::remove_client_from_object_clients(ObjectTableEntry* entry, + Client* client) { auto it = entry->clients.find(client); if (it != entry->clients.end()) { entry->clients.erase(it); @@ -408,7 +418,9 @@ void PlasmaStore::connect_client(int listener_sock) { // TODO(pcm): Check return value. loop_->add_file_event(client_fd, kEventLoopRead, [this, client](int events) { Status s = process_message(client); - if (!s.ok()) { ARROW_LOG(FATAL) << "Failed to process file event: " << s; } + if (!s.ok()) { + ARROW_LOG(FATAL) << "Failed to process file event: " << s; + } }); ARROW_LOG(DEBUG) << "New connection with fd " << client_fd; } @@ -466,8 +478,9 @@ void PlasmaStore::send_notifications(int client_fd) { // at the end of the method. // TODO(pcm): Introduce status codes and check in case the file descriptor // is added twice. - loop_->add_file_event(client_fd, kEventLoopWrite, - [this, client_fd](int events) { send_notifications(client_fd); }); + loop_->add_file_event(client_fd, kEventLoopWrite, [this, client_fd](int events) { + send_notifications(client_fd); + }); break; } else { ARROW_LOG(WARNING) << "Failed to send notification to client on fd " << client_fd; @@ -482,7 +495,8 @@ void PlasmaStore::send_notifications(int client_fd) { delete[] notification; } // Remove the sent notifications from the array. - it->second.object_notifications.erase(it->second.object_notifications.begin(), + it->second.object_notifications.erase( + it->second.object_notifications.begin(), it->second.object_notifications.begin() + num_processed); // Stop sending notifications if the pipe was broken. @@ -492,7 +506,9 @@ void PlasmaStore::send_notifications(int client_fd) { } // If we have sent all notifications, remove the fd from the event loop. - if (it->second.object_notifications.empty()) { loop_->remove_file_event(client_fd); } + if (it->second.object_notifications.empty()) { + loop_->remove_file_event(client_fd); + } } void PlasmaStore::push_notification(ObjectInfoT* object_info) { @@ -550,8 +566,8 @@ Status PlasmaStore::process_message(Client* client) { RETURN_NOT_OK(ReadCreateRequest(input, &object_id, &data_size, &metadata_size)); int error_code = create_object(object_id, data_size, metadata_size, client, &object); - HANDLE_SIGPIPE( - SendCreateReply(client->fd, object_id, &object, error_code), client->fd); + HANDLE_SIGPIPE(SendCreateReply(client->fd, object_id, &object, error_code), + client->fd); if (error_code == PlasmaError_OK) { warn_if_sigpipe(send_fd(client->fd, object.handle.store_fd), client->fd); } @@ -593,8 +609,8 @@ Status PlasmaStore::process_message(Client* client) { subscribe_to_updates(client); break; case MessageType_PlasmaConnectRequest: { - HANDLE_SIGPIPE( - SendConnectReply(client->fd, store_info_.memory_capacity), client->fd); + HANDLE_SIGPIPE(SendConnectReply(client->fd, store_info_.memory_capacity), + client->fd); } break; case DISCONNECT_CLIENT: ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd; @@ -609,7 +625,9 @@ Status PlasmaStore::process_message(Client* client) { // Report "success" to valgrind. void signal_handler(int signal) { - if (signal == SIGTERM) { exit(0); } + if (signal == SIGTERM) { + exit(0); + } } void start_server(char* socket_name, int64_t system_memory) { @@ -623,11 +641,11 @@ void start_server(char* socket_name, int64_t system_memory) { ARROW_CHECK(socket >= 0); // TODO(pcm): Check return value. loop.add_file_event(socket, kEventLoopRead, - [&store, socket](int events) { store.connect_client(socket); }); + [&store, socket](int events) { store.connect_client(socket); }); loop.run(); } -} // namespace plasma +} // namespace plasma int main(int argc, char* argv[]) { signal(SIGTERM, plasma::signal_handler); http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/store.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index 27c3813..fec25c1 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -66,7 +66,7 @@ class PlasmaStore { /// cannot create the object. In this case, the client should not call /// plasma_release. int create_object(const ObjectID& object_id, int64_t data_size, int64_t metadata_size, - Client* client, PlasmaObject* result); + Client* client, PlasmaObject* result); /// Delete objects that have been created in the hash table. This should only /// be called on objects that are returned by the eviction policy to evict. @@ -87,8 +87,8 @@ class PlasmaStore { /// @param object_ids Object IDs of the objects to be gotten. /// @param timeout_ms The timeout for the get request in milliseconds. /// @return Void. - void process_get_request( - Client* client, const std::vector<ObjectID>& object_ids, int64_t timeout_ms); + void process_get_request(Client* client, const std::vector<ObjectID>& object_ids, + int64_t timeout_ms); /// Seal an object. The object is now immutable and can be accessed with get. /// @@ -168,6 +168,6 @@ class PlasmaStore { std::unordered_map<int, NotificationQueue> pending_notifications_; }; -} // namespace plasma +} // namespace plasma #endif // PLASMA_STORE_H http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/test/client_tests.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 6dc558e..02b3832 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -127,7 +127,7 @@ TEST_F(TestPlasmaStore, MultipleGetTest) { ASSERT_EQ(object_buffer[1].data[0], 2); } -} // namespace plasma +} // namespace plasma int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/plasma/test/serialization_tests.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc index 13938cd..aca47d3 100644 --- a/cpp/src/plasma/test/serialization_tests.cc +++ b/cpp/src/plasma/test/serialization_tests.cc @@ -167,11 +167,11 @@ TEST(PlasmaSerialization, GetReply) { ASSERT_EQ(object_ids[0], object_ids_return[0]); ASSERT_EQ(object_ids[1], object_ids_return[1]); ASSERT_EQ(memcmp(&plasma_objects[object_ids[0]], &plasma_objects_return[0], - sizeof(PlasmaObject)), - 0); + sizeof(PlasmaObject)), + 0); ASSERT_EQ(memcmp(&plasma_objects[object_ids[1]], &plasma_objects_return[1], - sizeof(PlasmaObject)), - 0); + sizeof(PlasmaObject)), + 0); close(fd); } @@ -303,15 +303,15 @@ TEST(PlasmaSerialization, WaitRequest) { const int num_ready_objects_in = 1; int64_t timeout_ms = 1000; - ARROW_CHECK_OK(SendWaitRequest( - fd, &object_requests_in[0], num_objects_in, num_ready_objects_in, timeout_ms)); + ARROW_CHECK_OK(SendWaitRequest(fd, &object_requests_in[0], num_objects_in, + num_ready_objects_in, timeout_ms)); /* Read message back. */ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaWaitRequest); int num_ready_objects_out; int64_t timeout_ms_read; ObjectRequestMap object_requests_out; - ARROW_CHECK_OK(ReadWaitRequest( - data.data(), object_requests_out, &timeout_ms_read, &num_ready_objects_out)); + ARROW_CHECK_OK(ReadWaitRequest(data.data(), object_requests_out, &timeout_ms_read, + &num_ready_objects_out)); ASSERT_EQ(num_objects_in, object_requests_out.size()); ASSERT_EQ(num_ready_objects_out, num_ready_objects_in); for (int i = 0; i < num_objects_in; i++) { @@ -389,4 +389,4 @@ TEST(PlasmaSerialization, DataReply) { ASSERT_EQ(metadata_size1, metadata_size2); } -} // namespace plasma +} // namespace plasma
