This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit e471f58f76d1b7358f3ef58d1c7d517c95884e62 Author: Alexey Serbin <[email protected]> AuthorDate: Wed Nov 6 20:36:13 2024 -0800 [consensus] replace chromium atomics with STL ones This patch replaces chromium-derived atomics with STL-based ones, but otherwise it doesn't contain any functional modifications. Change-Id: If8de407e58ddca70097112360c0cdcba97465ce8 Reviewed-on: http://gerrit.cloudera.org:8080/22038 Tested-by: Alexey Serbin <[email protected]> Reviewed-by: Yifan Zhang <[email protected]> --- src/kudu/consensus/log.cc | 27 +++++++++++++++------------ src/kudu/consensus/log_util.cc | 20 +++++++++++--------- src/kudu/consensus/log_util.h | 11 ++++++----- 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc index ca93cb94b..42c7cf773 100644 --- a/src/kudu/consensus/log.cc +++ b/src/kudu/consensus/log.cc @@ -17,6 +17,7 @@ #include "kudu/consensus/log.h" +#include <atomic> #include <cerrno> #include <cstdint> #include <functional> @@ -38,8 +39,6 @@ #include "kudu/consensus/log_reader.h" #include "kudu/consensus/log_util.h" #include "kudu/fs/fs_manager.h" -#include "kudu/gutil/atomicops.h" -#include "kudu/gutil/dynamic_annotations.h" #include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" @@ -179,6 +178,7 @@ DEFINE_validator(log_min_segments_to_retain, &ValidateLogsToRetain); using kudu::consensus::CommitMsg; using kudu::consensus::OpId; using kudu::consensus::ReplicateRefPtr; +using std::atomic; using std::shared_lock; using std::string; using std::unique_ptr; @@ -235,7 +235,7 @@ class Log::AppendThread { void Wake(); bool active() const { - return base::subtle::NoBarrier_Load(&thread_state_) == ACTIVE; + return thread_state_.load(std::memory_order_relaxed) == ACTIVE; } private: @@ -264,7 +264,7 @@ class Log::AppendThread { // A task is queued or running. ACTIVE }; - Atomic32 thread_state_ = IDLE; + atomic<uint32_t> thread_state_ = IDLE; // Pool with a single thread, which handles shutting down the thread // when idle. @@ -293,9 +293,11 @@ Status Log::AppendThread::Init() { void Log::AppendThread::Wake() { DCHECK(append_pool_); - auto old_status = base::subtle::NoBarrier_CompareAndSwap( - &thread_state_, IDLE, ACTIVE); - if (old_status == IDLE) { + uint32_t old_state = IDLE; + if (thread_state_.compare_exchange_strong(old_state, + ACTIVE, + std::memory_order_release, + std::memory_order_relaxed)) { CHECK_OK(append_pool_->Submit([this]() { this->ProcessQueue(); })); } } @@ -321,8 +323,8 @@ bool Log::AppendThread::GoIdle() { // So, we first transition back to STOPPED state, and then re-check to see // if there has been something enqueued in the meantime. - auto old_state = base::subtle::NoBarrier_AtomicExchange(&thread_state_, IDLE); - DCHECK_EQ(old_state, ACTIVE); + const auto old_state = thread_state_.exchange(IDLE, std::memory_order_relaxed); + DCHECK_EQ(ACTIVE, old_state); if (log_->entry_queue()->empty()) { // Nothing got enqueued, which means there must not have been any missed wakeup. // We are now in IDLE state. @@ -332,8 +334,9 @@ bool Log::AppendThread::GoIdle() { MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms); // Someone enqueued something. We don't know whether their wakeup was successful // or not, but we can just try to transition back to ACTIVE mode here. - if (base::subtle::NoBarrier_CompareAndSwap(&thread_state_, IDLE, ACTIVE) - == IDLE) { + uint32_t old = IDLE; + if (thread_state_.compare_exchange_strong( + old, ACTIVE, std::memory_order_release, std::memory_order_relaxed)) { // Their wake-up was lost, but we've now marked ourselves as running. MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms); return false; @@ -346,7 +349,7 @@ bool Log::AppendThread::GoIdle() { } void Log::AppendThread::ProcessQueue() { - DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(thread_state_), ACTIVE); + DCHECK_EQ(ACTIVE, thread_state_.load(std::memory_order_relaxed)); VLOG_WITH_PREFIX(2) << "WAL Appender going active"; while (true) { MonoTime deadline = MonoTime::Now() + diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc index abcc0eea3..27f0f0d1d 100644 --- a/src/kudu/consensus/log_util.cc +++ b/src/kudu/consensus/log_util.cc @@ -33,6 +33,7 @@ #include "kudu/gutil/strings/substitute.h" #include "kudu/gutil/strings/util.h" #include "kudu/util/array_view.h" // IWYU pragma: keep +#include "kudu/util/atomic-utils.h" #include "kudu/util/coding.h" #include "kudu/util/coding-inl.h" #include "kudu/util/compression/compression.pb.h" @@ -116,7 +117,8 @@ LogEntryReader::LogEntryReader(const ReadableLogSegment* seg) num_entries_read_(0), offset_(seg_->first_entry_offset()) { - int64_t readable_to_offset = seg_->readable_to_offset_.Load(); + const int64_t readable_to_offset = + seg_->readable_to_offset_.load(std::memory_order_relaxed); // If we have a footer we only read up to it. If we don't we likely crashed // and always read to the end. @@ -307,7 +309,7 @@ Status ReadableLogSegment::Init(const LogSegmentHeaderPB& header, footer_.CopyFrom(footer); first_entry_offset_ = first_entry_offset; is_initialized_ = true; - readable_to_offset_.Store(file_size()); + readable_to_offset_.store(file_size(), std::memory_order_relaxed); return Status::OK(); } @@ -325,7 +327,7 @@ Status ReadableLogSegment::Init(const LogSegmentHeaderPB& header, is_initialized_ = true; // On a new segment, we don't expect any readable entries yet. - readable_to_offset_.Store(first_entry_offset); + readable_to_offset_.store(first_entry_offset, std::memory_order_relaxed); return Status::OK(); } @@ -352,7 +354,7 @@ Status ReadableLogSegment::Init() { is_initialized_ = true; - readable_to_offset_.Store(file_size()); + readable_to_offset_.store(file_size(), std::memory_order_relaxed); return Status::OK(); } @@ -367,12 +369,12 @@ Status ReadableLogSegment::InitCompressionCodec() { } int64_t ReadableLogSegment::readable_up_to() const { - return readable_to_offset_.Load(); + return readable_to_offset_.load(std::memory_order_relaxed); } void ReadableLogSegment::UpdateReadableToOffset(int64_t readable_to_offset) { - readable_to_offset_.Store(readable_to_offset); - file_size_.StoreMax(readable_to_offset); + readable_to_offset_.store(readable_to_offset, std::memory_order_relaxed); + AtomicStoreMax(file_size_, readable_to_offset); } Status ReadableLogSegment::RebuildFooterByScanning() { @@ -402,7 +404,7 @@ Status ReadableLogSegment::RebuildFooterByScanning() { footer_ = new_footer; DCHECK(footer_.IsInitialized()); footer_was_rebuilt_ = true; - readable_to_offset_.Store(reader.offset()); + readable_to_offset_.store(reader.offset(), std::memory_order_relaxed); VLOG(1) << "Successfully rebuilt footer for segment: " << path_ << " (valid entries through byte offset " << reader.offset() << ")"; @@ -415,7 +417,7 @@ Status ReadableLogSegment::ReadFileSize() { // underflow bugs. Use a local to convert. uint64_t size; RETURN_NOT_OK_PREPEND(file_->Size(&size), "Unable to read file size"); - file_size_.Store(size); + file_size_.store(size, std::memory_order_relaxed); if (size == 0) { VLOG(1) << "Log segment file $0 is zero-length: " << path(); return Status::OK(); diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h index 296412697..679e623e6 100644 --- a/src/kudu/consensus/log_util.h +++ b/src/kudu/consensus/log_util.h @@ -16,11 +16,13 @@ // under the License. #pragma once +#include <atomic> #include <cstddef> #include <cstdint> #include <deque> #include <memory> #include <string> +#include <type_traits> #include <vector> #include <gflags/gflags_declare.h> @@ -31,8 +33,7 @@ #include "kudu/consensus/opid.pb.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" -#include "kudu/util/atomic.h" -#include "kudu/util/env.h" +#include "kudu/util/env.h" // IWYU pragma: keep #include "kudu/util/faststring.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" @@ -258,7 +259,7 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> { } int64_t file_size() const { - return file_size_.Load(); + return file_size_.load(std::memory_order_relaxed); } int64_t first_entry_offset() const { @@ -375,7 +376,7 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> { // The size of the file. // This is set by Init(). In the case of a log being written to, // this may be increased by UpdateReadableToOffset() - AtomicInt<int64_t> file_size_; + std::atomic<int64_t> file_size_; // The offset up to which we can read the file. // For already written segments this is fixed and equal to the file size @@ -383,7 +384,7 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> { // we can read without the fear of reading garbage/zeros. // This is atomic because the Log thread might be updating the segment's readable // offset while an async reader is reading the segment's entries. - AtomicInt<int64_t> readable_to_offset_; + std::atomic<int64_t> readable_to_offset_; // File handle for a log segment (used on replay). //
