This is an automated email from the ASF dual-hosted git repository.
todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 0a7787f rpc: allow a sidecar to append multiple slices
0a7787f is described below
commit 0a7787f6563ae316a4a6207a9244c19ba8a7731f
Author: Todd Lipcon <[email protected]>
AuthorDate: Thu Feb 13 00:38:13 2020 -0800
rpc: allow a sidecar to append multiple slices
This adds support for sidecars to use vectors of Slices instead of a
single Slice each. Currently, this isn't used. However, a later patch
will make use of this to support returning scan batches in a columnar
layout. The row-wise layout could also make use of this to improve
allocator efficiency -- a TODO is added for that.
Note: this changes the Transfer code to use a
boost::container::small_vector instead of a hard-coded std::array with
10 elements. This should have the same "embedded in the object"
performance for responses with a small number of slices, while allowing
an unlimited number of slices when needed.
Change-Id: I05b4d5e5243735db943e315268d837f662891b1a
Reviewed-on: http://gerrit.cloudera.org:8080/15221
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
---
src/kudu/rpc/connection.cc | 8 +++---
src/kudu/rpc/inbound_call.cc | 17 +++++-------
src/kudu/rpc/inbound_call.h | 3 +--
src/kudu/rpc/outbound_call.cc | 17 +++++-------
src/kudu/rpc/outbound_call.h | 4 +--
src/kudu/rpc/rpc-test-base.h | 23 +++++++++++-----
src/kudu/rpc/rpc_controller.cc | 4 +--
src/kudu/rpc/rpc_sidecar.cc | 36 ++++++++++++++++++++-----
src/kudu/rpc/rpc_sidecar.h | 24 ++++++++++++-----
src/kudu/rpc/transfer.cc | 41 ++++++++++++----------------
src/kudu/rpc/transfer.h | 25 ++++++++---------
src/kudu/tserver/tablet_service.cc | 11 +++++---
src/kudu/util/faststring-test.cc | 55 ++++++++++++++++++++++++++++++++++++++
src/kudu/util/faststring.h | 29 ++++++++++++++++++++
14 files changed, 203 insertions(+), 94 deletions(-)
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index 0b78d46..508cb35 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -478,7 +478,7 @@ void Connection::QueueOutboundCall(shared_ptr<OutboundCall>
call) {
// Serialize the actual bytes to be put on the wire.
TransferPayload tmp_slices;
- size_t n_slices = call->SerializeTo(&tmp_slices);
+ call->SerializeTo(&tmp_slices);
call->SetQueued();
@@ -536,7 +536,7 @@ void Connection::QueueOutboundCall(shared_ptr<OutboundCall>
call) {
TransferCallbacks *cb = new CallTransferCallbacks(std::move(call), this);
awaiting_response_[call_id] = car.release();
QueueOutbound(unique_ptr<OutboundTransfer>(
- OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, n_slices,
cb)));
+ OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, cb)));
}
// Callbacks for sending an RPC call response from the server.
@@ -608,14 +608,14 @@ void
Connection::QueueResponseForCall(unique_ptr<InboundCall> call) {
// ResponseTransferCallbacks::NotifyTransferAborted.
TransferPayload tmp_slices;
- size_t n_slices = call->SerializeResponseTo(&tmp_slices);
+ call->SerializeResponseTo(&tmp_slices);
TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this);
// After the response is sent, can delete the InboundCall object.
// We set a dummy call ID and required feature set, since these are not
needed
// when sending responses.
unique_ptr<OutboundTransfer> t(
- OutboundTransfer::CreateForCallResponse(tmp_slices, n_slices, cb));
+ OutboundTransfer::CreateForCallResponse(tmp_slices, cb));
QueueTransferTask *task = new QueueTransferTask(std::move(t), this);
reactor_thread_->reactor()->ScheduleReactorTask(task);
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 7479020..60fdbb2 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -185,7 +185,7 @@ void InboundCall::SerializeResponseBuffer(const
MessageLite& response,
int32_t sidecar_byte_size = 0;
for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) {
resp_hdr.add_sidecar_offsets(sidecar_byte_size + protobuf_msg_size);
- int32_t sidecar_bytes = car->AsSlice().size();
+ size_t sidecar_bytes = car->TotalSize();
DCHECK_LE(sidecar_byte_size, TransferLimits::kMaxTotalSidecarBytes -
sidecar_bytes);
sidecar_byte_size += sidecar_bytes;
}
@@ -197,20 +197,15 @@ void InboundCall::SerializeResponseBuffer(const
MessageLite& response,
&response_hdr_buf_);
}
-size_t InboundCall::SerializeResponseTo(TransferPayload* slices) const {
+void InboundCall::SerializeResponseTo(TransferPayload* slices) const {
TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo");
DCHECK_GT(response_hdr_buf_.size(), 0);
DCHECK_GT(response_msg_buf_.size(), 0);
- size_t n_slices = 2 + outbound_sidecars_.size();
- DCHECK_LE(n_slices, slices->size());
- auto slice_iter = slices->begin();
- *slice_iter++ = Slice(response_hdr_buf_);
- *slice_iter++ = Slice(response_msg_buf_);
+ slices->push_back(Slice(response_hdr_buf_));
+ slices->push_back(Slice(response_msg_buf_));
for (auto& sidecar : outbound_sidecars_) {
- *slice_iter++ = sidecar->AsSlice();
+ sidecar->AppendSlices(slices);
}
- DCHECK_EQ(slice_iter - slices->begin(), n_slices);
- return n_slices;
}
Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
@@ -220,7 +215,7 @@ Status
InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
if (outbound_sidecars_.size() > TransferLimits::kMaxSidecars) {
return Status::ServiceUnavailable("All available sidecars already used");
}
- int64_t sidecar_bytes = car->AsSlice().size();
+ size_t sidecar_bytes = car->TotalSize();
if (outbound_sidecars_total_bytes_ >
TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes) {
return Status::RuntimeError(Substitute("Total size of sidecars $0 would
exceed limit $1",
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 892b893..1a9d8fb 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -133,8 +133,7 @@ class InboundCall {
// Serialize the response packet for the finished call into 'slices'.
// The resulting slices refer to memory in this object.
- // Returns the number of slices in the serialized response.
- size_t SerializeResponseTo(TransferPayload* slices) const;
+ void SerializeResponseTo(TransferPayload* slices) const;
// See RpcContext::AddRpcSidecar()
Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index 3fe7641..5a0c1da 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -17,6 +17,7 @@
#include "kudu/rpc/outbound_call.h"
+#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
@@ -109,7 +110,7 @@ OutboundCall::~OutboundCall() {
DVLOG(4) << "OutboundCall " << this << " destroyed with state_: " <<
StateName(state_);
}
-size_t OutboundCall::SerializeTo(TransferPayload* slices) {
+void OutboundCall::SerializeTo(TransferPayload* slices) {
DCHECK_LT(0, request_buf_.size())
<< "Must call SetRequestPayload() before SerializeTo()";
@@ -126,16 +127,12 @@ size_t OutboundCall::SerializeTo(TransferPayload* slices)
{
serialization::SerializeHeader(
header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_);
- size_t n_slices = 2 + sidecars_.size();
- DCHECK_LE(n_slices, slices->size());
- auto slice_iter = slices->begin();
- *slice_iter++ = Slice(header_buf_);
- *slice_iter++ = Slice(request_buf_);
+ slices->clear();
+ slices->push_back(header_buf_);
+ slices->push_back(request_buf_);
for (auto& sidecar : sidecars_) {
- *slice_iter++ = sidecar->AsSlice();
+ sidecar->AppendSlices(slices);
}
- DCHECK_EQ(slice_iter - slices->begin(), n_slices);
- return n_slices;
}
void OutboundCall::SetRequestPayload(const Message& req,
@@ -151,7 +148,7 @@ void OutboundCall::SetRequestPayload(const Message& req,
sidecar_byte_size_ = 0;
for (const unique_ptr<RpcSidecar>& car: sidecars_) {
header_.add_sidecar_offsets(sidecar_byte_size_ + message_size);
- int32_t sidecar_bytes = car->AsSlice().size();
+ size_t sidecar_bytes = car->TotalSize();
DCHECK_LE(sidecar_byte_size_, TransferLimits::kMaxTotalSidecarBytes -
sidecar_bytes);
sidecar_byte_size_ += sidecar_bytes;
}
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index 7cd5f01..eb36608 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -16,7 +16,6 @@
// under the License.
#pragma once
-#include <cstddef>
#include <cstdint>
#include <memory>
#include <ostream>
@@ -104,8 +103,7 @@ class OutboundCall {
// Serialize the call for the wire. Requires that SetRequestPayload()
// is called first. This is called from the Reactor thread.
- // Returns the number of slices in the serialized call.
- size_t SerializeTo(TransferPayload* slices);
+ void SerializeTo(TransferPayload* slices);
// Mark in the call that cancellation has been requested. If the call hasn't
yet
// started sending or has finished sending the RPC request but is waiting
for a
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 56e4c3d..88ea808 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -20,6 +20,7 @@
#include <atomic>
#include <memory>
#include <string>
+#include <vector>
#include "kudu/gutil/walltime.h"
#include "kudu/rpc/acceptor_pool.h"
@@ -147,22 +148,30 @@ class GenericCalculatorService : public ServiceIf {
LOG(FATAL) << "couldn't parse: " << param.ToDebugString();
}
- std::unique_ptr<faststring> first(new faststring);
- std::unique_ptr<faststring> second(new faststring);
+ faststring first;
Random r(req.random_seed());
- first->resize(req.size1());
- RandomString(first->data(), req.size1(), &r);
+ first.resize(req.size1());
+ RandomString(first.data(), req.size1(), &r);
- second->resize(req.size2());
- RandomString(second->data(), req.size2(), &r);
+ // The second string gets sent in two separate buffers, which get
+ // concatenated on the client side.
+ faststring second_data;
+ second_data.resize(req.size2());
+ RandomString(second_data.data(), second_data.size(), &r);
+
+ std::vector<faststring> second(2);
+ second[0].append(second_data.data(), second_data.size() / 3);
+ second[1].append(second_data.data() + second[0].size(),
+ second_data.size() - second[0].size());
SendTwoStringsResponsePB resp;
int idx1, idx2;
CHECK_OK(incoming->AddOutboundSidecar(
RpcSidecar::FromFaststring(std::move(first)), &idx1));
+
CHECK_OK(incoming->AddOutboundSidecar(
- RpcSidecar::FromFaststring(std::move(second)), &idx2));
+ RpcSidecar::FromFaststrings(std::move(second)), &idx2));
resp.set_sidecar1(idx1);
resp.set_sidecar2(idx2);
diff --git a/src/kudu/rpc/rpc_controller.cc b/src/kudu/rpc/rpc_controller.cc
index 821b881..52de3cb 100644
--- a/src/kudu/rpc/rpc_controller.cc
+++ b/src/kudu/rpc/rpc_controller.cc
@@ -30,8 +30,6 @@
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/rpc/transfer.h"
-#include "kudu/util/slice.h"
-
using std::unique_ptr;
using strings::Substitute;
@@ -146,7 +144,7 @@ Status
RpcController::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
if (outbound_sidecars_.size() >= TransferLimits::kMaxSidecars) {
return Status::RuntimeError("All available sidecars already used");
}
- int64_t sidecar_bytes = car->AsSlice().size();
+ int64_t sidecar_bytes = car->TotalSize();
if (outbound_sidecars_total_bytes_ >
TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes) {
return Status::RuntimeError(Substitute("Total size of sidecars $0 would
exceed limit $1",
diff --git a/src/kudu/rpc/rpc_sidecar.cc b/src/kudu/rpc/rpc_sidecar.cc
index b4de678..dbeb845 100644
--- a/src/kudu/rpc/rpc_sidecar.cc
+++ b/src/kudu/rpc/rpc_sidecar.cc
@@ -20,6 +20,7 @@
#include <cstdint>
#include <memory>
#include <utility>
+#include <vector>
#include <google/protobuf/repeated_field.h>
@@ -29,6 +30,7 @@
#include "kudu/util/status.h"
using std::unique_ptr;
+using std::vector;
namespace kudu {
namespace rpc {
@@ -39,22 +41,44 @@ namespace rpc {
class SliceSidecar : public RpcSidecar {
public:
explicit SliceSidecar(Slice slice) : slice_(slice) { }
- Slice AsSlice() const override { return slice_; }
-
+ void AppendSlices(TransferPayload* payload) const override {
+ payload->push_back(slice_);
+ }
+ size_t TotalSize() const override {
+ return slice_.size();
+ }
private:
const Slice slice_;
};
class FaststringSidecar : public RpcSidecar {
public:
- explicit FaststringSidecar(unique_ptr<faststring> data) :
data_(std::move(data)) { }
- Slice AsSlice() const override { return *data_; }
+ explicit FaststringSidecar(faststring data) {
+ data_.emplace_back(std::move(data));
+ }
+ explicit FaststringSidecar(vector<faststring> data) : data_(std::move(data))
{ }
+ void AppendSlices(TransferPayload* payload) const override {
+ for (const auto& fs : data_) {
+ payload->push_back(Slice(fs));
+ }
+ }
+ size_t TotalSize() const override {
+ size_t ret = 0;
+ for (const auto& fs : data_) {
+ ret += fs.size();
+ }
+ return ret;
+ }
private:
- const unique_ptr<faststring> data_;
+ vector<faststring> data_;
};
-unique_ptr<RpcSidecar> RpcSidecar::FromFaststring(unique_ptr<faststring> data)
{
+unique_ptr<RpcSidecar> RpcSidecar::FromFaststring(faststring data) {
+ return unique_ptr<RpcSidecar>(new FaststringSidecar(std::move(data)));
+}
+
+unique_ptr<RpcSidecar> RpcSidecar::FromFaststrings(vector<faststring> data) {
return unique_ptr<RpcSidecar>(new FaststringSidecar(std::move(data)));
}
diff --git a/src/kudu/rpc/rpc_sidecar.h b/src/kudu/rpc/rpc_sidecar.h
index cf555cb..3fccc5b 100644
--- a/src/kudu/rpc/rpc_sidecar.h
+++ b/src/kudu/rpc/rpc_sidecar.h
@@ -16,11 +16,14 @@
// under the License.
#pragma once
+#include <cstddef>
#include <memory>
+#include <vector>
#include <google/protobuf/repeated_field.h> // IWYU pragma: keep
#include <google/protobuf/stubs/port.h>
+#include "kudu/rpc/transfer.h"
#include "kudu/util/slice.h"
namespace kudu {
@@ -37,9 +40,10 @@ namespace rpc {
// The RpcSidecar saves on an additional copy to/from the protobuf on both the
server and
// client side. Both Inbound- and OutboundCall classes accept sidecars to be
sent to the
// client and server respectively. They are ignorant of the sidecar's format,
requiring
-// only that it can be represented as a Slice. Data is copied from the Slice
returned from
-// AsSlice() to the socket that is responding to the original RPC. The slice
should remain
-// valid for as long as the call it is attached to takes to complete.
+// only that it can be represented as a series of Slices. Data is concatenated
from the
+// Slices produced by AppendSlices() to the socket that is responding to the
original
+// RPC. The slices should remain valid for as long as the call it is attached
to takes to
+// complete.
//
// In order to distinguish between separate sidecars, whenever a sidecar is
added to the
// RPC response on the server side, an index for that sidecar is returned.
This index must
@@ -49,7 +53,8 @@ namespace rpc {
// sidecar data through the RpcContext or RpcController interfaces
respectively.
class RpcSidecar {
public:
- static std::unique_ptr<RpcSidecar>
FromFaststring(std::unique_ptr<faststring> data);
+ static std::unique_ptr<RpcSidecar> FromFaststring(faststring data);
+ static std::unique_ptr<RpcSidecar> FromFaststrings(std::vector<faststring>
data);
static std::unique_ptr<RpcSidecar> FromSlice(Slice slice);
// Utility method to parse a series of sidecar slices into 'sidecars' from
'buffer' and
@@ -60,9 +65,16 @@ class RpcSidecar {
const ::google::protobuf::RepeatedField<::google::protobuf::uint32>&
offsets,
Slice buffer, Slice* sidecars);
- // Returns a Slice representation of the sidecar's data.
- virtual Slice AsSlice() const = 0;
+ // Append Slice representation of the sidecar's data to the given payload.
+ //
+ // Note that, even if a sidecar appends multiple slices here, the receiver
will
+ // see a single concatenated slice on the other end of this call or response.
+ virtual void AppendSlices(TransferPayload* payload) const = 0;
+
+ // Return the total size of the slices to be appended.
+ virtual size_t TotalSize() const = 0;
virtual ~RpcSidecar() { }
+
};
} // namespace rpc
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index b268e77..fb6d6a2 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -25,6 +25,7 @@
#include <limits>
#include <set>
+#include <boost/container/vector.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
@@ -160,34 +161,26 @@ string InboundTransfer::StatusAsString() const {
}
OutboundTransfer* OutboundTransfer::CreateForCallRequest(int32_t call_id,
- const TransferPayload
&payload,
- size_t
n_payload_slices,
+ TransferPayload
payload,
TransferCallbacks
*callbacks) {
- return new OutboundTransfer(call_id, payload, n_payload_slices, callbacks);
+ return new OutboundTransfer(call_id, std::move(payload), callbacks);
}
-OutboundTransfer* OutboundTransfer::CreateForCallResponse(const
TransferPayload &payload,
- size_t
n_payload_slices,
+OutboundTransfer* OutboundTransfer::CreateForCallResponse(TransferPayload
payload,
TransferCallbacks
*callbacks) {
- return new OutboundTransfer(kInvalidCallId, payload, n_payload_slices,
callbacks);
+ return new OutboundTransfer(kInvalidCallId, std::move(payload), callbacks);
}
OutboundTransfer::OutboundTransfer(int32_t call_id,
- const TransferPayload &payload,
- size_t n_payload_slices,
+ TransferPayload payload,
TransferCallbacks *callbacks)
- : cur_slice_idx_(0),
+ : payload_slices_(std::move(payload)),
+ cur_slice_idx_(0),
cur_offset_in_slice_(0),
callbacks_(callbacks),
call_id_(call_id),
started_(false),
aborted_(false) {
-
- n_payload_slices_ = n_payload_slices;
- CHECK_LE(n_payload_slices_, payload_slices_.size());
- for (int i = 0; i < n_payload_slices; i++) {
- payload_slices_[i] = payload[i];
- }
}
OutboundTransfer::~OutboundTransfer() {
@@ -205,10 +198,10 @@ void OutboundTransfer::Abort(const Status &status) {
}
Status OutboundTransfer::SendBuffer(Socket &socket) {
- CHECK_LT(cur_slice_idx_, n_payload_slices_);
+ CHECK_LT(cur_slice_idx_, payload_slices_.size());
started_ = true;
- int n_iovecs = n_payload_slices_ - cur_slice_idx_;
+ int n_iovecs = payload_slices_.size() - cur_slice_idx_;
struct iovec iovec[n_iovecs];
{
int offset_in_slice = cur_offset_in_slice_;
@@ -226,7 +219,7 @@ Status OutboundTransfer::SendBuffer(Socket &socket) {
RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
// Adjust our accounting of current writer position.
- for (int i = cur_slice_idx_; i < n_payload_slices_; i++) {
+ for (int i = cur_slice_idx_; i < payload_slices_.size(); i++) {
Slice &slice = payload_slices_[i];
int rem_in_slice = slice.size() - cur_offset_in_slice_;
DCHECK_GE(rem_in_slice, 0);
@@ -243,11 +236,11 @@ Status OutboundTransfer::SendBuffer(Socket &socket) {
}
}
- if (cur_slice_idx_ == n_payload_slices_) {
+ if (cur_slice_idx_ == payload_slices_.size()) {
callbacks_->NotifyTransferFinished();
DCHECK_EQ(0, cur_offset_in_slice_);
} else {
- DCHECK_LT(cur_slice_idx_, n_payload_slices_);
+ DCHECK_LT(cur_slice_idx_, payload_slices_.size());
DCHECK_LT(cur_offset_in_slice_, payload_slices_[cur_slice_idx_].size());
}
@@ -259,7 +252,7 @@ bool OutboundTransfer::TransferStarted() const {
}
bool OutboundTransfer::TransferFinished() const {
- if (cur_slice_idx_ == n_payload_slices_) {
+ if (cur_slice_idx_ == payload_slices_.size()) {
DCHECK_EQ(0, cur_offset_in_slice_); // sanity check
return true;
}
@@ -272,7 +265,7 @@ string OutboundTransfer::HexDump() const {
}
string ret;
- for (int i = 0; i < n_payload_slices_; i++) {
+ for (int i = 0; i < payload_slices_.size(); i++) {
ret.append(payload_slices_[i].ToDebugString());
}
return ret;
@@ -280,8 +273,8 @@ string OutboundTransfer::HexDump() const {
int32_t OutboundTransfer::TotalLength() const {
int32_t ret = 0;
- for (int i = 0; i < n_payload_slices_; i++) {
- ret += payload_slices_[i].size();
+ for (const auto& s : payload_slices_) {
+ ret += s.size();
}
return ret;
}
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index 3342c93..0628f2c 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -16,12 +16,11 @@
// under the License.
#pragma once
-#include <array>
-#include <cstddef>
+#include <climits>
#include <cstdint>
-#include <limits.h>
#include <string>
+#include <boost/container/small_vector.hpp>
#include <boost/intrusive/list_hook.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
@@ -46,14 +45,17 @@ class TransferLimits {
public:
enum {
kMaxSidecars = 10,
- kMaxPayloadSlices = kMaxSidecars + 2, // (header + msg)
kMaxTotalSidecarBytes = INT_MAX
};
DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits);
};
-typedef std::array<Slice, TransferLimits::kMaxPayloadSlices> TransferPayload;
+// To avoid heap allocation in the common case, assume that most transfer
+// payloads will contain 4 or fewer slices (header, body protobuf, and maybe
+// two sidecars). For more complex responses with more slices, a heap
allocation
+// is worth the cost.
+typedef boost::container::small_vector<Slice, 4> TransferPayload;
// This class is used internally by the RPC layer to represent an inbound
// transfer in progress.
@@ -118,14 +120,12 @@ class OutboundTransfer : public
boost::intrusive::list_base_hook<> {
// Create an outbound transfer for a call request.
static OutboundTransfer* CreateForCallRequest(int32_t call_id,
- const TransferPayload &payload,
- size_t n_payload_slices,
+ TransferPayload payload,
TransferCallbacks *callbacks);
// Create an outbound transfer for a call response.
// See above for details.
- static OutboundTransfer* CreateForCallResponse(const TransferPayload
&payload,
- size_t n_payload_slices,
+ static OutboundTransfer* CreateForCallResponse(TransferPayload payload,
TransferCallbacks *callbacks);
// Destruct the transfer. A transfer object should never be deallocated
@@ -163,14 +163,11 @@ class OutboundTransfer : public
boost::intrusive::list_base_hook<> {
private:
OutboundTransfer(int32_t call_id,
- const TransferPayload& payload,
- size_t n_payload_slices,
+ TransferPayload payload,
TransferCallbacks *callbacks);
- // Slices to send. Uses an array here instead of a vector to avoid an
expensive
- // vector construction (improved performance a couple percent).
+ // Slices to send.
TransferPayload payload_slices_;
- size_t n_payload_slices_;
// The current slice that is being sent.
int32_t cur_slice_idx_;
diff --git a/src/kudu/tserver/tablet_service.cc
b/src/kudu/tserver/tablet_service.cc
index 8843bad..0892ff9 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1721,10 +1721,13 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
}
size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);
- unique_ptr<faststring> rows_data(new faststring(batch_size_bytes * 11 / 10));
- unique_ptr<faststring> indirect_data(new faststring(batch_size_bytes * 11 /
10));
+ // TODO(todd): use a chain of faststrings instead of a single one to avoid
+ // allocating this large buffer. Large buffer allocations are slow and
+ // potentially wasteful.
+ faststring rows_data(batch_size_bytes * 11 / 10);
+ faststring indirect_data(batch_size_bytes * 11 / 10);
RowwiseRowBlockPB data;
- ScanResultCopier collector(&data, rows_data.get(), indirect_data.get());
+ ScanResultCopier collector(&data, &rows_data, &indirect_data);
bool has_more_results = false;
TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
@@ -1777,7 +1780,7 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
resp->mutable_data()->set_rows_sidecar(rows_idx);
// Add indirect data as a sidecar, if applicable.
- if (indirect_data->size() > 0) {
+ if (indirect_data.size() > 0) {
int indirect_idx;
CHECK_OK(context->AddOutboundSidecar(
RpcSidecar::FromFaststring(std::move(indirect_data)), &indirect_idx));
diff --git a/src/kudu/util/faststring-test.cc b/src/kudu/util/faststring-test.cc
index 79dd6b2..9585e6e 100644
--- a/src/kudu/util/faststring-test.cc
+++ b/src/kudu/util/faststring-test.cc
@@ -123,4 +123,59 @@ TEST_F(FaststringTest, TestAppend_ExponentiallyExpand) {
}
}
+TEST_F(FaststringTest, TestMoveConstruct) {
+ for (int string_size = 0;
+ string_size < faststring::kInitialCapacity + 10;
+ string_size++) {
+ string test_str(string_size, 'a');
+ faststring f1;
+ f1.append(test_str);
+ faststring f2 = std::move(f1);
+ // NOTE: NOLINT here since we are purposefully checking the behavior
+ // of 'f1' after it was moved.
+ ASSERT_EQ(0, f1.size()); // NOLINT(*)
+ ASSERT_EQ(test_str, f2.ToString()); // NOLINT(*)
+ ASSERT_NE(f1.data(), f2.data()); // NOLINT(*)
+ f1.CheckInvariants(); // NOLINT(*)
+ f2.CheckInvariants();
+ }
+}
+
+TEST_F(FaststringTest, TestMoveAssignment) {
+ for (int string_size = 0;
+ string_size < faststring::kInitialCapacity + 10;
+ string_size++) {
+ string test_str(string_size, 'a');
+
+ faststring f1;
+ f1.append(test_str);
+
+ // Move to 'f2' by assignment operator.
+ faststring f2;
+ f2 = std::move(f1);
+ ASSERT_EQ(test_str, f2.ToString());
+ ASSERT_EQ(0, f1.size()); // NOLINT(*)
+ f1.CheckInvariants(); // NOLINT(*)
+ f2.CheckInvariants(); // NOLINT(*)
+
+ // Move back to f1.
+ f1 = std::move(f2);
+ ASSERT_EQ(0, f2.size()); // NOLINT(*)
+ ASSERT_EQ(test_str, f1.ToString());
+ f1.CheckInvariants(); // NOLINT(*)
+ f2.CheckInvariants(); // NOLINT(*)
+
+ // Check self-move doesn't have any effect.
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wself-move"
+ f1 = std::move(f1); // NOLINT(*)
+#pragma clang diagnostic pop
+ ASSERT_EQ(0, f2.size()); // NOLINT(*)
+ ASSERT_EQ(test_str, f1.ToString());
+ f1.CheckInvariants(); // NOLINT(*)
+ f2.CheckInvariants(); // NOLINT(*)
+ }
+}
+
+
} // namespace kudu
diff --git a/src/kudu/util/faststring.h b/src/kudu/util/faststring.h
index 3ada37c..5484f9b 100644
--- a/src/kudu/util/faststring.h
+++ b/src/kudu/util/faststring.h
@@ -21,6 +21,8 @@
#include <cstring>
#include <string>
+#include <glog/logging.h>
+
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
@@ -55,6 +57,25 @@ class faststring {
ASAN_POISON_MEMORY_REGION(data_, capacity_);
}
+ faststring(faststring&& other) noexcept
+ : faststring() {
+ *this = std::move(other);
+ }
+
+ faststring& operator=(faststring&& other) noexcept {
+ if (this == &other) return *this;
+
+ if (other.data_ == other.initial_data_) {
+ assign_copy(other.data(), other.size());
+ other.clear();
+ } else {
+ len_ = other.len_;
+ capacity_ = other.capacity_;
+ data_ = other.release();
+ }
+ return *this;
+ }
+
~faststring() {
ASAN_UNPOISON_MEMORY_REGION(initial_data_, arraysize(initial_data_));
if (data_ != initial_data_) {
@@ -224,6 +245,14 @@ class faststring {
len_);
}
+ // Check various internal invariants. Used by tests.
+ void CheckInvariants() {
+ CHECK_LE(len_, capacity_);
+ if (data_ == initial_data_) {
+ CHECK_EQ(capacity_, kInitialCapacity);
+ }
+ }
+
private:
DISALLOW_COPY_AND_ASSIGN(faststring);