This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new fe15af8 Enable arenas for RPC request and response
fe15af8 is described below
commit fe15af8496af524b6de3392772edeec88dc8a626
Author: Todd Lipcon <[email protected]>
AuthorDate: Wed Jul 1 16:57:17 2020 -0700
Enable arenas for RPC request and response
This changes the RPC server side to allocate a protobuf Arena for each
request. The request RPC and response are allocated from the Arena,
ensuring that any sub-messages, strings, repeated fields, etc, use that
Arena for allocation as well. Everything is deleted en-masse when the
InboundCall object (which owns the Arena) is destructed.
This is mostly a straight-forward change except for the change in
RaftConsensus. Specifically, we used to do a dirty const_cast to mutate
the inbound request and release the ReplicateMsgs, and move them into
the raft subsystem. When the request is allocated from an Arena, that
'release' is now actually making a copy, which broke the code path
there.
Given that there's now a copy happening nonetheless, I just made the
code more explicitly construct a new ReplicateMsg copying out of the
leader's request. There might be a slight performance degradation here
but seemed worth it for code clarity. My assumption here is that
anywhere that these copies are substantially expensive we'd probably be
disk-bound anyway.
Change-Id: I810931900fc2b5f1dec1265abadfb33fb41d29bf
Reviewed-on: http://gerrit.cloudera.org:8080/16136
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Todd Lipcon <[email protected]>
---
src/kudu/consensus/raft_consensus.cc | 28 +++++++---------------------
src/kudu/consensus/raft_consensus.h | 2 +-
src/kudu/rpc/inbound_call.h | 7 +++++++
src/kudu/rpc/rpc_context.cc | 4 ++--
src/kudu/rpc/rpc_context.h | 8 ++++----
src/kudu/rpc/service_if.cc | 8 ++++----
6 files changed, 25 insertions(+), 32 deletions(-)
diff --git a/src/kudu/consensus/raft_consensus.cc
b/src/kudu/consensus/raft_consensus.cc
index d37e6c9..2bf40e9 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -1139,7 +1139,7 @@ string RaftConsensus::LeaderRequest::OpsRangeString()
const {
return ret;
}
-void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB*
rpc_req,
+void RaftConsensus::DeduplicateLeaderRequestUnlocked(const ConsensusRequestPB*
rpc_req,
LeaderRequest*
deduplicated_req) {
DCHECK(lock_.is_locked());
@@ -1156,7 +1156,7 @@ void
RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
// In this loop we discard duplicates and advance the leader's preceding id
// accordingly.
for (int i = 0; i < rpc_req->ops_size(); i++) {
- ReplicateMsg* leader_msg = rpc_req->mutable_ops(i);
+ const ReplicateMsg* leader_msg = &rpc_req->ops(i);
if (leader_msg->id().index() <= last_committed_index) {
VLOG_WITH_PREFIX_UNLOCKED(2) << "Skipping op id " << leader_msg->id()
@@ -1191,7 +1191,8 @@ void
RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
if (deduplicated_req->first_message_idx == - 1) {
deduplicated_req->first_message_idx = i;
}
-
deduplicated_req->messages.push_back(make_scoped_refptr_replicate(leader_msg));
+ deduplicated_req->messages.emplace_back(
+ make_scoped_refptr_replicate(new ReplicateMsg(*leader_msg)));
}
if (deduplicated_req->messages.size() != rpc_req->ops_size()) {
@@ -1292,8 +1293,7 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const
ConsensusRequestPB* reque
"before restarting.");
}
- ConsensusRequestPB* mutable_req = const_cast<ConsensusRequestPB*>(request);
- DeduplicateLeaderRequestUnlocked(mutable_req, deduped_req);
+ DeduplicateLeaderRequestUnlocked(request, deduped_req);
// This is an additional check for KUDU-639 that makes sure the message's
index
// and term are in the right sequence in the request, after we've
deduplicated
@@ -1302,31 +1302,17 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const
ConsensusRequestPB* reque
// TODO move this to raft_consensus-state or whatever we transform that into.
// We should be able to do this check for each append, but right now the way
// we initialize raft_consensus-state is preventing us from doing so.
- Status s;
const OpId* prev = deduped_req->preceding_opid;
for (const ReplicateRefPtr& message : deduped_req->messages) {
- s = PendingRounds::CheckOpInSequence(*prev, message->get()->id());
+ Status s = PendingRounds::CheckOpInSequence(*prev, message->get()->id());
if (PREDICT_FALSE(!s.ok())) {
LOG(ERROR) << "Leader request contained out-of-sequence messages.
Status: "
<< s.ToString() << ". Leader Request: " <<
SecureShortDebugString(*request);
- break;
+ return s;
}
prev = &message->get()->id();
}
- // We only release the messages from the request after the above check so
that
- // that we can print the original request, if it fails.
- if (!deduped_req->messages.empty()) {
- // We take ownership of the deduped ops.
- DCHECK_GE(deduped_req->first_message_idx, 0);
- mutable_req->mutable_ops()->ExtractSubrange(
- deduped_req->first_message_idx,
- deduped_req->messages.size(),
- nullptr);
- }
-
- RETURN_NOT_OK(s);
-
RETURN_NOT_OK(HandleLeaderRequestTermUnlocked(request, response));
if (response->status().has_error()) {
diff --git a/src/kudu/consensus/raft_consensus.h
b/src/kudu/consensus/raft_consensus.h
index bc22c18..5ad4f93 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -522,7 +522,7 @@ class RaftConsensus : public
std::enable_shared_from_this<RaftConsensus>,
// haven't appended to our log yet.
// On return 'deduplicated_req' is instantiated with only the new messages
// and the correct preceding id.
- void DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req,
+ void DeduplicateLeaderRequestUnlocked(const ConsensusRequestPB* rpc_req,
LeaderRequest* deduplicated_req);
// Handles a request from a leader, refusing the request if the term is
lower than
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index c651aaa..1c83c7b 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -25,6 +25,7 @@
#include <vector>
#include <glog/logging.h>
+#include <google/protobuf/arena.h>
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
@@ -150,6 +151,10 @@ class InboundCall {
Trace* trace();
+ google::protobuf::Arena* pb_arena() {
+ return &arena_;
+ }
+
const InboundCallTiming& timing() const {
return timing_;
}
@@ -285,6 +290,8 @@ class InboundCall {
// client did not pass a timeout.
MonoTime deadline_;
+ google::protobuf::Arena arena_;
+
DISALLOW_COPY_AND_ASSIGN(InboundCall);
};
diff --git a/src/kudu/rpc/rpc_context.cc b/src/kudu/rpc/rpc_context.cc
index d39f36b..36c6ac3 100644
--- a/src/kudu/rpc/rpc_context.cc
+++ b/src/kudu/rpc/rpc_context.cc
@@ -70,7 +70,7 @@ void
RpcContext::SetResultTracker(scoped_refptr<ResultTracker> result_tracker) {
void RpcContext::RespondSuccess() {
if (AreResultsTracked()) {
result_tracker_->RecordCompletionAndRespond(call_->header().request_id(),
- response_pb_.get());
+ response_pb_);
} else {
VLOG(4) << call_->remote_method().service_name() << ": Sending RPC success
response for "
<< call_->ToString() << ":" << std::endl <<
SecureDebugString(*response_pb_);
@@ -85,7 +85,7 @@ void RpcContext::RespondSuccess() {
void RpcContext::RespondNoCache() {
if (AreResultsTracked()) {
result_tracker_->FailAndRespond(call_->header().request_id(),
- response_pb_.get());
+ response_pb_);
} else {
VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure
response for "
<< call_->ToString() << ": " << SecureDebugString(*response_pb_);
diff --git a/src/kudu/rpc/rpc_context.h b/src/kudu/rpc/rpc_context.h
index b507dcf..dfffdc0 100644
--- a/src/kudu/rpc/rpc_context.h
+++ b/src/kudu/rpc/rpc_context.h
@@ -200,8 +200,8 @@ class RpcContext {
// Return the name of the RPC service being called.
const std::string& service_name() const;
- const google::protobuf::Message *request_pb() const { return
request_pb_.get(); }
- google::protobuf::Message *response_pb() const { return response_pb_.get(); }
+ const google::protobuf::Message* request_pb() const { return request_pb_; }
+ google::protobuf::Message* response_pb() const { return response_pb_; }
// Return an upper bound on the client timeout deadline. This does not
// account for transmission delays between the client and the server.
@@ -242,8 +242,8 @@ class RpcContext {
private:
friend class ResultTracker;
InboundCall* const call_;
- const std::unique_ptr<const google::protobuf::Message> request_pb_;
- const std::unique_ptr<google::protobuf::Message> response_pb_;
+ const google::protobuf::Message* const request_pb_;
+ google::protobuf::Message* const response_pb_;
scoped_refptr<ResultTracker> result_tracker_;
};
diff --git a/src/kudu/rpc/service_if.cc b/src/kudu/rpc/service_if.cc
index af53452..792ac95 100644
--- a/src/kudu/rpc/service_if.cc
+++ b/src/kudu/rpc/service_if.cc
@@ -104,13 +104,13 @@ void GeneratedServiceIf::Handle(InboundCall *call) {
RespondBadMethod(call);
return;
}
- unique_ptr<Message> req(method_info->req_prototype->New());
- if (PREDICT_FALSE(!ParseParam(call, req.get()))) {
+ Message* req = method_info->req_prototype->New(call->pb_arena());
+ if (PREDICT_FALSE(!ParseParam(call, req))) {
return;
}
- Message* resp = method_info->resp_prototype->New();
+ Message* resp = method_info->resp_prototype->New(call->pb_arena());
- RpcContext* ctx = new RpcContext(call, req.release(), resp);
+ RpcContext* ctx = new RpcContext(call, req, resp);
if (!method_info->authz_method(ctx->request_pb(), resp, ctx)) {
// The authz_method itself should have responded to the RPC.
return;