This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e471f58f76d1b7358f3ef58d1c7d517c95884e62
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Nov 6 20:36:13 2024 -0800

    [consensus] replace chromium atomics with STL ones
    
    This patch replaces chromium-derived atomics with STL-based ones,
    but otherwise it doesn't contain any functional modifications.
    
    Change-Id: If8de407e58ddca70097112360c0cdcba97465ce8
    Reviewed-on: http://gerrit.cloudera.org:8080/22038
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Yifan Zhang <[email protected]>
---
 src/kudu/consensus/log.cc      | 27 +++++++++++++++------------
 src/kudu/consensus/log_util.cc | 20 +++++++++++---------
 src/kudu/consensus/log_util.h  | 11 ++++++-----
 3 files changed, 32 insertions(+), 26 deletions(-)

diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index ca93cb94b..42c7cf773 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/consensus/log.h"
 
+#include <atomic>
 #include <cerrno>
 #include <cstdint>
 #include <functional>
@@ -38,8 +39,6 @@
 #include "kudu/consensus/log_reader.h"
 #include "kudu/consensus/log_util.h"
 #include "kudu/fs/fs_manager.h"
-#include "kudu/gutil/atomicops.h"
-#include "kudu/gutil/dynamic_annotations.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -179,6 +178,7 @@ DEFINE_validator(log_min_segments_to_retain, 
&ValidateLogsToRetain);
 using kudu::consensus::CommitMsg;
 using kudu::consensus::OpId;
 using kudu::consensus::ReplicateRefPtr;
+using std::atomic;
 using std::shared_lock;
 using std::string;
 using std::unique_ptr;
