This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 5977402e3 [fs] remove chromium Atomics from FS
5977402e3 is described below

commit 5977402e3701b52888f91b7bb1e351f957e3c562
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Jun 14 21:08:28 2024 -0700

    [fs] remove chromium Atomics from FS
    
    Change-Id: Ie7ef778fd816ffa929166c9621f31ba4a2ea2b50
    Reviewed-on: http://gerrit.cloudera.org:8080/21521
    Tested-by: Marton Greber <[email protected]>
    Reviewed-by: Marton Greber <[email protected]>
    Reviewed-by: Zoltan Chovan <[email protected]>
---
 src/kudu/fs/block_manager-stress-test.cc | 32 +++++------
 src/kudu/fs/file_block_manager.cc        | 27 +++++-----
 src/kudu/fs/file_block_manager.h         |  8 +--
 src/kudu/fs/log_block_manager-test.cc    |  8 +--
 src/kudu/fs/log_block_manager.cc         | 93 +++++++++++++++++---------------
 src/kudu/fs/log_block_manager.h          |  3 +-
 src/kudu/util/atomic-utils.h             | 61 +++++++++++++++++++++
 7 files changed, 147 insertions(+), 85 deletions(-)

diff --git a/src/kudu/fs/block_manager-stress-test.cc 
b/src/kudu/fs/block_manager-stress-test.cc
index 14482e8fb..6a8b66e14 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <atomic>
 #include <cmath>
 #include <cstdint>
 #include <cstring>
@@ -47,7 +48,6 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/atomic.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/env.h"
 #include "kudu/util/faststring.h"
@@ -277,13 +277,13 @@ class BlockManagerStressTest : public KuduTest {
 
   // Some performance counters.
 
-  AtomicInt<int64_t> total_blocks_written_;
-  AtomicInt<int64_t> total_bytes_written_;
+  std::atomic<int64_t> total_blocks_written_;
+  std::atomic<int64_t> total_bytes_written_;
 
-  AtomicInt<int64_t> total_blocks_read_;
-  AtomicInt<int64_t> total_bytes_read_;
+  std::atomic<int64_t> total_blocks_read_;
+  std::atomic<int64_t> total_bytes_read_;
 
-  AtomicInt<int64_t> total_blocks_deleted_;
+  std::atomic<int64_t> total_blocks_deleted_;
 };
 
 template <typename T>
@@ -359,8 +359,8 @@ void BlockManagerStressTest<T>::WriterThread() {
     }
   }
 
-  total_blocks_written_.IncrementBy(num_blocks_written);
-  total_bytes_written_.IncrementBy(num_bytes_written);
+  total_blocks_written_.fetch_add(num_blocks_written, 
std::memory_order_relaxed);
+  total_bytes_written_.fetch_add(num_bytes_written, std::memory_order_relaxed);
 }
 
 template <typename T>
@@ -426,8 +426,8 @@ void BlockManagerStressTest<T>::ReaderThread() {
     num_bytes_read += block_size;
   }
 
-  total_blocks_read_.IncrementBy(num_blocks_read);
-  total_bytes_read_.IncrementBy(num_bytes_read);
+  total_blocks_read_.fetch_add(num_blocks_read, std::memory_order_relaxed);
+  total_bytes_read_.fetch_add(num_bytes_read, std::memory_order_relaxed);
 }
 
 template <typename T>
@@ -470,7 +470,7 @@ void BlockManagerStressTest<T>::DeleterThread() {
     num_blocks_deleted += deleted.size();
   }
 
-  total_blocks_deleted_.IncrementBy(num_blocks_deleted);
+  total_blocks_deleted_.fetch_add(num_blocks_deleted, 
std::memory_order_relaxed);
 }
 
 template <>
@@ -558,15 +558,15 @@ TYPED_TEST(BlockManagerStressTest, StressTest) {
   LOG(INFO) << "Printing test totals";
   LOG(INFO) << "--------------------";
   LOG(INFO) << Substitute("Wrote $0 blocks ($1 bytes) via $2 threads",
-                          this->total_blocks_written_.Load(),
-                          this->total_bytes_written_.Load(),
+                          this->total_blocks_written_.load(),
+                          this->total_bytes_written_.load(),
                           FLAGS_num_writer_threads);
   LOG(INFO) << Substitute("Read $0 blocks ($1 bytes) via $2 threads",
-                          this->total_blocks_read_.Load(),
-                          this->total_bytes_read_.Load(),
+                          this->total_blocks_read_.load(),
+                          this->total_bytes_read_.load(),
                           FLAGS_num_reader_threads);
   LOG(INFO) << Substitute("Deleted $0 blocks via $1 threads",
-                          this->total_blocks_deleted_.Load(),
+                          this->total_blocks_deleted_.load(),
                           FLAGS_num_deleter_threads);
 }
 
