This is an automated email from the ASF dual-hosted git repository. todd pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 1a21e92057f61bd3b0581ac590a84bc7d7a21ac6 Author: Todd Lipcon <[email protected]> AuthorDate: Thu Mar 5 12:49:07 2020 -0800 rpc: reduce context switches and receive calls * When queueing a task to execute on the reactor, avoid writing to the eventfd to wake it up if such a write has already been done. This should reduce the number of read/write syscalls to the eventfd and avoid "spurious" wakeups of the reactor. * When reading inbound data, read an extra 4 bytes, and if it's available, loop around to read another call without putting the reactor back to sleep. The effect on context switches is clearly visible using rpc-bench --gtest_filter=\*Async Before: I0305 12:50:56.463312 7468 rpc-bench.cc:128] Ctx Sw. per req: 0.640409 I0305 12:50:58.015260 7542 rpc-bench.cc:128] Ctx Sw. per req: 0.613172 I0305 12:50:59.563201 7587 rpc-bench.cc:128] Ctx Sw. per req: 0.589479 I0305 12:51:01.014848 7662 rpc-bench.cc:128] Ctx Sw. per req: 0.562744 I0305 12:51:02.666339 7736 rpc-bench.cc:128] Ctx Sw. per req: 0.569126 After: I0305 12:52:03.567790 9005 rpc-bench.cc:128] Ctx Sw. per req: 0.383251 I0305 12:52:05.050909 9079 rpc-bench.cc:128] Ctx Sw. per req: 0.454404 I0305 12:52:06.626401 9138 rpc-bench.cc:128] Ctx Sw. per req: 0.3308 I0305 12:52:08.123154 9198 rpc-bench.cc:128] Ctx Sw. per req: 0.317752 I0305 12:52:09.666586 9272 rpc-bench.cc:128] Ctx Sw. per req: 0.391739 And on system CPU: Before: I0305 12:50:56.463310 7468 rpc-bench.cc:127] Sys CPU per req: 16.5524us I0305 12:50:58.015259 7542 rpc-bench.cc:127] Sys CPU per req: 16.1158us I0305 12:50:59.563199 7587 rpc-bench.cc:127] Sys CPU per req: 17.3184us I0305 12:51:01.014847 7662 rpc-bench.cc:127] Sys CPU per req: 16.7911us I0305 12:51:02.666337 7736 rpc-bench.cc:127] Sys CPU per req: 15.7659us After: I0305 12:52:03.567787 9005 rpc-bench.cc:127] Sys CPU per req: 13.0533us I0305 12:52:05.050906 9079 rpc-bench.cc:127] Sys CPU per req: 13.7925us I0305 12:52:06.626399 9138 rpc-bench.cc:127] Sys CPU per req: 11.6987us I0305 12:52:08.123152 9198 rpc-bench.cc:127] Sys CPU per req: 11.9214us I0305 12:52:09.666584 9272 rpc-bench.cc:127] Sys CPU per req: 13.4031us And on syscalls: todd@turbo:~/kudu$ grep recvfr /tmp/before /tmp/after /tmp/before: 1458969 syscalls:sys_enter_recvfrom ( +- 1.99% ) /tmp/before: 1458969 syscalls:sys_exit_recvfrom ( +- 1.99% ) /tmp/after: 1252328 syscalls:sys_enter_recvfrom ( +- 1.82% ) /tmp/after: 1252328 syscalls:sys_exit_recvfrom ( +- 1.82% ) todd@turbo:~/kudu$ grep epoll_ctl /tmp/before /tmp/after /tmp/before: 915862 syscalls:sys_enter_epoll_ctl ( +- 1.47% ) /tmp/before: 915862 syscalls:sys_exit_epoll_ctl ( +- 1.47% ) /tmp/after: 475978 syscalls:sys_enter_epoll_ctl ( +- 3.61% ) /tmp/after: 475978 syscalls:sys_exit_epoll_ctl ( +- 3.61% ) On a more macro-benchmark (TSBS single-groupby-1-1-1 16 workers on an 8-core machine) this also reduces syscalls a bit, though the end-to-end improvement is minimal. Before: Performance counter stats for 'system wide' (10 runs): 340,444 cs ( +- 0.30% ) 144,024 syscalls:sys_enter_recvfrom ( +- 0.00% ) 94,379 syscalls:sys_enter_epoll_ctl ( +- 0.06% ) 129,376 syscalls:sys_enter_epoll_wait ( +- 0.10% ) 2.025755946 seconds time elapsed ( +- 0.43% ) After: Performance counter stats for 'system wide' (10 runs): 333,865 cs ( +- 0.27% ) 119,216 syscalls:sys_enter_recvfrom ( +- 0.04% ) 88,731 syscalls:sys_enter_epoll_ctl ( +- 0.08% ) 104,149 syscalls:sys_enter_epoll_wait ( +- 0.08% ) 2.005614271 seconds time elapsed ( +- 0.19% ) Change-Id: I32c5e4d146c25be8e90665a0cb8385fcd017b15c Reviewed-on: http://gerrit.cloudera.org:8080/15440 Reviewed-by: Andrew Wong <[email protected]> Tested-by: Andrew Wong <[email protected]> Reviewed-by: Bankim Bhavsar <[email protected]> --- src/kudu/rpc/connection.cc | 17 ++++++----- src/kudu/rpc/reactor.cc | 6 +++- src/kudu/rpc/transfer.cc | 70 +++++++++++++++++++++++++++++++--------------- src/kudu/rpc/transfer.h | 11 ++++++-- 4 files changed, 69 insertions(+), 35 deletions(-) diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc index 7dc0c03..a49dbe2 100644 --- a/src/kudu/rpc/connection.cc +++ b/src/kudu/rpc/connection.cc @@ -43,6 +43,7 @@ #include "kudu/rpc/rpc_header.pb.h" #include "kudu/rpc/rpc_introspection.pb.h" #include "kudu/rpc/transfer.h" +#include "kudu/util/faststring.h" #include "kudu/util/net/sockaddr.h" #include "kudu/util/net/socket.h" #include "kudu/util/slice.h" @@ -645,11 +646,12 @@ void Connection::ReadHandler(ev::io &watcher, int revents) { } last_activity_time_ = reactor_thread_->cur_time(); + faststring extra_buf; while (true) { if (!inbound_) { inbound_.reset(new InboundTransfer()); } - Status status = inbound_->ReceiveBuffer(*socket_); + Status status = inbound_->ReceiveBuffer(socket_.get(), &extra_buf); if (PREDICT_FALSE(!status.ok())) { if (status.posix_code() == ESHUTDOWN) { VLOG(1) << ToString() << " shut down by remote end."; @@ -673,14 +675,11 @@ void Connection::ReadHandler(ev::io &watcher, int revents) { LOG(FATAL) << "Invalid direction: " << direction_; } - // TODO: it would seem that it would be good to loop around and see if - // there is more data on the socket by trying another recv(), but it turns - // out that it really hurts throughput to do so. A better approach - // might be for each InboundTransfer to actually try to read an extra byte, - // and if it succeeds, then we'd copy that byte into a new InboundTransfer - // and loop around, since it's likely the next call also arrived at the - // same time. - break; + if (extra_buf.size() > 0) { + inbound_.reset(new InboundTransfer(std::move(extra_buf))); + } else { + break; + } } } diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc index 231783b..e77488c 100644 --- a/src/kudu/rpc/reactor.cc +++ b/src/kudu/rpc/reactor.cc @@ -928,6 +928,7 @@ void Reactor::QueueCancellation(const shared_ptr<OutboundCall>& call) { } void Reactor::ScheduleReactorTask(ReactorTask *task) { + bool was_empty; { std::unique_lock<LockType> l(lock_); if (closing_) { @@ -936,9 +937,12 @@ void Reactor::ScheduleReactorTask(ReactorTask *task) { task->Abort(ShutdownError(false)); return; } + was_empty = pending_tasks_.empty(); pending_tasks_.push_back(*task); } - thread_.WakeThread(); + if (was_empty) { + thread_.WakeThread(); + } } bool Reactor::DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks) { // NOLINT(*) diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc index fb6d6a2..7693a91 100644 --- a/src/kudu/rpc/transfer.cc +++ b/src/kudu/rpc/transfer.cc @@ -20,6 +20,7 @@ #include <sys/uio.h> #include <algorithm> +#include <cstddef> #include <cstdint> #include <iostream> #include <limits> @@ -88,32 +89,43 @@ TransferCallbacks::~TransferCallbacks() {} InboundTransfer::InboundTransfer() - : total_length_(kMsgLengthPrefixLength), + : total_length_(0), cur_offset_(0) { buf_.resize(kMsgLengthPrefixLength); } -Status InboundTransfer::ReceiveBuffer(Socket &socket) { - if (cur_offset_ < kMsgLengthPrefixLength) { - // receive uint32 length prefix - int32_t rem = kMsgLengthPrefixLength - cur_offset_; - int32_t nread; - Status status = socket.Recv(&buf_[cur_offset_], rem, &nread); - RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status); - if (nread == 0) { - return Status::OK(); - } - DCHECK_GE(nread, 0); - cur_offset_ += nread; +InboundTransfer::InboundTransfer(faststring initial_buf) + : buf_(std::move(initial_buf)), + total_length_(0), + cur_offset_(buf_.size()) { + buf_.resize(std::max<size_t>(kMsgLengthPrefixLength, buf_.size())); +} + +Status InboundTransfer::ReceiveBuffer(Socket* socket, faststring* extra_4) { + static constexpr int kExtraReadLength = kMsgLengthPrefixLength; + if (total_length_ == 0) { + // We haven't yet parsed the message length. It's possible that the + // length is already available in the buffer passed in the constructor. if (cur_offset_ < kMsgLengthPrefixLength) { - // If we still don't have the full length prefix, we can't continue - // reading yet. - return Status::OK(); + // receive uint32 length prefix + int32_t rem = kMsgLengthPrefixLength - cur_offset_; + int32_t nread; + Status status = socket->Recv(&buf_[cur_offset_], rem, &nread); + RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status); + if (nread == 0) { + return Status::OK(); + } + DCHECK_GE(nread, 0); + cur_offset_ += nread; + if (cur_offset_ < kMsgLengthPrefixLength) { + // If we still don't have the full length prefix, we can't continue + // reading yet. + return Status::OK(); + } } - // Since we only read 'rem' bytes above, we should now have exactly - // the length prefix in our buffer and no more. - DCHECK_EQ(cur_offset_, kMsgLengthPrefixLength); + // Parse the message length out of the prefix. + DCHECK_GE(cur_offset_, kMsgLengthPrefixLength); // The length prefix doesn't include its own 4 bytes, so we have to // add that back in. total_length_ = NetworkByteOrder::Load32(&buf_[0]) + kMsgLengthPrefixLength; @@ -126,7 +138,7 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) { return Status::NetworkError(Substitute("RPC frame had invalid length of $0", total_length_)); } - buf_.resize(total_length_); + buf_.resize(total_length_ + kExtraReadLength); // Fall through to receive the message body, which is likely to be already // available on the socket. @@ -139,12 +151,24 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) { // INT_MAX. The message will be split across multiple Recv() calls. // Note that this is only needed when rpc_max_message_size > INT_MAX, which is // currently only used for unit tests. - int32_t rem = std::min(total_length_ - cur_offset_, + int32_t rem = std::min(total_length_ - cur_offset_ + kExtraReadLength, static_cast<uint32_t>(std::numeric_limits<int32_t>::max())); - Status status = socket.Recv(&buf_[cur_offset_], rem, &nread); + Status status = socket->Recv(&buf_[cur_offset_], rem, &nread); RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status); cur_offset_ += nread; + // We may have read some extra bytes, in which case we need to trim them off + // and write them into the provided buffer. + if (cur_offset_ >= total_length_) { + int64_t extra_read = cur_offset_ - total_length_; + DCHECK_LE(extra_read, kExtraReadLength); + DCHECK_GE(extra_read, 0); + extra_4->clear(); + extra_4->append(&buf_[total_length_], extra_read); + cur_offset_ = total_length_; + buf_.resize(total_length_); + } + return Status::OK(); } @@ -153,7 +177,7 @@ bool InboundTransfer::TransferStarted() const { } bool InboundTransfer::TransferFinished() const { - return cur_offset_ == total_length_; + return total_length_ > 0 && cur_offset_ == total_length_; } string InboundTransfer::StatusAsString() const { diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h index 0628f2c..3cab6b1 100644 --- a/src/kudu/rpc/transfer.h +++ b/src/kudu/rpc/transfer.h @@ -67,9 +67,15 @@ class InboundTransfer { public: InboundTransfer(); + explicit InboundTransfer(faststring initial_buf); - // read from the socket into our buffer - Status ReceiveBuffer(Socket &socket); + // Read from the socket into our buffer. + // + // If this is the last read of the transfer (i.e. if TransferFinished() is true + // after this call returns OK), up to 4 extra bytes may have been read + // from the socket and stored in 'extra_4'. In that case, any previous content of + // 'extra_4' is replaced by this extra bytes. + Status ReceiveBuffer(Socket *socket, faststring* extra_4); // Return true if any bytes have yet been sent. bool TransferStarted() const; @@ -91,6 +97,7 @@ class InboundTransfer { faststring buf_; + // 0 indicates not yet set uint32_t total_length_; uint32_t cur_offset_;
