This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new e8f459284 [logging] simplify and improve LogThrottler
e8f459284 is described below

commit e8f459284164b0d82d6120684b17d2f30b4ddb4a
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon Dec 18 19:16:33 2023 -0800

    [logging] simplify and improve LogThrottler
    
    I've noticed that among all the usages of LogThrottler throughout the
    current codebase, the alternating tags functionality was used only in
    a single place.  That single usage was extremely confusing since the
    throttling interval for the former KLOG_EVERY_N_SECS_THROTTLER macro was
    different while using it with the same LogThrottler instance, so the
    throttling frequency was unpredictable and didn't make much sense,
    especially when trying to analyze the generated logs post-mortem.
    Getting rid of the tag alterations allows for simpler logic and cleaner
    code in the LogThrottler::ShouldLog() method as well.
    
    I also noticed that thousands of messages might be suppressed and go
    unreported for the rest of a Kudu server's life-cycle, but for some
    scenarios it could be beneficial to report on the number of suppressions
    when the server shuts down, so there were a way to assess how massive
    the very last surge in throttled log messages had been.
    
    This patch contains the following updates on LogThrottler:
      * The alternating tags for the LogThrottler are gone along with the
        corresponding macro KLOG_EVERY_N_SECS_THROTTLER(), and that's
        replaced with the new KLOG_THROTTLER() macro.
      * LogThrottler::ShouldLog() is rewritten using std::atomic for
        all the fields that require atomicity.  There is no need having
        the ANNOTATE_BENIGN_RACE_SIZED annotation anymore to cover up
        the sloppiness of the former code.
      * Throttling interval is now a parameter of the LogThrottler's
        constructor, and is an invariant of a LogThrottler class instance.
      * LogThrottler's destructor now logs about the number of suppressed
        but not reported messages since logging about them last time.
      * ostream& operator<<(ostream, const PRIVATE_ThrottleMsg&) is
        optimized to not perform dynamic casting in release builds: if
        there is a call site that uses not LogMessage::LogStream-derived
        output stream, that should have been caught during the code review
        process or by pre-commit tests running against DEBUG binaries.
    
    Change-Id: I6c26c4fb11c40bfba3e668d210bdaa1d497c51d0
    Reviewed-on: http://gerrit.cloudera.org:8080/20820
    Reviewed-by: Abhishek Chennaka <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
---
 src/kudu/consensus/consensus_queue.cc |   9 ++-
 src/kudu/consensus/consensus_queue.h  |   7 ++-
 src/kudu/server/server_base.cc        |  14 ++---
 src/kudu/util/logging-test.cc         |  56 +++++++++--------
 src/kudu/util/logging.cc              |  45 ++++++++++----
 src/kudu/util/logging.h               | 111 +++++++++++++++-------------------
 6 files changed, 130 insertions(+), 112 deletions(-)

diff --git a/src/kudu/consensus/consensus_queue.cc 
b/src/kudu/consensus/consensus_queue.cc
index 01368616a..bda268183 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -151,7 +151,10 @@ PeerMessageQueue::TrackedPeer::TrackedPeer(RaftPeerPB 
peer_pb)
       wal_catchup_possible(true),
       remote_server_quiescing(false),
       last_overall_health_status(HealthReportPB::UNKNOWN),
-      status_log_throttler(std::make_shared<logging::LogThrottler>()),
+      status_log_throttler_lag(
+          std::make_shared<logging::LogThrottler>(3, "TrackedPeer Lag")),
+      status_log_throttler_wal_gc(
+          std::make_shared<logging::LogThrottler>(60, "TrackedPeer WAL GC")),
       last_seen_term_(0) {
 }
 
