This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 5977402e3 [fs] remove chromium Atomics from FS
5977402e3 is described below
commit 5977402e3701b52888f91b7bb1e351f957e3c562
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Jun 14 21:08:28 2024 -0700
[fs] remove chromium Atomics from FS
Change-Id: Ie7ef778fd816ffa929166c9621f31ba4a2ea2b50
Reviewed-on: http://gerrit.cloudera.org:8080/21521
Tested-by: Marton Greber <[email protected]>
Reviewed-by: Marton Greber <[email protected]>
Reviewed-by: Zoltan Chovan <[email protected]>
---
src/kudu/fs/block_manager-stress-test.cc | 32 +++++------
src/kudu/fs/file_block_manager.cc | 27 +++++-----
src/kudu/fs/file_block_manager.h | 8 +--
src/kudu/fs/log_block_manager-test.cc | 8 +--
src/kudu/fs/log_block_manager.cc | 93 +++++++++++++++++---------------
src/kudu/fs/log_block_manager.h | 3 +-
src/kudu/util/atomic-utils.h | 61 +++++++++++++++++++++
7 files changed, 147 insertions(+), 85 deletions(-)
diff --git a/src/kudu/fs/block_manager-stress-test.cc
b/src/kudu/fs/block_manager-stress-test.cc
index 14482e8fb..6a8b66e14 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <atomic>
#include <cmath>
#include <cstdint>
#include <cstring>
@@ -47,7 +48,6 @@
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/atomic.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/env.h"
#include "kudu/util/faststring.h"
@@ -277,13 +277,13 @@ class BlockManagerStressTest : public KuduTest {
// Some performance counters.
- AtomicInt<int64_t> total_blocks_written_;
- AtomicInt<int64_t> total_bytes_written_;
+ std::atomic<int64_t> total_blocks_written_;
+ std::atomic<int64_t> total_bytes_written_;
- AtomicInt<int64_t> total_blocks_read_;
- AtomicInt<int64_t> total_bytes_read_;
+ std::atomic<int64_t> total_blocks_read_;
+ std::atomic<int64_t> total_bytes_read_;
- AtomicInt<int64_t> total_blocks_deleted_;
+ std::atomic<int64_t> total_blocks_deleted_;
};
template <typename T>
@@ -359,8 +359,8 @@ void BlockManagerStressTest<T>::WriterThread() {
}
}
- total_blocks_written_.IncrementBy(num_blocks_written);
- total_bytes_written_.IncrementBy(num_bytes_written);
+ total_blocks_written_.fetch_add(num_blocks_written,
std::memory_order_relaxed);
+ total_bytes_written_.fetch_add(num_bytes_written, std::memory_order_relaxed);
}
template <typename T>
@@ -426,8 +426,8 @@ void BlockManagerStressTest<T>::ReaderThread() {
num_bytes_read += block_size;
}
- total_blocks_read_.IncrementBy(num_blocks_read);
- total_bytes_read_.IncrementBy(num_bytes_read);
+ total_blocks_read_.fetch_add(num_blocks_read, std::memory_order_relaxed);
+ total_bytes_read_.fetch_add(num_bytes_read, std::memory_order_relaxed);
}
template <typename T>
@@ -470,7 +470,7 @@ void BlockManagerStressTest<T>::DeleterThread() {
num_blocks_deleted += deleted.size();
}
- total_blocks_deleted_.IncrementBy(num_blocks_deleted);
+ total_blocks_deleted_.fetch_add(num_blocks_deleted,
std::memory_order_relaxed);
}
template <>
@@ -558,15 +558,15 @@ TYPED_TEST(BlockManagerStressTest, StressTest) {
LOG(INFO) << "Printing test totals";
LOG(INFO) << "--------------------";
LOG(INFO) << Substitute("Wrote $0 blocks ($1 bytes) via $2 threads",
- this->total_blocks_written_.Load(),
- this->total_bytes_written_.Load(),
+ this->total_blocks_written_.load(),
+ this->total_bytes_written_.load(),
FLAGS_num_writer_threads);
LOG(INFO) << Substitute("Read $0 blocks ($1 bytes) via $2 threads",
- this->total_blocks_read_.Load(),
- this->total_bytes_read_.Load(),
+ this->total_blocks_read_.load(),
+ this->total_bytes_read_.load(),
FLAGS_num_reader_threads);
LOG(INFO) << Substitute("Deleted $0 blocks via $1 threads",
- this->total_blocks_deleted_.Load(),
+ this->total_blocks_deleted_.load(),
FLAGS_num_deleter_threads);
}
diff --git a/src/kudu/fs/file_block_manager.cc
b/src/kudu/fs/file_block_manager.cc
index e8524fd94..c9a24eb9e 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -47,7 +47,6 @@
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/array_view.h"
-#include "kudu/util/atomic.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/file_cache.h"
@@ -60,6 +59,7 @@
#include "kudu/util/status.h"
using std::accumulate;
+using std::atomic;
using std::set;
using std::shared_ptr;
using std::string;
@@ -469,7 +469,7 @@ class FileReadableBlock : public ReadableBlock {
// Whether or not this block has been closed. Close() is thread-safe, so
// this must be an atomic primitive.
- AtomicBool closed_;
+ atomic<bool> closed_;
DISALLOW_COPY_AND_ASSIGN(FileReadableBlock);
};
@@ -500,7 +500,8 @@ FileReadableBlock::~FileReadableBlock() {
}
Status FileReadableBlock::Close() {
- if (closed_.CompareAndSet(false, true)) {
+ bool is_closed = false;
+ if (closed_.compare_exchange_strong(is_closed, true)) {
reader_.reset();
if (block_manager_->metrics_) {
block_manager_->metrics_->blocks_open_reading->Decrement();
@@ -519,7 +520,7 @@ const BlockId& FileReadableBlock::id() const {
}
Status FileReadableBlock::Size(uint64_t* sz) const {
- DCHECK(!closed_.Load());
+ DCHECK(!closed_);
RETURN_NOT_OK_HANDLE_ERROR(reader_->Size(sz));
*sz -= reader_->GetEncryptionHeaderSize();
@@ -531,7 +532,7 @@ Status FileReadableBlock::Read(uint64_t offset, Slice
result) const {
}
Status FileReadableBlock::ReadV(uint64_t offset, ArrayView<Slice> results)
const {
- DCHECK(!closed_.Load());
+ DCHECK(!closed_);
RETURN_NOT_OK_HANDLE_ERROR(reader_->ReadV(offset +
reader_->GetEncryptionHeaderSize(), results));
@@ -730,8 +731,8 @@ FileBlockManager::~FileBlockManager() {
}
Status FileBlockManager::Open(FsReport* report, MergeReport need_merage,
- std::atomic<int>* /* containers_processed */,
- std::atomic<int>* /* containers_total */) {
+ atomic<int>* /* containers_processed */,
+ atomic<int>* /* containers_total */) {
// Prepare the filesystem report and either return or log it.
FsReport local_report;
set<int> failed_dirs = dd_manager_->GetFailedDirs();
@@ -764,7 +765,7 @@ Status FileBlockManager::Open(FsReport* report, MergeReport
need_merage,
Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
unique_ptr<WritableBlock>* block) {
- CHECK(!opts_.read_only);
+ DCHECK(!opts_.read_only);
Dir* dir;
RETURN_NOT_OK_EVAL(dd_manager_->GetDirAddIfNecessary(opts, &dir),
@@ -788,14 +789,14 @@ Status FileBlockManager::CreateBlock(const
CreateBlockOptions& opts,
// If we failed to generate a unique ID, start trying again from a random
// part of the key space.
if (attempt_num++ > 0) {
- next_block_id_.Store(rand_.Next64());
+ next_block_id_ = rand_.Next64();
}
// Make sure we don't accidentally create a location using the magic
// invalid ID value.
BlockId id;
do {
- id.SetId(next_block_id_.Increment());
+ id.SetId(next_block_id_++);
} while (id.IsNull());
location = internal::FileBlockLocation::FromParts(dir, uuid_idx, id);
@@ -869,7 +870,7 @@ Status FileBlockManager::OpenBlock(const BlockId& block_id,
}
Status FileBlockManager::DeleteBlock(const BlockId& block_id) {
- CHECK(!opts_.read_only);
+ DCHECK(!opts_.read_only);
// Return early if deleting a block in a failed directory.
set<int> failed_dirs = dd_manager_->GetFailedDirs();
@@ -906,13 +907,13 @@ Status FileBlockManager::DeleteBlock(const BlockId&
block_id) {
}
unique_ptr<BlockCreationTransaction>
FileBlockManager::NewCreationTransaction() {
- CHECK(!opts_.read_only);
+ DCHECK(!opts_.read_only);
return unique_ptr<internal::FileBlockCreationTransaction>(
new internal::FileBlockCreationTransaction());
}
shared_ptr<BlockDeletionTransaction>
FileBlockManager::NewDeletionTransaction() {
- CHECK(!opts_.read_only);
+ DCHECK(!opts_.read_only);
return std::make_shared<internal::FileBlockDeletionTransaction>(this);
}
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index bafee8e3d..27ba6e531 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_FS_FILE_BLOCK_MANAGER_H
-#define KUDU_FS_FILE_BLOCK_MANAGER_H
+#pragma once
#include <atomic>
#include <cstdint>
@@ -29,7 +28,6 @@
#include "kudu/fs/error_manager.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
-#include "kudu/util/atomic.h"
#include "kudu/util/locks.h"
#include "kudu/util/random.h"
#include "kudu/util/status.h"
@@ -145,7 +143,7 @@ class FileBlockManager : public BlockManager {
// For generating block IDs.
ThreadSafeRandom rand_;
- AtomicInt<uint64_t> next_block_id_;
+ std::atomic<uint64_t> next_block_id_;
// Protects 'dirty_dirs_'.
mutable simple_spinlock lock_;
@@ -170,5 +168,3 @@ class FileBlockManager : public BlockManager {
} // namespace fs
} // namespace kudu
-
-#endif
diff --git a/src/kudu/fs/log_block_manager-test.cc
b/src/kudu/fs/log_block_manager-test.cc
index ce7fd97ee..2d93d19b6 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -18,6 +18,7 @@
#include "kudu/fs/log_block_manager.h"
#include <algorithm>
+#include <atomic>
#include <cstdint>
#include <cstdlib>
#include <cstring>
@@ -61,7 +62,6 @@
#include "kudu/gutil/strings/strip.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
-#include "kudu/util/atomic.h"
#include "kudu/util/env.h"
#include "kudu/util/file_cache.h"
#include "kudu/util/metrics.h"
@@ -628,7 +628,7 @@ TEST_P(LogBlockManagerTest, TestBumpBlockIds) {
// Simulate a complete reset of the block manager's block ID record, e.g.
// from restarting but with all the blocks gone.
- bm_->next_block_id_.Store(1);
+ bm_->next_block_id_ = 1;
// Now simulate being notified by some other component (e.g. tablet metadata)
// of the presence of a block ID.
@@ -651,7 +651,7 @@ TEST_P(LogBlockManagerTest, TestBumpBlockIds) {
TEST_P(LogBlockManagerTest, TestReuseBlockIds) {
// Typically, the LBM starts with a random block ID when running as a
// gtest. In this test, we want to control the block IDs.
- bm_->next_block_id_.Store(1);
+ bm_->next_block_id_ = 1;
vector<BlockId> block_ids;
@@ -689,7 +689,7 @@ TEST_P(LogBlockManagerTest, TestReuseBlockIds) {
// Reset the block ID sequence and re-create new blocks which should reuse
the same
// block IDs. This isn't allowed in current versions of Kudu, but older
versions
// could produce this situation, and we still need to handle it on startup.
- bm_->next_block_id_.Store(1);
+ bm_->next_block_id_ = 1;
for (int i = 0; i < 4; i++) {
unique_ptr<WritableBlock> writer;
ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 830bd1e78..e4654e5a1 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -65,6 +65,7 @@
#include "kudu/gutil/strings/util.h"
#include "kudu/gutil/walltime.h"
#include "kudu/util/alignment.h"
+#include "kudu/util/atomic-utils.h"
#include "kudu/util/array_view.h"
#include "kudu/util/env.h"
#include "kudu/util/fault_injection.h"
@@ -226,6 +227,7 @@ using kudu::fs::internal::LogWritableBlock;
using kudu::pb_util::ReadablePBContainerFile;
using kudu::pb_util::WritablePBContainerFile;
using std::accumulate;
+using std::atomic;
using std::map;
using std::optional;
using std::set;
@@ -601,23 +603,23 @@ class LogBlockContainer: public
RefCountedThreadSafe<LogBlockContainer> {
// Simple accessors.
LogBlockManager* block_manager() const { return block_manager_; }
const string& id() const { return id_; }
- int64_t next_block_offset() const { return next_block_offset_.Load(); }
- int64_t total_bytes() const { return total_bytes_.Load(); }
- int64_t total_blocks() const { return total_blocks_.Load(); }
- int64_t live_bytes() const { return live_bytes_.Load(); }
- int64_t live_bytes_aligned() const { return live_bytes_aligned_.Load(); }
- int64_t live_blocks() const { return live_blocks_.Load(); }
- int32_t blocks_being_written() const { return blocks_being_written_.Load(); }
+ int64_t next_block_offset() const { return next_block_offset_; }
+ int64_t total_bytes() const { return total_bytes_; }
+ int64_t total_blocks() const { return total_blocks_; }
+ int64_t live_bytes() const { return live_bytes_; }
+ int64_t live_bytes_aligned() const { return live_bytes_aligned_; }
+ int64_t live_blocks() const { return live_blocks_; }
+ int32_t blocks_being_written() const { return blocks_being_written_; }
virtual bool full() const { return data_full(); }
- bool dead() const { return dead_.Load(); }
+ bool dead() const { return dead_; }
const LogBlockManagerMetrics* metrics() const { return metrics_; }
Dir* data_dir() const { return data_dir_; }
const DirInstanceMetadataPB* instance() const { return
data_dir_->instance()->metadata(); }
// Adjusts the number of blocks being written.
// Positive means increase, negative means decrease.
- int32_t blocks_being_written_incr(int32_t value) {
- return blocks_being_written_.IncrementBy(value);
+ void blocks_being_written_incr(int32_t value) {
+ blocks_being_written_.fetch_add(value);
}
// Check that the container meets the death condition.
@@ -641,8 +643,11 @@ class LogBlockContainer: public
RefCountedThreadSafe<LogBlockContainer> {
//
// If successful, returns true; otherwise returns false.
bool TrySetDead() {
- if (dead()) return false;
- return dead_.CompareAndSet(false, true);
+ if (dead()) {
+ return false;
+ }
+ bool is_dead = false;
+ return dead_.compare_exchange_strong(is_dead, true);
}
// Some work will be triggered after blocks have been removed from this
container successfully.
@@ -718,29 +723,29 @@ class LogBlockContainer: public
RefCountedThreadSafe<LogBlockContainer> {
shared_ptr<RWFile> data_file_;
// The offset of the next block to be written to the container.
- AtomicInt<int64_t> next_block_offset_;
+ atomic<int64_t> next_block_offset_;
// The amount of data (post block alignment) written thus far to the
container.
- AtomicInt<int64_t> total_bytes_;
+ atomic<int64_t> total_bytes_;
// The number of blocks written thus far in the container.
- AtomicInt<int64_t> total_blocks_;
+ atomic<int64_t> total_blocks_;
// The amount of data present in not-yet-deleted blocks of the container.
- AtomicInt<int64_t> live_bytes_;
+ atomic<int64_t> live_bytes_;
// The amount of data (post block alignment) present in not-yet-deleted
// blocks of the container.
- AtomicInt<int64_t> live_bytes_aligned_;
+ atomic<int64_t> live_bytes_aligned_;
// The number of not-yet-deleted blocks in the container.
- AtomicInt<int64_t> live_blocks_;
+ atomic<int64_t> live_blocks_;
// The number of LogWritableBlocks currently open for this container.
- AtomicInt<int32_t> blocks_being_written_;
+ atomic<int32_t> blocks_being_written_;
// Whether or not this container has been marked as dead.
- AtomicBool dead_;
+ atomic<bool> dead_;
// The metrics. Not owned by the log container; it has the same lifespan
// as the block manager.
@@ -895,7 +900,7 @@ class LogBlockContainerNativeMeta final : public
LogBlockContainer {
unique_ptr<WritablePBContainerFile> metadata_file_;
// TODO(yingchun): add metadata bytes for metadata.
- // AtomicInt<int64_t> metadata_bytes_;
+ // atomic<int64_t> metadata_bytes_;
DISALLOW_COPY_AND_ASSIGN(LogBlockContainerNativeMeta);
};
@@ -1021,9 +1026,9 @@ LogBlockContainer::LogBlockContainer(
if (auto encryption_header_size = data_file_->GetEncryptionHeaderSize();
encryption_header_size > 0) {
UpdateNextBlockOffset(0, encryption_header_size);
- live_bytes_.Store(encryption_header_size);
- total_bytes_.Store(next_block_offset_.Load());
- live_bytes_aligned_.Store(next_block_offset_.Load());
+ live_bytes_ = encryption_header_size;
+ total_bytes_ = next_block_offset_.load();
+ live_bytes_aligned_ = next_block_offset_.load();
}
}
@@ -1132,8 +1137,8 @@ void LogBlockContainerNativeMeta::CompactMetadata() {
VLOG(1) << "Compacted metadata file " << ToString()
<< " (saved " << file_bytes_delta << " bytes)";
- total_blocks_.Store(live_blocks.size());
- live_blocks_.Store(live_blocks.size());
+ total_blocks_ = live_blocks.size();
+ live_blocks_ = live_blocks.size();
}
#define RETURN_NOT_OK_CONTAINER_DISK_FAILURE(status_expr) do { \
@@ -1785,7 +1790,7 @@ void LogBlockContainer::UpdateNextBlockOffset(int64_t
block_offset, int64_t bloc
int64_t new_next_block_offset = KUDU_ALIGN_UP(
block_offset + block_length,
instance()->filesystem_block_size_bytes());
- next_block_offset_.StoreMax(new_next_block_offset);
+ AtomicStoreMax(next_block_offset_, new_next_block_offset);
if (data_full()) {
VLOG(1) << Substitute(
@@ -1833,19 +1838,19 @@ vector<BlockRecordPB> LogBlockContainer::SortRecords(
void LogBlockContainer::BlockCreated(const LogBlockRefPtr& block) {
DCHECK_GE(block->offset(), 0);
- total_bytes_.IncrementBy(block->fs_aligned_length());
- total_blocks_.Increment();
- live_bytes_.IncrementBy(block->length());
- live_bytes_aligned_.IncrementBy(block->fs_aligned_length());
- live_blocks_.Increment();
+ total_bytes_.fetch_add(block->fs_aligned_length());
+ total_blocks_++;
+ live_bytes_.fetch_add(block->length());
+ live_bytes_aligned_.fetch_add(block->fs_aligned_length());
+ live_blocks_++;
}
void LogBlockContainer::BlockDeleted(const LogBlockRefPtr& block) {
DCHECK_GE(block->offset(), 0);
- live_bytes_.IncrementBy(-block->length());
- live_bytes_aligned_.IncrementBy(-block->fs_aligned_length());
- live_blocks_.IncrementBy(-1);
+ live_bytes_.fetch_sub(block->length());
+ live_bytes_aligned_.fetch_sub(block->fs_aligned_length());
+ live_blocks_--;
}
void LogBlockContainer::ExecClosure(const std::function<void()>& task) {
@@ -2639,7 +2644,7 @@ class LogReadableBlock : public ReadableBlock {
// Whether or not this block has been closed. Close() is thread-safe, so
// this must be an atomic primitive.
- AtomicBool closed_;
+ atomic<bool> closed_;
DISALLOW_COPY_AND_ASSIGN(LogReadableBlock);
};
@@ -2659,7 +2664,8 @@ LogReadableBlock::~LogReadableBlock() {
}
Status LogReadableBlock::Close() {
- if (closed_.CompareAndSet(false, true)) {
+ bool is_closed = false;
+ if (closed_.compare_exchange_strong(is_closed, true)) {
if (log_block_->container()->metrics()) {
log_block_->container()->metrics()->generic_metrics.blocks_open_reading->Decrement();
}
@@ -2678,7 +2684,7 @@ const BlockId& LogReadableBlock::id() const {
}
Status LogReadableBlock::Size(uint64_t* sz) const {
- DCHECK(!closed_.Load());
+ DCHECK(!closed_);
*sz = log_block_->length();
return Status::OK();
@@ -2689,7 +2695,7 @@ Status LogReadableBlock::Read(uint64_t offset, Slice
result) const {
}
Status LogReadableBlock::ReadV(uint64_t offset, ArrayView<Slice> results)
const {
- DCHECK(!closed_.Load());
+ DCHECK(!closed_);
size_t read_length = accumulate(results.begin(), results.end(),
static_cast<size_t>(0),
[&](int sum, const Slice& curr) {
@@ -2780,8 +2786,7 @@ LogBlockManager::LogBlockManager(Env* env,
// block ID 1, we'll start with a random block ID. A collision is still
// possible, but exceedingly unlikely.
if (IsGTest()) {
- Random r(GetRandomSeed32());
- next_block_id_.Store(r.Next64());
+ next_block_id_ = Random(GetRandomSeed32()).Next64();
}
if (opts_.metric_entity) {
@@ -3013,7 +3018,7 @@ Status LogBlockManager::CreateBlock(const
CreateBlockOptions& opts,
// and thus we may have to "skip over" some block IDs that are claimed.
BlockId new_block_id;
do {
- new_block_id.SetId(next_block_id_.Increment());
+ new_block_id.SetId(next_block_id_++);
} while (!TryUseBlockId(new_block_id));
block->reset(new LogWritableBlock(container,
@@ -3078,7 +3083,7 @@ Status LogBlockManager::GetAllBlockIds(vector<BlockId>*
block_ids) {
}
void LogBlockManager::NotifyBlockId(BlockId block_id) {
- next_block_id_.StoreMax(block_id.id() + 1);
+ AtomicStoreMax(next_block_id_, block_id.id() + 1);
}
void LogBlockManager::AddNewContainerUnlocked(const LogBlockContainerRefPtr&
container) {
@@ -3583,7 +3588,7 @@ void LogBlockManager::LoadContainer(Dir* dir,
result->report.stats.live_block_count += container->live_blocks();
result->report.stats.lbm_container_count++;
- next_block_id_.StoreMax(max_block_id + 1);
+ AtomicStoreMax(next_block_id_, max_block_id + 1);
int64_t mem_usage = 0;
for (UntrackedBlockMap::value_type& e : live_blocks) {
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 2a6d1ae8c..352c6ad67 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -40,7 +40,6 @@
#include "kudu/fs/fs.pb.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
-#include "kudu/util/atomic.h"
#include "kudu/util/locks.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/oid_generator.h"
@@ -517,7 +516,7 @@ class LogBlockManager : public BlockManager {
ObjectIdGenerator oid_generator_;
// For generating block IDs.
- AtomicInt<uint64_t> next_block_id_;
+ std::atomic<uint64_t> next_block_id_;
// Metrics for the block manager.
//
diff --git a/src/kudu/util/atomic-utils.h b/src/kudu/util/atomic-utils.h
new file mode 100644
index 000000000..2b1c51a81
--- /dev/null
+++ b/src/kudu/util/atomic-utils.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <atomic>
+
+namespace kudu {
+
+// TODO(aserbin): remove this and replace its usage at call sites with
+// std::fetch_max() when switching to C++26
+template<typename T>
+inline void AtomicStoreMax(std::atomic<T>& val, T new_val) {
+ do {
+ T old_val = val.load(std::memory_order_relaxed);
+ T max_val = std::max(old_val, new_val);
+ T prev_val = old_val;
+ if (val.compare_exchange_weak(prev_val,
+ max_val,
+ std::memory_order_release,
+ std::memory_order_relaxed)) {
+ return;
+ }
+ old_val = prev_val;
+ } while (true);
+}
+
+// TODO(aserbin): remove this and replace its usage at call sites with
+// std::fetch_min() when switching to C++26
+template<typename T>
+inline void AtomicStoreMin(std::atomic<T>& val, T new_val) {
+ do {
+ T old_val = val.load(std::memory_order_relaxed);
+ T min_val = std::min(old_val, new_val);
+ T prev_val = old_val;
+ if (val.compare_exchange_weak(prev_val,
+ min_val,
+ std::memory_order_release,
+ std::memory_order_relaxed)) {
+ return;
+ }
+ old_val = prev_val;
+ } while (true);
+}
+
+} // namespace kudu