This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 58ee81bcf8d1da1a10918deedf2a14ae9104bed6
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Dec 9 21:17:41 2021 -0800

    [rpc] style cleanup on transfer.{cc,h}
    
    Since I was taking a look into the code of tranfer.{cc,h}, I also took
    the liberty to update the code to be in-line with the style guide and
    did other minor updates:
      * added PREDICT_FALSE() into the hot path where appropriate
      * removed declared but not defined ProcessInboundHeader() method
      * corrected the return type of OutboundTransfer::TotalLength()
        to be more future-proof
      * other unsorted minor updates
    
    Change-Id: Ib0efc5d815fd44b94737a999fee02a0413be62f1
    Reviewed-on: http://gerrit.cloudera.org:8080/18082
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Andrew Wong <[email protected]>
---
 src/kudu/rpc/connection.cc |  2 +-
 src/kudu/rpc/transfer.cc   | 65 ++++++++++++++++++++++------------------------
 src/kudu/rpc/transfer.h    | 25 +++++++++---------
 3 files changed, 44 insertions(+), 48 deletions(-)

diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index e211d53..11298d7 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -805,7 +805,7 @@ Connection::ProcessOutboundTransfersResult 
Connection::ProcessOutboundTransfers(
     }
 
     last_activity_time_ = reactor_thread_->cur_time();
-    Status status = transfer->SendBuffer(*socket_);
+    Status status = transfer->SendBuffer(socket_.get());
     if (PREDICT_FALSE(!status.ok())) {
       LOG(WARNING) << ToString() << " send error: " << status.ToString();
       reactor_thread_->DestroyConnection(this, status);
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index 0c4104f..cdb3b5e 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -85,19 +85,16 @@ using strings::Substitute;
     }                                                             \
   } while (0)
 
-TransferCallbacks::~TransferCallbacks()
-{}
-
 InboundTransfer::InboundTransfer()
