KUDU-1866: Add request-side sidecars This patch adds sidecars to client requests. Using the same mechanism as on the response-side, clients may attach slices to outbound requests which do not pass through a serialization or copy before being pushed to the network socket. On the server side, these sidecars may be read directly from the underlying byte stream with the interposition of a Protobuf wrapper.
The sidecars may be added to a request via RpcController and retrieved via RpcContext (i.e. the reverse of the existing response-side interface). This patch adds a few tests to rpc-test, and all rpc-test tests pass. Change-Id: I3d709edb2a22dc983f51b69d7660a39e8d8d6a09 Reviewed-on: http://gerrit.cloudera.org:8080/5908 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/72895966 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/72895966 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/72895966 Branch: refs/heads/master Commit: 72895966c6458d6f33e68d53450d9bd43a2c57b1 Parents: 5566bc9 Author: Henry Robinson <[email protected]> Authored: Thu Feb 2 20:31:33 2017 -0800 Committer: Todd Lipcon <[email protected]> Committed: Wed Mar 8 00:49:34 2017 +0000 ---------------------------------------------------------------------- src/kudu/client/scanner-internal.cc | 14 ++-- src/kudu/rpc/CMakeLists.txt | 2 +- src/kudu/rpc/inbound_call.cc | 40 +++++++--- src/kudu/rpc/inbound_call.h | 14 +++- src/kudu/rpc/outbound_call.cc | 74 ++++++++--------- src/kudu/rpc/outbound_call.h | 22 +++-- src/kudu/rpc/proxy.cc | 2 +- src/kudu/rpc/rpc-test-base.h | 70 ++++++++++++++-- src/kudu/rpc/rpc-test.cc | 45 +++++++++++ src/kudu/rpc/rpc_context.cc | 10 ++- src/kudu/rpc/rpc_context.h | 6 +- src/kudu/rpc/rpc_controller.cc | 20 ++++- src/kudu/rpc/rpc_controller.h | 23 +++++- src/kudu/rpc/rpc_header.proto | 5 ++ src/kudu/rpc/rpc_sidecar.cc | 102 ++++++++++++++++++++++++ src/kudu/rpc/rpc_sidecar.h | 60 +++++++------- src/kudu/rpc/rtest.proto | 13 +++ src/kudu/rpc/transfer.h | 14 +++- src/kudu/tserver/tablet_server-test-base.h | 6 +- src/kudu/tserver/tablet_service.cc | 13 +-- 20 files changed, 429 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/client/scanner-internal.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc index 07f93a7..c804046 100644 --- a/src/kudu/client/scanner-internal.cc +++ b/src/kudu/client/scanner-internal.cc @@ -505,16 +505,16 @@ Status KuduScanBatch::Data::Reset(RpcController* controller, // First, rewrite the relative addresses into absolute ones. if (PREDICT_FALSE(!resp_data_.has_rows_sidecar())) { return Status::Corruption("Server sent invalid response: no row data"); - } else { - Status s = controller_.GetSidecar(resp_data_.rows_sidecar(), &direct_data_); - if (!s.ok()) { - return Status::Corruption("Server sent invalid response: row data " - "sidecar index corrupt", s.ToString()); - } + } + + Status s = controller_.GetInboundSidecar(resp_data_.rows_sidecar(), &direct_data_); + if (!s.ok()) { + return Status::Corruption("Server sent invalid response: row data " + "sidecar index corrupt", s.ToString()); } if (resp_data_.has_indirect_data_sidecar()) { - Status s = controller_.GetSidecar(resp_data_.indirect_data_sidecar(), + Status s = controller_.GetInboundSidecar(resp_data_.indirect_data_sidecar(), &indirect_data_); if (!s.ok()) { return Status::Corruption("Server sent invalid response: indirect data " http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt index 19a7610..0cfe6e9 100644 --- a/src/kudu/rpc/CMakeLists.txt +++ b/src/kudu/rpc/CMakeLists.txt @@ -59,6 +59,7 @@ set(KRPC_SRCS rpc.cc rpc_context.cc rpc_controller.cc + rpc_sidecar.cc rpcz_store.cc sasl_common.cc sasl_helper.cc @@ -125,4 +126,3 @@ ADD_KUDU_TEST(rpc-bench RUN_SERIAL true) ADD_KUDU_TEST(rpc-test) ADD_KUDU_TEST(rpc_stub-test) ADD_KUDU_TEST(service_queue-test) - http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/inbound_call.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc index 448fd70..03e7da4 100644 --- a/src/kudu/rpc/inbound_call.cc +++ b/src/kudu/rpc/inbound_call.cc @@ -33,9 +33,8 @@ using google::protobuf::FieldDescriptor; using google::protobuf::io::CodedOutputStream; -using google::protobuf::Message; using google::protobuf::MessageLite; -using std::shared_ptr; +using std::unique_ptr; using std::vector; using strings::Substitute; @@ -44,7 +43,6 @@ namespace rpc { InboundCall::InboundCall(Connection* conn) : conn_(conn), - sidecars_deleter_(&sidecars_), trace_(new Trace), method_info_(nullptr) { RecordCallReceived(); @@ -67,6 +65,19 @@ Status InboundCall::ParseFrom(gscoped_ptr<InboundTransfer> transfer) { } remote_method_.FromPB(header_.remote_method()); + if (header_.sidecar_offsets_size() > TransferLimits::kMaxSidecars) { + return Status::Corruption(strings::Substitute( + "Received $0 additional payload slices, expected at most %d", + header_.sidecar_offsets_size(), TransferLimits::kMaxSidecars)); + } + + RETURN_NOT_OK(RpcSidecar::ParseSidecars( + header_.sidecar_offsets(), serialized_request_, inbound_sidecar_slices_)); + if (header_.sidecar_offsets_size() > 0) { + // Trim the request to just the message + serialized_request_ = Slice(serialized_request_.data(), header_.sidecar_offsets(0)); + } + // Retain the buffer that we have a view into. transfer_.swap(transfer); return Status::OK(); @@ -151,7 +162,7 @@ void InboundCall::SerializeResponseBuffer(const MessageLite& response, resp_hdr.set_call_id(header_.call_id()); resp_hdr.set_is_error(!is_success); uint32_t absolute_sidecar_offset = protobuf_msg_size; - for (RpcSidecar* car : sidecars_) { + for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) { resp_hdr.add_sidecar_offsets(absolute_sidecar_offset); absolute_sidecar_offset += car->AsSlice().size(); } @@ -168,23 +179,23 @@ void InboundCall::SerializeResponseTo(vector<Slice>* slices) const { TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo"); CHECK_GT(response_hdr_buf_.size(), 0); CHECK_GT(response_msg_buf_.size(), 0); - slices->reserve(slices->size() + 2 + sidecars_.size()); + slices->reserve(slices->size() + 2 + outbound_sidecars_.size()); slices->push_back(Slice(response_hdr_buf_)); slices->push_back(Slice(response_msg_buf_)); - for (RpcSidecar* car : sidecars_) { + for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) { slices->push_back(car->AsSlice()); } } -Status InboundCall::AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx) { +Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) { // Check that the number of sidecars does not exceed the number of payload // slices that are free (two are used up by the header and main message // protobufs). - if (sidecars_.size() + 2 > OutboundTransfer::kMaxPayloadSlices) { + if (outbound_sidecars_.size() > TransferLimits::kMaxSidecars) { return Status::ServiceUnavailable("All available sidecars already used"); } - sidecars_.push_back(car.release()); - *idx = sidecars_.size() - 1; + outbound_sidecars_.emplace_back(std::move(car)); + *idx = outbound_sidecars_.size() - 1; return Status::OK(); } @@ -288,5 +299,14 @@ vector<uint32_t> InboundCall::GetRequiredFeatures() const { return features; } +Status InboundCall::GetInboundSidecar(int idx, Slice* sidecar) const { + if (idx < 0 || idx >= header_.sidecar_offsets_size()) { + return Status::InvalidArgument(strings::Substitute( + "Index $0 does not reference a valid sidecar", idx)); + } + *sidecar = inbound_sidecar_slices_[idx]; + return Status::OK(); +} + } // namespace rpc } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/inbound_call.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h index 4f99dee..ea6eade 100644 --- a/src/kudu/rpc/inbound_call.h +++ b/src/kudu/rpc/inbound_call.h @@ -22,7 +22,6 @@ #include <vector> #include "kudu/gutil/gscoped_ptr.h" -#include "kudu/gutil/stl_util.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" #include "kudu/rpc/remote_method.h" @@ -124,7 +123,7 @@ class InboundCall { void SerializeResponseTo(std::vector<Slice>* slices) const; // See RpcContext::AddRpcSidecar() - Status AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx); + Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx); std::string ToString() const; @@ -187,6 +186,10 @@ class InboundCall { // the RPC. std::vector<uint32_t> GetRequiredFeatures() const; + // Get a sidecar sent as part of the request. If idx < 0 || idx > num sidecars - 1, + // returns an error. + Status GetInboundSidecar(int idx, Slice* sidecar) const; + private: friend class RpczStore; @@ -227,8 +230,11 @@ class InboundCall { // Vector of additional sidecars that are tacked on to the call's response // after serialization of the protobuf. See rpc/rpc_sidecar.h for more info. - std::vector<RpcSidecar*> sidecars_; - ElementDeleter sidecars_deleter_; + std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_; + + // Inbound sidecars from the request. The slices are views onto transfer_. There are as + // many slices as header_.sidecar_offsets_size(). + Slice inbound_sidecar_slices_[TransferLimits::kMaxSidecars]; // The trace buffer. scoped_refptr<Trace> trace_; http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/outbound_call.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc index 19ec0ec..9b160a1 100644 --- a/src/kudu/rpc/outbound_call.cc +++ b/src/kudu/rpc/outbound_call.cc @@ -18,6 +18,7 @@ #include <algorithm> #include <boost/functional/hash.hpp> #include <gflags/gflags.h> +#include <memory> #include <mutex> #include <string> #include <unordered_set> @@ -29,6 +30,7 @@ #include "kudu/rpc/constants.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/rpc/rpc_introspection.pb.h" +#include "kudu/rpc/rpc_sidecar.h" #include "kudu/rpc/serialization.h" #include "kudu/rpc/transfer.h" #include "kudu/util/flag_tags.h" @@ -44,6 +46,8 @@ DEFINE_int64(rpc_callback_max_cycles, 100 * 1000 * 1000, TAG_FLAG(rpc_callback_max_cycles, advanced); TAG_FLAG(rpc_callback_max_cycles, runtime); +using std::unique_ptr; + namespace kudu { namespace rpc { @@ -88,10 +92,8 @@ OutboundCall::~OutboundCall() { } Status OutboundCall::SerializeTo(vector<Slice>* slices) { - size_t param_len = request_buf_.size(); - if (PREDICT_FALSE(param_len == 0)) { - return Status::InvalidArgument("Must call SetRequestParam() before SerializeTo()"); - } + DCHECK_LT(0, request_buf_.size()) + << "Must call SetRequestPayload() before SerializeTo()"; const MonoDelta &timeout = controller_->timeout(); if (timeout.Initialized()) { @@ -102,16 +104,32 @@ Status OutboundCall::SerializeTo(vector<Slice>* slices) { header_.add_required_feature_flags(feature); } - serialization::SerializeHeader(header_, param_len, &header_buf_); + DCHECK_LE(0, sidecar_byte_size_); + serialization::SerializeHeader( + header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_); - // Return the concatenated packet. slices->push_back(Slice(header_buf_)); slices->push_back(Slice(request_buf_)); + for (const unique_ptr<RpcSidecar>& car : sidecars_) slices->push_back(car->AsSlice()); return Status::OK(); } -void OutboundCall::SetRequestParam(const Message& message) { - serialization::SerializeMessage(message, &request_buf_); +void OutboundCall::SetRequestPayload(const Message& req, + vector<unique_ptr<RpcSidecar>>&& sidecars) { + DCHECK_EQ(-1, sidecar_byte_size_); + + sidecars_ = move(sidecars); + + // Compute total size of sidecar payload so that extra space can be reserved as part of + // the request body. + uint32_t message_size = req.ByteSize(); + sidecar_byte_size_ = 0; + for (const unique_ptr<RpcSidecar>& car: sidecars_) { + header_.add_sidecar_offsets(sidecar_byte_size_ + message_size); + sidecar_byte_size_ += car->AsSlice().size(); + } + + serialization::SerializeMessage(req, &request_buf_, sidecar_byte_size_, true); } Status OutboundCall::status() const { @@ -432,44 +450,16 @@ Status CallResponse::GetSidecar(int idx, Slice* sidecar) const { Status CallResponse::ParseFrom(gscoped_ptr<InboundTransfer> transfer) { CHECK(!parsed_); - Slice entire_message; RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_, - &entire_message)); + &serialized_response_)); // Use information from header to extract the payload slices. - int last = header_.sidecar_offsets_size() - 1; + RETURN_NOT_OK(RpcSidecar::ParseSidecars(header_.sidecar_offsets(), + serialized_response_, sidecar_slices_)); - if (last >= OutboundTransfer::kMaxPayloadSlices) { - return Status::Corruption(strings::Substitute( - "Received $0 additional payload slices, expected at most %d", - last, OutboundTransfer::kMaxPayloadSlices)); - } - - if (last >= 0) { - serialized_response_ = Slice(entire_message.data(), - header_.sidecar_offsets(0)); - for (int i = 0; i < last; ++i) { - uint32_t next_offset = header_.sidecar_offsets(i); - int32_t len = header_.sidecar_offsets(i + 1) - next_offset; - if (next_offset + len > entire_message.size() || len < 0) { - return Status::Corruption(strings::Substitute( - "Invalid sidecar offsets; sidecar $0 apparently starts at $1," - " has length $2, but the entire message has length $3", - i, next_offset, len, entire_message.size())); - } - sidecar_slices_[i] = Slice(entire_message.data() + next_offset, len); - } - uint32_t next_offset = header_.sidecar_offsets(last); - if (next_offset > entire_message.size()) { - return Status::Corruption(strings::Substitute( - "Invalid sidecar offsets; the last sidecar ($0) apparently starts " - "at $1, but the entire message has length $3", - last, next_offset, entire_message.size())); - } - sidecar_slices_[last] = Slice(entire_message.data() + next_offset, - entire_message.size() - next_offset); - } else { - serialized_response_ = entire_message; + if (header_.sidecar_offsets_size() > 0) { + serialized_response_ = + Slice(serialized_response_.data(), header_.sidecar_offsets(0)); } transfer_.swap(transfer); http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/outbound_call.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h index fa599fd..87ca39a 100644 --- a/src/kudu/rpc/outbound_call.h +++ b/src/kudu/rpc/outbound_call.h @@ -27,6 +27,7 @@ #include "kudu/gutil/macros.h" #include "kudu/rpc/constants.h" #include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/rpc_sidecar.h" #include "kudu/rpc/remote_method.h" #include "kudu/rpc/response_callback.h" #include "kudu/rpc/transfer.h" @@ -52,6 +53,7 @@ class DumpRunningRpcsRequestPB; class InboundTransfer; class RpcCallInProgressPB; class RpcController; +class RpcSidecar; // Used to key on Connection information. @@ -124,11 +126,13 @@ class OutboundCall { ~OutboundCall(); - // Serialize the given request PB into this call's internal storage. + // Serialize the given request PB into this call's internal storage, and assume + // ownership of any sidecars that should accompany this request. // - // Because the data is fully serialized by this call, 'req' may be - // subsequently mutated with no ill effects. - void SetRequestParam(const google::protobuf::Message& req); + // Because the request data is fully serialized by this call, 'req' may be subsequently + // mutated with no ill effects. + void SetRequestPayload(const google::protobuf::Message& req, + std::vector<std::unique_ptr<RpcSidecar>>&& sidecars); // Assign the call ID for this call. This is called from the reactor // thread once a connection has been assigned. Must only be called once. @@ -137,7 +141,7 @@ class OutboundCall { header_.set_call_id(call_id); } - // Serialize the call for the wire. Requires that SetRequestParam() + // Serialize the call for the wire. Requires that SetRequestPayload() // is called first. This is called from the Reactor thread. Status SerializeTo(std::vector<Slice>* slices); @@ -269,6 +273,12 @@ class OutboundCall { // Otherwise NULL. gscoped_ptr<CallResponse> call_response_; + // All sidecars to be sent with this call. + std::vector<std::unique_ptr<RpcSidecar>> sidecars_; + + // Total size in bytes of all sidecars in 'sidecars_'. Set in SetRequestPayload(). + int64_t sidecar_byte_size_ = -1; + DISALLOW_COPY_AND_ASSIGN(OutboundCall); }; @@ -322,7 +332,7 @@ class CallResponse { Slice serialized_response_; // Slices of data for rpc sidecars. They point into memory owned by transfer_. - Slice sidecar_slices_[OutboundTransfer::kMaxPayloadSlices]; + Slice sidecar_slices_[TransferLimits::kMaxSidecars]; // The incoming transfer data - retained because serialized_response_ // and sidecar_slices_ refer into its data. http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/proxy.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc index 206aac3..077af58 100644 --- a/src/kudu/rpc/proxy.cc +++ b/src/kudu/rpc/proxy.cc @@ -81,7 +81,7 @@ void Proxy::AsyncRequest(const string& method, RemoteMethod remote_method(service_name_, method); OutboundCall* call = new OutboundCall(conn_id_, remote_method, response, controller, callback); controller->call_.reset(call); - call->SetRequestParam(req); + controller->SetRequestParam(req); // If this fails to queue, the callback will get called immediately // and the controller will be in an ERROR state. http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc-test-base.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h index 35a19f2..75ef792 100644 --- a/src/kudu/rpc/rpc-test-base.h +++ b/src/kudu/rpc/rpc-test-base.h @@ -65,6 +65,8 @@ using kudu::rpc_test::ExactlyOnceResponsePB; using kudu::rpc_test::FeatureFlags; using kudu::rpc_test::PanicRequestPB; using kudu::rpc_test::PanicResponsePB; +using kudu::rpc_test::PushTwoStringsRequestPB; +using kudu::rpc_test::PushTwoStringsResponsePB; using kudu::rpc_test::SendTwoStringsRequestPB; using kudu::rpc_test::SendTwoStringsResponsePB; using kudu::rpc_test::SleepRequestPB; @@ -83,6 +85,7 @@ class GenericCalculatorService : public ServiceIf { static const char *kFullServiceName; static const char *kAddMethodName; static const char *kSleepMethodName; + static const char *kPushTwoStringsMethodName; static const char *kSendTwoStringsMethodName; static const char *kAddExactlyOnce; @@ -105,6 +108,8 @@ class GenericCalculatorService : public ServiceIf { DoSleep(incoming); } else if (incoming->remote_method().method_name() == kSendTwoStringsMethodName) { DoSendTwoStrings(incoming); + } else if (incoming->remote_method().method_name() == kPushTwoStringsMethodName) { + DoPushTwoStrings(incoming); } else { incoming->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_METHOD, Status::InvalidArgument("bad method")); @@ -134,8 +139,8 @@ class GenericCalculatorService : public ServiceIf { LOG(FATAL) << "couldn't parse: " << param.ToDebugString(); } - gscoped_ptr<faststring> first(new faststring); - gscoped_ptr<faststring> second(new faststring); + std::unique_ptr<faststring> first(new faststring); + std::unique_ptr<faststring> second(new faststring); Random r(req.random_seed()); first->resize(req.size1()); @@ -146,16 +151,42 @@ class GenericCalculatorService : public ServiceIf { SendTwoStringsResponsePB resp; int idx1, idx2; - CHECK_OK(incoming->AddRpcSidecar( - make_gscoped_ptr(new RpcSidecar(std::move(first))), &idx1)); - CHECK_OK(incoming->AddRpcSidecar( - make_gscoped_ptr(new RpcSidecar(std::move(second))), &idx2)); + CHECK_OK(incoming->AddOutboundSidecar( + RpcSidecar::FromFaststring(std::move(first)), &idx1)); + CHECK_OK(incoming->AddOutboundSidecar( + RpcSidecar::FromFaststring(std::move(second)), &idx2)); resp.set_sidecar1(idx1); resp.set_sidecar2(idx2); incoming->RespondSuccess(resp); } + void DoPushTwoStrings(InboundCall* incoming) { + Slice param(incoming->serialized_request()); + PushTwoStringsRequestPB req; + if (!req.ParseFromArray(param.data(), param.size())) { + LOG(FATAL) << "couldn't parse: " << param.ToDebugString(); + } + + Slice sidecar1; + CHECK_OK(incoming->GetInboundSidecar(req.sidecar1_idx(), &sidecar1)); + + Slice sidecar2; + CHECK_OK(incoming->GetInboundSidecar(req.sidecar2_idx(), &sidecar2)); + + // Check that reading non-existant sidecars doesn't work. + Slice tmp; + CHECK(!incoming->GetInboundSidecar(req.sidecar2_idx() + 2, &tmp).ok()); + + PushTwoStringsResponsePB resp; + resp.set_size1(sidecar1.size()); + resp.set_data1(reinterpret_cast<const char*>(sidecar1.data()), sidecar1.size()); + resp.set_size2(sidecar2.size()); + resp.set_data2(reinterpret_cast<const char*>(sidecar2.data()), sidecar2.size()); + + incoming->RespondSuccess(resp); + } + void DoSleep(InboundCall *incoming) { Slice param(incoming->serialized_request()); SleepRequestPB req; @@ -326,6 +357,7 @@ class CalculatorService : public CalculatorServiceIf { const char *GenericCalculatorService::kFullServiceName = "kudu.rpc.GenericCalculatorService"; const char *GenericCalculatorService::kAddMethodName = "Add"; const char *GenericCalculatorService::kSleepMethodName = "Sleep"; +const char *GenericCalculatorService::kPushTwoStringsMethodName = "PushTwoStrings"; const char *GenericCalculatorService::kSendTwoStringsMethodName = "SendTwoStrings"; const char *GenericCalculatorService::kAddExactlyOnce = "AddExactlyOnce"; @@ -425,6 +457,30 @@ class RpcTestBase : public KuduTest { CHECK_EQ(0, second.compare(Slice(expected))); } + void DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) { + PushTwoStringsRequestPB request; + RpcController controller; + + int idx1; + string s1(size1, 'a'); + CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s1)), &idx1)); + + int idx2; + string s2(size2, 'b'); + CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s2)), &idx2)); + + request.set_sidecar1_idx(idx1); + request.set_sidecar2_idx(idx2); + + PushTwoStringsResponsePB resp; + CHECK_OK(p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName, + request, &resp, &controller)); + CHECK_EQ(size1, resp.size1()); + CHECK_EQ(resp.data1(), s1); + CHECK_EQ(size2, resp.size2()); + CHECK_EQ(resp.data2(), s2); + } + void DoTestExpectTimeout(const Proxy &p, const MonoDelta &timeout) { SleepRequestPB req; SleepResponsePB resp; @@ -476,7 +532,7 @@ class RpcTestBase : public KuduTest { static Slice GetSidecarPointer(const RpcController& controller, int idx, int expected_size) { Slice sidecar; - CHECK_OK(controller.GetSidecar(idx, &sidecar)); + CHECK_OK(controller.GetInboundSidecar(idx, &sidecar)); CHECK_EQ(expected_size, sidecar.size()); return Slice(sidecar.data(), expected_size); } http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc index e18d07c..d707a50 100644 --- a/src/kudu/rpc/rpc-test.cc +++ b/src/kudu/rpc/rpc-test.cc @@ -294,6 +294,51 @@ TEST_P(TestRpc, TestRpcSidecar) { // Test some larger sidecars to verify that we properly handle the case where // we can't write the whole response to the socket in a single call. DoTestSidecar(p, 3000 * 1024, 2000 * 1024); + + DoTestOutgoingSidecar(p, 0, 0); + DoTestOutgoingSidecar(p, 123, 456); + DoTestOutgoingSidecar(p, 3000 * 1024, 2000 * 1024); +} + +TEST_P(TestRpc, TestRpcSidecarLimits) { + { + // Test that the limits on the number of sidecars is respected. + RpcController controller; + string s = "foo"; + int idx; + for (int i = 0; i < TransferLimits::kMaxSidecars; ++i) { + CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx)); + } + + CHECK(!controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx).ok()); + } + + { + // Test that the payload may not exceed --rpc_max_message_size. + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + StartTestServer(&server_addr, enable_ssl); + + // Set up client. + shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, GetParam())); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + + RpcController controller; + string s(FLAGS_rpc_max_message_size + 1, 'a'); + int idx; + CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx)); + + PushTwoStringsRequestPB request; + request.set_sidecar1_idx(idx); + request.set_sidecar2_idx(idx); + PushTwoStringsResponsePB resp; + Status status = p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName, + request, &resp, &controller); + ASSERT_TRUE(status.IsNetworkError()) << "Unexpected error: " << status.ToString(); + // Remote responds to extra-large payloads by closing the connection. + ASSERT_STR_CONTAINS(status.ToString(), "Connection reset by peer"); + } } // Test that timeouts are properly handled. http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_context.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc_context.cc b/src/kudu/rpc/rpc_context.cc index a0e634c..e93e093 100644 --- a/src/kudu/rpc/rpc_context.cc +++ b/src/kudu/rpc/rpc_context.cc @@ -17,6 +17,7 @@ #include "kudu/rpc/rpc_context.h" +#include <memory> #include <ostream> #include <sstream> @@ -32,6 +33,7 @@ #include "kudu/util/trace.h" using google::protobuf::Message; +using std::unique_ptr; namespace kudu { namespace rpc { @@ -141,8 +143,12 @@ const rpc::RequestIdPB* RpcContext::request_id() const { return call_->header().has_request_id() ? &call_->header().request_id() : nullptr; } -Status RpcContext::AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx) { - return call_->AddRpcSidecar(std::move(car), idx); +Status RpcContext::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) { + return call_->AddOutboundSidecar(std::move(car), idx); +} + +Status RpcContext::GetInboundSidecar(int idx, Slice* slice) { + return call_->GetInboundSidecar(idx, slice); } const RemoteUser& RpcContext::remote_user() const { http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_context.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc_context.h b/src/kudu/rpc/rpc_context.h index b95a9ce..12e8907 100644 --- a/src/kudu/rpc/rpc_context.h +++ b/src/kudu/rpc/rpc_context.h @@ -153,7 +153,11 @@ class RpcContext { // Upon success, writes the index of the sidecar (necessary to be retrieved // later) to 'idx'. Call may fail if all sidecars have already been used // by the RPC response. - Status AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx); + Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx); + + // Fills 'sidecar' with a sidecar sent by the client. Returns an error if 'idx' is out + // of bounds. + Status GetInboundSidecar(int idx, Slice* slice); // Return the identity of remote user who made this call. const RemoteUser& remote_user() const; http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_controller.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc_controller.cc b/src/kudu/rpc/rpc_controller.cc index adaf5ce..5e5cbc3 100644 --- a/src/kudu/rpc/rpc_controller.cc +++ b/src/kudu/rpc/rpc_controller.cc @@ -19,11 +19,14 @@ #include <algorithm> #include <glog/logging.h> +#include <memory> #include <mutex> #include "kudu/rpc/rpc_header.pb.h" #include "kudu/rpc/outbound_call.h" +using std::unique_ptr; + namespace kudu { namespace rpc { RpcController::RpcController() { @@ -43,6 +46,7 @@ void RpcController::Swap(RpcController* other) { CHECK(other->finished()); } + std::swap(outbound_sidecars_, other->outbound_sidecars_); std::swap(timeout_, other->timeout_); std::swap(call_, other->call_); } @@ -77,7 +81,7 @@ const ErrorStatusPB* RpcController::error_response() const { return nullptr; } -Status RpcController::GetSidecar(int idx, Slice* sidecar) const { +Status RpcController::GetInboundSidecar(int idx, Slice* sidecar) const { return call_->call_response_->GetSidecar(idx, sidecar); } @@ -114,5 +118,19 @@ MonoDelta RpcController::timeout() const { return timeout_; } +Status RpcController::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) { + if (outbound_sidecars_.size() >= TransferLimits::kMaxSidecars) { + return Status::RuntimeError("All available sidecars already used"); + } + outbound_sidecars_.emplace_back(std::move(car)); + *idx = outbound_sidecars_.size() - 1; + return Status::OK(); +} + +void RpcController::SetRequestParam(const google::protobuf::Message& req) { + DCHECK(call_ != nullptr); + call_->SetRequestPayload(req, std::move(outbound_sidecars_)); +} + } // namespace rpc } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_controller.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc_controller.h b/src/kudu/rpc/rpc_controller.h index cce1ff2..6d521d0 100644 --- a/src/kudu/rpc/rpc_controller.h +++ b/src/kudu/rpc/rpc_controller.h @@ -21,11 +21,20 @@ #include <glog/logging.h> #include <memory> #include <unordered_set> +#include <vector> #include "kudu/gutil/macros.h" +#include "kudu/gutil/stl_util.h" #include "kudu/util/locks.h" #include "kudu/util/monotime.h" #include "kudu/util/status.h" + +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + namespace kudu { namespace rpc { @@ -33,6 +42,7 @@ namespace rpc { class ErrorStatusPB; class OutboundCall; class RequestIdPB; +class RpcSidecar; // Controller for managing properties of a single RPC call, on the client side. // @@ -177,12 +187,21 @@ class RpcController { // been Reset(). // // May fail if index is invalid. - Status GetSidecar(int idx, Slice* sidecar) const; + Status GetInboundSidecar(int idx, Slice* sidecar) const; + + // Adds a sidecar to the outbound request. The index of the sidecar is written to + // 'idx'. Returns an error if TransferLimits::kMaxSidecars have already been added + // to this request. + Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx); private: friend class OutboundCall; friend class Proxy; + // Set the outbound call_'s request parameter, and transfer ownership of + // outbound_sidecars_ to call_ in preparation for serialization. + void SetRequestParam(const google::protobuf::Message& req); + MonoDelta timeout_; std::unordered_set<uint32_t> required_server_features_; @@ -195,6 +214,8 @@ class RpcController { // Once the call is sent, it is tracked here. std::shared_ptr<OutboundCall> call_; + std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_; + DISALLOW_COPY_AND_ASSIGN(RpcController); }; http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_header.proto ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc_header.proto b/src/kudu/rpc/rpc_header.proto index 6721c44..a6c9728 100644 --- a/src/kudu/rpc/rpc_header.proto +++ b/src/kudu/rpc/rpc_header.proto @@ -257,6 +257,11 @@ message RequestHeader { // Optional for requests that are naturally idempotent or to maintain compatibility with // older clients for requests that are not. optional RequestIdPB request_id = 15; + + // Byte offsets for side cars in the main body of the request message. + // These offsets are counted AFTER the message header, i.e., offset 0 + // is the first byte after the bytes for this protobuf. + repeated uint32 sidecar_offsets = 16; } message ResponseHeader { http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_sidecar.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc_sidecar.cc b/src/kudu/rpc/rpc_sidecar.cc new file mode 100644 index 0000000..580c6eb --- /dev/null +++ b/src/kudu/rpc/rpc_sidecar.cc @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/rpc_sidecar.h" + +#include "kudu/util/status.h" +#include "kudu/rpc/transfer.h" +#include "kudu/gutil/strings/substitute.h" + +using std::unique_ptr; + +namespace kudu { +namespace rpc { + +// Sidecar that simply wraps a Slice. The data associated with the slice is therefore not +// owned by this class, and it's the caller's responsibility to ensure it has a lifetime +// at least as long as this sidecar. +class SliceSidecar : public RpcSidecar { + public: + explicit SliceSidecar(Slice slice) : slice_(slice) { } + Slice AsSlice() const override { return slice_; } + + private: + const Slice slice_; +}; + +class FaststringSidecar : public RpcSidecar { + public: + explicit FaststringSidecar(unique_ptr<faststring> data) : data_(std::move(data)) { } + Slice AsSlice() const override { return *data_; } + + private: + const unique_ptr<faststring> data_; +}; + +unique_ptr<RpcSidecar> RpcSidecar::FromFaststring(unique_ptr<faststring> data) { + return unique_ptr<RpcSidecar>(new FaststringSidecar(std::move(data))); +} + +unique_ptr<RpcSidecar> RpcSidecar::FromSlice(Slice slice) { + return unique_ptr<RpcSidecar>(new SliceSidecar(slice)); +} + + +Status RpcSidecar::ParseSidecars( + const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets, + Slice buffer, Slice* sidecars) { + if (offsets.size() == 0) return Status::OK(); + + int last = offsets.size() - 1; + if (last >= TransferLimits::kMaxSidecars) { + return Status::Corruption(strings::Substitute( + "Received $0 additional payload slices, expected at most %d", + last, TransferLimits::kMaxSidecars)); + } + + for (int i = 0; i < last; ++i) { + int64_t cur_offset = offsets.Get(i); + int64_t next_offset = offsets.Get(i + 1); + if (next_offset > buffer.size()) { + return Status::Corruption(strings::Substitute( + "Invalid sidecar offsets; sidecar $0 apparently starts at $1," + " has length $2, but the entire message has length $3", + i, cur_offset, (next_offset - cur_offset), buffer.size())); + } + if (next_offset < cur_offset) { + return Status::Corruption(strings::Substitute( + "Invalid sidecar offsets; sidecar $0 apparently starts at $1," + " but ends before that at offset $1.", i, cur_offset, next_offset)); + } + + sidecars[i] = Slice(buffer.data() + cur_offset, next_offset - cur_offset); + } + + int64_t cur_offset = offsets.Get(last); + if (cur_offset > buffer.size()) { + return Status::Corruption(strings::Substitute("Invalid sidecar offsets: sidecar $0 " + "starts at offset $1after message ends (message length $2).", last, + cur_offset, buffer.size())); + } + sidecars[last] = Slice(buffer.data() + cur_offset, buffer.size() - cur_offset); + + return Status::OK(); +} + + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_sidecar.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rpc_sidecar.h b/src/kudu/rpc/rpc_sidecar.h index da7e00f..00d6e4b 100644 --- a/src/kudu/rpc/rpc_sidecar.h +++ b/src/kudu/rpc/rpc_sidecar.h @@ -17,50 +17,48 @@ #ifndef KUDU_RPC_RPC_SIDECAR_H #define KUDU_RPC_RPC_SIDECAR_H -#include "kudu/gutil/gscoped_ptr.h" +#include <google/protobuf/repeated_field.h> +#include <memory> + #include "kudu/util/faststring.h" #include "kudu/util/slice.h" namespace kudu { namespace rpc { -// An RpcSidecar is a mechanism which allows replies to RPCs -// to reference blocks of data without extra copies. In other words, -// whenever a protobuf would have a large field where additional copies -// become expensive, one may opt instead to use an RpcSidecar. -// -// The RpcSidecar saves on an additional copy to/from the protobuf on both the -// server and client side. The InboundCall class accepts RpcSidecars, ignorant -// of the form that the sidecar's data is kept in, requiring only that it can -// be represented as a Slice. Data is then immediately copied from the -// Slice returned from AsSlice() to the socket that is responding to the original -// RPC. +// An RpcSidecar is a mechanism which allows replies to RPCs to reference blocks of data +// without extra copies. In other words, whenever a protobuf would have a large field +// where additional copies become expensive, one may opt instead to use an RpcSidecar. // -// In order to distinguish between separate sidecars, whenever a sidecar is -// added to the RPC response on the server side, an index for that sidecar is -// returned. This index must then in some way (i.e., via protobuf) be -// communicated to the client side. +// The RpcSidecar saves on an additional copy to/from the protobuf on both the server and +// client side. Both Inbound- and OutboundCall classes accept sidecars to be sent to the +// client and server respectively. They are ignorant of the sidecar's format, requiring +// only that it can be represented as a Slice. Data is copied from the Slice returned from +// AsSlice() to the socket that is responding to the original RPC. The slice should remain +// valid for as long as the call it is attached to takes to complete. // -// After receiving the RPC response on the client side, OutboundCall decodes -// the original message along with the separate sidecars by using a list -// of sidecar byte offsets that was sent in the message header. +// In order to distinguish between separate sidecars, whenever a sidecar is added to the +// RPC response on the server side, an index for that sidecar is returned. This index must +// then in some way (i.e., via protobuf) be communicated to the recipient. // -// After reconstructing the array of sidecars, the OutboundCall (through -// RpcController's interface) is able to offer retrieval of the sidecar data -// through the same indices that were returned by InboundCall (or indirectly -// through the RpcContext wrapper) on the client side. +// After reconstructing the array of sidecars, servers and clients may retrieve the +// sidecar data through the RpcContext or RpcController interfaces respectively. class RpcSidecar { public: - // Generates a sidecar with the parameter faststring as its data. - explicit RpcSidecar(gscoped_ptr<faststring> data) : data_(std::move(data)) {} + static std::unique_ptr<RpcSidecar> FromFaststring(std::unique_ptr<faststring> data); + static std::unique_ptr<RpcSidecar> FromSlice(Slice slice); - // Returns a Slice representation of the sidecar's data. - Slice AsSlice() const { return *data_; } + // Utility method to parse a series of sidecar slices into 'sidecars' from 'buffer' and + // a set of offsets. 'sidecars' must have length >= TransferLimits::kMaxSidecars, and + // will be filled from index 0. + // TODO(henryr): Consider a vector instead here if there's no perf. impact. + static Status ParseSidecars( + const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets, + Slice buffer, Slice* sidecars); - private: - const gscoped_ptr<faststring> data_; - - DISALLOW_COPY_AND_ASSIGN(RpcSidecar); + // Returns a Slice representation of the sidecar's data. + virtual Slice AsSlice() const = 0; + virtual ~RpcSidecar() { } }; } // namespace rpc http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rtest.proto ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/rtest.proto b/src/kudu/rpc/rtest.proto index df307a6..c52b535 100644 --- a/src/kudu/rpc/rtest.proto +++ b/src/kudu/rpc/rtest.proto @@ -65,6 +65,19 @@ message SendTwoStringsResponsePB { required uint32 sidecar2 = 2; } +// Push two strings to the server as part of the request, in sidecars. +message PushTwoStringsRequestPB { + required uint32 sidecar1_idx = 1; + required uint32 sidecar2_idx = 2; +} + +message PushTwoStringsResponsePB { + required uint32 size1 = 1; + required string data1 = 2; + required uint32 size2 = 3; + required string data2 = 4; +} + message EchoRequestPB { required string data = 1; } http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/transfer.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h index 7fb6b10..671347a 100644 --- a/src/kudu/rpc/transfer.h +++ b/src/kudu/rpc/transfer.h @@ -46,6 +46,16 @@ namespace rpc { class Messenger; struct TransferCallbacks; +class TransferLimits { + public: + enum { + kMaxSidecars = 10, + kMaxPayloadSlices = kMaxSidecars + 2 // (header + msg) + }; + + DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits); +}; + // This class is used internally by the RPC layer to represent an inbound // transfer in progress. // @@ -94,8 +104,6 @@ class InboundTransfer { // Upon completion of the transfer, a callback is triggered. class OutboundTransfer : public boost::intrusive::list_base_hook<> { public: - enum { kMaxPayloadSlices = 10 }; - // Factory methods for creating transfers associated with call requests // or responses. The 'payload' slices will be concatenated and // written to the socket. When the transfer completes or errors, the @@ -159,7 +167,7 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> { // Slices to send. Uses an array here instead of a vector to avoid an expensive // vector construction (improved performance a couple percent). - Slice payload_slices_[kMaxPayloadSlices]; + Slice payload_slices_[TransferLimits::kMaxPayloadSlices]; size_t n_payload_slices_; // The current slice that is being sent. http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/tserver/tablet_server-test-base.h ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h index 22edce7..6ebf0e3 100644 --- a/src/kudu/tserver/tablet_server-test-base.h +++ b/src/kudu/tserver/tablet_server-test-base.h @@ -312,10 +312,10 @@ class TabletServerTestBase : public KuduTest { vector<string>* results) { RowwiseRowBlockPB* rrpb = resp.mutable_data(); Slice direct, indirect; // sidecar data buffers - ASSERT_OK(rpc.GetSidecar(rrpb->rows_sidecar(), &direct)); + ASSERT_OK(rpc.GetInboundSidecar(rrpb->rows_sidecar(), &direct)); if (rrpb->has_indirect_data_sidecar()) { - ASSERT_OK(rpc.GetSidecar(rrpb->indirect_data_sidecar(), - &indirect)); + ASSERT_OK(rpc.GetInboundSidecar(rrpb->indirect_data_sidecar(), + &indirect)); } vector<const uint8_t*> rows; ASSERT_OK(ExtractRowsFromRowBlockPB(projection, *rrpb, http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/tserver/tablet_service.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index 69ee7ab..c9e7d79 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -118,6 +118,7 @@ using kudu::consensus::StartTabletCopyResponsePB; using kudu::consensus::VoteRequestPB; using kudu::consensus::VoteResponsePB; using kudu::rpc::RpcContext; +using kudu::rpc::RpcSidecar; using kudu::server::ServerBase; using kudu::tablet::AlterSchemaTransactionState; using kudu::tablet::Tablet; @@ -1073,8 +1074,8 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req, } size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req); - gscoped_ptr<faststring> rows_data(new faststring(batch_size_bytes * 11 / 10)); - gscoped_ptr<faststring> indirect_data(new faststring(batch_size_bytes * 11 / 10)); + unique_ptr<faststring> rows_data(new faststring(batch_size_bytes * 11 / 10)); + unique_ptr<faststring> indirect_data(new faststring(batch_size_bytes * 11 / 10)); RowwiseRowBlockPB data; ScanResultCopier collector(&data, rows_data.get(), indirect_data.get()); @@ -1123,15 +1124,15 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req, // Add sidecar data to context and record the returned indices. int rows_idx; - CHECK_OK(context->AddRpcSidecar(make_gscoped_ptr( - new rpc::RpcSidecar(std::move(rows_data))), &rows_idx)); + CHECK_OK(context->AddOutboundSidecar(RpcSidecar::FromFaststring((std::move(rows_data))), + &rows_idx)); resp->mutable_data()->set_rows_sidecar(rows_idx); // Add indirect data as a sidecar, if applicable. if (indirect_data->size() > 0) { int indirect_idx; - CHECK_OK(context->AddRpcSidecar(make_gscoped_ptr( - new rpc::RpcSidecar(std::move(indirect_data))), &indirect_idx)); + CHECK_OK(context->AddOutboundSidecar(RpcSidecar::FromFaststring( + std::move(indirect_data)), &indirect_idx)); resp->mutable_data()->set_indirect_data_sidecar(indirect_idx); }
