This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a7787f  rpc: allow a sidecar to append multiple slices
0a7787f is described below

commit 0a7787f6563ae316a4a6207a9244c19ba8a7731f
Author: Todd Lipcon <[email protected]>
AuthorDate: Thu Feb 13 00:38:13 2020 -0800

    rpc: allow a sidecar to append multiple slices
    
    This adds support for sidecars to use vectors of Slices instead of a
    single Slice each. Currently, this isn't used. However, a later patch
    will make use of this to support returning scan batches in a columnar
    layout. The row-wise layout could also make use of this to improve
    allocator efficiency -- a TODO is added for that.
    
    Note: this changes the Transfer code to use a
    boost::container::small_vector instead of a hard-coded std::array with
    10 elements. This should have the same "embedded in the object"
    performance for responses with a small number of slices, while allowing
    an unlimited number of slices when needed.
    
    Change-Id: I05b4d5e5243735db943e315268d837f662891b1a
    Reviewed-on: http://gerrit.cloudera.org:8080/15221
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <[email protected]>
---
 src/kudu/rpc/connection.cc         |  8 +++---
 src/kudu/rpc/inbound_call.cc       | 17 +++++-------
 src/kudu/rpc/inbound_call.h        |  3 +--
 src/kudu/rpc/outbound_call.cc      | 17 +++++-------
 src/kudu/rpc/outbound_call.h       |  4 +--
 src/kudu/rpc/rpc-test-base.h       | 23 +++++++++++-----
 src/kudu/rpc/rpc_controller.cc     |  4 +--
 src/kudu/rpc/rpc_sidecar.cc        | 36 ++++++++++++++++++++-----
 src/kudu/rpc/rpc_sidecar.h         | 24 ++++++++++++-----
 src/kudu/rpc/transfer.cc           | 41 ++++++++++++----------------
 src/kudu/rpc/transfer.h            | 25 ++++++++---------
 src/kudu/tserver/tablet_service.cc | 11 +++++---
 src/kudu/util/faststring-test.cc   | 55 ++++++++++++++++++++++++++++++++++++++
 src/kudu/util/faststring.h         | 29 ++++++++++++++++++++
 14 files changed, 203 insertions(+), 94 deletions(-)

diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index 0b78d46..508cb35 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -478,7 +478,7 @@ void Connection::QueueOutboundCall(shared_ptr<OutboundCall> 
call) {
 
   // Serialize the actual bytes to be put on the wire.
   TransferPayload tmp_slices;
-  size_t n_slices = call->SerializeTo(&tmp_slices);
+  call->SerializeTo(&tmp_slices);
 
   call->SetQueued();
 
@@ -536,7 +536,7 @@ void Connection::QueueOutboundCall(shared_ptr<OutboundCall> 
call) {
   TransferCallbacks *cb = new CallTransferCallbacks(std::move(call), this);
   awaiting_response_[call_id] = car.release();
   QueueOutbound(unique_ptr<OutboundTransfer>(
-      OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, n_slices, 
cb)));
+      OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, cb)));
 }
 
 // Callbacks for sending an RPC call response from the server.
@@ -608,14 +608,14 @@ void 
Connection::QueueResponseForCall(unique_ptr<InboundCall> call) {
   // ResponseTransferCallbacks::NotifyTransferAborted.
 
   TransferPayload tmp_slices;
-  size_t n_slices = call->SerializeResponseTo(&tmp_slices);
+  call->SerializeResponseTo(&tmp_slices);
 
   TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this);
   // After the response is sent, can delete the InboundCall object.
   // We set a dummy call ID and required feature set, since these are not 
