This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 58ee81bcf8d1da1a10918deedf2a14ae9104bed6 Author: Alexey Serbin <[email protected]> AuthorDate: Thu Dec 9 21:17:41 2021 -0800 [rpc] style cleanup on transfer.{cc,h} Since I was taking a look into the code of tranfer.{cc,h}, I also took the liberty to update the code to be in-line with the style guide and did other minor updates: * added PREDICT_FALSE() into the hot path where appropriate * removed declared but not defined ProcessInboundHeader() method * corrected the return type of OutboundTransfer::TotalLength() to be more future-proof * other unsorted minor updates Change-Id: Ib0efc5d815fd44b94737a999fee02a0413be62f1 Reviewed-on: http://gerrit.cloudera.org:8080/18082 Tested-by: Alexey Serbin <[email protected]> Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/rpc/connection.cc | 2 +- src/kudu/rpc/transfer.cc | 65 ++++++++++++++++++++++------------------------ src/kudu/rpc/transfer.h | 25 +++++++++--------- 3 files changed, 44 insertions(+), 48 deletions(-) diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc index e211d53..11298d7 100644 --- a/src/kudu/rpc/connection.cc +++ b/src/kudu/rpc/connection.cc @@ -805,7 +805,7 @@ Connection::ProcessOutboundTransfersResult Connection::ProcessOutboundTransfers( } last_activity_time_ = reactor_thread_->cur_time(); - Status status = transfer->SendBuffer(*socket_); + Status status = transfer->SendBuffer(socket_.get()); if (PREDICT_FALSE(!status.ok())) { LOG(WARNING) << ToString() << " send error: " << status.ToString(); reactor_thread_->DestroyConnection(this, status); diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc index 0c4104f..cdb3b5e 100644 --- a/src/kudu/rpc/transfer.cc +++ b/src/kudu/rpc/transfer.cc @@ -85,19 +85,16 @@ using strings::Substitute; } \ } while (0) -TransferCallbacks::~TransferCallbacks() -{} - InboundTransfer::InboundTransfer() - : total_length_(0), - cur_offset_(0) { + : total_length_(0), + cur_offset_(0) { buf_.resize(kMsgLengthPrefixLength); } InboundTransfer::InboundTransfer(faststring initial_buf) - : buf_(std::move(initial_buf)), - total_length_(0), - cur_offset_(buf_.size()) { + : buf_(std::move(initial_buf)), + total_length_(0), + cur_offset_(buf_.size()) { buf_.resize(std::max<size_t>(kMsgLengthPrefixLength, buf_.size())); } @@ -129,14 +126,14 @@ Status InboundTransfer::ReceiveBuffer(Socket* socket, faststring* extra_4) { // The length prefix doesn't include its own 4 bytes, so we have to // add that back in. total_length_ = NetworkByteOrder::Load32(&buf_[0]) + kMsgLengthPrefixLength; - if (total_length_ > FLAGS_rpc_max_message_size) { + if (PREDICT_FALSE(total_length_ > FLAGS_rpc_max_message_size)) { return Status::NetworkError(Substitute( "RPC frame had a length of $0, but we only support messages up to $1 bytes " "long.", total_length_, FLAGS_rpc_max_message_size)); } - if (total_length_ <= kMsgLengthPrefixLength) { - return Status::NetworkError(Substitute("RPC frame had invalid length of $0", - total_length_)); + if (PREDICT_FALSE(total_length_ <= kMsgLengthPrefixLength)) { + return Status::NetworkError( + Substitute("RPC frame had invalid length of $0", total_length_)); } buf_.resize(total_length_ + kExtraReadLength); @@ -144,15 +141,15 @@ Status InboundTransfer::ReceiveBuffer(Socket* socket, faststring* extra_4) { // available on the socket. } - // receive message body - int32_t nread; - // Socket::Recv() handles at most INT_MAX at a time, so cap the remainder at // INT_MAX. The message will be split across multiple Recv() calls. // Note that this is only needed when rpc_max_message_size > INT_MAX, which is // currently only used for unit tests. int32_t rem = std::min(total_length_ - cur_offset_ + kExtraReadLength, static_cast<uint32_t>(std::numeric_limits<int32_t>::max())); + + // receive message body + int32_t nread; Status status = socket->Recv(&buf_[cur_offset_], rem, &nread); RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status); cur_offset_ += nread; @@ -186,25 +183,25 @@ string InboundTransfer::StatusAsString() const { OutboundTransfer* OutboundTransfer::CreateForCallRequest(int32_t call_id, TransferPayload payload, - TransferCallbacks *callbacks) { + TransferCallbacks* callbacks) { return new OutboundTransfer(call_id, std::move(payload), callbacks); } OutboundTransfer* OutboundTransfer::CreateForCallResponse(TransferPayload payload, - TransferCallbacks *callbacks) { + TransferCallbacks* callbacks) { return new OutboundTransfer(kInvalidCallId, std::move(payload), callbacks); } OutboundTransfer::OutboundTransfer(int32_t call_id, TransferPayload payload, - TransferCallbacks *callbacks) - : payload_slices_(std::move(payload)), - cur_slice_idx_(0), - cur_offset_in_slice_(0), - callbacks_(callbacks), - call_id_(call_id), - started_(false), - aborted_(false) { + TransferCallbacks* callbacks) + : payload_slices_(std::move(payload)), + cur_slice_idx_(0), + cur_offset_in_slice_(0), + callbacks_(callbacks), + call_id_(call_id), + started_(false), + aborted_(false) { } OutboundTransfer::~OutboundTransfer() { @@ -214,14 +211,14 @@ OutboundTransfer::~OutboundTransfer() { } } -void OutboundTransfer::Abort(const Status &status) { +void OutboundTransfer::Abort(const Status& status) { CHECK(!aborted_) << "Already aborted"; CHECK(!TransferFinished()) << "Cannot abort a finished transfer"; callbacks_->NotifyTransferAborted(status); aborted_ = true; } -Status OutboundTransfer::SendBuffer(Socket &socket) { +Status OutboundTransfer::SendBuffer(Socket* socket) { CHECK_LT(cur_slice_idx_, payload_slices_.size()); started_ = true; @@ -230,7 +227,7 @@ Status OutboundTransfer::SendBuffer(Socket &socket) { { int offset_in_slice = cur_offset_in_slice_; for (int i = 0; i < n_iovecs; i++) { - Slice &slice = payload_slices_[cur_slice_idx_ + i]; + auto& slice = payload_slices_[cur_slice_idx_ + i]; iovec[i].iov_base = slice.mutable_data() + offset_in_slice; iovec[i].iov_len = slice.size() - offset_in_slice; @@ -239,12 +236,12 @@ Status OutboundTransfer::SendBuffer(Socket &socket) { } int64_t written; - Status status = socket.Writev(iovec, n_iovecs, &written); + Status status = socket->Writev(iovec, n_iovecs, &written); RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status); // Adjust our accounting of current writer position. for (int i = cur_slice_idx_; i < payload_slices_.size(); i++) { - Slice &slice = payload_slices_[i]; + const auto& slice = payload_slices_[i]; int rem_in_slice = slice.size() - cur_offset_in_slice_; DCHECK_GE(rem_in_slice, 0); @@ -289,14 +286,14 @@ string OutboundTransfer::HexDump() const { } string ret; - for (int i = 0; i < payload_slices_.size(); i++) { - ret.append(payload_slices_[i].ToDebugString()); + for (const auto& s : payload_slices_) { + ret.append(s.ToDebugString()); } return ret; } -int32_t OutboundTransfer::TotalLength() const { - int32_t ret = 0; +size_t OutboundTransfer::TotalLength() const { + size_t ret = 0; for (const auto& s : payload_slices_) { ret += s.size(); } diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h index 91135d6..9c0a79a 100644 --- a/src/kudu/rpc/transfer.h +++ b/src/kudu/rpc/transfer.h @@ -17,6 +17,7 @@ #pragma once #include <climits> +#include <cstddef> #include <cstdint> #include <string> @@ -75,7 +76,7 @@ class InboundTransfer { // after this call returns OK), up to 4 extra bytes may have been read // from the socket and stored in 'extra_4'. In that case, any previous content of // 'extra_4' is replaced by this extra bytes. - Status ReceiveBuffer(Socket *socket, faststring* extra_4); + Status ReceiveBuffer(Socket* socket, faststring* extra_4); // Return true if any bytes have yet been sent. bool TransferStarted() const; @@ -93,8 +94,6 @@ class InboundTransfer { private: - Status ProcessInboundHeader(); - faststring buf_; // 0 indicates not yet set @@ -128,12 +127,12 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> { // Create an outbound transfer for a call request. static OutboundTransfer* CreateForCallRequest(int32_t call_id, TransferPayload payload, - TransferCallbacks *callbacks); + TransferCallbacks* callbacks); // Create an outbound transfer for a call response. // See above for details. static OutboundTransfer* CreateForCallResponse(TransferPayload payload, - TransferCallbacks *callbacks); + TransferCallbacks* callbacks); // Destruct the transfer. A transfer object should never be deallocated // before it has either (a) finished transferring, or (b) been Abort()ed. @@ -141,10 +140,10 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> { // Abort the current transfer, with the given status. // This triggers TransferCallbacks::NotifyTransferAborted. - void Abort(const Status &status); + void Abort(const Status& status); // send from our buffers into the sock - Status SendBuffer(Socket &socket); + Status SendBuffer(Socket* socket); // Return true if any bytes have yet been sent. bool TransferStarted() const; @@ -152,8 +151,8 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> { // Return true if the entire transfer has been sent. bool TransferFinished() const; - // Return the total number of bytes to be sent (including those already sent) - int32_t TotalLength() const; + // Return the total number of bytes to be sent (including those already sent). + size_t TotalLength() const; std::string HexDump() const; @@ -171,7 +170,7 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> { private: OutboundTransfer(int32_t call_id, TransferPayload payload, - TransferCallbacks *callbacks); + TransferCallbacks* callbacks); // Slices to send. TransferPayload payload_slices_; @@ -181,7 +180,7 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> { // The number of bytes in the above slice which has already been sent. int32_t cur_offset_in_slice_; - TransferCallbacks *callbacks_; + TransferCallbacks* callbacks_; // In the case of outbound calls, the associated call ID. // In the case of call responses, kInvalidCallId @@ -200,13 +199,13 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> { // Callbacks made after a transfer completes. struct TransferCallbacks { public: - virtual ~TransferCallbacks(); + virtual ~TransferCallbacks() {} // The transfer finished successfully. virtual void NotifyTransferFinished() = 0; // The transfer was aborted (e.g because the connection died or an error occurred). - virtual void NotifyTransferAborted(const Status &status) = 0; + virtual void NotifyTransferAborted(const Status& status) = 0; }; } // namespace rpc
