This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 498c3e17 Support IOBuf Profiler (#2497)
498c3e17 is described below

commit 498c3e17954ea36a88e5561ffdffa298a7b48dcf
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Mon Apr 8 11:07:45 2024 +0800

    Support IOBuf Profiler (#2497)
    
    * Support IOBuf Profiler
    
    * Do not cache blocks in TLS
    
    * Use MPSC Queue
    
    * Add comments
---
 BUILD.bazel                           |   1 +
 CMakeLists.txt                        |   1 +
 Makefile                              |   1 +
 src/brpc/builtin/common.cpp           |   1 +
 src/brpc/builtin/common.h             |   1 +
 src/brpc/builtin/hotspots_service.cpp |  80 +++++++--
 src/brpc/builtin/hotspots_service.h   |  10 ++
 src/brpc/builtin/pprof_perl.cpp       |   3 +-
 src/brpc/builtin_service.proto        |   2 +
 src/bthread/mutex.cpp                 |   2 +-
 src/butil/iobuf.cpp                   |  46 ++++-
 src/butil/iobuf_profiler.cpp          | 316 ++++++++++++++++++++++++++++++++++
 src/butil/iobuf_profiler.h            | 169 ++++++++++++++++++
 13 files changed, 607 insertions(+), 26 deletions(-)

diff --git a/BUILD.bazel b/BUILD.bazel
index 0bdaa4d3..b2a453e0 100644
--- a/BUILD.bazel
+++ b/BUILD.bazel
@@ -213,6 +213,7 @@ BUTIL_SRCS = [
     "src/butil/crc32c.cc",
     "src/butil/containers/case_ignored_flat_map.cpp",
     "src/butil/iobuf.cpp",
+    "src/butil/iobuf_profiler.cpp",
     "src/butil/binary_printer.cpp",
     "src/butil/recordio.cc",
     "src/butil/popen.cpp",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 40eff79d..eaae961a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -444,6 +444,7 @@ set(BUTIL_SOURCES
     ${PROJECT_SOURCE_DIR}/src/butil/crc32c.cc
     ${PROJECT_SOURCE_DIR}/src/butil/containers/case_ignored_flat_map.cpp
     ${PROJECT_SOURCE_DIR}/src/butil/iobuf.cpp
+    ${PROJECT_SOURCE_DIR}/src/butil/iobuf_profiler.cpp
     ${PROJECT_SOURCE_DIR}/src/butil/binary_printer.cpp
     ${PROJECT_SOURCE_DIR}/src/butil/recordio.cc
     ${PROJECT_SOURCE_DIR}/src/butil/popen.cpp
diff --git a/Makefile b/Makefile
index 04d0a243..641a8a78 100644
--- a/Makefile
+++ b/Makefile
@@ -163,6 +163,7 @@ BUTIL_SOURCES = \
     src/butil/crc32c.cc \
     src/butil/containers/case_ignored_flat_map.cpp \
     src/butil/iobuf.cpp \
+    src/butil/iobuf_profiler.cpp \
     src/butil/binary_printer.cpp \
     src/butil/recordio.cc \
     src/butil/popen.cpp
diff --git a/src/brpc/builtin/common.cpp b/src/brpc/builtin/common.cpp
index 9cdc72fe..7b5d33a3 100644
--- a/src/brpc/builtin/common.cpp
+++ b/src/brpc/builtin/common.cpp
@@ -308,6 +308,7 @@ const char* ProfilingType2String(ProfilingType t) {
     case PROFILING_HEAP: return "heap";
     case PROFILING_GROWTH: return "growth";
     case PROFILING_CONTENTION: return "contention";
+    case PROFILING_IOBUF: return "iobuf";
     }
     return "unknown";
 }
diff --git a/src/brpc/builtin/common.h b/src/brpc/builtin/common.h
index f4d69627..5fd8a9c9 100644
--- a/src/brpc/builtin/common.h
+++ b/src/brpc/builtin/common.h
@@ -52,6 +52,7 @@ enum ProfilingType {
     PROFILING_HEAP = 1,
     PROFILING_GROWTH = 2,
     PROFILING_CONTENTION = 3,
+    PROFILING_IOBUF = 4,
 };
 
 DECLARE_string(rpc_profiling_dir);
diff --git a/src/brpc/builtin/hotspots_service.cpp 
b/src/brpc/builtin/hotspots_service.cpp
index b70266f4..c01ce536 100644
--- a/src/brpc/builtin/hotspots_service.cpp
+++ b/src/brpc/builtin/hotspots_service.cpp
@@ -23,6 +23,7 @@
 #include "butil/file_util.h"                     // butil::FilePath
 #include "butil/popen.h"                         // butil::read_command_output
 #include "butil/fd_guard.h"                      // butil::fd_guard
+#include "butil/iobuf_profiler.h"
 #include "brpc/log.h"
 #include "brpc/controller.h"
 #include "brpc/server.h"
@@ -154,7 +155,8 @@ struct ProfilingEnvironment {
 };
 
 // Different ProfilingType have different env.
-static ProfilingEnvironment g_env[4] = {
+static ProfilingEnvironment g_env[5] = {
+    { PTHREAD_MUTEX_INITIALIZER, 0, NULL, NULL, NULL },
     { PTHREAD_MUTEX_INITIALIZER, 0, NULL, NULL, NULL },
     { PTHREAD_MUTEX_INITIALIZER, 0, NULL, NULL, NULL },
     { PTHREAD_MUTEX_INITIALIZER, 0, NULL, NULL, NULL },
@@ -399,7 +401,8 @@ static bool has_GOOGLE_PPROF_BINARY_PATH() {
 static void DisplayResult(Controller* cntl,
                           google::protobuf::Closure* done,
                           const char* prof_name,
-                          const butil::IOBuf& result_prefix) {
+                          const butil::IOBuf& result_prefix,
+                          ProfilingType type) {
     ClosureGuard done_guard(done);
     butil::IOBuf prof_result;
     if (cntl->IsCanceled()) {
@@ -488,7 +491,7 @@ static void DisplayResult(Controller* cntl,
 #if defined(OS_LINUX)
     cmd_builder << "perl " << pprof_tool
                 << DisplayTypeToPProfArgument(display_type)
-                << (show_ccount ? " --contention " : "");
+                << ((show_ccount || type == PROFILING_IOBUF) ? " --contention 
" : "");
     if (base_name) {
         cmd_builder << "--base " << *base_name << ' ';
     }
@@ -505,7 +508,7 @@ static void DisplayResult(Controller* cntl,
 #elif defined(OS_MACOSX)
     cmd_builder << s_pprof_binary_path << " "
                 << DisplayTypeToPProfArgument(display_type)
-                << (show_ccount ? " -contentions " : "");
+                << ((show_ccount || type == PROFILING_IOBUF) ? " --contention 
" : "");
     if (base_name) {
         cmd_builder << "-base " << *base_name << ' ';
     }
@@ -637,7 +640,7 @@ static void DoProfiling(ProfilingType type,
             return cntl->SetFailed(
                 EINVAL, "The profile denoted by `view' does not exist");
         }
-        DisplayResult(cntl, done_guard.release(), view->c_str(), os.buf());
+        DisplayResult(cntl, done_guard.release(), view->c_str(), os.buf(), 
type);
         return;
     }
 
@@ -774,6 +777,15 @@ static void DoProfiling(ProfilingType type,
             PLOG(WARNING) << "Profiling has been interrupted";
         }
         bthread::ContentionProfilerStop();
+    } else if (type == PROFILING_IOBUF) {
+        if (!butil::IsIOBufProfilerEnabled()) {
+            os << "IOBuf profiler is not enabled"
+               << (use_html ? "</body></html>" : "\n");
+            os.move_to(resp);
+            cntl->http_response().set_status_code(HTTP_STATUS_FORBIDDEN);
+            return NotifyWaiters(type, cntl, view);
+        }
+        butil::IOBufProfilerFlush(prof_name);
     } else if (type == PROFILING_HEAP) {
         MallocExtension* malloc_ext = MallocExtension::instance();
         if (malloc_ext == NULL || !has_TCMALLOC_SAMPLE_PARAMETER()) {
@@ -827,11 +839,11 @@ static void DoProfiling(ProfilingType type,
     std::vector<ProfilingWaiter> waiters;
     // NOTE: Must be called before DisplayResult which calls done->Run() and
     // deletes cntl.
-    ConsumeWaiters(type, cntl, &waiters);    
-    DisplayResult(cntl, done_guard.release(), prof_name, os.buf());
+    ConsumeWaiters(type, cntl, &waiters);
+    DisplayResult(cntl, done_guard.release(), prof_name, os.buf(), type);
 
     for (size_t i = 0; i < waiters.size(); ++i) {
-        DisplayResult(waiters[i].cntl, waiters[i].done, prof_name, os.buf());
+        DisplayResult(waiters[i].cntl, waiters[i].done, prof_name, os.buf(), 
type);
     }
 }
 
@@ -849,7 +861,12 @@ static void StartProfiling(ProfilingType type,
         enabled = cpu_profiler_enabled;
     } else if (type == PROFILING_CONTENTION) {
         enabled = true;
-    } else if (type == PROFILING_HEAP) {
+    } else if (type == PROFILING_IOBUF) {
+        enabled = butil::IsIOBufProfilerEnabled();
+        if (!enabled) {
+            extra_desc = " (no ENABLE_IOBUF_PROFILER=1 in env or no link 
tcmalloc )";
+        }
+    }  else if (type == PROFILING_HEAP) {
         enabled = IsHeapProfilerEnabled();
         if (enabled && !has_TCMALLOC_SAMPLE_PARAMETER()) {
             enabled = false;
@@ -925,7 +942,8 @@ static void StartProfiling(ProfilingType type,
         "<script type=\"text/javascript\">\n"
         "function generateURL() {\n"
         "  var past_prof = document.getElementById('view_prof').value;\n"
-        "  var base_prof = document.getElementById('base_prof').value;\n"
+          "  var base_prof_el = document.getElementById('base_prof');\n"
+          "  var base_prof = base_prof_el != null ? base_prof_el.value : '';\n"
         "  var display_type = 
document.getElementById('display_type').value;\n";
     if (type == PROFILING_CONTENTION) {
         os << "  var show_ccount = 
document.getElementById('ccount_cb').checked;\n";
@@ -1092,17 +1110,19 @@ static void StartProfiling(ProfilingType type,
            << (show_ccount ? " checked=''" : "") <<
             " onclick='onChangedCB(this);'>count</label>";
     }
-    os << "</div><div><pre style='display:inline'>Diff: </pre>"
-        "<select id='base_prof' onchange='onSelectProf()'>"
-        "<option value=''>&lt;none&gt;</option>";
-    for (size_t i = 0; i < past_profs.size(); ++i) {
-        os << "<option value='" << past_profs[i] << "' ";
-        if (base_name != NULL && past_profs[i] == *base_name) {
-            os << "selected";
+    if (type != PROFILING_IOBUF) {
+        os << "</div><div><pre style='display:inline'>Diff: </pre>"
+              "<select id='base_prof' onchange='onSelectProf()'>"
+              "<option value=''>&lt;none&gt;</option>";
+        for (size_t i = 0; i<past_profs.size(); ++i) {
+            os << "<option value='" << past_profs[i] << "' ";
+            if (base_name!=NULL && past_profs[i]==*base_name) {
+                os << "selected";
+            }
+            os << '>' << GetBaseName(&past_profs[i]);
         }
-        os << '>' << GetBaseName(&past_profs[i]);
+        os << "</select></div>";
     }
-    os << "</select></div>";
     
     if (!enabled && view == NULL) {
         os << "<p><span style='color:red'>Error:</span> "
@@ -1194,6 +1214,14 @@ void HotspotsService::contention(
     return StartProfiling(PROFILING_CONTENTION, cntl_base, done);
 }
 
+void HotspotsService::iobuf(::google::protobuf::RpcController* cntl_base,
+    const ::brpc::HotspotsRequest* request,
+    ::brpc::HotspotsResponse* response,
+    ::google::protobuf::Closure* done) {
+    return StartProfiling(PROFILING_IOBUF, cntl_base, done);
+}
+
+
 void HotspotsService::cpu_non_responsive(
     ::google::protobuf::RpcController* cntl_base,
     const ::brpc::HotspotsRequest*,
@@ -1226,19 +1254,33 @@ void HotspotsService::contention_non_responsive(
     return DoProfiling(PROFILING_CONTENTION, cntl_base, done);
 }
 
+void HotspotsService::iobuf_non_responsive(::google::protobuf::RpcController* 
cntl_base,
+    const ::brpc::HotspotsRequest* request,
+    ::brpc::HotspotsResponse* response,
+    ::google::protobuf::Closure* done) {
+    return DoProfiling(PROFILING_IOBUF, cntl_base, done);
+}
+
 void HotspotsService::GetTabInfo(TabInfoList* info_list) const {
     TabInfo* info = info_list->add();
     info->path = "/hotspots/cpu";
     info->tab_name = "cpu";
+
     info = info_list->add();
     info->path = "/hotspots/heap";
     info->tab_name = "heap";
+
     info = info_list->add();
     info->path = "/hotspots/growth";
     info->tab_name = "growth";
+
     info = info_list->add();
     info->path = "/hotspots/contention";
     info->tab_name = "contention";
+
+    info = info_list->add();
+    info->path = "/hotspots/iobuf";
+    info->tab_name = "iobuf";
 }
 
 } // namespace brpc
diff --git a/src/brpc/builtin/hotspots_service.h 
b/src/brpc/builtin/hotspots_service.h
index 23696eec..cdd90b6b 100644
--- a/src/brpc/builtin/hotspots_service.h
+++ b/src/brpc/builtin/hotspots_service.h
@@ -50,6 +50,11 @@ public:
                     ::brpc::HotspotsResponse* response,
                     ::google::protobuf::Closure* done);
 
+    void iobuf(::google::protobuf::RpcController* cntl_base,
+               const ::brpc::HotspotsRequest* request,
+               ::brpc::HotspotsResponse* response,
+               ::google::protobuf::Closure* done);
+
     void cpu_non_responsive(::google::protobuf::RpcController* cntl_base,
                             const ::brpc::HotspotsRequest* request,
                             ::brpc::HotspotsResponse* response,
@@ -70,6 +75,11 @@ public:
                                    ::brpc::HotspotsResponse* response,
                                    ::google::protobuf::Closure* done);
 
+    void iobuf_non_responsive(::google::protobuf::RpcController* cntl_base,
+                              const ::brpc::HotspotsRequest* request,
+                              ::brpc::HotspotsResponse* response,
+                              ::google::protobuf::Closure* done);
+
     void GetTabInfo(brpc::TabInfoList*) const;
 };
 
diff --git a/src/brpc/builtin/pprof_perl.cpp b/src/brpc/builtin/pprof_perl.cpp
index 798df7bd..0d859165 100644
--- a/src/brpc/builtin/pprof_perl.cpp
+++ b/src/brpc/builtin/pprof_perl.cpp
@@ -4172,7 +4172,8 @@ const char* pprof_perl() {
         "\n"
         "  while ( $line = <PROFILE> ) {\n"
         "    $line =~ s/\\r//g;      # turn windows-looking lines into 
unix-looking lines\n"
-        "    if ( $line =~ /^\\s*(\\d+)\\s+(\\d+) \\@\\s*(.*?)\\s*$/ ) {\n"
+        "    # Support negative count for IOBuf Profiler\n"
+        "    if ( $line =~ /^\\s*(\\d+)\\s+(-?\\d+) \\@\\s*(.*?)\\s*$/ ) {\n"
         "      my ($cycles, $count, $stack) = ($1, $2, $3);\n"
         "\n"
         "      # Convert cycles to nanoseconds\n"
diff --git a/src/brpc/builtin_service.proto b/src/brpc/builtin_service.proto
index 95d26bfa..bb28f44e 100644
--- a/src/brpc/builtin_service.proto
+++ b/src/brpc/builtin_service.proto
@@ -149,6 +149,8 @@ service hotspots {
     rpc growth_non_responsive(HotspotsRequest) returns (HotspotsResponse);
     rpc contention(HotspotsRequest) returns (HotspotsResponse);
     rpc contention_non_responsive(HotspotsRequest) returns (HotspotsResponse);
+    rpc iobuf(HotspotsRequest) returns (HotspotsResponse);
+    rpc iobuf_non_responsive(HotspotsRequest) returns (HotspotsResponse);
 }
 
 service flags {
diff --git a/src/bthread/mutex.cpp b/src/bthread/mutex.cpp
index b4008fd7..d22c8753 100644
--- a/src/bthread/mutex.cpp
+++ b/src/bthread/mutex.cpp
@@ -268,7 +268,7 @@ BAIDU_CACHELINE_ALIGNMENT static ContentionProfiler* g_cp = 
NULL;
 // Need this version to solve an issue that non-empty entries left by
 // previous contention profilers should be detected and overwritten.
 static uint64_t g_cp_version = 0;
-// Protecting accesss to g_cp.
+// Protecting accesses to g_cp.
 static pthread_mutex_t g_cp_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 // The map storing information for profiling pthread_mutex. Different from
diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp
index 3418f671..8895fb16 100644
--- a/src/butil/iobuf.cpp
+++ b/src/butil/iobuf.cpp
@@ -38,6 +38,7 @@
 #include "butil/logging.h"                  // CHECK, LOG
 #include "butil/fd_guard.h"                 // butil::fd_guard
 #include "butil/iobuf.h"
+#include "butil/iobuf_profiler.h"
 
 namespace butil {
 namespace iobuf {
@@ -155,7 +156,7 @@ inline iov_function get_pwritev_func() {
 
 #endif  // ARCH_CPU_X86_64
 
-inline void* cp(void *__restrict dest, const void *__restrict src, size_t n) {
+void* cp(void *__restrict dest, const void *__restrict src, size_t n) {
     // memcpy in gcc 4.8 seems to be faster enough.
     return memcpy(dest, src, n);
 }
@@ -193,7 +194,8 @@ size_t IOBuf::new_bigview_count() {
     return iobuf::g_newbigview.load(butil::memory_order_relaxed);
 }
 
-const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 0x1;
+const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 1 << 0;
+const uint16_t IOBUF_BLOCK_FLAGS_SAMPLED = 1 << 1;
 using UserDataDeleter = std::function<void(void*)>;
 
 struct UserDataExtension {
@@ -228,6 +230,9 @@ struct IOBuf::Block {
         iobuf::g_nblock.fetch_add(1, butil::memory_order_relaxed);
         iobuf::g_blockmem.fetch_add(data_size + sizeof(Block),
                                     butil::memory_order_relaxed);
+        if (is_samplable()) {
+            SubmitIOBufSample(this, 1);
+        }
     }
 
     Block(char* data_in, uint32_t data_size, UserDataDeleter deleter)
@@ -240,6 +245,9 @@ struct IOBuf::Block {
         , data(data_in) {
         auto ext = new (get_user_data_extension()) UserDataExtension();
         ext->deleter = std::move(deleter);
+        if (is_samplable()) {
+            SubmitIOBufSample(this, 1);
+        }
     }
 
     // Undefined behavior when (flags & IOBUF_BLOCK_FLAGS_USER_DATA) is 0.
@@ -260,13 +268,19 @@ struct IOBuf::Block {
     void inc_ref() {
         check_abi();
         nshared.fetch_add(1, butil::memory_order_relaxed);
+        if (sampled()) {
+            SubmitIOBufSample(this, 1);
+        }
     }
         
     void dec_ref() {
         check_abi();
+        if (sampled()) {
+            SubmitIOBufSample(this, -1);
+        }
         if (nshared.fetch_sub(1, butil::memory_order_release) == 1) {
             butil::atomic_thread_fence(butil::memory_order_acquire);
-            if (!flags) {
+            if (!is_user_data()) {
                 iobuf::g_nblock.fetch_sub(1, butil::memory_order_relaxed);
                 iobuf::g_blockmem.fetch_sub(cap + sizeof(Block),
                                             butil::memory_order_relaxed);
@@ -288,6 +302,23 @@ struct IOBuf::Block {
 
     bool full() const { return size >= cap; }
     size_t left_space() const { return cap - size; }
+
+private:
+    bool is_samplable() {
+        if (IsIOBufProfilerSamplable()) {
+            flags |= IOBUF_BLOCK_FLAGS_SAMPLED;
+            return true;
+        }
+        return false;
+    }
+
+    bool sampled() const {
+        return flags & IOBUF_BLOCK_FLAGS_SAMPLED;
+    }
+
+    bool is_user_data() const {
+        return flags & IOBUF_BLOCK_FLAGS_USER_DATA;
+    }
 };
 
 namespace iobuf {
@@ -329,6 +360,11 @@ inline IOBuf::Block* create_block() {
 // release_tls_block_chain() may exceed this limit sometimes.
 const int MAX_BLOCKS_PER_THREAD = 8;
 
+inline int max_blocks_per_thread() {
+    // If IOBufProfiler is enabled, do not cache blocks in TLS.
+    return IsIOBufProfilerEnabled() ? 0 : MAX_BLOCKS_PER_THREAD;
+}
+
 struct TLSData {
     // Head of the TLS block chain.
     IOBuf::Block* block_head;
@@ -410,7 +446,7 @@ inline void release_tls_block(IOBuf::Block* b) {
     TLSData& tls_data = g_tls_data;
     if (b->full()) {
         b->dec_ref();
-    } else if (tls_data.num_blocks >= MAX_BLOCKS_PER_THREAD) {
+    } else if (tls_data.num_blocks >= max_blocks_per_thread()) {
         b->dec_ref();
         g_num_hit_tls_threshold.fetch_add(1, butil::memory_order_relaxed);
     } else {
@@ -429,7 +465,7 @@ inline void release_tls_block(IOBuf::Block* b) {
 void release_tls_block_chain(IOBuf::Block* b) {
     TLSData& tls_data = g_tls_data;
     size_t n = 0;
-    if (tls_data.num_blocks >= MAX_BLOCKS_PER_THREAD) {
+    if (tls_data.num_blocks >= max_blocks_per_thread()) {
         do {
             ++n;
             IOBuf::Block* const saved_next = b->u.portal_next;
diff --git a/src/butil/iobuf_profiler.cpp b/src/butil/iobuf_profiler.cpp
new file mode 100644
index 00000000..bcb53391
--- /dev/null
+++ b/src/butil/iobuf_profiler.cpp
@@ -0,0 +1,316 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <fcntl.h>
+#include "butil/iobuf_profiler.h"
+#include "butil/strings/string_number_conversions.h"
+#include "butil/file_util.h"
+#include "butil/fd_guard.h"
+#include "butil/fast_rand.h"
+#include "butil/hash.h"
+#include <execinfo.h>
+
+extern int __attribute__((weak)) GetStackTrace(void** result, int max_depth, 
int skip_count);
+
+namespace butil {
+
+namespace iobuf {
+extern void* cp(void *__restrict dest, const void *__restrict src, size_t n);
+}
+
+// Max and min sleep time for IOBuf profiler consuming thread
+// when `_sample_queue' is empty.
+const uint32_t IOBufProfiler::MIN_SLEEP_MS = 10;
+const uint32_t IOBufProfiler::MAX_SLEEP_MS = 1000;
+
+static pthread_once_t g_iobuf_profiler_info_once = PTHREAD_ONCE_INIT;
+static bool g_iobuf_profiler_enabled = false;
+static uint g_iobuf_profiler_sample_rate = 100;
+
+// Environment variables:
+// 1. ENABLE_IOBUF_PROFILER: set value to 1 to enable IOBuf profiler.
+// 2. IOBUF_PROFILER_SAMPLE_RATE: set value between (0, 100] to control sample 
rate.
+static void InitGlobalIOBufProfilerInfo() {
+    const char* enabled = getenv("ENABLE_IOBUF_PROFILER");
+    g_iobuf_profiler_enabled = enabled && strcmp("1", enabled) == 0 && 
::GetStackTrace != NULL;
+    if (g_iobuf_profiler_enabled) {
+        return;
+    }
+
+    char* rate = getenv("IOBUF_PROFILER_SAMPLE_RATE");
+    if (rate) {
+        int tmp = 0;
+        if (butil::StringToInt(rate, &tmp)) {
+            if (tmp > 0 && tmp <= 100) {
+                g_iobuf_profiler_sample_rate = tmp;
+            } else {
+                LOG(ERROR) << "IOBUF_PROFILER_SAMPLE_RATE should be in (0, 
100], but get " << rate;
+            }
+        } else {
+            LOG(ERROR) << "IOBUF_PROFILER_SAMPLE_RATE should be a number, but 
get " << rate;
+        }
+    }
+    LOG(INFO) << "g_iobuf_profiler_sample_rate=" << 
g_iobuf_profiler_sample_rate;
+}
+
+bool IsIOBufProfilerEnabled() {
+    pthread_once(&g_iobuf_profiler_info_once, InitGlobalIOBufProfilerInfo);
+    return g_iobuf_profiler_enabled;
+}
+
+bool IsIOBufProfilerSamplable() {
+    pthread_once(&g_iobuf_profiler_info_once, InitGlobalIOBufProfilerInfo);
+    if (g_iobuf_profiler_sample_rate == 100) {
+        return true;
+    }
+    return fast_rand_less_than(100) + 1 <= g_iobuf_profiler_sample_rate;
+}
+
+size_t IOBufSample::stack_hash_code() const {
+    if (nframes == 0) {
+        return 0;
+    }
+    if (_hash_code == 0) {
+        _hash_code = SuperFastHash(reinterpret_cast<const char*>(stack),
+                                   sizeof(void*) * nframes);
+    }
+    return _hash_code;
+}
+
+IOBufProfiler* IOBufProfiler::GetInstance() {
+    return ::Singleton<IOBufProfiler, 
LeakySingletonTraits<IOBufProfiler>>::get();
+}
+
+IOBufProfiler::IOBufProfiler()
+    : butil::SimpleThread("IOBufProfiler")
+    , _stop(false)
+    , _sleep_ms(MIN_SLEEP_MS) {
+    CHECK_EQ(0, _stack_map.init(1024));
+    CHECK_EQ(0, _block_info_map.init(1024));
+    Start();
+}
+
+IOBufProfiler::~IOBufProfiler() {
+    StopAndJoin();
+    _block_info_map.clear();
+    _stack_map.clear();
+
+    // Clear `_sample_queue'.
+    IOBufSample* sample = NULL;
+    while (_sample_queue.Dequeue(sample)) {
+        IOBufSample::Destroy(sample);
+    }
+
+}
+
+void IOBufProfiler::Submit(IOBufSample* s) {
+    if (_stop.load(butil::memory_order_relaxed)) {
+        return;
+    }
+
+    _sample_queue.Enqueue(s);
+}
+
+void IOBufProfiler::Dump(IOBufSample* s) {
+    do {
+        BAIDU_SCOPED_LOCK(_mutex);
+        // Categorize the stack.
+        IOBufRefSampleSharedPtr stack_sample;
+        IOBufRefSampleSharedPtr* stack_ptr = _stack_map.seek(s);
+        if (!stack_ptr) {
+            stack_sample = IOBufSample::CopyAndSharedWithDestroyer(s);
+            stack_sample->block = NULL;
+            stack_ptr = &_stack_map[stack_sample.get()];
+            *stack_ptr = stack_sample;
+        } else {
+            (*stack_ptr)->count += s->count;
+        }
+
+        BlockInfo* info = _block_info_map.seek(s->block);
+        if (info) {
+            // Categorize the IOBufSample.
+            info->stack_count_map[*stack_ptr] += s->count;
+            info->ref += s->count;
+            if (info->ref == 0) {
+                for (const auto& iter : info->stack_count_map) {
+                    IOBufRefSampleSharedPtr* stack_ptr2 = 
_stack_map.seek(iter.first.get());
+                    if (stack_ptr2 && *stack_ptr2) {
+                        (*stack_ptr2)->count -= iter.second;
+                        if ((*stack_ptr2)->count == 0) {
+                            _stack_map.erase(iter.first.get());
+                        }
+                    }
+                }
+                _block_info_map.erase(s->block);
+                break;
+            }
+        } else {
+            BlockInfo& new_info = _block_info_map[s->block];
+            new_info.ref += s->count;
+            CHECK_EQ(0, new_info.stack_count_map.init(64));
+            new_info.stack_count_map[*stack_ptr] = s->count;
+        }
+    } while (false);
+    s->block = NULL;
+}
+
+IOBufSample* IOBufSample::Copy(IOBufSample* ref) {
+    auto copied = IOBufSample::New();
+    copied->block = ref->block;
+    copied->count = ref->count;
+    copied->_hash_code = ref->_hash_code;
+    copied->nframes = ref->nframes;
+    iobuf::cp(copied->stack, ref->stack, sizeof(void*) * ref->nframes);
+    return copied;
+}
+
+IOBufRefSampleSharedPtr IOBufSample::CopyAndSharedWithDestroyer(IOBufSample* 
ref) {
+    return { Copy(ref), detail::Destroyer() };
+}
+
+void IOBufProfiler::Flush2Disk(const char* filename) {
+    if (!filename) {
+        LOG(ERROR) << "Parameter [filename] is NULL";
+        return;
+    }
+    // Serialize contentions in _stack_map into _disk_buf.
+    _disk_buf.append("--- contention\ncycles/second=1000000000\n");
+    butil::IOBufBuilder os;
+    {
+        BAIDU_SCOPED_LOCK(_mutex);
+        for (auto it = _stack_map.begin(); it != _stack_map.end(); ++it) {
+            const IOBufRefSampleSharedPtr& c = it->second;
+            if (c->nframes == 0) {
+                LOG_EVERY_SECOND(WARNING) << "Invalid stack with nframes=0, 
count=" << c->count;
+                continue;
+            }
+            os << "0 " << c->count << " @";
+            for (int i = 0; i < c->nframes; ++i) {
+                os << " " << c->stack[i];
+            }
+            os << "\n";
+        }
+    }
+    _disk_buf.append(os.buf().movable());
+
+    // Append /proc/self/maps to the end of the contention file, required by
+    // pprof.pl, otherwise the functions in sys libs are not interpreted.
+    butil::IOPortal mem_maps;
+    const butil::fd_guard maps_fd(open("/proc/self/maps", O_RDONLY));
+    if (maps_fd >= 0) {
+        while (true) {
+            ssize_t nr = mem_maps.append_from_file_descriptor(maps_fd, 8192);
+            if (nr < 0) {
+                if (errno == EINTR) {
+                    continue;
+                }
+                PLOG(ERROR) << "Fail to read /proc/self/maps";
+                break;
+            }
+            if (nr == 0) {
+                _disk_buf.append(mem_maps);
+                break;
+            }
+        }
+    } else {
+        PLOG(ERROR) << "Fail to open /proc/self/maps";
+    }
+    // Write _disk_buf into _filename
+    butil::File::Error error;
+    butil::FilePath file_path(filename);
+    butil::FilePath dir = file_path.DirName();
+    if (!butil::CreateDirectoryAndGetError(dir, &error)) {
+        LOG(ERROR) << "Fail to create directory=`" << dir.value()
+                   << ": " << error;
+        return;
+    }
+
+    butil::fd_guard fd(open(filename, O_WRONLY | O_CREAT | O_APPEND | O_TRUNC, 
0666));
+    if (fd < 0) {
+        PLOG(ERROR) << "Fail to open " << filename;
+        return;
+    }
+    // Write once normally, write until empty in the end.
+    do {
+        ssize_t nw = _disk_buf.cut_into_file_descriptor(fd);
+        if (nw < 0) {
+            if (errno == EINTR) {
+                continue;
+            }
+            PLOG(ERROR) << "Fail to write into " << filename;
+            return;
+        }
+        LOG(ERROR) << "Write " << nw << " bytes into " << filename;
+    } while (!_disk_buf.empty());
+}
+
+void IOBufProfiler::StopAndJoin() {
+    if (_stop.exchange(true, butil::memory_order_relaxed)) {
+        return;
+    }
+    if (!HasBeenJoined()) {
+        Join();
+    }
+}
+
+void IOBufProfiler::Run() {
+    while (!_stop.load(butil::memory_order_relaxed)) {
+        Consume();
+        ::usleep(_sleep_ms * 1000);
+    }
+}
+
+void IOBufProfiler::Consume() {
+    IOBufSample* sample = NULL;
+    bool is_empty = true;
+    while (_sample_queue.Dequeue(sample)) {
+        Dump(sample);
+        IOBufSample::Destroy(sample);
+        is_empty = false;
+    }
+
+    // If `_sample_queue' is empty, exponential increase in sleep time.
+    _sleep_ms = !is_empty ? MIN_SLEEP_MS :
+                std::min(_sleep_ms * 2, MAX_SLEEP_MS);
+}
+
+void SubmitIOBufSample(IOBuf::Block* block, int64_t ref) {
+    if (!IsIOBufProfilerEnabled()) {
+        return;
+    }
+
+    auto sample = IOBufSample::New();
+    sample->block = block;
+    sample->count = ref;
+    sample->nframes = GetStackTrace(sample->stack,
+                                    arraysize(sample->stack),
+                                    0); // may lock
+    IOBufProfiler::GetInstance()->Submit(sample);
+}
+
+bool IOBufProfilerFlush(const char* filename) {
+    if (!filename) {
+        LOG(ERROR) << "Parameter [filename] is NULL";
+        return false;
+    }
+
+    auto profiler = IOBufProfiler::GetInstance();
+    profiler->Flush2Disk(filename);
+    return true;
+}
+
+}
\ No newline at end of file
diff --git a/src/butil/iobuf_profiler.h b/src/butil/iobuf_profiler.h
new file mode 100644
index 00000000..7178f20a
--- /dev/null
+++ b/src/butil/iobuf_profiler.h
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BIGO_BRPC_IOBUF_PROFILER_H
+#define BIGO_BRPC_IOBUF_PROFILER_H
+
+#include <unordered_map>
+#include <unordered_set>
+#include "butil/iobuf.h"
+#include "butil/object_pool.h"
+#include "butil/threading/simple_thread.h"
+#include "butil/threading/simple_thread.h"
+#include "butil/memory/singleton.h"
+#include "butil/containers/flat_map.h"
+#include "butil/containers/mpsc_queue.h"
+
+namespace butil {
+
+struct IOBufSample;
+typedef std::shared_ptr<IOBufSample> IOBufRefSampleSharedPtr;
+
+struct IOBufSample {
+    IOBufSample* next;
+    IOBuf::Block* block;
+    int64_t count; // // reference count of the block.
+    void* stack[28]; // backtrace.
+    int nframes; // num of frames in stack.
+
+    static IOBufSample* New() {
+        return get_object<IOBufSample>();
+    }
+
+    static IOBufSample* Copy(IOBufSample* ref);
+    static IOBufRefSampleSharedPtr CopyAndSharedWithDestroyer(IOBufSample* 
ref);
+    static void Destroy(IOBufSample* ref) {
+        ref->_hash_code = 0;
+        return_object(ref);
+    }
+
+    size_t stack_hash_code() const;
+
+private:
+    friend ObjectPool<IOBufSample>;
+
+    IOBufSample()
+        : next(NULL)
+        , block(NULL)
+        , count(0)
+        , stack{}
+        , nframes(0)
+        , _hash_code(0) {}
+
+    ~IOBufSample() = default;
+
+    mutable uint32_t _hash_code; // For combining samples with hashmap.
+};
+
+BAIDU_CASSERT(sizeof(IOBufSample) == 256, be_friendly_to_allocator);
+
+namespace detail {
+// Functor to compare IOBufRefSample.
+template <typename T>
+struct IOBufSampleEqual {
+    bool operator()(const T& c1, const T& c2) const {
+        return c1->stack_hash_code() ==c2->stack_hash_code() &&
+            c1->nframes == c2->nframes &&
+            memcmp(c1->stack, c2->stack, sizeof(void*) * c1->nframes) == 0;
+    }
+};
+
+// Functor to hash IOBufRefSample.
+template <typename T>
+struct IOBufSampleHash {
+    size_t operator()(const T& c) const {
+        return c->stack_hash_code();
+    }
+};
+
+struct Destroyer {
+    void operator()(IOBufSample* ref) const {
+        if (ref) {
+            IOBufSample::Destroy(ref);
+        }
+    }
+};
+}
+
+class IOBufProfiler : public butil::SimpleThread {
+public:
+    static IOBufProfiler* GetInstance();
+
+    // Submit the IOBuf sample along with stacktrace.
+    void Submit(IOBufSample* s);
+    // Dump IOBuf sample to map.
+    void Dump(IOBufSample* s);
+    // Write buffered data into resulting file.
+    void Flush2Disk(const char* filename);
+
+    void StopAndJoin();
+
+private:
+    friend struct DefaultSingletonTraits<IOBufProfiler>;
+
+    typedef butil::FlatMap<IOBufSample*,
+                           IOBufRefSampleSharedPtr,
+                           detail::IOBufSampleHash<IOBufSample*>,
+                           detail::IOBufSampleEqual<IOBufSample*>> IOBufRefMap;
+
+    // <iobuf stack, ref count>
+    typedef butil::FlatMap<IOBufRefSampleSharedPtr,
+                           int64_t,
+                           detail::IOBufSampleHash<IOBufRefSampleSharedPtr>,
+                           detail::IOBufSampleEqual<IOBufRefSampleSharedPtr>> 
StackCountMap;
+    struct BlockInfo {
+        int64_t ref{0};
+        StackCountMap stack_count_map;
+    };
+    typedef butil::FlatMap<IOBuf::Block*, BlockInfo> BlockInfoMap;
+
+    IOBufProfiler();
+    ~IOBufProfiler() override;
+    DISALLOW_COPY_AND_ASSIGN(IOBufProfiler);
+
+    void Run() override;
+    // Consume the IOBuf sample in _sample_queue.
+    void Consume();
+
+    // Stop flag of IOBufProfiler.
+    butil::atomic<bool> _stop;
+    // IOBuf sample queue.
+    MPSCQueue<IOBufSample*> _sample_queue;
+
+    // Temp buf before saving the file.
+    butil::IOBuf _disk_buf;
+    // Combining same samples to make result smaller.
+    IOBufRefMap _stack_map;
+    // Record block info.
+    BlockInfoMap _block_info_map;
+    Mutex _mutex;
+
+    // Sleep when `_sample_queue' is empty.
+    uint32_t _sleep_ms;
+    static const uint32_t MIN_SLEEP_MS;
+    static const uint32_t MAX_SLEEP_MS;
+};
+
+bool IsIOBufProfilerEnabled();
+bool IsIOBufProfilerSamplable();
+
+void SubmitIOBufSample(IOBuf::Block* block, int64_t ref);
+
+bool IOBufProfilerFlush(const char* filename);
+
+}
+#endif //BIGO_BRPC_IOBUF_PROFILER_H


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org


Reply via email to