@@ -235,7 +235,7 @@ class Log::AppendThread {
   void Wake();
 
   bool active() const {
-    return base::subtle::NoBarrier_Load(&thread_state_) == ACTIVE;
+    return thread_state_.load(std::memory_order_relaxed) == ACTIVE;
   }
 
  private:
@@ -264,7 +264,7 @@ class Log::AppendThread {
     // A task is queued or running.
     ACTIVE
   };
-  Atomic32 thread_state_ = IDLE;
+  atomic<uint32_t> thread_state_ = IDLE;
 
   // Pool with a single thread, which handles shutting down the thread
   // when idle.
@@ -293,9 +293,11 @@ Status Log::AppendThread::Init() {
 
 void Log::AppendThread::Wake() {
   DCHECK(append_pool_);
-  auto old_status = base::subtle::NoBarrier_CompareAndSwap(
-      &thread_state_, IDLE, ACTIVE);
-  if (old_status == IDLE) {
+  uint32_t old_state = IDLE;
+  if (thread_state_.compare_exchange_strong(old_state,
+                                            ACTIVE,
+                                            std::memory_order_release,
+                                            std::memory_order_relaxed)) {
     CHECK_OK(append_pool_->Submit([this]() { this->ProcessQueue(); }));
   }
 }
@@ -321,8 +323,8 @@ bool Log::AppendThread::GoIdle() {
 
   // So, we first transition back to STOPPED state, and then re-check to see
   // if there has been something enqueued in the meantime.
-  auto old_state = base::subtle::NoBarrier_AtomicExchange(&thread_state_, 
IDLE);
-  DCHECK_EQ(old_state, ACTIVE);
+  const auto old_state = thread_state_.exchange(IDLE, 
std::memory_order_relaxed);
+  DCHECK_EQ(ACTIVE, old_state);
   if (log_->entry_queue()->empty()) {
     // Nothing got enqueued, which means there must not have been any missed 
wakeup.
     // We are now in IDLE state.
@@ -332,8 +334,9 @@ bool Log::AppendThread::GoIdle() {
   MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
   // Someone enqueued something. We don't know whether their wakeup was 
successful
   // or not, but we can just try to transition back to ACTIVE mode here.
-  if (base::subtle::NoBarrier_CompareAndSwap(&thread_state_, IDLE, ACTIVE)
-      == IDLE) {
+  uint32_t old = IDLE;
+  if (thread_state_.compare_exchange_strong(
+          old, ACTIVE, std::memory_order_release, std::memory_order_relaxed)) {
     // Their wake-up was lost, but we've now marked ourselves as running.
     MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
     return false;
@@ -346,7 +349,7 @@ bool Log::AppendThread::GoIdle() {
 }
 
 void Log::AppendThread::ProcessQueue() {
-  DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(thread_state_), ACTIVE);
+  DCHECK_EQ(ACTIVE, thread_state_.load(std::memory_order_relaxed));
   VLOG_WITH_PREFIX(2) << "WAL Appender going active";
   while (true) {
     MonoTime deadline = MonoTime::Now() +
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index abcc0eea3..27f0f0d1d 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -33,6 +33,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
 #include "kudu/util/array_view.h" // IWYU pragma: keep
+#include "kudu/util/atomic-utils.h"
 #include "kudu/util/coding.h"
 #include "kudu/util/coding-inl.h"
 #include "kudu/util/compression/compression.pb.h"
@@ -116,7 +117,8 @@ LogEntryReader::LogEntryReader(const ReadableLogSegment* 
seg)
       num_entries_read_(0),
       offset_(seg_->first_entry_offset()) {
 
-  int64_t readable_to_offset = seg_->readable_to_offset_.Load();
+  const int64_t readable_to_offset =
+      seg_->readable_to_offset_.load(std::memory_order_relaxed);
 
   // If we have a footer we only read up to it. If we don't we likely crashed
   // and always read to the end.
@@ -307,7 +309,7 @@ Status ReadableLogSegment::Init(const LogSegmentHeaderPB& 
header,
   footer_.CopyFrom(footer);
   first_entry_offset_ = first_entry_offset;
   is_initialized_ = true;
-  readable_to_offset_.Store(file_size());
+  readable_to_offset_.store(file_size(), std::memory_order_relaxed);
 
   return Status::OK();
 }
@@ -325,7 +327,7 @@ Status ReadableLogSegment::Init(const LogSegmentHeaderPB& 
header,
   is_initialized_ = true;
 
   // On a new segment, we don't expect any readable entries yet.
-  readable_to_offset_.Store(first_entry_offset);
+  readable_to_offset_.store(first_entry_offset, std::memory_order_relaxed);
 
   return Status::OK();
 }
@@ -352,7 +354,7 @@ Status ReadableLogSegment::Init() {
 
   is_initialized_ = true;
 
-  readable_to_offset_.Store(file_size());
+  readable_to_offset_.store(file_size(), std::memory_order_relaxed);
 
   return Status::OK();
 }
@@ -367,12 +369,12 @@ Status ReadableLogSegment::InitCompressionCodec() {
 }
 
 int64_t ReadableLogSegment::readable_up_to() const {
-  return readable_to_offset_.Load();
+  return readable_to_offset_.load(std::memory_order_relaxed);
 }
 
 void ReadableLogSegment::UpdateReadableToOffset(int64_t readable_to_offset) {
-  readable_to_offset_.Store(readable_to_offset);
-  file_size_.StoreMax(readable_to_offset);
+  readable_to_offset_.store(readable_to_offset, std::memory_order_relaxed);
+  AtomicStoreMax(file_size_, readable_to_offset);
 }
 
 Status ReadableLogSegment::RebuildFooterByScanning() {
@@ -402,7 +404,7 @@ Status ReadableLogSegment::RebuildFooterByScanning() {
   footer_ = new_footer;
   DCHECK(footer_.IsInitialized());
   footer_was_rebuilt_ = true;
-  readable_to_offset_.Store(reader.offset());
+  readable_to_offset_.store(reader.offset(), std::memory_order_relaxed);
 
   VLOG(1) << "Successfully rebuilt footer for segment: " << path_
           << " (valid entries through byte offset " << reader.offset() << ")";
@@ -415,7 +417,7 @@ Status ReadableLogSegment::ReadFileSize() {
   // underflow bugs. Use a local to convert.
   uint64_t size;
   RETURN_NOT_OK_PREPEND(file_->Size(&size), "Unable to read file size");
-  file_size_.Store(size);
+  file_size_.store(size, std::memory_order_relaxed);
   if (size == 0) {
     VLOG(1) << "Log segment file $0 is zero-length: " << path();
     return Status::OK();
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index 296412697..679e623e6 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -16,11 +16,13 @@
 // under the License.
 #pragma once
 
+#include <atomic>
 #include <cstddef>
 #include <cstdint>
 #include <deque>
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
@@ -31,8 +33,7 @@
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/util/atomic.h"
-#include "kudu/util/env.h"
+#include "kudu/util/env.h"  // IWYU pragma: keep
 #include "kudu/util/faststring.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -258,7 +259,7 @@ class ReadableLogSegment : public 
RefCountedThreadSafe<ReadableLogSegment> {
   }
 
   int64_t file_size() const {
-    return file_size_.Load();
+    return file_size_.load(std::memory_order_relaxed);
   }
 
   int64_t first_entry_offset() const {
@@ -375,7 +376,7 @@ class ReadableLogSegment : public 
RefCountedThreadSafe<ReadableLogSegment> {
   // The size of the file.
   // This is set by Init(). In the case of a log being written to,
   // this may be increased by UpdateReadableToOffset()
-  AtomicInt<int64_t> file_size_;
+  std::atomic<int64_t> file_size_;
 
   // The offset up to which we can read the file.
   // For already written segments this is fixed and equal to the file size
@@ -383,7 +384,7 @@ class ReadableLogSegment : public 
RefCountedThreadSafe<ReadableLogSegment> {
   // we can read without the fear of reading garbage/zeros.
   // This is atomic because the Log thread might be updating the segment's 
readable
   // offset while an async reader is reading the segment's entries.
-  AtomicInt<int64_t> readable_to_offset_;
+  std::atomic<int64_t> readable_to_offset_;
 
   // File handle for a log segment (used on replay).
   //

Reply via email to