This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 498c3e17 Support IOBuf Profiler (#2497)
498c3e17 is described below
commit 498c3e17954ea36a88e5561ffdffa298a7b48dcf
Author: Bright Chen <[email protected]>
AuthorDate: Mon Apr 8 11:07:45 2024 +0800
Support IOBuf Profiler (#2497)
* Support IOBuf Profiler
* Do not cache blocks in TLS
* Use MPSC Queue
* Add comments
---
BUILD.bazel | 1 +
CMakeLists.txt | 1 +
Makefile | 1 +
src/brpc/builtin/common.cpp | 1 +
src/brpc/builtin/common.h | 1 +
src/brpc/builtin/hotspots_service.cpp | 80 +++++++--
src/brpc/builtin/hotspots_service.h | 10 ++
src/brpc/builtin/pprof_perl.cpp | 3 +-
src/brpc/builtin_service.proto | 2 +
src/bthread/mutex.cpp | 2 +-
src/butil/iobuf.cpp | 46 ++++-
src/butil/iobuf_profiler.cpp | 316 ++++++++++++++++++++++++++++++++++
src/butil/iobuf_profiler.h | 169 ++++++++++++++++++
13 files changed, 607 insertions(+), 26 deletions(-)
diff --git a/BUILD.bazel b/BUILD.bazel
index 0bdaa4d3..b2a453e0 100644
--- a/BUILD.bazel
+++ b/BUILD.bazel
@@ -213,6 +213,7 @@ BUTIL_SRCS = [
"src/butil/crc32c.cc",
"src/butil/containers/case_ignored_flat_map.cpp",
"src/butil/iobuf.cpp",
+ "src/butil/iobuf_profiler.cpp",
"src/butil/binary_printer.cpp",
"src/butil/recordio.cc",
"src/butil/popen.cpp",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 40eff79d..eaae961a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -444,6 +444,7 @@ set(BUTIL_SOURCES
${PROJECT_SOURCE_DIR}/src/butil/crc32c.cc
${PROJECT_SOURCE_DIR}/src/butil/containers/case_ignored_flat_map.cpp
${PROJECT_SOURCE_DIR}/src/butil/iobuf.cpp
+ ${PROJECT_SOURCE_DIR}/src/butil/iobuf_profiler.cpp
${PROJECT_SOURCE_DIR}/src/butil/binary_printer.cpp
${PROJECT_SOURCE_DIR}/src/butil/recordio.cc
${PROJECT_SOURCE_DIR}/src/butil/popen.cpp
diff --git a/Makefile b/Makefile
index 04d0a243..641a8a78 100644
--- a/Makefile
+++ b/Makefile
@@ -163,6 +163,7 @@ BUTIL_SOURCES = \
src/butil/crc32c.cc \
src/butil/containers/case_ignored_flat_map.cpp \
src/butil/iobuf.cpp \
+ src/butil/iobuf_profiler.cpp \
src/butil/binary_printer.cpp \
src/butil/recordio.cc \
src/butil/popen.cpp
diff --git a/src/brpc/builtin/common.cpp b/src/brpc/builtin/common.cpp
index 9cdc72fe..7b5d33a3 100644
--- a/src/brpc/builtin/common.cpp
+++ b/src/brpc/builtin/common.cpp
@@ -308,6 +308,7 @@ const char* ProfilingType2String(ProfilingType t) {
case PROFILING_HEAP: return "heap";
case PROFILING_GROWTH: return "growth";
case PROFILING_CONTENTION: return "contention";
+ case PROFILING_IOBUF: return "iobuf";
}
return "unknown";
}
diff --git a/src/brpc/builtin/common.h b/src/brpc/builtin/common.h
index f4d69627..5fd8a9c9 100644
--- a/src/brpc/builtin/common.h
+++ b/src/brpc/builtin/common.h
@@ -52,6 +52,7 @@ enum ProfilingType {
PROFILING_HEAP = 1,
PROFILING_GROWTH = 2,
PROFILING_CONTENTION = 3,
+ PROFILING_IOBUF = 4,
};
DECLARE_string(rpc_profiling_dir);
diff --git a/src/brpc/builtin/hotspots_service.cpp
b/src/brpc/builtin/hotspots_service.cpp
index b70266f4..c01ce536 100644
--- a/src/brpc/builtin/hotspots_service.cpp
+++ b/src/brpc/builtin/hotspots_service.cpp
@@ -23,6 +23,7 @@
#include "butil/file_util.h" // butil::FilePath
#include "butil/popen.h" // butil::read_command_output
#include "butil/fd_guard.h" // butil::fd_guard
+#include "butil/iobuf_profiler.h"
#include "brpc/log.h"
#include "brpc/controller.h"
#include "brpc/server.h"
@@ -154,7 +155,8 @@ struct ProfilingEnvironment {
};
// Different ProfilingType have different env.
-static ProfilingEnvironment g_env[4] = {
+static ProfilingEnvironment g_env[5] = {
+ { PTHREAD_MUTEX_INITIALIZER, 0, NULL, NULL, NULL },
{ PTHREAD_MUTEX_INITIALIZER, 0, NULL, NULL, NULL },
{ PTHREAD_MUTEX_INITIALIZER, 0, NULL, NULL, NULL },
{ PTHREAD_MUTEX_INITIALIZER, 0, NULL, NULL, NULL },
@@ -399,7 +401,8 @@ static bool has_GOOGLE_PPROF_BINARY_PATH() {
static void DisplayResult(Controller* cntl,
google::protobuf::Closure* done,
const char* prof_name,
- const butil::IOBuf& result_prefix) {
+ const butil::IOBuf& result_prefix,
+ ProfilingType type) {
ClosureGuard done_guard(done);
butil::IOBuf prof_result;
if (cntl->IsCanceled()) {
@@ -488,7 +491,7 @@ static void DisplayResult(Controller* cntl,
#if defined(OS_LINUX)
cmd_builder << "perl " << pprof_tool
<< DisplayTypeToPProfArgument(display_type)
- << (show_ccount ? " --contention " : "");
+ << ((show_ccount || type == PROFILING_IOBUF) ? " --contention
" : "");
if (base_name) {
cmd_builder << "--base " << *base_name << ' ';
}
@@ -505,7 +508,7 @@ static void DisplayResult(Controller* cntl,
#elif defined(OS_MACOSX)
cmd_builder << s_pprof_binary_path << " "
<< DisplayTypeToPProfArgument(display_type)
- << (show_ccount ? " -contentions " : "");
+ << ((show_ccount || type == PROFILING_IOBUF) ? " --contention
" : "");
if (base_name) {
cmd_builder << "-base " << *base_name << ' ';
}
@@ -637,7 +640,7 @@ static void DoProfiling(ProfilingType type,
return cntl->SetFailed(
EINVAL, "The profile denoted by `view' does not exist");
}
- DisplayResult(cntl, done_guard.release(), view->c_str(), os.buf());
+ DisplayResult(cntl, done_guard.release(), view->c_str(), os.buf(),
type);
return;
}
@@ -774,6 +777,15 @@ static void DoProfiling(ProfilingType type,
PLOG(WARNING) << "Profiling has been interrupted";
}
bthread::ContentionProfilerStop();
+ } else if (type == PROFILING_IOBUF) {
+ if (!butil::IsIOBufProfilerEnabled()) {
+ os << "IOBuf profiler is not enabled"
+ << (use_html ? "</body></html>" : "\n");
+ os.move_to(resp);
+ cntl->http_response().set_status_code(HTTP_STATUS_FORBIDDEN);
+ return NotifyWaiters(type, cntl, view);
+ }
+ butil::IOBufProfilerFlush(prof_name);
} else if (type == PROFILING_HEAP) {
MallocExtension* malloc_ext = MallocExtension::instance();
if (malloc_ext == NULL || !has_TCMALLOC_SAMPLE_PARAMETER()) {
@@ -827,11 +839,11 @@ static void DoProfiling(ProfilingType type,
std::vector<ProfilingWaiter> waiters;
// NOTE: Must be called before DisplayResult which calls done->Run() and
// deletes cntl.
- ConsumeWaiters(type, cntl, &waiters);
- DisplayResult(cntl, done_guard.release(), prof_name, os.buf());
+ ConsumeWaiters(type, cntl, &waiters);
+ DisplayResult(cntl, done_guard.release(), prof_name, os.buf(), type);
for (size_t i = 0; i < waiters.size(); ++i) {
- DisplayResult(waiters[i].cntl, waiters[i].done, prof_name, os.buf());
+ DisplayResult(waiters[i].cntl, waiters[i].done, prof_name, os.buf(),
type);
}
}
@@ -849,7 +861,12 @@ static void StartProfiling(ProfilingType type,
enabled = cpu_profiler_enabled;
} else if (type == PROFILING_CONTENTION) {
enabled = true;
- } else if (type == PROFILING_HEAP) {
+ } else if (type == PROFILING_IOBUF) {
+ enabled = butil::IsIOBufProfilerEnabled();
+ if (!enabled) {
+ extra_desc = " (no ENABLE_IOBUF_PROFILER=1 in env or no link
tcmalloc )";
+ }
+ } else if (type == PROFILING_HEAP) {
enabled = IsHeapProfilerEnabled();
if (enabled && !has_TCMALLOC_SAMPLE_PARAMETER()) {
enabled = false;
@@ -925,7 +942,8 @@ static void StartProfiling(ProfilingType type,
"<script type=\"text/javascript\">\n"
"function generateURL() {\n"
" var past_prof = document.getElementById('view_prof').value;\n"
- " var base_prof = document.getElementById('base_prof').value;\n"
+ " var base_prof_el = document.getElementById('base_prof');\n"
+ " var base_prof = base_prof_el != null ? base_prof_el.value : '';\n"
" var display_type =
document.getElementById('display_type').value;\n";
if (type == PROFILING_CONTENTION) {
os << " var show_ccount =
document.getElementById('ccount_cb').checked;\n";
@@ -1092,17 +1110,19 @@ static void StartProfiling(ProfilingType type,
<< (show_ccount ? " checked=''" : "") <<
" onclick='onChangedCB(this);'>count</label>";
}
- os << "</div><div><pre style='display:inline'>Diff: </pre>"
- "<select id='base_prof' onchange='onSelectProf()'>"
- "<option value=''><none></option>";
- for (size_t i = 0; i < past_profs.size(); ++i) {
- os << "<option value='" << past_profs[i] << "' ";
- if (base_name != NULL && past_profs[i] == *base_name) {
- os << "selected";
+ if (type != PROFILING_IOBUF) {
+ os << "</div><div><pre style='display:inline'>Diff: </pre>"
+ "<select id='base_prof' onchange='onSelectProf()'>"
+ "<option value=''><none></option>";
+ for (size_t i = 0; i<past_profs.size(); ++i) {
+ os << "<option value='" << past_profs[i] << "' ";
+ if (base_name!=NULL && past_profs[i]==*base_name) {
+ os << "selected";
+ }
+ os << '>' << GetBaseName(&past_profs[i]);
}
- os << '>' << GetBaseName(&past_profs[i]);
+ os << "</select></div>";
}
- os << "</select></div>";
if (!enabled && view == NULL) {
os << "<p><span style='color:red'>Error:</span> "
@@ -1194,6 +1214,14 @@ void HotspotsService::contention(
return StartProfiling(PROFILING_CONTENTION, cntl_base, done);
}
+void HotspotsService::iobuf(::google::protobuf::RpcController* cntl_base,
+ const ::brpc::HotspotsRequest* request,
+ ::brpc::HotspotsResponse* response,
+ ::google::protobuf::Closure* done) {
+ return StartProfiling(PROFILING_IOBUF, cntl_base, done);
+}
+
+
void HotspotsService::cpu_non_responsive(
::google::protobuf::RpcController* cntl_base,
const ::brpc::HotspotsRequest*,
@@ -1226,19 +1254,33 @@ void HotspotsService::contention_non_responsive(
return DoProfiling(PROFILING_CONTENTION, cntl_base, done);
}
+void HotspotsService::iobuf_non_responsive(::google::protobuf::RpcController*
cntl_base,
+ const ::brpc::HotspotsRequest* request,
+ ::brpc::HotspotsResponse* response,
+ ::google::protobuf::Closure* done) {
+ return DoProfiling(PROFILING_IOBUF, cntl_base, done);
+}
+
void HotspotsService::GetTabInfo(TabInfoList* info_list) const {
TabInfo* info = info_list->add();
info->path = "/hotspots/cpu";
info->tab_name = "cpu";
+
info = info_list->add();
info->path = "/hotspots/heap";
info->tab_name = "heap";
+
info = info_list->add();
info->path = "/hotspots/growth";
info->tab_name = "growth";
+
info = info_list->add();
info->path = "/hotspots/contention";
info->tab_name = "contention";
+
+ info = info_list->add();
+ info->path = "/hotspots/iobuf";
+ info->tab_name = "iobuf";
}
} // namespace brpc
diff --git a/src/brpc/builtin/hotspots_service.h
b/src/brpc/builtin/hotspots_service.h
index 23696eec..cdd90b6b 100644
--- a/src/brpc/builtin/hotspots_service.h
+++ b/src/brpc/builtin/hotspots_service.h
@@ -50,6 +50,11 @@ public:
::brpc::HotspotsResponse* response,
::google::protobuf::Closure* done);
+ void iobuf(::google::protobuf::RpcController* cntl_base,
+ const ::brpc::HotspotsRequest* request,
+ ::brpc::HotspotsResponse* response,
+ ::google::protobuf::Closure* done);
+
void cpu_non_responsive(::google::protobuf::RpcController* cntl_base,
const ::brpc::HotspotsRequest* request,
::brpc::HotspotsResponse* response,
@@ -70,6 +75,11 @@ public:
::brpc::HotspotsResponse* response,
::google::protobuf::Closure* done);
+ void iobuf_non_responsive(::google::protobuf::RpcController* cntl_base,
+ const ::brpc::HotspotsRequest* request,
+ ::brpc::HotspotsResponse* response,
+ ::google::protobuf::Closure* done);
+
void GetTabInfo(brpc::TabInfoList*) const;
};
diff --git a/src/brpc/builtin/pprof_perl.cpp b/src/brpc/builtin/pprof_perl.cpp
index 798df7bd..0d859165 100644
--- a/src/brpc/builtin/pprof_perl.cpp
+++ b/src/brpc/builtin/pprof_perl.cpp
@@ -4172,7 +4172,8 @@ const char* pprof_perl() {
"\n"
" while ( $line = <PROFILE> ) {\n"
" $line =~ s/\\r//g; # turn windows-looking lines into
unix-looking lines\n"
- " if ( $line =~ /^\\s*(\\d+)\\s+(\\d+) \\@\\s*(.*?)\\s*$/ ) {\n"
+ " # Support negative count for IOBuf Profiler\n"
+ " if ( $line =~ /^\\s*(\\d+)\\s+(-?\\d+) \\@\\s*(.*?)\\s*$/ ) {\n"
" my ($cycles, $count, $stack) = ($1, $2, $3);\n"
"\n"
" # Convert cycles to nanoseconds\n"
diff --git a/src/brpc/builtin_service.proto b/src/brpc/builtin_service.proto
index 95d26bfa..bb28f44e 100644
--- a/src/brpc/builtin_service.proto
+++ b/src/brpc/builtin_service.proto
@@ -149,6 +149,8 @@ service hotspots {
rpc growth_non_responsive(HotspotsRequest) returns (HotspotsResponse);
rpc contention(HotspotsRequest) returns (HotspotsResponse);
rpc contention_non_responsive(HotspotsRequest) returns (HotspotsResponse);
+ rpc iobuf(HotspotsRequest) returns (HotspotsResponse);
+ rpc iobuf_non_responsive(HotspotsRequest) returns (HotspotsResponse);
}
service flags {
diff --git a/src/bthread/mutex.cpp b/src/bthread/mutex.cpp
index b4008fd7..d22c8753 100644
--- a/src/bthread/mutex.cpp
+++ b/src/bthread/mutex.cpp
@@ -268,7 +268,7 @@ BAIDU_CACHELINE_ALIGNMENT static ContentionProfiler* g_cp =
NULL;
// Need this version to solve an issue that non-empty entries left by
// previous contention profilers should be detected and overwritten.
static uint64_t g_cp_version = 0;
-// Protecting accesss to g_cp.
+// Protecting accesses to g_cp.
static pthread_mutex_t g_cp_mutex = PTHREAD_MUTEX_INITIALIZER;
// The map storing information for profiling pthread_mutex. Different from
diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp
index 3418f671..8895fb16 100644
--- a/src/butil/iobuf.cpp
+++ b/src/butil/iobuf.cpp
@@ -38,6 +38,7 @@
#include "butil/logging.h" // CHECK, LOG
#include "butil/fd_guard.h" // butil::fd_guard
#include "butil/iobuf.h"
+#include "butil/iobuf_profiler.h"
namespace butil {
namespace iobuf {
@@ -155,7 +156,7 @@ inline iov_function get_pwritev_func() {
#endif // ARCH_CPU_X86_64
-inline void* cp(void *__restrict dest, const void *__restrict src, size_t n) {
+void* cp(void *__restrict dest, const void *__restrict src, size_t n) {
// memcpy in gcc 4.8 seems to be faster enough.
return memcpy(dest, src, n);
}
@@ -193,7 +194,8 @@ size_t IOBuf::new_bigview_count() {
return iobuf::g_newbigview.load(butil::memory_order_relaxed);
}
-const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 0x1;
+const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 1 << 0;
+const uint16_t IOBUF_BLOCK_FLAGS_SAMPLED = 1 << 1;
using UserDataDeleter = std::function<void(void*)>;
struct UserDataExtension {
@@ -228,6 +230,9 @@ struct IOBuf::Block {
iobuf::g_nblock.fetch_add(1, butil::memory_order_relaxed);
iobuf::g_blockmem.fetch_add(data_size + sizeof(Block),
butil::memory_order_relaxed);
+ if (is_samplable()) {
+ SubmitIOBufSample(this, 1);
+ }
}
Block(char* data_in, uint32_t data_size, UserDataDeleter deleter)
@@ -240,6 +245,9 @@ struct IOBuf::Block {
, data(data_in) {
auto ext = new (get_user_data_extension()) UserDataExtension();
ext->deleter = std::move(deleter);
+ if (is_samplable()) {
+ SubmitIOBufSample(this, 1);
+ }
}
// Undefined behavior when (flags & IOBUF_BLOCK_FLAGS_USER_DATA) is 0.
@@ -260,13 +268,19 @@ struct IOBuf::Block {
void inc_ref() {
check_abi();
nshared.fetch_add(1, butil::memory_order_relaxed);
+ if (sampled()) {
+ SubmitIOBufSample(this, 1);
+ }
}
void dec_ref() {
check_abi();
+ if (sampled()) {
+ SubmitIOBufSample(this, -1);
+ }
if (nshared.fetch_sub(1, butil::memory_order_release) == 1) {
butil::atomic_thread_fence(butil::memory_order_acquire);
- if (!flags) {
+ if (!is_user_data()) {
iobuf::g_nblock.fetch_sub(1, butil::memory_order_relaxed);
iobuf::g_blockmem.fetch_sub(cap + sizeof(Block),
butil::memory_order_relaxed);
@@ -288,6 +302,23 @@ struct IOBuf::Block {
bool full() const { return size >= cap; }
size_t left_space() const { return cap - size; }
+
+private:
+ bool is_samplable() {
+ if (IsIOBufProfilerSamplable()) {
+ flags |= IOBUF_BLOCK_FLAGS_SAMPLED;
+ return true;
+ }
+ return false;
+ }
+
+ bool sampled() const {
+ return flags & IOBUF_BLOCK_FLAGS_SAMPLED;
+ }
+
+ bool is_user_data() const {
+ return flags & IOBUF_BLOCK_FLAGS_USER_DATA;
+ }
};
namespace iobuf {
@@ -329,6 +360,11 @@ inline IOBuf::Block* create_block() {
// release_tls_block_chain() may exceed this limit sometimes.
const int MAX_BLOCKS_PER_THREAD = 8;
+inline int max_blocks_per_thread() {
+ // If IOBufProfiler is enabled, do not cache blocks in TLS.
+ return IsIOBufProfilerEnabled() ? 0 : MAX_BLOCKS_PER_THREAD;
+}
+
struct TLSData {
// Head of the TLS block chain.
IOBuf::Block* block_head;
@@ -410,7 +446,7 @@ inline void release_tls_block(IOBuf::Block* b) {
TLSData& tls_data = g_tls_data;
if (b->full()) {
b->dec_ref();
- } else if (tls_data.num_blocks >= MAX_BLOCKS_PER_THREAD) {
+ } else if (tls_data.num_blocks >= max_blocks_per_thread()) {
b->dec_ref();
g_num_hit_tls_threshold.fetch_add(1, butil::memory_order_relaxed);
} else {
@@ -429,7 +465,7 @@ inline void release_tls_block(IOBuf::Block* b) {
void release_tls_block_chain(IOBuf::Block* b) {
TLSData& tls_data = g_tls_data;
size_t n = 0;
- if (tls_data.num_blocks >= MAX_BLOCKS_PER_THREAD) {
+ if (tls_data.num_blocks >= max_blocks_per_thread()) {
do {
++n;
IOBuf::Block* const saved_next = b->u.portal_next;
diff --git a/src/butil/iobuf_profiler.cpp b/src/butil/iobuf_profiler.cpp
new file mode 100644
index 00000000..bcb53391
--- /dev/null
+++ b/src/butil/iobuf_profiler.cpp
@@ -0,0 +1,316 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <fcntl.h>
+#include "butil/iobuf_profiler.h"
+#include "butil/strings/string_number_conversions.h"
+#include "butil/file_util.h"
+#include "butil/fd_guard.h"
+#include "butil/fast_rand.h"
+#include "butil/hash.h"
+#include <execinfo.h>
+
+extern int __attribute__((weak)) GetStackTrace(void** result, int max_depth,
int skip_count);
+
+namespace butil {
+
+namespace iobuf {
+extern void* cp(void *__restrict dest, const void *__restrict src, size_t n);
+}
+
+// Max and min sleep time for IOBuf profiler consuming thread
+// when `_sample_queue' is empty.
+const uint32_t IOBufProfiler::MIN_SLEEP_MS = 10;
+const uint32_t IOBufProfiler::MAX_SLEEP_MS = 1000;
+
+static pthread_once_t g_iobuf_profiler_info_once = PTHREAD_ONCE_INIT;
+static bool g_iobuf_profiler_enabled = false;
+static uint g_iobuf_profiler_sample_rate = 100;
+
+// Environment variables:
+// 1. ENABLE_IOBUF_PROFILER: set value to 1 to enable IOBuf profiler.
+// 2. IOBUF_PROFILER_SAMPLE_RATE: set value between (0, 100] to control sample
rate.
+static void InitGlobalIOBufProfilerInfo() {
+ const char* enabled = getenv("ENABLE_IOBUF_PROFILER");
+ g_iobuf_profiler_enabled = enabled && strcmp("1", enabled) == 0 &&
::GetStackTrace != NULL;
+ if (g_iobuf_profiler_enabled) {
+ return;
+ }
+
+ char* rate = getenv("IOBUF_PROFILER_SAMPLE_RATE");
+ if (rate) {
+ int tmp = 0;
+ if (butil::StringToInt(rate, &tmp)) {
+ if (tmp > 0 && tmp <= 100) {
+ g_iobuf_profiler_sample_rate = tmp;
+ } else {
+ LOG(ERROR) << "IOBUF_PROFILER_SAMPLE_RATE should be in (0,
100], but get " << rate;
+ }
+ } else {
+ LOG(ERROR) << "IOBUF_PROFILER_SAMPLE_RATE should be a number, but
get " << rate;
+ }
+ }
+ LOG(INFO) << "g_iobuf_profiler_sample_rate=" <<
g_iobuf_profiler_sample_rate;
+}
+
+bool IsIOBufProfilerEnabled() {
+ pthread_once(&g_iobuf_profiler_info_once, InitGlobalIOBufProfilerInfo);
+ return g_iobuf_profiler_enabled;
+}
+
+bool IsIOBufProfilerSamplable() {
+ pthread_once(&g_iobuf_profiler_info_once, InitGlobalIOBufProfilerInfo);
+ if (g_iobuf_profiler_sample_rate == 100) {
+ return true;
+ }
+ return fast_rand_less_than(100) + 1 <= g_iobuf_profiler_sample_rate;
+}
+
+size_t IOBufSample::stack_hash_code() const {
+ if (nframes == 0) {
+ return 0;
+ }
+ if (_hash_code == 0) {
+ _hash_code = SuperFastHash(reinterpret_cast<const char*>(stack),
+ sizeof(void*) * nframes);
+ }
+ return _hash_code;
+}
+
+IOBufProfiler* IOBufProfiler::GetInstance() {
+ return ::Singleton<IOBufProfiler,
LeakySingletonTraits<IOBufProfiler>>::get();
+}
+
+IOBufProfiler::IOBufProfiler()
+ : butil::SimpleThread("IOBufProfiler")
+ , _stop(false)
+ , _sleep_ms(MIN_SLEEP_MS) {
+ CHECK_EQ(0, _stack_map.init(1024));
+ CHECK_EQ(0, _block_info_map.init(1024));
+ Start();
+}
+
+IOBufProfiler::~IOBufProfiler() {
+ StopAndJoin();
+ _block_info_map.clear();
+ _stack_map.clear();
+
+ // Clear `_sample_queue'.
+ IOBufSample* sample = NULL;
+ while (_sample_queue.Dequeue(sample)) {
+ IOBufSample::Destroy(sample);
+ }
+
+}
+
+void IOBufProfiler::Submit(IOBufSample* s) {
+ if (_stop.load(butil::memory_order_relaxed)) {
+ return;
+ }
+
+ _sample_queue.Enqueue(s);
+}
+
+void IOBufProfiler::Dump(IOBufSample* s) {
+ do {
+ BAIDU_SCOPED_LOCK(_mutex);
+ // Categorize the stack.
+ IOBufRefSampleSharedPtr stack_sample;
+ IOBufRefSampleSharedPtr* stack_ptr = _stack_map.seek(s);
+ if (!stack_ptr) {
+ stack_sample = IOBufSample::CopyAndSharedWithDestroyer(s);
+ stack_sample->block = NULL;
+ stack_ptr = &_stack_map[stack_sample.get()];
+ *stack_ptr = stack_sample;
+ } else {
+ (*stack_ptr)->count += s->count;
+ }
+
+ BlockInfo* info = _block_info_map.seek(s->block);
+ if (info) {
+ // Categorize the IOBufSample.
+ info->stack_count_map[*stack_ptr] += s->count;
+ info->ref += s->count;
+ if (info->ref == 0) {
+ for (const auto& iter : info->stack_count_map) {
+ IOBufRefSampleSharedPtr* stack_ptr2 =
_stack_map.seek(iter.first.get());
+ if (stack_ptr2 && *stack_ptr2) {
+ (*stack_ptr2)->count -= iter.second;
+ if ((*stack_ptr2)->count == 0) {
+ _stack_map.erase(iter.first.get());
+ }
+ }
+ }
+ _block_info_map.erase(s->block);
+ break;
+ }
+ } else {
+ BlockInfo& new_info = _block_info_map[s->block];
+ new_info.ref += s->count;
+ CHECK_EQ(0, new_info.stack_count_map.init(64));
+ new_info.stack_count_map[*stack_ptr] = s->count;
+ }
+ } while (false);
+ s->block = NULL;
+}
+
+IOBufSample* IOBufSample::Copy(IOBufSample* ref) {
+ auto copied = IOBufSample::New();
+ copied->block = ref->block;
+ copied->count = ref->count;
+ copied->_hash_code = ref->_hash_code;
+ copied->nframes = ref->nframes;
+ iobuf::cp(copied->stack, ref->stack, sizeof(void*) * ref->nframes);
+ return copied;
+}
+
+IOBufRefSampleSharedPtr IOBufSample::CopyAndSharedWithDestroyer(IOBufSample*
ref) {
+ return { Copy(ref), detail::Destroyer() };
+}
+
+void IOBufProfiler::Flush2Disk(const char* filename) {
+ if (!filename) {
+ LOG(ERROR) << "Parameter [filename] is NULL";
+ return;
+ }
+ // Serialize contentions in _stack_map into _disk_buf.
+ _disk_buf.append("--- contention\ncycles/second=1000000000\n");
+ butil::IOBufBuilder os;
+ {
+ BAIDU_SCOPED_LOCK(_mutex);
+ for (auto it = _stack_map.begin(); it != _stack_map.end(); ++it) {
+ const IOBufRefSampleSharedPtr& c = it->second;
+ if (c->nframes == 0) {
+ LOG_EVERY_SECOND(WARNING) << "Invalid stack with nframes=0,
count=" << c->count;
+ continue;
+ }
+ os << "0 " << c->count << " @";
+ for (int i = 0; i < c->nframes; ++i) {
+ os << " " << c->stack[i];
+ }
+ os << "\n";
+ }
+ }
+ _disk_buf.append(os.buf().movable());
+
+ // Append /proc/self/maps to the end of the contention file, required by
+ // pprof.pl, otherwise the functions in sys libs are not interpreted.
+ butil::IOPortal mem_maps;
+ const butil::fd_guard maps_fd(open("/proc/self/maps", O_RDONLY));
+ if (maps_fd >= 0) {
+ while (true) {
+ ssize_t nr = mem_maps.append_from_file_descriptor(maps_fd, 8192);
+ if (nr < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ PLOG(ERROR) << "Fail to read /proc/self/maps";
+ break;
+ }
+ if (nr == 0) {
+ _disk_buf.append(mem_maps);
+ break;
+ }
+ }
+ } else {
+ PLOG(ERROR) << "Fail to open /proc/self/maps";
+ }
+ // Write _disk_buf into _filename
+ butil::File::Error error;
+ butil::FilePath file_path(filename);
+ butil::FilePath dir = file_path.DirName();
+ if (!butil::CreateDirectoryAndGetError(dir, &error)) {
+ LOG(ERROR) << "Fail to create directory=`" << dir.value()
+ << ": " << error;
+ return;
+ }
+
+ butil::fd_guard fd(open(filename, O_WRONLY | O_CREAT | O_APPEND | O_TRUNC,
0666));
+ if (fd < 0) {
+ PLOG(ERROR) << "Fail to open " << filename;
+ return;
+ }
+ // Write once normally, write until empty in the end.
+ do {
+ ssize_t nw = _disk_buf.cut_into_file_descriptor(fd);
+ if (nw < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ PLOG(ERROR) << "Fail to write into " << filename;
+ return;
+ }
+ LOG(ERROR) << "Write " << nw << " bytes into " << filename;
+ } while (!_disk_buf.empty());
+}
+
+void IOBufProfiler::StopAndJoin() {
+ if (_stop.exchange(true, butil::memory_order_relaxed)) {
+ return;
+ }
+ if (!HasBeenJoined()) {
+ Join();
+ }
+}
+
+void IOBufProfiler::Run() {
+ while (!_stop.load(butil::memory_order_relaxed)) {
+ Consume();
+ ::usleep(_sleep_ms * 1000);
+ }
+}
+
+void IOBufProfiler::Consume() {
+ IOBufSample* sample = NULL;
+ bool is_empty = true;
+ while (_sample_queue.Dequeue(sample)) {
+ Dump(sample);
+ IOBufSample::Destroy(sample);
+ is_empty = false;
+ }
+
+ // If `_sample_queue' is empty, exponential increase in sleep time.
+ _sleep_ms = !is_empty ? MIN_SLEEP_MS :
+ std::min(_sleep_ms * 2, MAX_SLEEP_MS);
+}
+
+void SubmitIOBufSample(IOBuf::Block* block, int64_t ref) {
+ if (!IsIOBufProfilerEnabled()) {
+ return;
+ }
+
+ auto sample = IOBufSample::New();
+ sample->block = block;
+ sample->count = ref;
+ sample->nframes = GetStackTrace(sample->stack,
+ arraysize(sample->stack),
+ 0); // may lock
+ IOBufProfiler::GetInstance()->Submit(sample);
+}
+
+bool IOBufProfilerFlush(const char* filename) {
+ if (!filename) {
+ LOG(ERROR) << "Parameter [filename] is NULL";
+ return false;
+ }
+
+ auto profiler = IOBufProfiler::GetInstance();
+ profiler->Flush2Disk(filename);
+ return true;
+}
+
+}
\ No newline at end of file
diff --git a/src/butil/iobuf_profiler.h b/src/butil/iobuf_profiler.h
new file mode 100644
index 00000000..7178f20a
--- /dev/null
+++ b/src/butil/iobuf_profiler.h
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BIGO_BRPC_IOBUF_PROFILER_H
+#define BIGO_BRPC_IOBUF_PROFILER_H
+
+#include <unordered_map>
+#include <unordered_set>
+#include "butil/iobuf.h"
+#include "butil/object_pool.h"
+#include "butil/threading/simple_thread.h"
+#include "butil/threading/simple_thread.h"
+#include "butil/memory/singleton.h"
+#include "butil/containers/flat_map.h"
+#include "butil/containers/mpsc_queue.h"
+
+namespace butil {
+
+struct IOBufSample;
+typedef std::shared_ptr<IOBufSample> IOBufRefSampleSharedPtr;
+
+struct IOBufSample {
+ IOBufSample* next;
+ IOBuf::Block* block;
+ int64_t count; // // reference count of the block.
+ void* stack[28]; // backtrace.
+ int nframes; // num of frames in stack.
+
+ static IOBufSample* New() {
+ return get_object<IOBufSample>();
+ }
+
+ static IOBufSample* Copy(IOBufSample* ref);
+ static IOBufRefSampleSharedPtr CopyAndSharedWithDestroyer(IOBufSample*
ref);
+ static void Destroy(IOBufSample* ref) {
+ ref->_hash_code = 0;
+ return_object(ref);
+ }
+
+ size_t stack_hash_code() const;
+
+private:
+ friend ObjectPool<IOBufSample>;
+
+ IOBufSample()
+ : next(NULL)
+ , block(NULL)
+ , count(0)
+ , stack{}
+ , nframes(0)
+ , _hash_code(0) {}
+
+ ~IOBufSample() = default;
+
+ mutable uint32_t _hash_code; // For combining samples with hashmap.
+};
+
+BAIDU_CASSERT(sizeof(IOBufSample) == 256, be_friendly_to_allocator);
+
+namespace detail {
+// Functor to compare IOBufRefSample.
+template <typename T>
+struct IOBufSampleEqual {
+ bool operator()(const T& c1, const T& c2) const {
+ return c1->stack_hash_code() ==c2->stack_hash_code() &&
+ c1->nframes == c2->nframes &&
+ memcmp(c1->stack, c2->stack, sizeof(void*) * c1->nframes) == 0;
+ }
+};
+
+// Functor to hash IOBufRefSample.
+template <typename T>
+struct IOBufSampleHash {
+ size_t operator()(const T& c) const {
+ return c->stack_hash_code();
+ }
+};
+
+struct Destroyer {
+ void operator()(IOBufSample* ref) const {
+ if (ref) {
+ IOBufSample::Destroy(ref);
+ }
+ }
+};
+}
+
+class IOBufProfiler : public butil::SimpleThread {
+public:
+ static IOBufProfiler* GetInstance();
+
+ // Submit the IOBuf sample along with stacktrace.
+ void Submit(IOBufSample* s);
+ // Dump IOBuf sample to map.
+ void Dump(IOBufSample* s);
+ // Write buffered data into resulting file.
+ void Flush2Disk(const char* filename);
+
+ void StopAndJoin();
+
+private:
+ friend struct DefaultSingletonTraits<IOBufProfiler>;
+
+ typedef butil::FlatMap<IOBufSample*,
+ IOBufRefSampleSharedPtr,
+ detail::IOBufSampleHash<IOBufSample*>,
+ detail::IOBufSampleEqual<IOBufSample*>> IOBufRefMap;
+
+ // <iobuf stack, ref count>
+ typedef butil::FlatMap<IOBufRefSampleSharedPtr,
+ int64_t,
+ detail::IOBufSampleHash<IOBufRefSampleSharedPtr>,
+ detail::IOBufSampleEqual<IOBufRefSampleSharedPtr>>
StackCountMap;
+ struct BlockInfo {
+ int64_t ref{0};
+ StackCountMap stack_count_map;
+ };
+ typedef butil::FlatMap<IOBuf::Block*, BlockInfo> BlockInfoMap;
+
+ IOBufProfiler();
+ ~IOBufProfiler() override;
+ DISALLOW_COPY_AND_ASSIGN(IOBufProfiler);
+
+ void Run() override;
+ // Consume the IOBuf sample in _sample_queue.
+ void Consume();
+
+ // Stop flag of IOBufProfiler.
+ butil::atomic<bool> _stop;
+ // IOBuf sample queue.
+ MPSCQueue<IOBufSample*> _sample_queue;
+
+ // Temp buf before saving the file.
+ butil::IOBuf _disk_buf;
+ // Combining same samples to make result smaller.
+ IOBufRefMap _stack_map;
+ // Record block info.
+ BlockInfoMap _block_info_map;
+ Mutex _mutex;
+
+ // Sleep when `_sample_queue' is empty.
+ uint32_t _sleep_ms;
+ static const uint32_t MIN_SLEEP_MS;
+ static const uint32_t MAX_SLEEP_MS;
+};
+
+bool IsIOBufProfilerEnabled();
+bool IsIOBufProfilerSamplable();
+
+void SubmitIOBufSample(IOBuf::Block* block, int64_t ref);
+
+bool IOBufProfilerFlush(const char* filename);
+
+}
+#endif //BIGO_BRPC_IOBUF_PROFILER_H
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]