This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 498c3e17 Support IOBuf Profiler (#2497) 498c3e17 is described below commit 498c3e17954ea36a88e5561ffdffa298a7b48dcf Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Mon Apr 8 11:07:45 2024 +0800 Support IOBuf Profiler (#2497) * Support IOBuf Profiler * Do not cache blocks in TLS * Use MPSC Queue * Add comments --- BUILD.bazel | 1 + CMakeLists.txt | 1 + Makefile | 1 + src/brpc/builtin/common.cpp | 1 + src/brpc/builtin/common.h | 1 + src/brpc/builtin/hotspots_service.cpp | 80 +++++++-- src/brpc/builtin/hotspots_service.h | 10 ++ src/brpc/builtin/pprof_perl.cpp | 3 +- src/brpc/builtin_service.proto | 2 + src/bthread/mutex.cpp | 2 +- src/butil/iobuf.cpp | 46 ++++- src/butil/iobuf_profiler.cpp | 316 ++++++++++++++++++++++++++++++++++ src/butil/iobuf_profiler.h | 169 ++++++++++++++++++ 13 files changed, 607 insertions(+), 26 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 0bdaa4d3..b2a453e0 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -213,6 +213,7 @@ BUTIL_SRCS = [ "src/butil/crc32c.cc", "src/butil/containers/case_ignored_flat_map.cpp", "src/butil/iobuf.cpp", + "src/butil/iobuf_profiler.cpp", "src/butil/binary_printer.cpp", "src/butil/recordio.cc", "src/butil/popen.cpp", diff --git a/CMakeLists.txt b/CMakeLists.txt index 40eff79d..eaae961a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -444,6 +444,7 @@ set(BUTIL_SOURCES ${PROJECT_SOURCE_DIR}/src/butil/crc32c.cc ${PROJECT_SOURCE_DIR}/src/butil/containers/case_ignored_flat_map.cpp ${PROJECT_SOURCE_DIR}/src/butil/iobuf.cpp + ${PROJECT_SOURCE_DIR}/src/butil/iobuf_profiler.cpp ${PROJECT_SOURCE_DIR}/src/butil/binary_printer.cpp ${PROJECT_SOURCE_DIR}/src/butil/recordio.cc ${PROJECT_SOURCE_DIR}/src/butil/popen.cpp diff --git a/Makefile b/Makefile index 04d0a243..641a8a78 100644 --- a/Makefile +++ b/Makefile @@ -163,6 +163,7 @@ BUTIL_SOURCES = \ src/butil/crc32c.cc \ src/butil/containers/case_ignored_flat_map.cpp \ src/butil/iobuf.cpp \ + src/butil/iobuf_profiler.cpp \ src/butil/binary_printer.cpp \ src/butil/recordio.cc \ src/butil/popen.cpp diff --git a/src/brpc/builtin/common.cpp b/src/brpc/builtin/common.cpp index 9cdc72fe..7b5d33a3 100644 --- a/src/brpc/builtin/common.cpp +++ b/src/brpc/builtin/common.cpp @@ -308,6 +308,7 @@ const char* ProfilingType2String(ProfilingType t) { case PROFILING_HEAP: return "heap"; case PROFILING_GROWTH: return "growth"; case PROFILING_CONTENTION: return "contention"; + case PROFILING_IOBUF: return "iobuf"; } return "unknown"; } diff --git a/src/brpc/builtin/common.h b/src/brpc/builtin/common.h index f4d69627..5fd8a9c9 100644 --- a/src/brpc/builtin/common.h +++ b/src/brpc/builtin/common.h @@ -52,6 +52,7 @@ enum ProfilingType { PROFILING_HEAP = 1, PROFILING_GROWTH = 2, PROFILING_CONTENTION = 3, + PROFILING_IOBUF = 4, }; DECLARE_string(rpc_profiling_dir); diff --git a/src/brpc/builtin/hotspots_service.cpp b/src/brpc/builtin/hotspots_service.cpp index b70266f4..c01ce536 100644 --- a/src/brpc/builtin/hotspots_service.cpp +++ b/src/brpc/builtin/hotspots_service.cpp @@ -23,6 +23,7 @@ #include "butil/file_util.h" // butil::FilePath #include "butil/popen.h" // butil::read_command_output #include "butil/fd_guard.h" // butil::fd_guard +#include "butil/iobuf_profiler.h" #include "brpc/log.h" #include "brpc/controller.h" #include "brpc/server.h" @@ -154,7 +155,8 @@ struct ProfilingEnvironment { }; // Different ProfilingType have different env. -static ProfilingEnvironment g_env[4] = { +static ProfilingEnvironment g_env[5] = { + { PTHREAD_MUTEX_INITIALIZER, 0, NULL, NULL, NULL }, { PTHREAD_MUTEX_INITIALIZER, 0, NULL, NULL, NULL }, { PTHREAD_MUTEX_INITIALIZER, 0, NULL, NULL, NULL }, { PTHREAD_MUTEX_INITIALIZER, 0, NULL, NULL, NULL }, @@ -399,7 +401,8 @@ static bool has_GOOGLE_PPROF_BINARY_PATH() { static void DisplayResult(Controller* cntl, google::protobuf::Closure* done, const char* prof_name, - const butil::IOBuf& result_prefix) { + const butil::IOBuf& result_prefix, + ProfilingType type) { ClosureGuard done_guard(done); butil::IOBuf prof_result; if (cntl->IsCanceled()) { @@ -488,7 +491,7 @@ static void DisplayResult(Controller* cntl, #if defined(OS_LINUX) cmd_builder << "perl " << pprof_tool << DisplayTypeToPProfArgument(display_type) - << (show_ccount ? " --contention " : ""); + << ((show_ccount || type == PROFILING_IOBUF) ? " --contention " : ""); if (base_name) { cmd_builder << "--base " << *base_name << ' '; } @@ -505,7 +508,7 @@ static void DisplayResult(Controller* cntl, #elif defined(OS_MACOSX) cmd_builder << s_pprof_binary_path << " " << DisplayTypeToPProfArgument(display_type) - << (show_ccount ? " -contentions " : ""); + << ((show_ccount || type == PROFILING_IOBUF) ? " --contention " : ""); if (base_name) { cmd_builder << "-base " << *base_name << ' '; } @@ -637,7 +640,7 @@ static void DoProfiling(ProfilingType type, return cntl->SetFailed( EINVAL, "The profile denoted by `view' does not exist"); } - DisplayResult(cntl, done_guard.release(), view->c_str(), os.buf()); + DisplayResult(cntl, done_guard.release(), view->c_str(), os.buf(), type); return; } @@ -774,6 +777,15 @@ static void DoProfiling(ProfilingType type, PLOG(WARNING) << "Profiling has been interrupted"; } bthread::ContentionProfilerStop(); + } else if (type == PROFILING_IOBUF) { + if (!butil::IsIOBufProfilerEnabled()) { + os << "IOBuf profiler is not enabled" + << (use_html ? "</body></html>" : "\n"); + os.move_to(resp); + cntl->http_response().set_status_code(HTTP_STATUS_FORBIDDEN); + return NotifyWaiters(type, cntl, view); + } + butil::IOBufProfilerFlush(prof_name); } else if (type == PROFILING_HEAP) { MallocExtension* malloc_ext = MallocExtension::instance(); if (malloc_ext == NULL || !has_TCMALLOC_SAMPLE_PARAMETER()) { @@ -827,11 +839,11 @@ static void DoProfiling(ProfilingType type, std::vector<ProfilingWaiter> waiters; // NOTE: Must be called before DisplayResult which calls done->Run() and // deletes cntl. - ConsumeWaiters(type, cntl, &waiters); - DisplayResult(cntl, done_guard.release(), prof_name, os.buf()); + ConsumeWaiters(type, cntl, &waiters); + DisplayResult(cntl, done_guard.release(), prof_name, os.buf(), type); for (size_t i = 0; i < waiters.size(); ++i) { - DisplayResult(waiters[i].cntl, waiters[i].done, prof_name, os.buf()); + DisplayResult(waiters[i].cntl, waiters[i].done, prof_name, os.buf(), type); } } @@ -849,7 +861,12 @@ static void StartProfiling(ProfilingType type, enabled = cpu_profiler_enabled; } else if (type == PROFILING_CONTENTION) { enabled = true; - } else if (type == PROFILING_HEAP) { + } else if (type == PROFILING_IOBUF) { + enabled = butil::IsIOBufProfilerEnabled(); + if (!enabled) { + extra_desc = " (no ENABLE_IOBUF_PROFILER=1 in env or no link tcmalloc )"; + } + } else if (type == PROFILING_HEAP) { enabled = IsHeapProfilerEnabled(); if (enabled && !has_TCMALLOC_SAMPLE_PARAMETER()) { enabled = false; @@ -925,7 +942,8 @@ static void StartProfiling(ProfilingType type, "<script type=\"text/javascript\">\n" "function generateURL() {\n" " var past_prof = document.getElementById('view_prof').value;\n" - " var base_prof = document.getElementById('base_prof').value;\n" + " var base_prof_el = document.getElementById('base_prof');\n" + " var base_prof = base_prof_el != null ? base_prof_el.value : '';\n" " var display_type = document.getElementById('display_type').value;\n"; if (type == PROFILING_CONTENTION) { os << " var show_ccount = document.getElementById('ccount_cb').checked;\n"; @@ -1092,17 +1110,19 @@ static void StartProfiling(ProfilingType type, << (show_ccount ? " checked=''" : "") << " onclick='onChangedCB(this);'>count</label>"; } - os << "</div><div><pre style='display:inline'>Diff: </pre>" - "<select id='base_prof' onchange='onSelectProf()'>" - "<option value=''><none></option>"; - for (size_t i = 0; i < past_profs.size(); ++i) { - os << "<option value='" << past_profs[i] << "' "; - if (base_name != NULL && past_profs[i] == *base_name) { - os << "selected"; + if (type != PROFILING_IOBUF) { + os << "</div><div><pre style='display:inline'>Diff: </pre>" + "<select id='base_prof' onchange='onSelectProf()'>" + "<option value=''><none></option>"; + for (size_t i = 0; i<past_profs.size(); ++i) { + os << "<option value='" << past_profs[i] << "' "; + if (base_name!=NULL && past_profs[i]==*base_name) { + os << "selected"; + } + os << '>' << GetBaseName(&past_profs[i]); } - os << '>' << GetBaseName(&past_profs[i]); + os << "</select></div>"; } - os << "</select></div>"; if (!enabled && view == NULL) { os << "<p><span style='color:red'>Error:</span> " @@ -1194,6 +1214,14 @@ void HotspotsService::contention( return StartProfiling(PROFILING_CONTENTION, cntl_base, done); } +void HotspotsService::iobuf(::google::protobuf::RpcController* cntl_base, + const ::brpc::HotspotsRequest* request, + ::brpc::HotspotsResponse* response, + ::google::protobuf::Closure* done) { + return StartProfiling(PROFILING_IOBUF, cntl_base, done); +} + + void HotspotsService::cpu_non_responsive( ::google::protobuf::RpcController* cntl_base, const ::brpc::HotspotsRequest*, @@ -1226,19 +1254,33 @@ void HotspotsService::contention_non_responsive( return DoProfiling(PROFILING_CONTENTION, cntl_base, done); } +void HotspotsService::iobuf_non_responsive(::google::protobuf::RpcController* cntl_base, + const ::brpc::HotspotsRequest* request, + ::brpc::HotspotsResponse* response, + ::google::protobuf::Closure* done) { + return DoProfiling(PROFILING_IOBUF, cntl_base, done); +} + void HotspotsService::GetTabInfo(TabInfoList* info_list) const { TabInfo* info = info_list->add(); info->path = "/hotspots/cpu"; info->tab_name = "cpu"; + info = info_list->add(); info->path = "/hotspots/heap"; info->tab_name = "heap"; + info = info_list->add(); info->path = "/hotspots/growth"; info->tab_name = "growth"; + info = info_list->add(); info->path = "/hotspots/contention"; info->tab_name = "contention"; + + info = info_list->add(); + info->path = "/hotspots/iobuf"; + info->tab_name = "iobuf"; } } // namespace brpc diff --git a/src/brpc/builtin/hotspots_service.h b/src/brpc/builtin/hotspots_service.h index 23696eec..cdd90b6b 100644 --- a/src/brpc/builtin/hotspots_service.h +++ b/src/brpc/builtin/hotspots_service.h @@ -50,6 +50,11 @@ public: ::brpc::HotspotsResponse* response, ::google::protobuf::Closure* done); + void iobuf(::google::protobuf::RpcController* cntl_base, + const ::brpc::HotspotsRequest* request, + ::brpc::HotspotsResponse* response, + ::google::protobuf::Closure* done); + void cpu_non_responsive(::google::protobuf::RpcController* cntl_base, const ::brpc::HotspotsRequest* request, ::brpc::HotspotsResponse* response, @@ -70,6 +75,11 @@ public: ::brpc::HotspotsResponse* response, ::google::protobuf::Closure* done); + void iobuf_non_responsive(::google::protobuf::RpcController* cntl_base, + const ::brpc::HotspotsRequest* request, + ::brpc::HotspotsResponse* response, + ::google::protobuf::Closure* done); + void GetTabInfo(brpc::TabInfoList*) const; }; diff --git a/src/brpc/builtin/pprof_perl.cpp b/src/brpc/builtin/pprof_perl.cpp index 798df7bd..0d859165 100644 --- a/src/brpc/builtin/pprof_perl.cpp +++ b/src/brpc/builtin/pprof_perl.cpp @@ -4172,7 +4172,8 @@ const char* pprof_perl() { "\n" " while ( $line = <PROFILE> ) {\n" " $line =~ s/\\r//g; # turn windows-looking lines into unix-looking lines\n" - " if ( $line =~ /^\\s*(\\d+)\\s+(\\d+) \\@\\s*(.*?)\\s*$/ ) {\n" + " # Support negative count for IOBuf Profiler\n" + " if ( $line =~ /^\\s*(\\d+)\\s+(-?\\d+) \\@\\s*(.*?)\\s*$/ ) {\n" " my ($cycles, $count, $stack) = ($1, $2, $3);\n" "\n" " # Convert cycles to nanoseconds\n" diff --git a/src/brpc/builtin_service.proto b/src/brpc/builtin_service.proto index 95d26bfa..bb28f44e 100644 --- a/src/brpc/builtin_service.proto +++ b/src/brpc/builtin_service.proto @@ -149,6 +149,8 @@ service hotspots { rpc growth_non_responsive(HotspotsRequest) returns (HotspotsResponse); rpc contention(HotspotsRequest) returns (HotspotsResponse); rpc contention_non_responsive(HotspotsRequest) returns (HotspotsResponse); + rpc iobuf(HotspotsRequest) returns (HotspotsResponse); + rpc iobuf_non_responsive(HotspotsRequest) returns (HotspotsResponse); } service flags { diff --git a/src/bthread/mutex.cpp b/src/bthread/mutex.cpp index b4008fd7..d22c8753 100644 --- a/src/bthread/mutex.cpp +++ b/src/bthread/mutex.cpp @@ -268,7 +268,7 @@ BAIDU_CACHELINE_ALIGNMENT static ContentionProfiler* g_cp = NULL; // Need this version to solve an issue that non-empty entries left by // previous contention profilers should be detected and overwritten. static uint64_t g_cp_version = 0; -// Protecting accesss to g_cp. +// Protecting accesses to g_cp. static pthread_mutex_t g_cp_mutex = PTHREAD_MUTEX_INITIALIZER; // The map storing information for profiling pthread_mutex. Different from diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp index 3418f671..8895fb16 100644 --- a/src/butil/iobuf.cpp +++ b/src/butil/iobuf.cpp @@ -38,6 +38,7 @@ #include "butil/logging.h" // CHECK, LOG #include "butil/fd_guard.h" // butil::fd_guard #include "butil/iobuf.h" +#include "butil/iobuf_profiler.h" namespace butil { namespace iobuf { @@ -155,7 +156,7 @@ inline iov_function get_pwritev_func() { #endif // ARCH_CPU_X86_64 -inline void* cp(void *__restrict dest, const void *__restrict src, size_t n) { +void* cp(void *__restrict dest, const void *__restrict src, size_t n) { // memcpy in gcc 4.8 seems to be faster enough. return memcpy(dest, src, n); } @@ -193,7 +194,8 @@ size_t IOBuf::new_bigview_count() { return iobuf::g_newbigview.load(butil::memory_order_relaxed); } -const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 0x1; +const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 1 << 0; +const uint16_t IOBUF_BLOCK_FLAGS_SAMPLED = 1 << 1; using UserDataDeleter = std::function<void(void*)>; struct UserDataExtension { @@ -228,6 +230,9 @@ struct IOBuf::Block { iobuf::g_nblock.fetch_add(1, butil::memory_order_relaxed); iobuf::g_blockmem.fetch_add(data_size + sizeof(Block), butil::memory_order_relaxed); + if (is_samplable()) { + SubmitIOBufSample(this, 1); + } } Block(char* data_in, uint32_t data_size, UserDataDeleter deleter) @@ -240,6 +245,9 @@ struct IOBuf::Block { , data(data_in) { auto ext = new (get_user_data_extension()) UserDataExtension(); ext->deleter = std::move(deleter); + if (is_samplable()) { + SubmitIOBufSample(this, 1); + } } // Undefined behavior when (flags & IOBUF_BLOCK_FLAGS_USER_DATA) is 0. @@ -260,13 +268,19 @@ struct IOBuf::Block { void inc_ref() { check_abi(); nshared.fetch_add(1, butil::memory_order_relaxed); + if (sampled()) { + SubmitIOBufSample(this, 1); + } } void dec_ref() { check_abi(); + if (sampled()) { + SubmitIOBufSample(this, -1); + } if (nshared.fetch_sub(1, butil::memory_order_release) == 1) { butil::atomic_thread_fence(butil::memory_order_acquire); - if (!flags) { + if (!is_user_data()) { iobuf::g_nblock.fetch_sub(1, butil::memory_order_relaxed); iobuf::g_blockmem.fetch_sub(cap + sizeof(Block), butil::memory_order_relaxed); @@ -288,6 +302,23 @@ struct IOBuf::Block { bool full() const { return size >= cap; } size_t left_space() const { return cap - size; } + +private: + bool is_samplable() { + if (IsIOBufProfilerSamplable()) { + flags |= IOBUF_BLOCK_FLAGS_SAMPLED; + return true; + } + return false; + } + + bool sampled() const { + return flags & IOBUF_BLOCK_FLAGS_SAMPLED; + } + + bool is_user_data() const { + return flags & IOBUF_BLOCK_FLAGS_USER_DATA; + } }; namespace iobuf { @@ -329,6 +360,11 @@ inline IOBuf::Block* create_block() { // release_tls_block_chain() may exceed this limit sometimes. const int MAX_BLOCKS_PER_THREAD = 8; +inline int max_blocks_per_thread() { + // If IOBufProfiler is enabled, do not cache blocks in TLS. + return IsIOBufProfilerEnabled() ? 0 : MAX_BLOCKS_PER_THREAD; +} + struct TLSData { // Head of the TLS block chain. IOBuf::Block* block_head; @@ -410,7 +446,7 @@ inline void release_tls_block(IOBuf::Block* b) { TLSData& tls_data = g_tls_data; if (b->full()) { b->dec_ref(); - } else if (tls_data.num_blocks >= MAX_BLOCKS_PER_THREAD) { + } else if (tls_data.num_blocks >= max_blocks_per_thread()) { b->dec_ref(); g_num_hit_tls_threshold.fetch_add(1, butil::memory_order_relaxed); } else { @@ -429,7 +465,7 @@ inline void release_tls_block(IOBuf::Block* b) { void release_tls_block_chain(IOBuf::Block* b) { TLSData& tls_data = g_tls_data; size_t n = 0; - if (tls_data.num_blocks >= MAX_BLOCKS_PER_THREAD) { + if (tls_data.num_blocks >= max_blocks_per_thread()) { do { ++n; IOBuf::Block* const saved_next = b->u.portal_next; diff --git a/src/butil/iobuf_profiler.cpp b/src/butil/iobuf_profiler.cpp new file mode 100644 index 00000000..bcb53391 --- /dev/null +++ b/src/butil/iobuf_profiler.cpp @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <fcntl.h> +#include "butil/iobuf_profiler.h" +#include "butil/strings/string_number_conversions.h" +#include "butil/file_util.h" +#include "butil/fd_guard.h" +#include "butil/fast_rand.h" +#include "butil/hash.h" +#include <execinfo.h> + +extern int __attribute__((weak)) GetStackTrace(void** result, int max_depth, int skip_count); + +namespace butil { + +namespace iobuf { +extern void* cp(void *__restrict dest, const void *__restrict src, size_t n); +} + +// Max and min sleep time for IOBuf profiler consuming thread +// when `_sample_queue' is empty. +const uint32_t IOBufProfiler::MIN_SLEEP_MS = 10; +const uint32_t IOBufProfiler::MAX_SLEEP_MS = 1000; + +static pthread_once_t g_iobuf_profiler_info_once = PTHREAD_ONCE_INIT; +static bool g_iobuf_profiler_enabled = false; +static uint g_iobuf_profiler_sample_rate = 100; + +// Environment variables: +// 1. ENABLE_IOBUF_PROFILER: set value to 1 to enable IOBuf profiler. +// 2. IOBUF_PROFILER_SAMPLE_RATE: set value between (0, 100] to control sample rate. +static void InitGlobalIOBufProfilerInfo() { + const char* enabled = getenv("ENABLE_IOBUF_PROFILER"); + g_iobuf_profiler_enabled = enabled && strcmp("1", enabled) == 0 && ::GetStackTrace != NULL; + if (g_iobuf_profiler_enabled) { + return; + } + + char* rate = getenv("IOBUF_PROFILER_SAMPLE_RATE"); + if (rate) { + int tmp = 0; + if (butil::StringToInt(rate, &tmp)) { + if (tmp > 0 && tmp <= 100) { + g_iobuf_profiler_sample_rate = tmp; + } else { + LOG(ERROR) << "IOBUF_PROFILER_SAMPLE_RATE should be in (0, 100], but get " << rate; + } + } else { + LOG(ERROR) << "IOBUF_PROFILER_SAMPLE_RATE should be a number, but get " << rate; + } + } + LOG(INFO) << "g_iobuf_profiler_sample_rate=" << g_iobuf_profiler_sample_rate; +} + +bool IsIOBufProfilerEnabled() { + pthread_once(&g_iobuf_profiler_info_once, InitGlobalIOBufProfilerInfo); + return g_iobuf_profiler_enabled; +} + +bool IsIOBufProfilerSamplable() { + pthread_once(&g_iobuf_profiler_info_once, InitGlobalIOBufProfilerInfo); + if (g_iobuf_profiler_sample_rate == 100) { + return true; + } + return fast_rand_less_than(100) + 1 <= g_iobuf_profiler_sample_rate; +} + +size_t IOBufSample::stack_hash_code() const { + if (nframes == 0) { + return 0; + } + if (_hash_code == 0) { + _hash_code = SuperFastHash(reinterpret_cast<const char*>(stack), + sizeof(void*) * nframes); + } + return _hash_code; +} + +IOBufProfiler* IOBufProfiler::GetInstance() { + return ::Singleton<IOBufProfiler, LeakySingletonTraits<IOBufProfiler>>::get(); +} + +IOBufProfiler::IOBufProfiler() + : butil::SimpleThread("IOBufProfiler") + , _stop(false) + , _sleep_ms(MIN_SLEEP_MS) { + CHECK_EQ(0, _stack_map.init(1024)); + CHECK_EQ(0, _block_info_map.init(1024)); + Start(); +} + +IOBufProfiler::~IOBufProfiler() { + StopAndJoin(); + _block_info_map.clear(); + _stack_map.clear(); + + // Clear `_sample_queue'. + IOBufSample* sample = NULL; + while (_sample_queue.Dequeue(sample)) { + IOBufSample::Destroy(sample); + } + +} + +void IOBufProfiler::Submit(IOBufSample* s) { + if (_stop.load(butil::memory_order_relaxed)) { + return; + } + + _sample_queue.Enqueue(s); +} + +void IOBufProfiler::Dump(IOBufSample* s) { + do { + BAIDU_SCOPED_LOCK(_mutex); + // Categorize the stack. + IOBufRefSampleSharedPtr stack_sample; + IOBufRefSampleSharedPtr* stack_ptr = _stack_map.seek(s); + if (!stack_ptr) { + stack_sample = IOBufSample::CopyAndSharedWithDestroyer(s); + stack_sample->block = NULL; + stack_ptr = &_stack_map[stack_sample.get()]; + *stack_ptr = stack_sample; + } else { + (*stack_ptr)->count += s->count; + } + + BlockInfo* info = _block_info_map.seek(s->block); + if (info) { + // Categorize the IOBufSample. + info->stack_count_map[*stack_ptr] += s->count; + info->ref += s->count; + if (info->ref == 0) { + for (const auto& iter : info->stack_count_map) { + IOBufRefSampleSharedPtr* stack_ptr2 = _stack_map.seek(iter.first.get()); + if (stack_ptr2 && *stack_ptr2) { + (*stack_ptr2)->count -= iter.second; + if ((*stack_ptr2)->count == 0) { + _stack_map.erase(iter.first.get()); + } + } + } + _block_info_map.erase(s->block); + break; + } + } else { + BlockInfo& new_info = _block_info_map[s->block]; + new_info.ref += s->count; + CHECK_EQ(0, new_info.stack_count_map.init(64)); + new_info.stack_count_map[*stack_ptr] = s->count; + } + } while (false); + s->block = NULL; +} + +IOBufSample* IOBufSample::Copy(IOBufSample* ref) { + auto copied = IOBufSample::New(); + copied->block = ref->block; + copied->count = ref->count; + copied->_hash_code = ref->_hash_code; + copied->nframes = ref->nframes; + iobuf::cp(copied->stack, ref->stack, sizeof(void*) * ref->nframes); + return copied; +} + +IOBufRefSampleSharedPtr IOBufSample::CopyAndSharedWithDestroyer(IOBufSample* ref) { + return { Copy(ref), detail::Destroyer() }; +} + +void IOBufProfiler::Flush2Disk(const char* filename) { + if (!filename) { + LOG(ERROR) << "Parameter [filename] is NULL"; + return; + } + // Serialize contentions in _stack_map into _disk_buf. + _disk_buf.append("--- contention\ncycles/second=1000000000\n"); + butil::IOBufBuilder os; + { + BAIDU_SCOPED_LOCK(_mutex); + for (auto it = _stack_map.begin(); it != _stack_map.end(); ++it) { + const IOBufRefSampleSharedPtr& c = it->second; + if (c->nframes == 0) { + LOG_EVERY_SECOND(WARNING) << "Invalid stack with nframes=0, count=" << c->count; + continue; + } + os << "0 " << c->count << " @"; + for (int i = 0; i < c->nframes; ++i) { + os << " " << c->stack[i]; + } + os << "\n"; + } + } + _disk_buf.append(os.buf().movable()); + + // Append /proc/self/maps to the end of the contention file, required by + // pprof.pl, otherwise the functions in sys libs are not interpreted. + butil::IOPortal mem_maps; + const butil::fd_guard maps_fd(open("/proc/self/maps", O_RDONLY)); + if (maps_fd >= 0) { + while (true) { + ssize_t nr = mem_maps.append_from_file_descriptor(maps_fd, 8192); + if (nr < 0) { + if (errno == EINTR) { + continue; + } + PLOG(ERROR) << "Fail to read /proc/self/maps"; + break; + } + if (nr == 0) { + _disk_buf.append(mem_maps); + break; + } + } + } else { + PLOG(ERROR) << "Fail to open /proc/self/maps"; + } + // Write _disk_buf into _filename + butil::File::Error error; + butil::FilePath file_path(filename); + butil::FilePath dir = file_path.DirName(); + if (!butil::CreateDirectoryAndGetError(dir, &error)) { + LOG(ERROR) << "Fail to create directory=`" << dir.value() + << ": " << error; + return; + } + + butil::fd_guard fd(open(filename, O_WRONLY | O_CREAT | O_APPEND | O_TRUNC, 0666)); + if (fd < 0) { + PLOG(ERROR) << "Fail to open " << filename; + return; + } + // Write once normally, write until empty in the end. + do { + ssize_t nw = _disk_buf.cut_into_file_descriptor(fd); + if (nw < 0) { + if (errno == EINTR) { + continue; + } + PLOG(ERROR) << "Fail to write into " << filename; + return; + } + LOG(ERROR) << "Write " << nw << " bytes into " << filename; + } while (!_disk_buf.empty()); +} + +void IOBufProfiler::StopAndJoin() { + if (_stop.exchange(true, butil::memory_order_relaxed)) { + return; + } + if (!HasBeenJoined()) { + Join(); + } +} + +void IOBufProfiler::Run() { + while (!_stop.load(butil::memory_order_relaxed)) { + Consume(); + ::usleep(_sleep_ms * 1000); + } +} + +void IOBufProfiler::Consume() { + IOBufSample* sample = NULL; + bool is_empty = true; + while (_sample_queue.Dequeue(sample)) { + Dump(sample); + IOBufSample::Destroy(sample); + is_empty = false; + } + + // If `_sample_queue' is empty, exponential increase in sleep time. + _sleep_ms = !is_empty ? MIN_SLEEP_MS : + std::min(_sleep_ms * 2, MAX_SLEEP_MS); +} + +void SubmitIOBufSample(IOBuf::Block* block, int64_t ref) { + if (!IsIOBufProfilerEnabled()) { + return; + } + + auto sample = IOBufSample::New(); + sample->block = block; + sample->count = ref; + sample->nframes = GetStackTrace(sample->stack, + arraysize(sample->stack), + 0); // may lock + IOBufProfiler::GetInstance()->Submit(sample); +} + +bool IOBufProfilerFlush(const char* filename) { + if (!filename) { + LOG(ERROR) << "Parameter [filename] is NULL"; + return false; + } + + auto profiler = IOBufProfiler::GetInstance(); + profiler->Flush2Disk(filename); + return true; +} + +} \ No newline at end of file diff --git a/src/butil/iobuf_profiler.h b/src/butil/iobuf_profiler.h new file mode 100644 index 00000000..7178f20a --- /dev/null +++ b/src/butil/iobuf_profiler.h @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BIGO_BRPC_IOBUF_PROFILER_H +#define BIGO_BRPC_IOBUF_PROFILER_H + +#include <unordered_map> +#include <unordered_set> +#include "butil/iobuf.h" +#include "butil/object_pool.h" +#include "butil/threading/simple_thread.h" +#include "butil/threading/simple_thread.h" +#include "butil/memory/singleton.h" +#include "butil/containers/flat_map.h" +#include "butil/containers/mpsc_queue.h" + +namespace butil { + +struct IOBufSample; +typedef std::shared_ptr<IOBufSample> IOBufRefSampleSharedPtr; + +struct IOBufSample { + IOBufSample* next; + IOBuf::Block* block; + int64_t count; // // reference count of the block. + void* stack[28]; // backtrace. + int nframes; // num of frames in stack. + + static IOBufSample* New() { + return get_object<IOBufSample>(); + } + + static IOBufSample* Copy(IOBufSample* ref); + static IOBufRefSampleSharedPtr CopyAndSharedWithDestroyer(IOBufSample* ref); + static void Destroy(IOBufSample* ref) { + ref->_hash_code = 0; + return_object(ref); + } + + size_t stack_hash_code() const; + +private: + friend ObjectPool<IOBufSample>; + + IOBufSample() + : next(NULL) + , block(NULL) + , count(0) + , stack{} + , nframes(0) + , _hash_code(0) {} + + ~IOBufSample() = default; + + mutable uint32_t _hash_code; // For combining samples with hashmap. +}; + +BAIDU_CASSERT(sizeof(IOBufSample) == 256, be_friendly_to_allocator); + +namespace detail { +// Functor to compare IOBufRefSample. +template <typename T> +struct IOBufSampleEqual { + bool operator()(const T& c1, const T& c2) const { + return c1->stack_hash_code() ==c2->stack_hash_code() && + c1->nframes == c2->nframes && + memcmp(c1->stack, c2->stack, sizeof(void*) * c1->nframes) == 0; + } +}; + +// Functor to hash IOBufRefSample. +template <typename T> +struct IOBufSampleHash { + size_t operator()(const T& c) const { + return c->stack_hash_code(); + } +}; + +struct Destroyer { + void operator()(IOBufSample* ref) const { + if (ref) { + IOBufSample::Destroy(ref); + } + } +}; +} + +class IOBufProfiler : public butil::SimpleThread { +public: + static IOBufProfiler* GetInstance(); + + // Submit the IOBuf sample along with stacktrace. + void Submit(IOBufSample* s); + // Dump IOBuf sample to map. + void Dump(IOBufSample* s); + // Write buffered data into resulting file. + void Flush2Disk(const char* filename); + + void StopAndJoin(); + +private: + friend struct DefaultSingletonTraits<IOBufProfiler>; + + typedef butil::FlatMap<IOBufSample*, + IOBufRefSampleSharedPtr, + detail::IOBufSampleHash<IOBufSample*>, + detail::IOBufSampleEqual<IOBufSample*>> IOBufRefMap; + + // <iobuf stack, ref count> + typedef butil::FlatMap<IOBufRefSampleSharedPtr, + int64_t, + detail::IOBufSampleHash<IOBufRefSampleSharedPtr>, + detail::IOBufSampleEqual<IOBufRefSampleSharedPtr>> StackCountMap; + struct BlockInfo { + int64_t ref{0}; + StackCountMap stack_count_map; + }; + typedef butil::FlatMap<IOBuf::Block*, BlockInfo> BlockInfoMap; + + IOBufProfiler(); + ~IOBufProfiler() override; + DISALLOW_COPY_AND_ASSIGN(IOBufProfiler); + + void Run() override; + // Consume the IOBuf sample in _sample_queue. + void Consume(); + + // Stop flag of IOBufProfiler. + butil::atomic<bool> _stop; + // IOBuf sample queue. + MPSCQueue<IOBufSample*> _sample_queue; + + // Temp buf before saving the file. + butil::IOBuf _disk_buf; + // Combining same samples to make result smaller. + IOBufRefMap _stack_map; + // Record block info. + BlockInfoMap _block_info_map; + Mutex _mutex; + + // Sleep when `_sample_queue' is empty. + uint32_t _sleep_ms; + static const uint32_t MIN_SLEEP_MS; + static const uint32_t MAX_SLEEP_MS; +}; + +bool IsIOBufProfilerEnabled(); +bool IsIOBufProfilerSamplable(); + +void SubmitIOBufSample(IOBuf::Block* block, int64_t ref); + +bool IOBufProfilerFlush(const char* filename); + +} +#endif //BIGO_BRPC_IOBUF_PROFILER_H --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org