Repository: incubator-impala Updated Branches: refs/heads/master 9e03e9332 -> 30129c453
KUDU-1865: Avoid heap allocation for payload slices As shown in KUDU-1865, the heap allocation for the temporary vector for the slices for holding the serialized payload is introducing measurable overhead under heavy load. This change replaces the heap allocation with a stack allocation of an array of size TransferLimits::kMaxPayloadSlices. With this change, we saw 10%~15% improvement under heavy workload. Change-Id: I4470d34ba48db5edaeb66d9e739e0c8942004d86 Reviewed-on: http://gerrit.cloudera.org:8080/7471 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <[email protected]> Reviewed-on: http://gerrit.cloudera.org:8080/7744 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/701cd503 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/701cd503 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/701cd503 Branch: refs/heads/master Commit: 701cd503e749993d65646336ecbc09b8b8df695e Parents: 9e03e93 Author: Michael Ho <[email protected]> Authored: Wed Jul 19 20:15:03 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Aug 30 22:05:47 2017 +0000 ---------------------------------------------------------------------- be/src/kudu/rpc/connection.cc | 18 +++++++----------- be/src/kudu/rpc/connection.h | 4 ---- be/src/kudu/rpc/inbound_call.cc | 20 ++++++++++++-------- be/src/kudu/rpc/inbound_call.h | 5 +++-- be/src/kudu/rpc/outbound_call.cc | 17 ++++++++++++----- be/src/kudu/rpc/outbound_call.h | 3 ++- be/src/kudu/rpc/transfer.cc | 26 +++++++++++++------------- be/src/kudu/rpc/transfer.h | 14 ++++++++++---- 8 files changed, 59 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/701cd503/be/src/kudu/rpc/connection.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/connection.cc b/be/src/kudu/rpc/connection.cc index 7a519e3..bc7446e 100644 --- a/be/src/kudu/rpc/connection.cc +++ b/be/src/kudu/rpc/connection.cc @@ -336,13 +336,8 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) { call->set_call_id(call_id); // Serialize the actual bytes to be put on the wire. - slices_tmp_.clear(); - Status s = call->SerializeTo(&slices_tmp_); - if (PREDICT_FALSE(!s.ok())) { - call->SetFailed(s, negotiation_complete_ ? Phase::REMOTE_CALL - : Phase::CONNECTION_NEGOTIATION); - return; - } + TransferPayload tmp_slices; + size_t n_slices = call->SerializeTo(&tmp_slices); call->SetQueued(); @@ -400,7 +395,7 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) { TransferCallbacks *cb = new CallTransferCallbacks(call, this); awaiting_response_[call_id] = car.release(); QueueOutbound(gscoped_ptr<OutboundTransfer>( - OutboundTransfer::CreateForCallRequest(call_id, slices_tmp_, cb))); + OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, n_slices, cb))); } // Callbacks for sending an RPC call response from the server. @@ -471,14 +466,15 @@ void Connection::QueueResponseForCall(gscoped_ptr<InboundCall> call) { // eventually runs in the reactor thread will take care of calling // ResponseTransferCallbacks::NotifyTransferAborted. - std::vector<Slice> slices; - call->SerializeResponseTo(&slices); + TransferPayload tmp_slices; + size_t n_slices = call->SerializeResponseTo(&tmp_slices); TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this); // After the response is sent, can delete the InboundCall object. // We set a dummy call ID and required feature set, since these are not needed // when sending responses. - gscoped_ptr<OutboundTransfer> t(OutboundTransfer::CreateForCallResponse(slices, cb)); + gscoped_ptr<OutboundTransfer> t( + OutboundTransfer::CreateForCallResponse(tmp_slices, n_slices, cb)); QueueTransferTask *task = new QueueTransferTask(std::move(t), this); reactor_thread_->reactor()->ScheduleReactorTask(task); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/701cd503/be/src/kudu/rpc/connection.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/connection.h b/be/src/kudu/rpc/connection.h index 3d23826..7f16b7b 100644 --- a/be/src/kudu/rpc/connection.h +++ b/be/src/kudu/rpc/connection.h @@ -330,10 +330,6 @@ class Connection : public RefCountedThreadSafe<Connection> { // Starts as Status::OK, gets set to a shutdown status upon Shutdown(). Status shutdown_status_; - // Temporary vector used when serializing - avoids an allocation - // when serializing calls. - std::vector<Slice> slices_tmp_; - // RPC features supported by the remote end of the connection. std::set<RpcFeatureFlag> remote_features_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/701cd503/be/src/kudu/rpc/inbound_call.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/inbound_call.cc b/be/src/kudu/rpc/inbound_call.cc index aba9977..d1c27b7 100644 --- a/be/src/kudu/rpc/inbound_call.cc +++ b/be/src/kudu/rpc/inbound_call.cc @@ -175,16 +175,20 @@ void InboundCall::SerializeResponseBuffer(const MessageLite& response, &response_hdr_buf_); } -void InboundCall::SerializeResponseTo(vector<Slice>* slices) const { +size_t InboundCall::SerializeResponseTo(TransferPayload* slices) const { TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo"); - CHECK_GT(response_hdr_buf_.size(), 0); - CHECK_GT(response_msg_buf_.size(), 0); - slices->reserve(slices->size() + 2 + outbound_sidecars_.size()); - slices->push_back(Slice(response_hdr_buf_)); - slices->push_back(Slice(response_msg_buf_)); - for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) { - slices->push_back(car->AsSlice()); + DCHECK_GT(response_hdr_buf_.size(), 0); + DCHECK_GT(response_msg_buf_.size(), 0); + size_t n_slices = 2 + outbound_sidecars_.size(); + DCHECK_LE(n_slices, slices->size()); + auto slice_iter = slices->begin(); + *slice_iter++ = Slice(response_hdr_buf_); + *slice_iter++ = Slice(response_msg_buf_); + for (auto& sidecar : outbound_sidecars_) { + *slice_iter++ = sidecar->AsSlice(); } + DCHECK_EQ(slice_iter - slices->begin(), n_slices); + return n_slices; } Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/701cd503/be/src/kudu/rpc/inbound_call.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/inbound_call.h b/be/src/kudu/rpc/inbound_call.h index 6bed18f..84e6745 100644 --- a/be/src/kudu/rpc/inbound_call.h +++ b/be/src/kudu/rpc/inbound_call.h @@ -119,9 +119,10 @@ class InboundCall { const google::protobuf::MessageLite& app_error_pb, ErrorStatusPB* err); - // Serialize the response packet for the finished call. + // Serialize the response packet for the finished call into 'slices'. // The resulting slices refer to memory in this object. - void SerializeResponseTo(std::vector<Slice>* slices) const; + // Returns the number of slices in the serialized response. + size_t SerializeResponseTo(TransferPayload* slices) const; // See RpcContext::AddRpcSidecar() Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/701cd503/be/src/kudu/rpc/outbound_call.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/outbound_call.cc b/be/src/kudu/rpc/outbound_call.cc index aab29d3..bcc39c3 100644 --- a/be/src/kudu/rpc/outbound_call.cc +++ b/be/src/kudu/rpc/outbound_call.cc @@ -99,7 +99,7 @@ OutboundCall::~OutboundCall() { DVLOG(4) << "OutboundCall " << this << " destroyed with state_: " << StateName(state_); } -Status OutboundCall::SerializeTo(vector<Slice>* slices) { +size_t OutboundCall::SerializeTo(TransferPayload* slices) { DCHECK_LT(0, request_buf_.size()) << "Must call SetRequestPayload() before SerializeTo()"; @@ -116,10 +116,16 @@ Status OutboundCall::SerializeTo(vector<Slice>* slices) { serialization::SerializeHeader( header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_); - slices->push_back(Slice(header_buf_)); - slices->push_back(Slice(request_buf_)); - for (const unique_ptr<RpcSidecar>& car : sidecars_) slices->push_back(car->AsSlice()); - return Status::OK(); + size_t n_slices = 2 + sidecars_.size(); + DCHECK_LE(n_slices, slices->size()); + auto slice_iter = slices->begin(); + *slice_iter++ = Slice(header_buf_); + *slice_iter++ = Slice(request_buf_); + for (auto& sidecar : sidecars_) { + *slice_iter++ = sidecar->AsSlice(); + } + DCHECK_EQ(slice_iter - slices->begin(), n_slices); + return n_slices; } void OutboundCall::SetRequestPayload(const Message& req, @@ -127,6 +133,7 @@ void OutboundCall::SetRequestPayload(const Message& req, DCHECK_EQ(-1, sidecar_byte_size_); sidecars_ = move(sidecars); + DCHECK_LE(sidecars_.size(), TransferLimits::kMaxSidecars); // Compute total size of sidecar payload so that extra space can be reserved as part of // the request body. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/701cd503/be/src/kudu/rpc/outbound_call.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/outbound_call.h b/be/src/kudu/rpc/outbound_call.h index ba2df27..221c368 100644 --- a/be/src/kudu/rpc/outbound_call.h +++ b/be/src/kudu/rpc/outbound_call.h @@ -156,7 +156,8 @@ class OutboundCall { // Serialize the call for the wire. Requires that SetRequestPayload() // is called first. This is called from the Reactor thread. - Status SerializeTo(std::vector<Slice>* slices); + // Returns the number of slices in the serialized call. + size_t SerializeTo(TransferPayload* slices); // Mark in the call that cancellation has been requested. If the call hasn't yet // started sending or has finished sending the RPC request but is waiting for a http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/701cd503/be/src/kudu/rpc/transfer.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/transfer.cc b/be/src/kudu/rpc/transfer.cc index d24e94d..d660869 100644 --- a/be/src/kudu/rpc/transfer.cc +++ b/be/src/kudu/rpc/transfer.cc @@ -135,32 +135,32 @@ string InboundTransfer::StatusAsString() const { return Substitute("$0/$1 bytes received", cur_offset_, total_length_); } -OutboundTransfer* OutboundTransfer::CreateForCallRequest( - int32_t call_id, - const std::vector<Slice> &payload, - TransferCallbacks *callbacks) { - return new OutboundTransfer(call_id, payload, callbacks); +OutboundTransfer* OutboundTransfer::CreateForCallRequest(int32_t call_id, + const TransferPayload &payload, + size_t n_payload_slices, + TransferCallbacks *callbacks) { + return new OutboundTransfer(call_id, payload, n_payload_slices, callbacks); } -OutboundTransfer* OutboundTransfer::CreateForCallResponse(const std::vector<Slice> &payload, +OutboundTransfer* OutboundTransfer::CreateForCallResponse(const TransferPayload &payload, + size_t n_payload_slices, TransferCallbacks *callbacks) { - return new OutboundTransfer(kInvalidCallId, payload, callbacks); + return new OutboundTransfer(kInvalidCallId, payload, n_payload_slices, callbacks); } - OutboundTransfer::OutboundTransfer(int32_t call_id, - const std::vector<Slice> &payload, + const TransferPayload &payload, + size_t n_payload_slices, TransferCallbacks *callbacks) : cur_slice_idx_(0), cur_offset_in_slice_(0), callbacks_(callbacks), call_id_(call_id), aborted_(false) { - CHECK(!payload.empty()); - n_payload_slices_ = payload.size(); - CHECK_LE(n_payload_slices_, arraysize(payload_slices_)); - for (int i = 0; i < payload.size(); i++) { + n_payload_slices_ = n_payload_slices; + CHECK_LE(n_payload_slices_, payload_slices_.size()); + for (int i = 0; i < n_payload_slices; i++) { payload_slices_[i] = payload[i]; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/701cd503/be/src/kudu/rpc/transfer.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/transfer.h b/be/src/kudu/rpc/transfer.h index 671347a..2a2b726 100644 --- a/be/src/kudu/rpc/transfer.h +++ b/be/src/kudu/rpc/transfer.h @@ -18,6 +18,7 @@ #ifndef KUDU_RPC_TRANSFER_H #define KUDU_RPC_TRANSFER_H +#include <array> #include <boost/intrusive/list.hpp> #include <gflags/gflags.h> #include <set> @@ -56,6 +57,8 @@ class TransferLimits { DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits); }; +typedef std::array<Slice, TransferLimits::kMaxPayloadSlices> TransferPayload; + // This class is used internally by the RPC layer to represent an inbound // transfer in progress. // @@ -119,12 +122,14 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> { // Create an outbound transfer for a call request. static OutboundTransfer* CreateForCallRequest(int32_t call_id, - const std::vector<Slice> &payload, + const TransferPayload &payload, + size_t n_payload_slices, TransferCallbacks *callbacks); // Create an outbound transfer for a call response. // See above for details. - static OutboundTransfer* CreateForCallResponse(const std::vector<Slice> &payload, + static OutboundTransfer* CreateForCallResponse(const TransferPayload &payload, + size_t n_payload_slices, TransferCallbacks *callbacks); // Destruct the transfer. A transfer object should never be deallocated @@ -162,12 +167,13 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> { private: OutboundTransfer(int32_t call_id, - const std::vector<Slice> &payload, + const TransferPayload& payload, + size_t n_payload_slices, TransferCallbacks *callbacks); // Slices to send. Uses an array here instead of a vector to avoid an expensive // vector construction (improved performance a couple percent). - Slice payload_slices_[TransferLimits::kMaxPayloadSlices]; + TransferPayload payload_slices_; size_t n_payload_slices_; // The current slice that is being sent.