needed
   // when sending responses.
   unique_ptr<OutboundTransfer> t(
-      OutboundTransfer::CreateForCallResponse(tmp_slices, n_slices, cb));
+      OutboundTransfer::CreateForCallResponse(tmp_slices, cb));
 
   QueueTransferTask *task = new QueueTransferTask(std::move(t), this);
   reactor_thread_->reactor()->ScheduleReactorTask(task);
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 7479020..60fdbb2 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -185,7 +185,7 @@ void InboundCall::SerializeResponseBuffer(const 
MessageLite& response,
   int32_t sidecar_byte_size = 0;
   for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) {
     resp_hdr.add_sidecar_offsets(sidecar_byte_size + protobuf_msg_size);
-    int32_t sidecar_bytes = car->AsSlice().size();
+    size_t sidecar_bytes = car->TotalSize();
     DCHECK_LE(sidecar_byte_size, TransferLimits::kMaxTotalSidecarBytes - 
sidecar_bytes);
     sidecar_byte_size += sidecar_bytes;
   }
@@ -197,20 +197,15 @@ void InboundCall::SerializeResponseBuffer(const 
MessageLite& response,
                                  &response_hdr_buf_);
 }
 
-size_t InboundCall::SerializeResponseTo(TransferPayload* slices) const {
+void InboundCall::SerializeResponseTo(TransferPayload* slices) const {
   TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo");
   DCHECK_GT(response_hdr_buf_.size(), 0);
   DCHECK_GT(response_msg_buf_.size(), 0);
-  size_t n_slices = 2 + outbound_sidecars_.size();
-  DCHECK_LE(n_slices, slices->size());
-  auto slice_iter = slices->begin();
-  *slice_iter++ = Slice(response_hdr_buf_);
-  *slice_iter++ = Slice(response_msg_buf_);
+  slices->push_back(Slice(response_hdr_buf_));
+  slices->push_back(Slice(response_msg_buf_));
   for (auto& sidecar : outbound_sidecars_) {
-    *slice_iter++ = sidecar->AsSlice();
+    sidecar->AppendSlices(slices);
   }
-  DCHECK_EQ(slice_iter - slices->begin(), n_slices);
-  return n_slices;
 }
 
 Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
@@ -220,7 +215,7 @@ Status 
InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
   if (outbound_sidecars_.size() > TransferLimits::kMaxSidecars) {
     return Status::ServiceUnavailable("All available sidecars already used");
   }
-  int64_t sidecar_bytes = car->AsSlice().size();
+  size_t sidecar_bytes = car->TotalSize();
   if (outbound_sidecars_total_bytes_ >
       TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes) {
     return Status::RuntimeError(Substitute("Total size of sidecars $0 would 
exceed limit $1",
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 892b893..1a9d8fb 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -133,8 +133,7 @@ class InboundCall {
 
   // Serialize the response packet for the finished call into 'slices'.
   // The resulting slices refer to memory in this object.
-  // Returns the number of slices in the serialized response.
-  size_t SerializeResponseTo(TransferPayload* slices) const;
+  void SerializeResponseTo(TransferPayload* slices) const;
 
   // See RpcContext::AddRpcSidecar()
   Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index 3fe7641..5a0c1da 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/rpc/outbound_call.h"
 
+#include <cstddef>
 #include <cstdint>
 #include <memory>
 #include <mutex>
@@ -109,7 +110,7 @@ OutboundCall::~OutboundCall() {
   DVLOG(4) << "OutboundCall " << this << " destroyed with state_: " << 
StateName(state_);
 }
 
-size_t OutboundCall::SerializeTo(TransferPayload* slices) {
+void OutboundCall::SerializeTo(TransferPayload* slices) {
   DCHECK_LT(0, request_buf_.size())
       << "Must call SetRequestPayload() before SerializeTo()";
 
@@ -126,16 +127,12 @@ size_t OutboundCall::SerializeTo(TransferPayload* slices) 
{
   serialization::SerializeHeader(
       header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_);
 
-  size_t n_slices = 2 + sidecars_.size();
-  DCHECK_LE(n_slices, slices->size());
-  auto slice_iter = slices->begin();
-  *slice_iter++ = Slice(header_buf_);
-  *slice_iter++ = Slice(request_buf_);
+  slices->clear();
+  slices->push_back(header_buf_);
+  slices->push_back(request_buf_);
   for (auto& sidecar : sidecars_) {
-    *slice_iter++ = sidecar->AsSlice();
+    sidecar->AppendSlices(slices);
   }
-  DCHECK_EQ(slice_iter - slices->begin(), n_slices);
-  return n_slices;
 }
 
 void OutboundCall::SetRequestPayload(const Message& req,
@@ -151,7 +148,7 @@ void OutboundCall::SetRequestPayload(const Message& req,
   sidecar_byte_size_ = 0;
   for (const unique_ptr<RpcSidecar>& car: sidecars_) {
     header_.add_sidecar_offsets(sidecar_byte_size_ + message_size);
-    int32_t sidecar_bytes = car->AsSlice().size();
+    size_t sidecar_bytes = car->TotalSize();
     DCHECK_LE(sidecar_byte_size_, TransferLimits::kMaxTotalSidecarBytes - 
sidecar_bytes);
     sidecar_byte_size_ += sidecar_bytes;
   }
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index 7cd5f01..eb36608 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -16,7 +16,6 @@
 // under the License.
 #pragma once
 
-#include <cstddef>
 #include <cstdint>
 #include <memory>
 #include <ostream>
@@ -104,8 +103,7 @@ class OutboundCall {
 
   // Serialize the call for the wire. Requires that SetRequestPayload()
   // is called first. This is called from the Reactor thread.
-  // Returns the number of slices in the serialized call.
-  size_t SerializeTo(TransferPayload* slices);
+  void SerializeTo(TransferPayload* slices);
 
   // Mark in the call that cancellation has been requested. If the call hasn't 
yet
   // started sending or has finished sending the RPC request but is waiting 
for a
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 56e4c3d..88ea808 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -20,6 +20,7 @@
 #include <atomic>
 #include <memory>
 #include <string>
+#include <vector>
 
 #include "kudu/gutil/walltime.h"
 #include "kudu/rpc/acceptor_pool.h"
@@ -147,22 +148,30 @@ class GenericCalculatorService : public ServiceIf {
       LOG(FATAL) << "couldn't parse: " << param.ToDebugString();
     }
 
-    std::unique_ptr<faststring> first(new faststring);
-    std::unique_ptr<faststring> second(new faststring);
+    faststring first;
 
     Random r(req.random_seed());
-    first->resize(req.size1());
-    RandomString(first->data(), req.size1(), &r);
+    first.resize(req.size1());
+    RandomString(first.data(), req.size1(), &r);
 
-    second->resize(req.size2());
-    RandomString(second->data(), req.size2(), &r);
+    // The second string gets sent in two separate buffers, which get
+    // concatenated on the client side.
+    faststring second_data;
+    second_data.resize(req.size2());
+    RandomString(second_data.data(), second_data.size(), &r);
+
+    std::vector<faststring> second(2);
+    second[0].append(second_data.data(), second_data.size() / 3);
+    second[1].append(second_data.data() + second[0].size(),
+                     second_data.size() - second[0].size());
 
     SendTwoStringsResponsePB resp;
     int idx1, idx2;
     CHECK_OK(incoming->AddOutboundSidecar(
             RpcSidecar::FromFaststring(std::move(first)), &idx1));
+
     CHECK_OK(incoming->AddOutboundSidecar(
-            RpcSidecar::FromFaststring(std::move(second)), &idx2));
+            RpcSidecar::FromFaststrings(std::move(second)), &idx2));
     resp.set_sidecar1(idx1);
     resp.set_sidecar2(idx2);
 
diff --git a/src/kudu/rpc/rpc_controller.cc b/src/kudu/rpc/rpc_controller.cc
index 821b881..52de3cb 100644
--- a/src/kudu/rpc/rpc_controller.cc
+++ b/src/kudu/rpc/rpc_controller.cc
@@ -30,8 +30,6 @@
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/rpc/transfer.h"
-#include "kudu/util/slice.h"
-
 
 using std::unique_ptr;
 using strings::Substitute;
@@ -146,7 +144,7 @@ Status 
RpcController::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
   if (outbound_sidecars_.size() >= TransferLimits::kMaxSidecars) {
     return Status::RuntimeError("All available sidecars already used");
   }
-  int64_t sidecar_bytes = car->AsSlice().size();
+  int64_t sidecar_bytes = car->TotalSize();
   if (outbound_sidecars_total_bytes_ >
       TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes) {
     return Status::RuntimeError(Substitute("Total size of sidecars $0 would 
exceed limit $1",
diff --git a/src/kudu/rpc/rpc_sidecar.cc b/src/kudu/rpc/rpc_sidecar.cc
index b4de678..dbeb845 100644
--- a/src/kudu/rpc/rpc_sidecar.cc
+++ b/src/kudu/rpc/rpc_sidecar.cc
@@ -20,6 +20,7 @@
 #include <cstdint>
 #include <memory>
 #include <utility>
+#include <vector>
 
 #include <google/protobuf/repeated_field.h>
 
@@ -29,6 +30,7 @@
 #include "kudu/util/status.h"
 
 using std::unique_ptr;
+using std::vector;
 
 namespace kudu {
 namespace rpc {
@@ -39,22 +41,44 @@ namespace rpc {
 class SliceSidecar : public RpcSidecar {
  public:
   explicit SliceSidecar(Slice slice) : slice_(slice) { }
-  Slice AsSlice() const override { return slice_; }
-
+  void AppendSlices(TransferPayload* payload) const override {
+    payload->push_back(slice_);
+  }
+  size_t TotalSize() const override {
+    return slice_.size();
+  }
  private:
   const Slice slice_;
 };
 
 class FaststringSidecar : public RpcSidecar {
  public:
-  explicit FaststringSidecar(unique_ptr<faststring> data) : 
data_(std::move(data)) { }
-  Slice AsSlice() const override { return *data_; }
+  explicit FaststringSidecar(faststring data) {
+    data_.emplace_back(std::move(data));
+  }
+  explicit FaststringSidecar(vector<faststring> data) : data_(std::move(data)) 
{ }
 
+  void AppendSlices(TransferPayload* payload) const override {
+    for (const auto& fs : data_) {
+      payload->push_back(Slice(fs));
+    }
+  }
+  size_t TotalSize() const override {
+    size_t ret = 0;
+    for (const auto& fs : data_) {
+      ret += fs.size();
+    }
+    return ret;
+  }
  private:
-  const unique_ptr<faststring> data_;
+  vector<faststring> data_;
 };
 
-unique_ptr<RpcSidecar> RpcSidecar::FromFaststring(unique_ptr<faststring> data) 
{
+unique_ptr<RpcSidecar> RpcSidecar::FromFaststring(faststring data) {
+  return unique_ptr<RpcSidecar>(new FaststringSidecar(std::move(data)));
+}
+
+unique_ptr<RpcSidecar> RpcSidecar::FromFaststrings(vector<faststring> data) {
   return unique_ptr<RpcSidecar>(new FaststringSidecar(std::move(data)));
 }
 
diff --git a/src/kudu/rpc/rpc_sidecar.h b/src/kudu/rpc/rpc_sidecar.h
index cf555cb..3fccc5b 100644
--- a/src/kudu/rpc/rpc_sidecar.h
+++ b/src/kudu/rpc/rpc_sidecar.h
@@ -16,11 +16,14 @@
 // under the License.
 #pragma once
 
+#include <cstddef>
 #include <memory>
+#include <vector>
 
 #include <google/protobuf/repeated_field.h> // IWYU pragma: keep
 #include <google/protobuf/stubs/port.h>
 
+#include "kudu/rpc/transfer.h"
 #include "kudu/util/slice.h"
 
 namespace kudu {
@@ -37,9 +40,10 @@ namespace rpc {
 // The RpcSidecar saves on an additional copy to/from the protobuf on both the 
server and
 // client side. Both Inbound- and OutboundCall classes accept sidecars to be 
sent to the
 // client and server respectively. They are ignorant of the sidecar's format, 
requiring
-// only that it can be represented as a Slice. Data is copied from the Slice 
returned from
-// AsSlice() to the socket that is responding to the original RPC. The slice 
should remain
-// valid for as long as the call it is attached to takes to complete.
+// only that it can be represented as a series of Slices. Data is concatenated 
from the
+// Slices produced by AppendSlices() to the socket that is responding to the 
original
+// RPC. The slices should remain valid for as long as the call it is attached 
to takes to
+// complete.
 //
 // In order to distinguish between separate sidecars, whenever a sidecar is 
added to the
 // RPC response on the server side, an index for that sidecar is returned. 
This index must
@@ -49,7 +53,8 @@ namespace rpc {
 // sidecar data through the RpcContext or RpcController interfaces 
respectively.
 class RpcSidecar {
  public:
-  static std::unique_ptr<RpcSidecar> 
FromFaststring(std::unique_ptr<faststring> data);
+  static std::unique_ptr<RpcSidecar> FromFaststring(faststring data);
+  static std::unique_ptr<RpcSidecar> FromFaststrings(std::vector<faststring> 
data);
   static std::unique_ptr<RpcSidecar> FromSlice(Slice slice);
 
   // Utility method to parse a series of sidecar slices into 'sidecars' from 
'buffer' and
@@ -60,9 +65,16 @@ class RpcSidecar {
       const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& 
offsets,
       Slice buffer, Slice* sidecars);
 
-  // Returns a Slice representation of the sidecar's data.
-  virtual Slice AsSlice() const = 0;
+  // Append Slice representation of the sidecar's data to the given payload.
+  //
+  // Note that, even if a sidecar appends multiple slices here, the receiver 
will
+  // see a single concatenated slice on the other end of this call or response.
+  virtual void AppendSlices(TransferPayload* payload) const = 0;
+
+  // Return the total size of the slices to be appended.
+  virtual size_t TotalSize() const = 0;
   virtual ~RpcSidecar() { }
+
 };
 
 } // namespace rpc
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index b268e77..fb6d6a2 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -25,6 +25,7 @@
 #include <limits>
 #include <set>
 
+#include <boost/container/vector.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
@@ -160,34 +161,26 @@ string InboundTransfer::StatusAsString() const {
 }
 
 OutboundTransfer* OutboundTransfer::CreateForCallRequest(int32_t call_id,
-                                                         const TransferPayload 
&payload,
-                                                         size_t 
n_payload_slices,
+                                                         TransferPayload 
payload,
                                                          TransferCallbacks 
*callbacks) {
-  return new OutboundTransfer(call_id, payload, n_payload_slices, callbacks);
+  return new OutboundTransfer(call_id, std::move(payload), callbacks);
 }
 
-OutboundTransfer* OutboundTransfer::CreateForCallResponse(const 
TransferPayload &payload,
-                                                          size_t 
n_payload_slices,
+OutboundTransfer* OutboundTransfer::CreateForCallResponse(TransferPayload 
payload,
                                                           TransferCallbacks 
*callbacks) {
-  return new OutboundTransfer(kInvalidCallId, payload, n_payload_slices, 
callbacks);
+  return new OutboundTransfer(kInvalidCallId, std::move(payload), callbacks);
 }
 
 OutboundTransfer::OutboundTransfer(int32_t call_id,
-                                   const TransferPayload &payload,
-                                   size_t n_payload_slices,
+                                   TransferPayload payload,
                                    TransferCallbacks *callbacks)
-  : cur_slice_idx_(0),
+  : payload_slices_(std::move(payload)),
+    cur_slice_idx_(0),
     cur_offset_in_slice_(0),
     callbacks_(callbacks),
     call_id_(call_id),
     started_(false),
     aborted_(false) {
-
-  n_payload_slices_ = n_payload_slices;
-  CHECK_LE(n_payload_slices_, payload_slices_.size());
-  for (int i = 0; i < n_payload_slices; i++) {
-    payload_slices_[i] = payload[i];
-  }
 }
 
 OutboundTransfer::~OutboundTransfer() {
@@ -205,10 +198,10 @@ void OutboundTransfer::Abort(const Status &status) {
 }
 
 Status OutboundTransfer::SendBuffer(Socket &socket) {
-  CHECK_LT(cur_slice_idx_, n_payload_slices_);
+  CHECK_LT(cur_slice_idx_, payload_slices_.size());
 
   started_ = true;
-  int n_iovecs = n_payload_slices_ - cur_slice_idx_;
+  int n_iovecs = payload_slices_.size() - cur_slice_idx_;
   struct iovec iovec[n_iovecs];
   {
     int offset_in_slice = cur_offset_in_slice_;
@@ -226,7 +219,7 @@ Status OutboundTransfer::SendBuffer(Socket &socket) {
   RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
 
   // Adjust our accounting of current writer position.
-  for (int i = cur_slice_idx_; i < n_payload_slices_; i++) {
+  for (int i = cur_slice_idx_; i < payload_slices_.size(); i++) {
     Slice &slice = payload_slices_[i];
     int rem_in_slice = slice.size() - cur_offset_in_slice_;
     DCHECK_GE(rem_in_slice, 0);
@@ -243,11 +236,11 @@ Status OutboundTransfer::SendBuffer(Socket &socket) {
     }
   }
 
-  if (cur_slice_idx_ == n_payload_slices_) {
+  if (cur_slice_idx_ == payload_slices_.size()) {
     callbacks_->NotifyTransferFinished();
     DCHECK_EQ(0, cur_offset_in_slice_);
   } else {
-    DCHECK_LT(cur_slice_idx_, n_payload_slices_);
+    DCHECK_LT(cur_slice_idx_, payload_slices_.size());
     DCHECK_LT(cur_offset_in_slice_, payload_slices_[cur_slice_idx_].size());
   }
 
@@ -259,7 +252,7 @@ bool OutboundTransfer::TransferStarted() const {
 }
 
 bool OutboundTransfer::TransferFinished() const {
-  if (cur_slice_idx_ == n_payload_slices_) {
+  if (cur_slice_idx_ == payload_slices_.size()) {
     DCHECK_EQ(0, cur_offset_in_slice_); // sanity check
     return true;
   }
@@ -272,7 +265,7 @@ string OutboundTransfer::HexDump() const {
   }
 
   string ret;
-  for (int i = 0; i < n_payload_slices_; i++) {
+  for (int i = 0; i < payload_slices_.size(); i++) {
     ret.append(payload_slices_[i].ToDebugString());
   }
   return ret;
@@ -280,8 +273,8 @@ string OutboundTransfer::HexDump() const {
 
 int32_t OutboundTransfer::TotalLength() const {
   int32_t ret = 0;
-  for (int i = 0; i < n_payload_slices_; i++) {
-    ret += payload_slices_[i].size();
+  for (const auto& s : payload_slices_) {
+    ret += s.size();
   }
   return ret;
 }
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index 3342c93..0628f2c 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -16,12 +16,11 @@
 // under the License.
 #pragma once
 
-#include <array>
-#include <cstddef>
+#include <climits>
 #include <cstdint>
-#include <limits.h>
 #include <string>
 
+#include <boost/container/small_vector.hpp>
 #include <boost/intrusive/list_hook.hpp>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
@@ -46,14 +45,17 @@ class TransferLimits {
  public:
   enum {
     kMaxSidecars = 10,
-    kMaxPayloadSlices = kMaxSidecars + 2, // (header + msg)
     kMaxTotalSidecarBytes = INT_MAX
   };
 
   DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits);
 };
 
-typedef std::array<Slice, TransferLimits::kMaxPayloadSlices> TransferPayload;
+// To avoid heap allocation in the common case, assume that most transfer
+// payloads will contain 4 or fewer slices (header, body protobuf, and maybe
+// two sidecars). For more complex responses with more slices, a heap 
allocation
+// is worth the cost.
+typedef boost::container::small_vector<Slice, 4> TransferPayload;
 
 // This class is used internally by the RPC layer to represent an inbound
 // transfer in progress.
@@ -118,14 +120,12 @@ class OutboundTransfer : public 
boost::intrusive::list_base_hook<> {
 
   // Create an outbound transfer for a call request.
   static OutboundTransfer* CreateForCallRequest(int32_t call_id,
-                                                const TransferPayload &payload,
-                                                size_t n_payload_slices,
+                                                TransferPayload payload,
                                                 TransferCallbacks *callbacks);
 
   // Create an outbound transfer for a call response.
   // See above for details.
-  static OutboundTransfer* CreateForCallResponse(const TransferPayload 
&payload,
-                                                 size_t n_payload_slices,
+  static OutboundTransfer* CreateForCallResponse(TransferPayload payload,
                                                  TransferCallbacks *callbacks);
 
   // Destruct the transfer. A transfer object should never be deallocated
@@ -163,14 +163,11 @@ class OutboundTransfer : public 
boost::intrusive::list_base_hook<> {
 
  private:
   OutboundTransfer(int32_t call_id,
-                   const TransferPayload& payload,
-                   size_t n_payload_slices,
+                   TransferPayload payload,
                    TransferCallbacks *callbacks);
 
-  // Slices to send. Uses an array here instead of a vector to avoid an 
expensive
-  // vector construction (improved performance a couple percent).
+  // Slices to send.
   TransferPayload payload_slices_;
-  size_t n_payload_slices_;
 
   // The current slice that is being sent.
   int32_t cur_slice_idx_;
diff --git a/src/kudu/tserver/tablet_service.cc 
b/src/kudu/tserver/tablet_service.cc
index 8843bad..0892ff9 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1721,10 +1721,13 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
   }
 
   size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);
-  unique_ptr<faststring> rows_data(new faststring(batch_size_bytes * 11 / 10));
-  unique_ptr<faststring> indirect_data(new faststring(batch_size_bytes * 11 / 
10));
+  // TODO(todd): use a chain of faststrings instead of a single one to avoid
+  // allocating this large buffer. Large buffer allocations are slow and
+  // potentially wasteful.
+  faststring rows_data(batch_size_bytes * 11 / 10);
+  faststring indirect_data(batch_size_bytes * 11 / 10);
   RowwiseRowBlockPB data;
-  ScanResultCopier collector(&data, rows_data.get(), indirect_data.get());
+  ScanResultCopier collector(&data, &rows_data, &indirect_data);
 
   bool has_more_results = false;
   TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
@@ -1777,7 +1780,7 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
   resp->mutable_data()->set_rows_sidecar(rows_idx);
 
   // Add indirect data as a sidecar, if applicable.
-  if (indirect_data->size() > 0) {
+  if (indirect_data.size() > 0) {
     int indirect_idx;
     CHECK_OK(context->AddOutboundSidecar(
         RpcSidecar::FromFaststring(std::move(indirect_data)), &indirect_idx));
diff --git a/src/kudu/util/faststring-test.cc b/src/kudu/util/faststring-test.cc
index 79dd6b2..9585e6e 100644
--- a/src/kudu/util/faststring-test.cc
+++ b/src/kudu/util/faststring-test.cc
@@ -123,4 +123,59 @@ TEST_F(FaststringTest, TestAppend_ExponentiallyExpand) {
   }
 }
 
+TEST_F(FaststringTest, TestMoveConstruct) {
+  for (int string_size = 0;
+       string_size < faststring::kInitialCapacity + 10;
+       string_size++) {
+    string test_str(string_size, 'a');
+    faststring f1;
+    f1.append(test_str);
+    faststring f2 = std::move(f1);
+    // NOTE: NOLINT here since we are purposefully checking the behavior
+    // of 'f1' after it was moved.
+    ASSERT_EQ(0, f1.size()); // NOLINT(*)
+    ASSERT_EQ(test_str, f2.ToString()); // NOLINT(*)
+    ASSERT_NE(f1.data(), f2.data()); // NOLINT(*)
+    f1.CheckInvariants(); // NOLINT(*)
+    f2.CheckInvariants();
+  }
+}
+
+TEST_F(FaststringTest, TestMoveAssignment) {
+  for (int string_size = 0;
+       string_size < faststring::kInitialCapacity + 10;
+       string_size++) {
+    string test_str(string_size, 'a');
+
+    faststring f1;
+    f1.append(test_str);
+
+    // Move to 'f2' by assignment operator.
+    faststring f2;
+    f2 = std::move(f1);
+    ASSERT_EQ(test_str, f2.ToString());
+    ASSERT_EQ(0, f1.size()); // NOLINT(*)
+    f1.CheckInvariants(); // NOLINT(*)
+    f2.CheckInvariants(); // NOLINT(*)
+
+    // Move back to f1.
+    f1 = std::move(f2);
+    ASSERT_EQ(0, f2.size()); // NOLINT(*)
+    ASSERT_EQ(test_str, f1.ToString());
+    f1.CheckInvariants(); // NOLINT(*)
+    f2.CheckInvariants(); // NOLINT(*)
+
+    // Check self-move doesn't have any effect.
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wself-move"
+    f1 = std::move(f1); // NOLINT(*)
+#pragma clang diagnostic pop
+    ASSERT_EQ(0, f2.size()); // NOLINT(*)
+    ASSERT_EQ(test_str, f1.ToString());
+    f1.CheckInvariants(); // NOLINT(*)
+    f2.CheckInvariants(); // NOLINT(*)
+  }
+}
+
+
 } // namespace kudu
diff --git a/src/kudu/util/faststring.h b/src/kudu/util/faststring.h
index 3ada37c..5484f9b 100644
--- a/src/kudu/util/faststring.h
+++ b/src/kudu/util/faststring.h
@@ -21,6 +21,8 @@
 #include <cstring>
 #include <string>
 
+#include <glog/logging.h>
+
 #include "kudu/gutil/dynamic_annotations.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
@@ -55,6 +57,25 @@ class faststring {
     ASAN_POISON_MEMORY_REGION(data_, capacity_);
   }
 
+  faststring(faststring&& other) noexcept
+      : faststring() {
+    *this = std::move(other);
+  }
+
+  faststring& operator=(faststring&& other) noexcept {
+    if (this == &other) return *this;
+
+    if (other.data_ == other.initial_data_) {
+      assign_copy(other.data(), other.size());
+      other.clear();
+    } else {
+      len_ = other.len_;
+      capacity_ = other.capacity_;
+      data_ = other.release();
+    }
+    return *this;
+  }
+
   ~faststring() {
     ASAN_UNPOISON_MEMORY_REGION(initial_data_, arraysize(initial_data_));
     if (data_ != initial_data_) {
@@ -224,6 +245,14 @@ class faststring {
                        len_);
   }
 
+  // Check various internal invariants. Used by tests.
+  void CheckInvariants() {
+    CHECK_LE(len_, capacity_);
+    if (data_ == initial_data_) {
+      CHECK_EQ(capacity_, kInitialCapacity);
+    }
+  }
+
  private:
   DISALLOW_COPY_AND_ASSIGN(faststring);
 

Reply via email to