This is an automated email from the ASF dual-hosted git repository.
achennaka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 66815148a [rpc] micro-optimizations on RpczStore::LogTrace
66815148a is described below
commit 66815148adb79cdd25cfc0399dc1e1fa9622bff5
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon Dec 4 15:41:00 2023 -0800
[rpc] micro-optimizations on RpczStore::LogTrace
This patch micro-optimizes the code in RpczStore::LogTrace():
* faster computation of the logging threshold
* std::string instance is no longer allocated when outputting
the trace of an RPC call
* the LogTrace() method is now static
As an extra style-related change, 'final' specifier has been added to
the RpczStore class' declaration since it was not meant for adding
derived classes on top (its destructor wasn't virtual). Also, switched
samples' timestamp from Atomics to std::atomics and did other unsorted
changes to avoid memory allocation while holding a spinlock and shorten
duration of critical sections overall.
This patch also contains a few nano-optimizations for AsyncLogger class:
* 'final' specifier was added to allow for more run-time optimizations
* unsorted minor changes
Change-Id: I4b9cd537ad9797773f6934ae72d0e865db9770a1
Reviewed-on: http://gerrit.cloudera.org:8080/20748
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
---
src/kudu/rpc/rpcz_store.cc | 109 ++++++++++++++++++++++--------------------
src/kudu/rpc/rpcz_store.h | 12 ++---
src/kudu/util/async_logger.cc | 11 ++---
src/kudu/util/async_logger.h | 10 ++--
src/kudu/util/logging.cc | 9 ++--
5 files changed, 79 insertions(+), 72 deletions(-)
diff --git a/src/kudu/rpc/rpcz_store.cc b/src/kudu/rpc/rpcz_store.cc
index 83c62c976..70b38d72c 100644
--- a/src/kudu/rpc/rpcz_store.cc
+++ b/src/kudu/rpc/rpcz_store.cc
@@ -19,11 +19,15 @@
#include <algorithm> // IWYU pragma: keep
#include <array>
+#include <atomic>
+#include <cstddef>
#include <cstdint>
#include <map>
#include <mutex> // for unique_lock
#include <ostream>
#include <string>
+#include <tuple>
+#include <type_traits>
#include <utility>
#include <vector>
@@ -33,14 +37,13 @@
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/rpc/inbound_call.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/rpc_introspection.pb.h"
#include "kudu/rpc/service_if.h"
-#include "kudu/util/atomic.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/monotime.h"
#include "kudu/util/trace.h"
@@ -62,6 +65,7 @@ using std::pair;
using std::string;
using std::vector;
using std::unique_ptr;
+using strings::Substitute;
namespace kudu {
namespace rpc {
@@ -69,10 +73,13 @@ namespace rpc {
// Sample an RPC call once every N milliseconds within each
// bucket. If the current sample in a latency bucket is older
// than this threshold, a new sample will be taken.
-static const int kSampleIntervalMs = 1000;
+//
+// NOTE: this constant is in microsecond units since corresponding timestamps
+// are captured using the GetMonoTimeMicros() timer.
+static constexpr int64_t kSampleIntervalUs = 1000 * 1000;
-static const int kBucketThresholdsMs[] = {10, 100, 1000};
-static constexpr int kNumBuckets = arraysize(kBucketThresholdsMs) + 1;
+static constexpr size_t kBucketThresholdsMs[] = {10, 100, 1000};
+static constexpr size_t kNumBuckets = arraysize(kBucketThresholdsMs) + 1;
// An instance of this class is created For each RPC method implemented
// on the server. It keeps several recent samples for each RPC, currently
@@ -108,7 +115,7 @@ class MethodSampler {
struct SampleBucket {
SampleBucket() : last_sample_time(0) {}
- AtomicInt<int64_t> last_sample_time;
+ std::atomic<int64_t> last_sample_time;
simple_spinlock sample_lock;
Sample sample;
};
@@ -118,29 +125,26 @@ class MethodSampler {
};
MethodSampler* RpczStore::SamplerForCall(InboundCall* call) {
- if (PREDICT_FALSE(!call->method_info())) {
+ auto* method_info = call->method_info();
+ if (PREDICT_FALSE(!method_info)) {
return nullptr;
}
- // Most likely, we already have a sampler created for the call.
+ // Most likely, we already have a sampler created for the call once received
+ // the very first call for a particular method of an RPC interface.
{
shared_lock<rw_spinlock> l(samplers_lock_.get_lock());
- auto it = method_samplers_.find(call->method_info());
+ auto it = method_samplers_.find(method_info);
if (PREDICT_TRUE(it != method_samplers_.end())) {
return it->second.get();
}
}
// If missing, create a new sampler for this method and try to insert it.
- unique_ptr<MethodSampler> ms(new MethodSampler());
+ unique_ptr<MethodSampler> ms(new MethodSampler);
std::lock_guard<percpu_rwlock> lock(samplers_lock_);
- auto it = method_samplers_.find(call->method_info());
- if (it != method_samplers_.end()) {
- return it->second.get();
- }
- auto* ret = ms.get();
- method_samplers_[call->method_info()] = std::move(ms);
- return ret;
+ const auto& [it, _] = method_samplers_.try_emplace(method_info,
std::move(ms));
+ return it->second.get();
}
void MethodSampler::SampleCall(InboundCall* call) {
@@ -156,18 +160,17 @@ void MethodSampler::SampleCall(InboundCall* call) {
}
MicrosecondsInt64 now = GetMonoTimeMicros();
- int64_t us_since_trace = now - bucket->last_sample_time.Load();
- if (us_since_trace > kSampleIntervalMs * 1000) {
- Sample new_sample = {call->header(), call->trace(), duration_ms};
+ int64_t us_since_trace = now - bucket->last_sample_time;
+ if (us_since_trace > kSampleIntervalUs) {
{
std::unique_lock<simple_spinlock> lock(bucket->sample_lock,
std::try_to_lock);
// If another thread is already taking a sample, it's not worth waiting.
if (!lock.owns_lock()) {
return;
}
- std::swap(bucket->sample, new_sample);
- bucket->last_sample_time.Store(now);
+ bucket->sample = {call->header(), call->trace(), duration_ms};
}
+ bucket->last_sample_time = now;
VLOG(2) << "Sampled call " << call->ToString();
}
}
@@ -197,10 +200,12 @@ void MethodSampler::GetTraceMetrics(const Trace& t,
void MethodSampler::GetSamplePBs(RpczMethodPB* method_pb) {
for (auto& bucket : buckets_) {
- if (bucket.last_sample_time.Load() == 0) continue;
+ if (bucket.last_sample_time == 0) {
+ continue;
+ }
- std::unique_lock<simple_spinlock> lock(bucket.sample_lock);
auto* sample_pb = method_pb->add_samples();
+ std::unique_lock<simple_spinlock> lock(bucket.sample_lock);
sample_pb->mutable_header()->CopyFrom(bucket.sample.header);
sample_pb->set_trace(bucket.sample.trace->DumpToString(Trace::INCLUDE_TIME_DELTAS));
@@ -215,7 +220,9 @@ RpczStore::~RpczStore() {}
void RpczStore::AddCall(InboundCall* call) {
LogTrace(call);
auto* sampler = SamplerForCall(call);
- if (PREDICT_FALSE(!sampler)) return;
+ if (PREDICT_FALSE(!sampler)) {
+ return;
+ }
sampler->SampleCall(call);
}
@@ -225,49 +232,47 @@ void RpczStore::DumpPB(const DumpRpczStoreRequestPB& req,
vector<pair<RpcMethodInfo*, MethodSampler*>> samplers;
{
shared_lock<rw_spinlock> l(samplers_lock_.get_lock());
- for (const auto& p : method_samplers_) {
- samplers.emplace_back(p.first, p.second.get());
+ for (const auto& [mi, ms] : method_samplers_) {
+ samplers.emplace_back(mi, ms.get());
}
}
- for (const auto& p : samplers) {
- auto* sampler = p.second;
-
+ for (const auto& [mi, ms] : samplers) {
RpczMethodPB* method_pb = resp->add_methods();
- // TODO: use the actual RPC name instead of the request type name.
+ // TODO(todd): use the actual RPC name instead of the request type name.
// Currently this isn't conveniently plumbed here, but the type name
// is close enough.
- method_pb->set_method_name(p.first->req_prototype->GetTypeName());
- sampler->GetSamplePBs(method_pb);
+ method_pb->set_method_name(mi->req_prototype->GetTypeName());
+ ms->GetSamplePBs(method_pb);
}
}
void RpczStore::LogTrace(InboundCall* call) {
- int duration_ms = call->timing().TotalDuration().ToMilliseconds();
+ const auto duration_ms = call->timing().TotalDuration().ToMilliseconds();
if (call->header_.has_timeout_millis() && call->header_.timeout_millis() >
0) {
- double log_threshold = call->header_.timeout_millis() * 0.75f;
+ const int64_t timeout_ms = call->header_.timeout_millis();
+ const int64_t log_threshold = 3LL * timeout_ms / 4; // 75% of timeout
if (duration_ms > log_threshold) {
- // TODO: consider pushing this onto another thread since it may be slow.
- // The traces may also be too large to fit in a log message.
- int64_t timeout_ms = call->header_.timeout_millis();
- LOG(WARNING) << call->ToString() << " took " << duration_ms << " ms "
- << "(" <<
HumanReadableElapsedTime::ToShortString(duration_ms * .001) << "). "
- << "Client timeout " << timeout_ms << " ms "
- << "(" <<
HumanReadableElapsedTime::ToShortString(timeout_ms * .001) << ")";
- string s = call->trace()->DumpToString();
- if (!s.empty()) {
- LOG(WARNING) << "Trace:\n" << s;
- }
+ // This might be slow, but with AsyncLogger it's mostly OK. The only
issue
+ // is when the logger decides to flush exactly at the point of logging
+ // these messages.
+ //
+ // TODO(aserbin): introduce a hint to the logger to avoid flushing for
+ // a while when logging from a reactor thread or similar,
+ // even if going slightly over the allowed threshold
+ // for the amount of accumulated log messages
+ LOG(WARNING) << Substitute("$0 took $1 ms (client timeout $2 ms).
Trace:\n",
+ call->ToString(), duration_ms, timeout_ms);
+ call->trace()->Dump(&LOG(WARNING), Trace::INCLUDE_ALL);
return;
}
}
- if (duration_ms > FLAGS_rpc_duration_too_long_ms ||
- PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
- const auto flags = (duration_ms > FLAGS_rpc_duration_too_long_ms)
- ? Trace::INCLUDE_ALL : Trace::INCLUDE_TIME_DELTAS;
- LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. Trace:";
- call->trace()->Dump(&LOG(INFO), flags);
+ const bool too_long = duration_ms > FLAGS_rpc_duration_too_long_ms;
+ if (too_long || PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
+ LOG(INFO) << Substitute("$0 took $1 ms. Trace:\n", call->ToString(),
duration_ms);
+ call->trace()->Dump(&LOG(INFO), too_long ? Trace::INCLUDE_ALL
+ : Trace::INCLUDE_TIME_DELTAS);
}
}
diff --git a/src/kudu/rpc/rpcz_store.h b/src/kudu/rpc/rpcz_store.h
index 48e447442..563f51754 100644
--- a/src/kudu/rpc/rpcz_store.h
+++ b/src/kudu/rpc/rpcz_store.h
@@ -34,7 +34,7 @@ struct RpcMethodInfo;
// Responsible for storing sampled traces associated with completed calls.
// Before each call is responded to, it is added to this store.
-class RpczStore {
+class RpczStore final {
public:
RpczStore();
~RpczStore();
@@ -52,15 +52,15 @@ class RpczStore {
DumpRpczStoreResponsePB* resp);
private:
- // Look up or create the particular MethodSampler instance which should
- // store samples for this call.
- MethodSampler* SamplerForCall(InboundCall* call);
-
// Log a WARNING message if the RPC response was slow enough that the
// client likely timed out. This is based on the client-provided timeout
// value.
// Also can be configured to log _all_ RPC traces for help debugging.
- void LogTrace(InboundCall* call);
+ static void LogTrace(InboundCall* call);
+
+ // Look up or create the particular MethodSampler instance which should
+ // store samples for this call.
+ MethodSampler* SamplerForCall(InboundCall* call);
percpu_rwlock samplers_lock_;
diff --git a/src/kudu/util/async_logger.cc b/src/kudu/util/async_logger.cc
index be1678938..3a2617740 100644
--- a/src/kudu/util/async_logger.cc
+++ b/src/kudu/util/async_logger.cc
@@ -20,6 +20,7 @@
#include <string>
#include <thread>
+#include "kudu/gutil/port.h"
#include "kudu/util/monotime.h"
using std::string;
@@ -27,7 +28,7 @@ using std::string;
namespace kudu {
AsyncLogger::AsyncLogger(google::base::Logger* wrapped,
- int max_buffer_bytes) :
+ size_t max_buffer_bytes) :
max_buffer_bytes_(max_buffer_bytes),
wrapped_(DCHECK_NOTNULL(wrapped)),
wake_flusher_cond_(&lock_),
@@ -38,10 +39,8 @@ AsyncLogger::AsyncLogger(google::base::Logger* wrapped,
DCHECK_GT(max_buffer_bytes_, 0);
}
-AsyncLogger::~AsyncLogger() {}
-
void AsyncLogger::Start() {
- CHECK_EQ(state_, INITTED);
+ DCHECK_EQ(state_, INITTED);
state_ = RUNNING;
thread_ = std::thread(&AsyncLogger::RunThread, this);
}
@@ -49,7 +48,7 @@ void AsyncLogger::Start() {
void AsyncLogger::Stop() {
{
MutexLock l(lock_);
- CHECK_EQ(state_, RUNNING);
+ DCHECK_EQ(state_, RUNNING);
state_ = STOPPED;
wake_flusher_cond_.Signal();
}
@@ -86,7 +85,7 @@ void AsyncLogger::Write(bool force_flush,
//
// Unfortunately, the underlying log level isn't passed through to this
interface, so we
// have to use this hack: messages from FATAL errors start with the
character 'F'.
- if (message_len > 0 && message[0] == 'F') {
+ if (PREDICT_FALSE(message_len > 0 && message[0] == 'F')) {
Flush();
}
}
diff --git a/src/kudu/util/async_logger.h b/src/kudu/util/async_logger.h
index 48a304657..8c7c347e2 100644
--- a/src/kudu/util/async_logger.h
+++ b/src/kudu/util/async_logger.h
@@ -58,11 +58,11 @@ namespace kudu {
// NOTE: the logger limits the total amount of buffer space, so if the
underlying
// log blocks for too long, eventually the threads generating the log messages
// will block as well. This prevents runaway memory usage.
-class AsyncLogger : public google::base::Logger {
+class AsyncLogger final : public google::base::Logger {
public:
AsyncLogger(google::base::Logger* wrapped,
- int max_buffer_bytes);
- ~AsyncLogger();
+ size_t max_buffer_bytes);
+ ~AsyncLogger() override = default;
void Start();
@@ -126,7 +126,7 @@ class AsyncLogger : public google::base::Logger {
std::vector<Msg> messages;
// Estimate of the size of 'messages'.
- int size = 0;
+ size_t size = 0;
// Whether this buffer needs an explicit flush of the
// underlying logger.
@@ -158,7 +158,7 @@ class AsyncLogger : public google::base::Logger {
void RunThread();
// The maximum number of bytes used by the entire class.
- const int max_buffer_bytes_;
+ const size_t max_buffer_bytes_;
google::base::Logger* const wrapped_;
std::thread thread_;
diff --git a/src/kudu/util/logging.cc b/src/kudu/util/logging.cc
index 30c060260..5e21493f4 100644
--- a/src/kudu/util/logging.cc
+++ b/src/kudu/util/logging.cc
@@ -58,9 +58,9 @@ DEFINE_bool(log_async, true,
"latency and stability.");
TAG_FLAG(log_async, hidden);
-DEFINE_int32(log_async_buffer_bytes_per_level, 2 * 1024 * 1024,
- "The number of bytes of buffer space used by each log "
- "level. Only relevant when --log_async is enabled.");
+DEFINE_uint32(log_async_buffer_bytes_per_level, 2 * 1024 * 1024,
+ "The number of bytes of buffer space used by each log "
+ "level. Only relevant when --log_async is enabled.");
TAG_FLAG(log_async_buffer_bytes_per_level, hidden);
DEFINE_int32(max_log_files, 10,
@@ -282,6 +282,9 @@ void InitGoogleLoggingSafe(const char* arg) {
if (FLAGS_log_async) {
EnableAsyncLogging();
+ } else {
+ LOG(WARNING) <<
+ "disabling asynchronous logging adversely affects Kudu's performance";
}
logging_initialized = true;