diff --git a/src/kudu/fs/file_block_manager.cc 
b/src/kudu/fs/file_block_manager.cc
index e8524fd94..c9a24eb9e 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -47,7 +47,6 @@
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/array_view.h"
-#include "kudu/util/atomic.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/file_cache.h"
@@ -60,6 +59,7 @@
 #include "kudu/util/status.h"
 
 using std::accumulate;
+using std::atomic;
 using std::set;
 using std::shared_ptr;
 using std::string;
@@ -469,7 +469,7 @@ class FileReadableBlock : public ReadableBlock {
 
   // Whether or not this block has been closed. Close() is thread-safe, so
   // this must be an atomic primitive.
-  AtomicBool closed_;
+  atomic<bool> closed_;
 
   DISALLOW_COPY_AND_ASSIGN(FileReadableBlock);
 };
@@ -500,7 +500,8 @@ FileReadableBlock::~FileReadableBlock() {
 }
 
 Status FileReadableBlock::Close() {
-  if (closed_.CompareAndSet(false, true)) {
+  bool is_closed = false;
+  if (closed_.compare_exchange_strong(is_closed, true)) {
     reader_.reset();
     if (block_manager_->metrics_) {
       block_manager_->metrics_->blocks_open_reading->Decrement();
@@ -519,7 +520,7 @@ const BlockId& FileReadableBlock::id() const {
 }
 
 Status FileReadableBlock::Size(uint64_t* sz) const {
-  DCHECK(!closed_.Load());
+  DCHECK(!closed_);
 
   RETURN_NOT_OK_HANDLE_ERROR(reader_->Size(sz));
   *sz -= reader_->GetEncryptionHeaderSize();
@@ -531,7 +532,7 @@ Status FileReadableBlock::Read(uint64_t offset, Slice 
result) const {
 }
 
 Status FileReadableBlock::ReadV(uint64_t offset, ArrayView<Slice> results) 
const {
-  DCHECK(!closed_.Load());
+  DCHECK(!closed_);
 
   RETURN_NOT_OK_HANDLE_ERROR(reader_->ReadV(offset + 
reader_->GetEncryptionHeaderSize(), results));
 
@@ -730,8 +731,8 @@ FileBlockManager::~FileBlockManager() {
 }
 
 Status FileBlockManager::Open(FsReport* report, MergeReport need_merage,
-                              std::atomic<int>* /* containers_processed */,
-                              std::atomic<int>* /* containers_total */) {
+                              atomic<int>* /* containers_processed */,
+                              atomic<int>* /* containers_total */) {
   // Prepare the filesystem report and either return or log it.
   FsReport local_report;
   set<int> failed_dirs = dd_manager_->GetFailedDirs();
@@ -764,7 +765,7 @@ Status FileBlockManager::Open(FsReport* report, MergeReport 
need_merage,
 
 Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
                                      unique_ptr<WritableBlock>* block) {
-  CHECK(!opts_.read_only);
+  DCHECK(!opts_.read_only);
 
   Dir* dir;
   RETURN_NOT_OK_EVAL(dd_manager_->GetDirAddIfNecessary(opts, &dir),
@@ -788,14 +789,14 @@ Status FileBlockManager::CreateBlock(const 
CreateBlockOptions& opts,
     // If we failed to generate a unique ID, start trying again from a random
     // part of the key space.
     if (attempt_num++ > 0) {
-      next_block_id_.Store(rand_.Next64());
+      next_block_id_ = rand_.Next64();
     }
 
     // Make sure we don't accidentally create a location using the magic
     // invalid ID value.
     BlockId id;
     do {
-      id.SetId(next_block_id_.Increment());
+      id.SetId(next_block_id_++);
     } while (id.IsNull());
 
     location = internal::FileBlockLocation::FromParts(dir, uuid_idx, id);
@@ -869,7 +870,7 @@ Status FileBlockManager::OpenBlock(const BlockId& block_id,
 }
 
 Status FileBlockManager::DeleteBlock(const BlockId& block_id) {
-  CHECK(!opts_.read_only);
+  DCHECK(!opts_.read_only);
 
   // Return early if deleting a block in a failed directory.
   set<int> failed_dirs = dd_manager_->GetFailedDirs();
@@ -906,13 +907,13 @@ Status FileBlockManager::DeleteBlock(const BlockId& 
block_id) {
 }
 
 unique_ptr<BlockCreationTransaction> 
FileBlockManager::NewCreationTransaction() {
-  CHECK(!opts_.read_only);
+  DCHECK(!opts_.read_only);
   return unique_ptr<internal::FileBlockCreationTransaction>(
       new internal::FileBlockCreationTransaction());
 }
 
 shared_ptr<BlockDeletionTransaction> 
FileBlockManager::NewDeletionTransaction() {
-  CHECK(!opts_.read_only);
+  DCHECK(!opts_.read_only);
   return std::make_shared<internal::FileBlockDeletionTransaction>(this);
 }
 
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index bafee8e3d..27ba6e531 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef KUDU_FS_FILE_BLOCK_MANAGER_H
-#define KUDU_FS_FILE_BLOCK_MANAGER_H
+#pragma once
 
 #include <atomic>
 #include <cstdint>
@@ -29,7 +28,6 @@
 #include "kudu/fs/error_manager.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/util/atomic.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
@@ -145,7 +143,7 @@ class FileBlockManager : public BlockManager {
 
   // For generating block IDs.
   ThreadSafeRandom rand_;
-  AtomicInt<uint64_t> next_block_id_;
+  std::atomic<uint64_t> next_block_id_;
 
   // Protects 'dirty_dirs_'.
   mutable simple_spinlock lock_;
@@ -170,5 +168,3 @@ class FileBlockManager : public BlockManager {
 
 } // namespace fs
 } // namespace kudu
-
-#endif
diff --git a/src/kudu/fs/log_block_manager-test.cc 
b/src/kudu/fs/log_block_manager-test.cc
index ce7fd97ee..2d93d19b6 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -18,6 +18,7 @@
 #include "kudu/fs/log_block_manager.h"
 
 #include <algorithm>
+#include <atomic>
 #include <cstdint>
 #include <cstdlib>
 #include <cstring>
@@ -61,7 +62,6 @@
 #include "kudu/gutil/strings/strip.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
-#include "kudu/util/atomic.h"
 #include "kudu/util/env.h"
 #include "kudu/util/file_cache.h"
 #include "kudu/util/metrics.h"
@@ -628,7 +628,7 @@ TEST_P(LogBlockManagerTest, TestBumpBlockIds) {
 
   // Simulate a complete reset of the block manager's block ID record, e.g.
   // from restarting but with all the blocks gone.
-  bm_->next_block_id_.Store(1);
+  bm_->next_block_id_ = 1;
 
   // Now simulate being notified by some other component (e.g. tablet metadata)
   // of the presence of a block ID.
@@ -651,7 +651,7 @@ TEST_P(LogBlockManagerTest, TestBumpBlockIds) {
 TEST_P(LogBlockManagerTest, TestReuseBlockIds) {
   // Typically, the LBM starts with a random block ID when running as a
   // gtest. In this test, we want to control the block IDs.
-  bm_->next_block_id_.Store(1);
+  bm_->next_block_id_ = 1;
 
   vector<BlockId> block_ids;
 
@@ -689,7 +689,7 @@ TEST_P(LogBlockManagerTest, TestReuseBlockIds) {
   // Reset the block ID sequence and re-create new blocks which should reuse 
the same
   // block IDs. This isn't allowed in current versions of Kudu, but older 
versions
   // could produce this situation, and we still need to handle it on startup.
-  bm_->next_block_id_.Store(1);
+  bm_->next_block_id_ = 1;
   for (int i = 0; i < 4; i++) {
     unique_ptr<WritableBlock> writer;
     ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 830bd1e78..e4654e5a1 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -65,6 +65,7 @@
 #include "kudu/gutil/strings/util.h"
 #include "kudu/gutil/walltime.h"
 #include "kudu/util/alignment.h"
+#include "kudu/util/atomic-utils.h"
 #include "kudu/util/array_view.h"
 #include "kudu/util/env.h"
 #include "kudu/util/fault_injection.h"
@@ -226,6 +227,7 @@ using kudu::fs::internal::LogWritableBlock;
 using kudu::pb_util::ReadablePBContainerFile;
 using kudu::pb_util::WritablePBContainerFile;
 using std::accumulate;
+using std::atomic;
 using std::map;
 using std::optional;
 using std::set;
@@ -601,23 +603,23 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
   // Simple accessors.
   LogBlockManager* block_manager() const { return block_manager_; }
   const string& id() const { return id_; }
-  int64_t next_block_offset() const { return next_block_offset_.Load(); }
-  int64_t total_bytes() const { return total_bytes_.Load(); }
-  int64_t total_blocks() const { return total_blocks_.Load(); }
-  int64_t live_bytes() const { return live_bytes_.Load(); }
-  int64_t live_bytes_aligned() const { return live_bytes_aligned_.Load(); }
-  int64_t live_blocks() const { return live_blocks_.Load(); }
-  int32_t blocks_being_written() const { return blocks_being_written_.Load(); }
+  int64_t next_block_offset() const { return next_block_offset_; }
+  int64_t total_bytes() const { return total_bytes_; }
+  int64_t total_blocks() const { return total_blocks_; }
+  int64_t live_bytes() const { return live_bytes_; }
+  int64_t live_bytes_aligned() const { return live_bytes_aligned_; }
+  int64_t live_blocks() const { return live_blocks_; }
+  int32_t blocks_being_written() const { return blocks_being_written_; }
   virtual bool full() const { return data_full(); }
-  bool dead() const { return dead_.Load(); }
+  bool dead() const { return dead_; }
   const LogBlockManagerMetrics* metrics() const { return metrics_; }
   Dir* data_dir() const { return data_dir_; }
   const DirInstanceMetadataPB* instance() const { return 
data_dir_->instance()->metadata(); }
 
   // Adjusts the number of blocks being written.
   // Positive means increase, negative means decrease.
-  int32_t blocks_being_written_incr(int32_t value) {
-    return blocks_being_written_.IncrementBy(value);
+  void blocks_being_written_incr(int32_t value) {
+    blocks_being_written_.fetch_add(value);
   }
 
   // Check that the container meets the death condition.
@@ -641,8 +643,11 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
   //
   // If successful, returns true; otherwise returns false.
   bool TrySetDead() {
-    if (dead()) return false;
-    return dead_.CompareAndSet(false, true);
+    if (dead()) {
+      return false;
+    }
+    bool is_dead = false;
+    return dead_.compare_exchange_strong(is_dead, true);
   }
 
   // Some work will be triggered after blocks have been removed from this 
container successfully.
@@ -718,29 +723,29 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
   shared_ptr<RWFile> data_file_;
 
   // The offset of the next block to be written to the container.
-  AtomicInt<int64_t> next_block_offset_;
+  atomic<int64_t> next_block_offset_;
 
   // The amount of data (post block alignment) written thus far to the 
container.
-  AtomicInt<int64_t> total_bytes_;
+  atomic<int64_t> total_bytes_;
 
   // The number of blocks written thus far in the container.
-  AtomicInt<int64_t> total_blocks_;
+  atomic<int64_t> total_blocks_;
 
   // The amount of data present in not-yet-deleted blocks of the container.
-  AtomicInt<int64_t> live_bytes_;
+  atomic<int64_t> live_bytes_;
 
   // The amount of data (post block alignment) present in not-yet-deleted
   // blocks of the container.
-  AtomicInt<int64_t> live_bytes_aligned_;
+  atomic<int64_t> live_bytes_aligned_;
 
   // The number of not-yet-deleted blocks in the container.
-  AtomicInt<int64_t> live_blocks_;
+  atomic<int64_t> live_blocks_;
 
   // The number of LogWritableBlocks currently open for this container.
-  AtomicInt<int32_t> blocks_being_written_;
+  atomic<int32_t> blocks_being_written_;
 
   // Whether or not this container has been marked as dead.
-  AtomicBool dead_;
+  atomic<bool> dead_;
 
   // The metrics. Not owned by the log container; it has the same lifespan
   // as the block manager.
@@ -895,7 +900,7 @@ class LogBlockContainerNativeMeta final : public 
LogBlockContainer {
   unique_ptr<WritablePBContainerFile> metadata_file_;
 
   // TODO(yingchun): add metadata bytes for metadata.
-  //  AtomicInt<int64_t> metadata_bytes_;
+  //  atomic<int64_t> metadata_bytes_;
 
   DISALLOW_COPY_AND_ASSIGN(LogBlockContainerNativeMeta);
 };
@@ -1021,9 +1026,9 @@ LogBlockContainer::LogBlockContainer(
   if (auto encryption_header_size = data_file_->GetEncryptionHeaderSize();
       encryption_header_size > 0) {
     UpdateNextBlockOffset(0, encryption_header_size);
-    live_bytes_.Store(encryption_header_size);
-    total_bytes_.Store(next_block_offset_.Load());
-    live_bytes_aligned_.Store(next_block_offset_.Load());
+    live_bytes_ = encryption_header_size;
+    total_bytes_ = next_block_offset_.load();
+    live_bytes_aligned_ = next_block_offset_.load();
   }
 }
 
@@ -1132,8 +1137,8 @@ void LogBlockContainerNativeMeta::CompactMetadata() {
   VLOG(1) << "Compacted metadata file " << ToString()
           << " (saved " << file_bytes_delta << " bytes)";
 
-  total_blocks_.Store(live_blocks.size());
-  live_blocks_.Store(live_blocks.size());
+  total_blocks_ = live_blocks.size();
+  live_blocks_ = live_blocks.size();
 }
 
 #define RETURN_NOT_OK_CONTAINER_DISK_FAILURE(status_expr) do { \
@@ -1785,7 +1790,7 @@ void LogBlockContainer::UpdateNextBlockOffset(int64_t 
block_offset, int64_t bloc
   int64_t new_next_block_offset = KUDU_ALIGN_UP(
       block_offset + block_length,
       instance()->filesystem_block_size_bytes());
-  next_block_offset_.StoreMax(new_next_block_offset);
+  AtomicStoreMax(next_block_offset_, new_next_block_offset);
 
   if (data_full()) {
     VLOG(1) << Substitute(
@@ -1833,19 +1838,19 @@ vector<BlockRecordPB> LogBlockContainer::SortRecords(
 void LogBlockContainer::BlockCreated(const LogBlockRefPtr& block) {
   DCHECK_GE(block->offset(), 0);
 
-  total_bytes_.IncrementBy(block->fs_aligned_length());
-  total_blocks_.Increment();
-  live_bytes_.IncrementBy(block->length());
-  live_bytes_aligned_.IncrementBy(block->fs_aligned_length());
-  live_blocks_.Increment();
+  total_bytes_.fetch_add(block->fs_aligned_length());
+  total_blocks_++;
+  live_bytes_.fetch_add(block->length());
+  live_bytes_aligned_.fetch_add(block->fs_aligned_length());
+  live_blocks_++;
 }
 
 void LogBlockContainer::BlockDeleted(const LogBlockRefPtr& block) {
   DCHECK_GE(block->offset(), 0);
 
-  live_bytes_.IncrementBy(-block->length());
-  live_bytes_aligned_.IncrementBy(-block->fs_aligned_length());
-  live_blocks_.IncrementBy(-1);
+  live_bytes_.fetch_sub(block->length());
+  live_bytes_aligned_.fetch_sub(block->fs_aligned_length());
+  live_blocks_--;
 }
 
 void LogBlockContainer::ExecClosure(const std::function<void()>& task) {
@@ -2639,7 +2644,7 @@ class LogReadableBlock : public ReadableBlock {
 
   // Whether or not this block has been closed. Close() is thread-safe, so
   // this must be an atomic primitive.
-  AtomicBool closed_;
+  atomic<bool> closed_;
 
   DISALLOW_COPY_AND_ASSIGN(LogReadableBlock);
 };
@@ -2659,7 +2664,8 @@ LogReadableBlock::~LogReadableBlock() {
 }
 
 Status LogReadableBlock::Close() {
-  if (closed_.CompareAndSet(false, true)) {
+  bool is_closed = false;
+  if (closed_.compare_exchange_strong(is_closed, true)) {
     if (log_block_->container()->metrics()) {
       
log_block_->container()->metrics()->generic_metrics.blocks_open_reading->Decrement();
     }
@@ -2678,7 +2684,7 @@ const BlockId& LogReadableBlock::id() const {
 }
 
 Status LogReadableBlock::Size(uint64_t* sz) const {
-  DCHECK(!closed_.Load());
+  DCHECK(!closed_);
 
   *sz = log_block_->length();
   return Status::OK();
@@ -2689,7 +2695,7 @@ Status LogReadableBlock::Read(uint64_t offset, Slice 
result) const {
 }
 
 Status LogReadableBlock::ReadV(uint64_t offset, ArrayView<Slice> results) 
const {
-  DCHECK(!closed_.Load());
+  DCHECK(!closed_);
 
   size_t read_length = accumulate(results.begin(), results.end(), 
static_cast<size_t>(0),
                                   [&](int sum, const Slice& curr) {
@@ -2780,8 +2786,7 @@ LogBlockManager::LogBlockManager(Env* env,
   // block ID 1, we'll start with a random block ID. A collision is still
   // possible, but exceedingly unlikely.
   if (IsGTest()) {
-    Random r(GetRandomSeed32());
-    next_block_id_.Store(r.Next64());
+    next_block_id_ = Random(GetRandomSeed32()).Next64();
   }
 
   if (opts_.metric_entity) {
@@ -3013,7 +3018,7 @@ Status LogBlockManager::CreateBlock(const 
CreateBlockOptions& opts,
   // and thus we may have to "skip over" some block IDs that are claimed.
   BlockId new_block_id;
   do {
-    new_block_id.SetId(next_block_id_.Increment());
+    new_block_id.SetId(next_block_id_++);
   } while (!TryUseBlockId(new_block_id));
 
   block->reset(new LogWritableBlock(container,
@@ -3078,7 +3083,7 @@ Status LogBlockManager::GetAllBlockIds(vector<BlockId>* 
block_ids) {
 }
 
 void LogBlockManager::NotifyBlockId(BlockId block_id) {
-  next_block_id_.StoreMax(block_id.id() + 1);
+  AtomicStoreMax(next_block_id_, block_id.id() + 1);
 }
 
 void LogBlockManager::AddNewContainerUnlocked(const LogBlockContainerRefPtr& 
container) {
@@ -3583,7 +3588,7 @@ void LogBlockManager::LoadContainer(Dir* dir,
   result->report.stats.live_block_count += container->live_blocks();
   result->report.stats.lbm_container_count++;
 
-  next_block_id_.StoreMax(max_block_id + 1);
+  AtomicStoreMax(next_block_id_, max_block_id + 1);
 
   int64_t mem_usage = 0;
   for (UntrackedBlockMap::value_type& e : live_blocks) {
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 2a6d1ae8c..352c6ad67 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -40,7 +40,6 @@
 #include "kudu/fs/fs.pb.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/util/atomic.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/oid_generator.h"
@@ -517,7 +516,7 @@ class LogBlockManager : public BlockManager {
   ObjectIdGenerator oid_generator_;
 
   // For generating block IDs.
-  AtomicInt<uint64_t> next_block_id_;
+  std::atomic<uint64_t> next_block_id_;
 
   // Metrics for the block manager.
   //
diff --git a/src/kudu/util/atomic-utils.h b/src/kudu/util/atomic-utils.h
new file mode 100644
index 000000000..2b1c51a81
--- /dev/null
+++ b/src/kudu/util/atomic-utils.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <atomic>
+
+namespace kudu {
+
+// TODO(aserbin): remove this and replace its usage at call sites with
+//                std::fetch_max() when switching to C++26
+template<typename T>
+inline void AtomicStoreMax(std::atomic<T>& val, T new_val) {
+  do {
+    T old_val = val.load(std::memory_order_relaxed);
+    T max_val = std::max(old_val, new_val);
+    T prev_val = old_val;
+    if (val.compare_exchange_weak(prev_val,
+                                  max_val,
+                                  std::memory_order_release,
+                                  std::memory_order_relaxed)) {
+      return;
+    }
+    old_val = prev_val;
+  } while (true);
+}
+
+// TODO(aserbin): remove this and replace its usage at call sites with
+//                std::fetch_min() when switching to C++26
+template<typename T>
+inline void AtomicStoreMin(std::atomic<T>& val, T new_val) {
+  do {
+    T old_val = val.load(std::memory_order_relaxed);
+    T min_val = std::min(old_val, new_val);
+    T prev_val = old_val;
+    if (val.compare_exchange_weak(prev_val,
+                                  min_val,
+                                  std::memory_order_release,
+                                  std::memory_order_relaxed)) {
+      return;
+    }
+    old_val = prev_val;
+  } while (true);
+}
+
+} // namespace kudu

Reply via email to