@@ -734,7 +737,7 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
       // the leader has GCed its logs. The follower replica will hang around
       // for a while until it's evicted.
       if (PREDICT_TRUE(s.IsNotFound())) {
-        KLOG_EVERY_N_SECS_THROTTLER(INFO, 60, *peer_copy.status_log_throttler, 
"logs_gced")
+        KLOG_THROTTLER(INFO, *peer_copy.status_log_throttler_wal_gc)
             << LogPrefixUnlocked()
             << Substitute("The logs necessary to catch up peer $0 have been "
                           "garbage collected. The follower will never be able "
@@ -780,7 +783,7 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
   if (request->ops_size() > 0) {
     int64_t last_op_sent = request->ops(request->ops_size() - 1).id().index();
     if (last_op_sent < request->committed_index()) {
-      KLOG_EVERY_N_SECS_THROTTLER(INFO, 3, *peer_copy.status_log_throttler, 
"lagging")
+      KLOG_THROTTLER(INFO, *peer_copy.status_log_throttler_lag)
           << LogPrefixUnlocked() << "Peer " << uuid << " is lagging by at 
least "
           << (request->committed_index() - last_op_sent)
           << " ops behind the committed index " << THROTTLE_MSG;
diff --git a/src/kudu/consensus/consensus_queue.h 
b/src/kudu/consensus/consensus_queue.h
index bbe68fa0a..68cae6351 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -173,9 +173,10 @@ class PeerMessageQueue {
     // The peer's latest overall health status.
     HealthReportPB::HealthStatus last_overall_health_status;
 
-    // Throttler for how often we will log status messages pertaining to this
-    // peer (eg when it is lagging, etc).
-    std::shared_ptr<logging::LogThrottler> status_log_throttler;
+    // Throttlers for how often we will log status messages pertaining to this
+    // peer: when it is lagging, WAL are GC-ed and it cannot catch up.
+    std::shared_ptr<logging::LogThrottler> status_log_throttler_lag;
+    std::shared_ptr<logging::LogThrottler> status_log_throttler_wal_gc;
 
    private:
     // The last term we saw from a given peer.
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 8b5f554b0..ff3927008 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -48,7 +48,6 @@
 #include "kudu/fs/fs_report.h"
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/map-util.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strcat.h"
@@ -1367,15 +1366,16 @@ void ServerBase::UnregisterAllServices() {
 }
 
 void ServerBase::ServiceQueueOverflowed(rpc::ServicePool* service) {
-  if (!diag_log_) return;
+  if (!diag_log_) {
+    return;
+  }
 
   // Logging all of the stacks is relatively heavy-weight, so if we are in a 
persistent
   // state of overload, it's probably not a good idea to start compounding the 
issue with
-  // a lot of stack-logging activity. So, we limit the frequency of 
stack-dumping.
-  static logging::LogThrottler throttler;
-  const int kStackDumpFrequencySecs = 5;
-  int suppressed = 0;
-  if (PREDICT_TRUE(!throttler.ShouldLog(kStackDumpFrequencySecs, "", 
&suppressed))) {
+  // a lot of stack-logging activity. So, we limit the frequency of 
stack-dumping
+  // to happen no more than once in five (5) seconds.
+  static logging::LogThrottler throttler(5, "Service Queue Overflow");
+  if (int64_t suppressed = 0; !throttler.ShouldLog(&suppressed)) {
     return;
   }
 
diff --git a/src/kudu/util/logging-test.cc b/src/kudu/util/logging-test.cc
index 3f087618d..6855a6d70 100644
--- a/src/kudu/util/logging-test.cc
+++ b/src/kudu/util/logging-test.cc
@@ -125,37 +125,43 @@ TEST(LoggingTest, ThrottledLoggingShortBurst) {
   }
 }
 
-TEST(LoggingTest, TestAdvancedThrottling) {
+TEST(LoggingTest, LogThrottleDestructorReport) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
   StringVectorSink sink;
   ScopedRegisterSink srs(&sink);
 
-  logging::LogThrottler throttle_a;
+  {
+    logging::LogThrottler throttler(5, "in test");
 
-  // First, log only using a single tag and throttler.
-  for (int i = 0; i < 100000; i++) {
-    KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_a") << "test" << 
THROTTLE_MSG;
-    SleepFor(MonoDelta::FromMilliseconds(1));
-    if (sink.logged_msgs().size() >= 2) break;
+    for (int i = 0; i < 100; ++i) {
+      KLOG_THROTTLER(INFO, throttler) << "test " << i << THROTTLE_MSG;
+    }
+    // Sleep for a second to check for the proper timings reported in the
+    // summary output by the LogThrottler's destructor.
+    SleepFor(MonoDelta::FromMilliseconds(1000));
+
+    const auto& msgs = sink.logged_msgs();
+    // Only the very first message in the burst is accounted for.
+    ASSERT_EQ(1, msgs.size());
+    ASSERT_THAT(msgs.front(), testing::ContainsRegex("test 0$"));
+
+    for (const auto& m: msgs) {
+      // No information on suppressed messages yet.
+      ASSERT_STR_NOT_CONTAINS(m, "suppressed");
+    }
   }
-  auto& msgs = sink.logged_msgs();
-  ASSERT_GE(msgs.size(), 2);
 
-  // The first log line shouldn't have a suppression count.
-  EXPECT_THAT(msgs[0], testing::ContainsRegex("test$"));
-  // The second one should have suppressed at least three digits worth of log 
messages.
-  EXPECT_THAT(msgs[1], testing::ContainsRegex("\\[suppressed [0-9]{3,} similar 
messages\\]"));
-  msgs.clear();
-
-  // Now, try logging using two different tags in rapid succession. This 
should not
-  // throttle, because the tag is switching.
-  KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << 
THROTTLE_MSG;
-  KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << 
THROTTLE_MSG;
-  KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_c") << "test c" << 
THROTTLE_MSG;
-  KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << 
THROTTLE_MSG;
-  ASSERT_EQ(msgs.size(), 3);
-  EXPECT_THAT(msgs[0], testing::ContainsRegex("test b$"));
-  EXPECT_THAT(msgs[1], testing::ContainsRegex("test c$"));
-  EXPECT_THAT(msgs[2], testing::ContainsRegex("test b$"));
+  // The throttler should report on the suppressed but not yet logged messages
+  // in the destructor. To avoid flakiness due to scheduling anomalies on busy
+  // test nodes, the expected timing is flexible to accommodate for several
+  // extra seconds between the first message logged and the time when the
+  // LogThrottler's destructor has run.
+  const auto& msgs = sink.logged_msgs();
+  ASSERT_EQ(2, msgs.size());
+  ASSERT_THAT(msgs.back(), testing::ContainsRegex(
+      "suppressed but not reported on 99 messages "
+      "since previous log \\~[1-9] seconds ago"));
 }
 
 // Test Logger implementation that just counts the number of messages
diff --git a/src/kudu/util/logging.cc b/src/kudu/util/logging.cc
index 5e21493f4..215e7999a 100644
--- a/src/kudu/util/logging.cc
+++ b/src/kudu/util/logging.cc
@@ -19,6 +19,8 @@
 
 #include <unistd.h>
 
+#include <atomic>
+#include <cstddef>
 #include <cstdint>
 #include <cstdio>
 #include <cstdlib>
@@ -35,6 +37,8 @@
 #include <glog/logging.h>
 
 #include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/casts.h"
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/spinlock.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -73,6 +77,7 @@ TAG_FLAG(max_log_files, stable);
 
 bool logging_initialized = false;
 
+using strings::Substitute;
 using base::SpinLock;
 using base::SpinLockHolder;
 using boost::uuids::random_generator;
@@ -388,8 +393,8 @@ Status DeleteExcessLogFiles(Env* env) {
   for (int severity = 0; severity < google::NUM_SEVERITIES; ++severity) {
     // Build glob pattern for input
     // e.g. /var/log/kudu/kudu-master.*.INFO.*
-    string pattern = strings::Substitute("$0/$1.*.$2.*", FLAGS_log_dir, 
FLAGS_log_filename,
-                                         google::GetLogSeverityName(severity));
+    string pattern = Substitute("$0/$1.*.$2.*", FLAGS_log_dir, 
FLAGS_log_filename,
+                                google::GetLogSeverityName(severity));
 
     // Keep the 'max_log_files' most recent log files, as compared by
     // modification time. Glog files contain a second-granularity timestamp in
@@ -402,18 +407,34 @@ Status DeleteExcessLogFiles(Env* env) {
   return Status::OK();
 }
 
+namespace logging {
+
+LogThrottler::~LogThrottler() {
+  const auto num_not_reported = 
num_suppressed_.load(std::memory_order_acquire);
+  if (num_not_reported > 0) {
+    const auto ts = GetMonoTimeMicros() - 
last_ts_.load(std::memory_order_acquire);
+    string instance_info = "LogThrottler";
+    if (string_id_) {
+      instance_info += " ";
+      instance_info += string_id_;
+      if (num_id_ > 0) {
+        instance_info += ":" + std::to_string(num_id_);
+      }
+    }
+    LOG(INFO) << Substitute("$0: suppressed but not reported on $1 messages "
+                            "since previous log ~$2 seconds ago",
+                            instance_info, num_not_reported, ts / 1000000);
+  }
+}
+
+} // namespace logging
+
 // Support for the special THROTTLE_MSG token in a log message stream.
-ostream& operator<<(ostream &os, const PRIVATE_ThrottleMsg& /*unused*/) {
+ostream& operator<<(ostream& os, const PRIVATE_ThrottleMsg& /*unused*/) {
   using google::LogMessage;
-#ifdef DISABLE_RTTI
-  LogMessage::LogStream *log = static_cast<LogMessage::LogStream*>(&os);
-#else
-  LogMessage::LogStream *log = dynamic_cast<LogMessage::LogStream*>(&os);
-#endif
-  CHECK(log && log == log->self())
-      << "You must not use COUNTER with non-glog ostream";
-  size_t ctr = log->ctr();
-  if (ctr > 0) {
+  LogMessage::LogStream* log = down_cast<LogMessage::LogStream*>(&os);
+  DCHECK(log && log == log->self()) << "COUNTER is for glog LogStream only";
+  if (auto ctr = log->ctr(); ctr > 0) {
     os << " [suppressed " << ctr << " similar messages]";
   }
   return os;
diff --git a/src/kudu/util/logging.h b/src/kudu/util/logging.h
index b88a094fe..93ba90efd 100644
--- a/src/kudu/util/logging.h
+++ b/src/kudu/util/logging.h
@@ -14,18 +14,16 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_UTIL_LOGGING_H
-#define KUDU_UTIL_LOGGING_H
+#pragma once
 
-#include <cstddef>
+#include <atomic>
+#include <cstdint>
 #include <iosfwd>
 #include <string>
 
 #include <glog/logging.h>
 
-#include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/dynamic_annotations.h"
-#include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/walltime.h"
 #include "kudu/util/logging_callback.h"
@@ -133,49 +131,39 @@ class ScopedDisableRedaction {
 // -----------------------------------
 // For cases where the throttling should be scoped to a given class instance,
 // you may define a logging::LogThrottler object and pass it to the
-// KLOG_EVERY_N_SECS_THROTTLER(...) macro. In addition, you must pass a "tag".
-// Only log messages with equal tags (by pointer equality) will be throttled.
-// For example:
+// KLOG_THROTTLER(...) macro. For example:
 //
 //    struct MyThing {
 //      string name;
-//      LogThrottler throttler;
+//      LogThrottler throttler(10);
 //    };
 //
 //    if (...) {
-//      LOG_EVERY_N_SECS_THROTTLER(INFO, 1, my_thing->throttler, "coffee") <<
-//        my_thing->name << " needs coffee!";
-//    } else {
-//      LOG_EVERY_N_SECS_THROTTLER(INFO, 1, my_thing->throttler, "wine") <<
-//        my_thing->name << " needs wine!";
+//      KLOG_THROTTLER(INFO, my_thing->throttler)
+//          << my_thing->name << " needs coffee!";
 //    }
 //
-// In this example, the "coffee"-related message will be collapsed into other
-// such messages within the prior one second; however, if the state alternates
-// between the "coffee" message and the "wine" message, then each such 
alternation
-// will yield a message.
-
-#define KLOG_EVERY_N_SECS_THROTTLER(severity, n_secs, throttler, tag) \
-  int VARNAME_LINENUM(num_suppressed) = 0;                            \
-  if ((throttler).ShouldLog(n_secs, tag, &VARNAME_LINENUM(num_suppressed)))  \
-    google::LogMessage( \
-      __FILE__, __LINE__, google::GLOG_ ## severity, 
VARNAME_LINENUM(num_suppressed), \
-      &google::LogMessage::SendToLog).stream()
+// In this example, the message will be collapsed into other such messages
+// within the prior ten seconds.
+
+#define KLOG_THROTTLER(severity, throttler)                                   \
+  if (int64_t _nsupp = 0; (throttler).ShouldLog(&_nsupp))                     \
+    google::LogMessage(__FILE__, __LINE__, google::GLOG_ ## severity, _nsupp, \
+                         &google::LogMessage::SendToLog).stream()
 
 #define KLOG_EVERY_N_SECS(severity, n_secs) \
-  static ::kudu::logging::LogThrottler LOG_THROTTLER;  \
-  KLOG_EVERY_N_SECS_THROTTLER(severity, n_secs, LOG_THROTTLER, "no-tag")
+  static ::kudu::logging::LogThrottler LOG_THROTTLER(n_secs, __FILE__, 
__LINE__); \
+  KLOG_THROTTLER(severity, LOG_THROTTLER)
 
-#define WARN_NOT_OK_EVERY_N_SECS(to_call, warning_prefix, n_secs) do {         
        \
-    const ::kudu::Status& _s = (to_call);                                      
        \
-    if (PREDICT_FALSE(!_s.ok())) {                                             
        \
-      KLOG_EVERY_N_SECS(WARNING, n_secs) << (warning_prefix) << ": " << 
_s.ToString()  \
-                                         << THROTTLE_MSG;                      
        \
-    }                                                                          
        \
-  } while (0)
+#define WARN_NOT_OK_EVERY_N_SECS(to_call, warning_prefix, n_secs) do {        \
+    if (const ::kudu::Status& _s = (to_call); PREDICT_FALSE(!_s.ok())) {      \
+      KLOG_EVERY_N_SECS(WARNING, n_secs)                                      \
+          << (warning_prefix) << ": " << _s.ToString() << THROTTLE_MSG;       \
+    }                                                                         \
+  } while (false)
 
 namespace kudu {
-enum PRIVATE_ThrottleMsg {THROTTLE_MSG};
+enum PRIVATE_ThrottleMsg {THROTTLE_MSG}; // 
NOLINT(readability-identifier-naming)
 } // namespace kudu
 
 
////////////////////////////////////////////////////////////////////////////////
@@ -316,41 +304,42 @@ namespace logging {
 // A LogThrottler instance tracks the throttling state for a particular
 // log message.
 //
-// This is used internally by KLOG_EVERY_N_SECS, but can also be used
-// explicitly in conjunction with KLOG_EVERY_N_SECS_THROTTLER. See the
-// macro descriptions above for details.
-class LogThrottler {
+// This is used internally by KLOG_EVERY_N_SECS, but can also be used 
explicitly
+// in conjunction with KLOG_THROTTLER. See the macro descriptions above
+// for details.
+class LogThrottler final {
  public:
-  LogThrottler() : num_suppressed_(0), last_ts_(0), last_tag_(nullptr) {
-    ANNOTATE_BENIGN_RACE_SIZED(this, sizeof(*this), "OK to be sloppy with log 
throttling");
+  explicit LogThrottler(uint32_t n_secs,
+                        const char* string_id = nullptr,
+                        uint32_t num_id = 0)
+      : throttle_interval_us_(n_secs * 1000000LL),
+        string_id_(string_id),
+        num_id_(num_id),
+        last_ts_(0),
+        num_suppressed_(0) {
   }
 
-  bool ShouldLog(size_t n_secs, const char* tag, int* num_suppressed) {
-    MicrosecondsInt64 ts = GetMonoTimeMicros();
-
-    // When we switch tags, we should not show the "suppressed" messages, 
because
-    // in fact it's a different message that we skipped. So, reset it to zero,
-    // and always log the new message.
-    if (tag != last_tag_) {
-      *num_suppressed = num_suppressed_ = 0;
-      last_tag_ = tag;
-      last_ts_ = ts;
-      return true;
-    }
+  ~LogThrottler();
 
-    if (ts - last_ts_ < n_secs * 1000000) {
-      *num_suppressed = 
base::subtle::NoBarrier_AtomicIncrement(&num_suppressed_, 1);
+  bool ShouldLog(int64_t* num_suppressed) {
+    const MicrosecondsInt64 ts = GetMonoTimeMicros();
+    if (ts - last_ts_.load(std::memory_order_acquire) < throttle_interval_us_) 
{
+      *num_suppressed = num_suppressed_.fetch_add(1, 
std::memory_order_relaxed);
       return false;
     }
-    last_ts_ = ts;
-    *num_suppressed = base::subtle::NoBarrier_AtomicExchange(&num_suppressed_, 
0);
+    last_ts_.store(ts, std::memory_order_release);
+    *num_suppressed = num_suppressed_.exchange(0, std::memory_order_acq_rel);
     return true;
   }
+
  private:
-  Atomic32 num_suppressed_;
-  MicrosecondsInt64 last_ts_;
-  const char* last_tag_;
+  const int64_t throttle_interval_us_;
+  const char* const string_id_;
+  const uint32_t num_id_;
+  std::atomic<MicrosecondsInt64> last_ts_;
+  std::atomic<int64_t> num_suppressed_;
 };
+
 } // namespace logging
 
 std::ostream& operator<<(std::ostream &os, const PRIVATE_ThrottleMsg&);
@@ -369,5 +358,3 @@ std::ostream& operator<<(std::ostream &os, const 
PRIVATE_ThrottleMsg&);
   << LogPrefix()
 
 } // namespace kudu
-
-#endif // KUDU_UTIL_LOGGING_H

Reply via email to