This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new fe15af8  Enable arenas for RPC request and response
fe15af8 is described below

commit fe15af8496af524b6de3392772edeec88dc8a626
Author: Todd Lipcon <[email protected]>
AuthorDate: Wed Jul 1 16:57:17 2020 -0700

    Enable arenas for RPC request and response
    
    This changes the RPC server side to allocate a protobuf Arena for each
    request. The request RPC and response are allocated from the Arena,
    ensuring that any sub-messages, strings, repeated fields, etc, use that
    Arena for allocation as well. Everything is deleted en-masse when the
    InboundCall object (which owns the Arena) is destructed.
    
    This is mostly a straight-forward change except for the change in
    RaftConsensus. Specifically, we used to do a dirty const_cast to mutate
    the inbound request and release the ReplicateMsgs, and move them into
    the raft subsystem. When the request is allocated from an Arena, that
    'release' is now actually making a copy, which broke the code path
    there.
    
    Given that there's now a copy happening nonetheless, I just made the
    code more explicitly construct a new ReplicateMsg copying out of the
    leader's request. There might be a slight performance degradation here
    but seemed worth it for code clarity. My assumption here is that
    anywhere that these copies are substantially expensive we'd probably be
    disk-bound anyway.
    
    Change-Id: I810931900fc2b5f1dec1265abadfb33fb41d29bf
    Reviewed-on: http://gerrit.cloudera.org:8080/16136
    Reviewed-by: Alexey Serbin <[email protected]>
    Tested-by: Todd Lipcon <[email protected]>
---
 src/kudu/consensus/raft_consensus.cc | 28 +++++++---------------------
 src/kudu/consensus/raft_consensus.h  |  2 +-
 src/kudu/rpc/inbound_call.h          |  7 +++++++
 src/kudu/rpc/rpc_context.cc          |  4 ++--
 src/kudu/rpc/rpc_context.h           |  8 ++++----
 src/kudu/rpc/service_if.cc           |  8 ++++----
 6 files changed, 25 insertions(+), 32 deletions(-)

diff --git a/src/kudu/consensus/raft_consensus.cc 
b/src/kudu/consensus/raft_consensus.cc
index d37e6c9..2bf40e9 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -1139,7 +1139,7 @@ string RaftConsensus::LeaderRequest::OpsRangeString() 
const {
   return ret;
 }
 