-  : total_length_(0),
-    cur_offset_(0) {
+    : total_length_(0),
+      cur_offset_(0) {
   buf_.resize(kMsgLengthPrefixLength);
 }
 
 InboundTransfer::InboundTransfer(faststring initial_buf)
-  : buf_(std::move(initial_buf)),
-    total_length_(0),
-    cur_offset_(buf_.size()) {
+    : buf_(std::move(initial_buf)),
+      total_length_(0),
+      cur_offset_(buf_.size()) {
   buf_.resize(std::max<size_t>(kMsgLengthPrefixLength, buf_.size()));
 }
 
@@ -129,14 +126,14 @@ Status InboundTransfer::ReceiveBuffer(Socket* socket, 
faststring* extra_4) {
     // The length prefix doesn't include its own 4 bytes, so we have to
     // add that back in.
     total_length_ = NetworkByteOrder::Load32(&buf_[0]) + 
kMsgLengthPrefixLength;
-    if (total_length_ > FLAGS_rpc_max_message_size) {
+    if (PREDICT_FALSE(total_length_ > FLAGS_rpc_max_message_size)) {
       return Status::NetworkError(Substitute(
           "RPC frame had a length of $0, but we only support messages up to $1 
bytes "
           "long.", total_length_, FLAGS_rpc_max_message_size));
     }
-    if (total_length_ <= kMsgLengthPrefixLength) {
-      return Status::NetworkError(Substitute("RPC frame had invalid length of 
$0",
-                                             total_length_));
+    if (PREDICT_FALSE(total_length_ <= kMsgLengthPrefixLength)) {
+      return Status::NetworkError(
+          Substitute("RPC frame had invalid length of $0", total_length_));
     }
     buf_.resize(total_length_ + kExtraReadLength);
 
@@ -144,15 +141,15 @@ Status InboundTransfer::ReceiveBuffer(Socket* socket, 
faststring* extra_4) {
     // available on the socket.
   }
 
-  // receive message body
-  int32_t nread;
-
   // Socket::Recv() handles at most INT_MAX at a time, so cap the remainder at
   // INT_MAX. The message will be split across multiple Recv() calls.
   // Note that this is only needed when rpc_max_message_size > INT_MAX, which 
is
   // currently only used for unit tests.
   int32_t rem = std::min(total_length_ - cur_offset_ + kExtraReadLength,
       static_cast<uint32_t>(std::numeric_limits<int32_t>::max()));
+
+  // receive message body
+  int32_t nread;
   Status status = socket->Recv(&buf_[cur_offset_], rem, &nread);
   RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
   cur_offset_ += nread;
@@ -186,25 +183,25 @@ string InboundTransfer::StatusAsString() const {
 
 OutboundTransfer* OutboundTransfer::CreateForCallRequest(int32_t call_id,
                                                          TransferPayload 
payload,
-                                                         TransferCallbacks 
*callbacks) {
+                                                         TransferCallbacks* 
callbacks) {
   return new OutboundTransfer(call_id, std::move(payload), callbacks);
 }
 
 OutboundTransfer* OutboundTransfer::CreateForCallResponse(TransferPayload 
payload,
-                                                          TransferCallbacks 
*callbacks) {
+                                                          TransferCallbacks* 
callbacks) {
   return new OutboundTransfer(kInvalidCallId, std::move(payload), callbacks);
 }
 
 OutboundTransfer::OutboundTransfer(int32_t call_id,
                                    TransferPayload payload,
-                                   TransferCallbacks *callbacks)
-  : payload_slices_(std::move(payload)),
-    cur_slice_idx_(0),
-    cur_offset_in_slice_(0),
-    callbacks_(callbacks),
-    call_id_(call_id),
-    started_(false),
-    aborted_(false) {
+                                   TransferCallbacks* callbacks)
+    : payload_slices_(std::move(payload)),
+      cur_slice_idx_(0),
+      cur_offset_in_slice_(0),
+      callbacks_(callbacks),
+      call_id_(call_id),
+      started_(false),
+      aborted_(false) {
 }
 
 OutboundTransfer::~OutboundTransfer() {
@@ -214,14 +211,14 @@ OutboundTransfer::~OutboundTransfer() {
   }
 }
 
-void OutboundTransfer::Abort(const Status &status) {
+void OutboundTransfer::Abort(const Status& status) {
   CHECK(!aborted_) << "Already aborted";
   CHECK(!TransferFinished()) << "Cannot abort a finished transfer";
   callbacks_->NotifyTransferAborted(status);
   aborted_ = true;
 }
 
-Status OutboundTransfer::SendBuffer(Socket &socket) {
+Status OutboundTransfer::SendBuffer(Socket* socket) {
   CHECK_LT(cur_slice_idx_, payload_slices_.size());
 
   started_ = true;
@@ -230,7 +227,7 @@ Status OutboundTransfer::SendBuffer(Socket &socket) {
   {
     int offset_in_slice = cur_offset_in_slice_;
     for (int i = 0; i < n_iovecs; i++) {
-      Slice &slice = payload_slices_[cur_slice_idx_ + i];
+      auto& slice = payload_slices_[cur_slice_idx_ + i];
       iovec[i].iov_base = slice.mutable_data() + offset_in_slice;
       iovec[i].iov_len = slice.size() - offset_in_slice;
 
@@ -239,12 +236,12 @@ Status OutboundTransfer::SendBuffer(Socket &socket) {
   }
 
   int64_t written;
-  Status status = socket.Writev(iovec, n_iovecs, &written);
+  Status status = socket->Writev(iovec, n_iovecs, &written);
   RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
 
   // Adjust our accounting of current writer position.
   for (int i = cur_slice_idx_; i < payload_slices_.size(); i++) {
-    Slice &slice = payload_slices_[i];
+    const auto& slice = payload_slices_[i];
     int rem_in_slice = slice.size() - cur_offset_in_slice_;
     DCHECK_GE(rem_in_slice, 0);
 
@@ -289,14 +286,14 @@ string OutboundTransfer::HexDump() const {
   }
 
   string ret;
-  for (int i = 0; i < payload_slices_.size(); i++) {
-    ret.append(payload_slices_[i].ToDebugString());
+  for (const auto& s : payload_slices_) {
+    ret.append(s.ToDebugString());
   }
   return ret;
 }
 
-int32_t OutboundTransfer::TotalLength() const {
-  int32_t ret = 0;
+size_t OutboundTransfer::TotalLength() const {
+  size_t ret = 0;
   for (const auto& s : payload_slices_) {
     ret += s.size();
   }
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index 91135d6..9c0a79a 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <climits>
+#include <cstddef>
 #include <cstdint>
 #include <string>
 
@@ -75,7 +76,7 @@ class InboundTransfer {
   // after this call returns OK), up to 4 extra bytes may have been read
   // from the socket and stored in 'extra_4'. In that case, any previous 
content of
   // 'extra_4' is replaced by this extra bytes.
-  Status ReceiveBuffer(Socket *socket, faststring* extra_4);
+  Status ReceiveBuffer(Socket* socket, faststring* extra_4);
 
   // Return true if any bytes have yet been sent.
   bool TransferStarted() const;
@@ -93,8 +94,6 @@ class InboundTransfer {
 
  private:
 
-  Status ProcessInboundHeader();
-
   faststring buf_;
 
   // 0 indicates not yet set
@@ -128,12 +127,12 @@ class OutboundTransfer : public 
boost::intrusive::list_base_hook<> {
   // Create an outbound transfer for a call request.
   static OutboundTransfer* CreateForCallRequest(int32_t call_id,
                                                 TransferPayload payload,
-                                                TransferCallbacks *callbacks);
+                                                TransferCallbacks* callbacks);
 
   // Create an outbound transfer for a call response.
   // See above for details.
   static OutboundTransfer* CreateForCallResponse(TransferPayload payload,
-                                                 TransferCallbacks *callbacks);
+                                                 TransferCallbacks* callbacks);
 
   // Destruct the transfer. A transfer object should never be deallocated
   // before it has either (a) finished transferring, or (b) been Abort()ed.
@@ -141,10 +140,10 @@ class OutboundTransfer : public 
boost::intrusive::list_base_hook<> {
 
   // Abort the current transfer, with the given status.
   // This triggers TransferCallbacks::NotifyTransferAborted.
-  void Abort(const Status &status);
+  void Abort(const Status& status);
 
   // send from our buffers into the sock
-  Status SendBuffer(Socket &socket);
+  Status SendBuffer(Socket* socket);
 
   // Return true if any bytes have yet been sent.
   bool TransferStarted() const;
@@ -152,8 +151,8 @@ class OutboundTransfer : public 
boost::intrusive::list_base_hook<> {
   // Return true if the entire transfer has been sent.
   bool TransferFinished() const;
 
-  // Return the total number of bytes to be sent (including those already sent)
-  int32_t TotalLength() const;
+  // Return the total number of bytes to be sent (including those already 
sent).
+  size_t TotalLength() const;
 
   std::string HexDump() const;
 
@@ -171,7 +170,7 @@ class OutboundTransfer : public 
boost::intrusive::list_base_hook<> {
  private:
   OutboundTransfer(int32_t call_id,
                    TransferPayload payload,
-                   TransferCallbacks *callbacks);
+                   TransferCallbacks* callbacks);
 
   // Slices to send.
   TransferPayload payload_slices_;
@@ -181,7 +180,7 @@ class OutboundTransfer : public 
boost::intrusive::list_base_hook<> {
   // The number of bytes in the above slice which has already been sent.
   int32_t cur_offset_in_slice_;
 
-  TransferCallbacks *callbacks_;
+  TransferCallbacks* callbacks_;
 
   // In the case of outbound calls, the associated call ID.
   // In the case of call responses, kInvalidCallId
@@ -200,13 +199,13 @@ class OutboundTransfer : public 
boost::intrusive::list_base_hook<> {
 // Callbacks made after a transfer completes.
 struct TransferCallbacks {
  public:
-  virtual ~TransferCallbacks();
+  virtual ~TransferCallbacks() {}
 
   // The transfer finished successfully.
   virtual void NotifyTransferFinished() = 0;
 
   // The transfer was aborted (e.g because the connection died or an error 
occurred).
-  virtual void NotifyTransferAborted(const Status &status) = 0;
+  virtual void NotifyTransferAborted(const Status& status) = 0;
 };
 
 } // namespace rpc

Reply via email to