-void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* 
rpc_req,
+void RaftConsensus::DeduplicateLeaderRequestUnlocked(const ConsensusRequestPB* 
rpc_req,
                                                      LeaderRequest* 
deduplicated_req) {
   DCHECK(lock_.is_locked());
 
@@ -1156,7 +1156,7 @@ void 
RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
   // In this loop we discard duplicates and advance the leader's preceding id
   // accordingly.
   for (int i = 0; i < rpc_req->ops_size(); i++) {
-    ReplicateMsg* leader_msg = rpc_req->mutable_ops(i);
+    const ReplicateMsg* leader_msg = &rpc_req->ops(i);
 
     if (leader_msg->id().index() <= last_committed_index) {
       VLOG_WITH_PREFIX_UNLOCKED(2) << "Skipping op id " << leader_msg->id()
@@ -1191,7 +1191,8 @@ void 
RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
     if (deduplicated_req->first_message_idx == - 1) {
       deduplicated_req->first_message_idx = i;
     }
-    
deduplicated_req->messages.push_back(make_scoped_refptr_replicate(leader_msg));
+    deduplicated_req->messages.emplace_back(
+        make_scoped_refptr_replicate(new ReplicateMsg(*leader_msg)));
   }
 
   if (deduplicated_req->messages.size() != rpc_req->ops_size()) {
@@ -1292,8 +1293,7 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const 
ConsensusRequestPB* reque
                                    "before restarting.");
   }
 
-  ConsensusRequestPB* mutable_req = const_cast<ConsensusRequestPB*>(request);
-  DeduplicateLeaderRequestUnlocked(mutable_req, deduped_req);
+  DeduplicateLeaderRequestUnlocked(request, deduped_req);
 
   // This is an additional check for KUDU-639 that makes sure the message's 
index
   // and term are in the right sequence in the request, after we've 
deduplicated
@@ -1302,31 +1302,17 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const 
ConsensusRequestPB* reque
   // TODO move this to raft_consensus-state or whatever we transform that into.
   // We should be able to do this check for each append, but right now the way
   // we initialize raft_consensus-state is preventing us from doing so.
-  Status s;
   const OpId* prev = deduped_req->preceding_opid;
   for (const ReplicateRefPtr& message : deduped_req->messages) {
-    s = PendingRounds::CheckOpInSequence(*prev, message->get()->id());
+    Status s = PendingRounds::CheckOpInSequence(*prev, message->get()->id());
     if (PREDICT_FALSE(!s.ok())) {
       LOG(ERROR) << "Leader request contained out-of-sequence messages. 
Status: "
           << s.ToString() << ". Leader Request: " << 
SecureShortDebugString(*request);
-      break;
+      return s;
     }
     prev = &message->get()->id();
   }
 
-  // We only release the messages from the request after the above check so 
that
-  // that we can print the original request, if it fails.
-  if (!deduped_req->messages.empty()) {
-    // We take ownership of the deduped ops.
-    DCHECK_GE(deduped_req->first_message_idx, 0);
-    mutable_req->mutable_ops()->ExtractSubrange(
-        deduped_req->first_message_idx,
-        deduped_req->messages.size(),
-        nullptr);
-  }
-
-  RETURN_NOT_OK(s);
-
   RETURN_NOT_OK(HandleLeaderRequestTermUnlocked(request, response));
 
   if (response->status().has_error()) {
diff --git a/src/kudu/consensus/raft_consensus.h 
b/src/kudu/consensus/raft_consensus.h
index bc22c18..5ad4f93 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -522,7 +522,7 @@ class RaftConsensus : public 
std::enable_shared_from_this<RaftConsensus>,
   // haven't appended to our log yet.
   // On return 'deduplicated_req' is instantiated with only the new messages
   // and the correct preceding id.
-  void DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req,
+  void DeduplicateLeaderRequestUnlocked(const ConsensusRequestPB* rpc_req,
                                         LeaderRequest* deduplicated_req);
 
   // Handles a request from a leader, refusing the request if the term is 
lower than
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index c651aaa..1c83c7b 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include <glog/logging.h>
+#include <google/protobuf/arena.h>
 
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
@@ -150,6 +151,10 @@ class InboundCall {
 
   Trace* trace();
 
+  google::protobuf::Arena* pb_arena() {
+    return &arena_;
+  }
+
   const InboundCallTiming& timing() const {
     return timing_;
   }
@@ -285,6 +290,8 @@ class InboundCall {
   // client did not pass a timeout.
   MonoTime deadline_;
 
+  google::protobuf::Arena arena_;
+
   DISALLOW_COPY_AND_ASSIGN(InboundCall);
 };
 
diff --git a/src/kudu/rpc/rpc_context.cc b/src/kudu/rpc/rpc_context.cc
index d39f36b..36c6ac3 100644
--- a/src/kudu/rpc/rpc_context.cc
+++ b/src/kudu/rpc/rpc_context.cc
@@ -70,7 +70,7 @@ void 
RpcContext::SetResultTracker(scoped_refptr<ResultTracker> result_tracker) {
 void RpcContext::RespondSuccess() {
   if (AreResultsTracked()) {
     result_tracker_->RecordCompletionAndRespond(call_->header().request_id(),
-                                                response_pb_.get());
+                                                response_pb_);
   } else {
     VLOG(4) << call_->remote_method().service_name() << ": Sending RPC success 
response for "
         << call_->ToString() << ":" << std::endl << 
SecureDebugString(*response_pb_);
@@ -85,7 +85,7 @@ void RpcContext::RespondSuccess() {
 void RpcContext::RespondNoCache() {
   if (AreResultsTracked()) {
     result_tracker_->FailAndRespond(call_->header().request_id(),
-                                    response_pb_.get());
+                                    response_pb_);
   } else {
     VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure 
response for "
         << call_->ToString() << ": " << SecureDebugString(*response_pb_);
diff --git a/src/kudu/rpc/rpc_context.h b/src/kudu/rpc/rpc_context.h
index b507dcf..dfffdc0 100644
--- a/src/kudu/rpc/rpc_context.h
+++ b/src/kudu/rpc/rpc_context.h
@@ -200,8 +200,8 @@ class RpcContext {
   // Return the name of the RPC service being called.
   const std::string& service_name() const;
 
-  const google::protobuf::Message *request_pb() const { return 
request_pb_.get(); }
-  google::protobuf::Message *response_pb() const { return response_pb_.get(); }
+  const google::protobuf::Message* request_pb() const { return request_pb_; }
+  google::protobuf::Message* response_pb() const { return response_pb_; }
 
   // Return an upper bound on the client timeout deadline. This does not
   // account for transmission delays between the client and the server.
@@ -242,8 +242,8 @@ class RpcContext {
  private:
   friend class ResultTracker;
   InboundCall* const call_;
-  const std::unique_ptr<const google::protobuf::Message> request_pb_;
-  const std::unique_ptr<google::protobuf::Message> response_pb_;
+  const google::protobuf::Message* const request_pb_;
+  google::protobuf::Message* const response_pb_;
   scoped_refptr<ResultTracker> result_tracker_;
 };
 
diff --git a/src/kudu/rpc/service_if.cc b/src/kudu/rpc/service_if.cc
index af53452..792ac95 100644
--- a/src/kudu/rpc/service_if.cc
+++ b/src/kudu/rpc/service_if.cc
@@ -104,13 +104,13 @@ void GeneratedServiceIf::Handle(InboundCall *call) {
     RespondBadMethod(call);
     return;
   }
-  unique_ptr<Message> req(method_info->req_prototype->New());
-  if (PREDICT_FALSE(!ParseParam(call, req.get()))) {
+  Message* req = method_info->req_prototype->New(call->pb_arena());
+  if (PREDICT_FALSE(!ParseParam(call, req))) {
     return;
   }
-  Message* resp = method_info->resp_prototype->New();
+  Message* resp = method_info->resp_prototype->New(call->pb_arena());
 
-  RpcContext* ctx = new RpcContext(call, req.release(), resp);
+  RpcContext* ctx = new RpcContext(call, req, resp);
   if (!method_info->authz_method(ctx->request_pb(), resp, ctx)) {
     // The authz_method itself should have responded to the RPC.
     return;

Reply via email to