This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 5405d06ee [rpc] remove last vestiges of chromium Atomics from RPC
5405d06ee is described below
commit 5405d06eeccaf1b0eb559e4a531f48160141ed16
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Jun 13 12:49:58 2024 -0700
[rpc] remove last vestiges of chromium Atomics from RPC
Change-Id: I6039b4a08615339c8f06a4d215a0b1058a9bacea
Reviewed-on: http://gerrit.cloudera.org:8080/21513
Tested-by: Kudu Jenkins
Reviewed-by: Marton Greber <[email protected]>
---
src/kudu/rpc/proxy.cc | 6 +++---
src/kudu/rpc/proxy.h | 4 ++--
src/kudu/rpc/rpc-bench.cc | 15 +++++++--------
src/kudu/rpc/rpc_stub-test.cc | 17 +++++------------
4 files changed, 17 insertions(+), 25 deletions(-)
diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index 2ec1244f0..8033343b8 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -203,7 +203,7 @@ void Proxy::AsyncRequest(const string& method,
RpcController* controller,
const ResponseCallback& callback) {
DCHECK(!controller->call_) << "Controller should be reset";
- base::subtle::NoBarrier_Store(&is_started_, true);
+ is_started_.store(true, std::memory_order_relaxed);
// TODO(awong): it would be great if we didn't have to heap allocate the
// payload.
auto req_payload = RequestPayload::CreateRequestPayload(
@@ -269,13 +269,13 @@ Status Proxy::SyncRequest(const string& method,
}
void Proxy::set_user_credentials(UserCredentials user_credentials) {
- DCHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
+ DCHECK(is_started_.load(std::memory_order_relaxed) == false)
<< "illegal to call set_user_credentials() after request processing has
started";
conn_id_.set_user_credentials(std::move(user_credentials));
}
void Proxy::set_network_plane(string network_plane) {
- DCHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
+ DCHECK(is_started_.load(std::memory_order_relaxed) == false)
<< "illegal to call set_network_plane() after request processing has
started";
conn_id_.set_network_plane(std::move(network_plane));
}
diff --git a/src/kudu/rpc/proxy.h b/src/kudu/rpc/proxy.h
index cb822057d..5e7d93c38 100644
--- a/src/kudu/rpc/proxy.h
+++ b/src/kudu/rpc/proxy.h
@@ -16,12 +16,12 @@
// under the License.
#pragma once
+#include <atomic>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
-#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/macros.h"
#include "kudu/rpc/connection_id.h"
#include "kudu/rpc/outbound_call.h"
@@ -189,7 +189,7 @@ class Proxy {
mutable simple_spinlock lock_;
ConnectionId conn_id_;
- mutable Atomic32 is_started_;
+ std::atomic<bool> is_started_;
DISALLOW_COPY_AND_ASSIGN(Proxy);
};
diff --git a/src/kudu/rpc/rpc-bench.cc b/src/kudu/rpc/rpc-bench.cc
index ca42fd8ef..145a24b2f 100644
--- a/src/kudu/rpc/rpc-bench.cc
+++ b/src/kudu/rpc/rpc-bench.cc
@@ -31,7 +31,6 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
-#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
@@ -96,8 +95,8 @@ class RpcBench : public RpcTestBase {
public:
RpcBench()
: should_run_(true),
- stop_(0)
- {}
+ stop_(0) {
+ }
void SetUp() override {
RpcTestBase::SetUp();
@@ -149,7 +148,7 @@ class RpcBench : public RpcTestBase {
friend class ClientAsyncWorkload;
Sockaddr server_addr_;
- Atomic32 should_run_;
+ atomic<bool> should_run_;
CountDownLatch stop_;
};
@@ -176,7 +175,7 @@ class ClientThread {
AddRequestPB req;
AddResponsePB resp;
- while (Acquire_Load(&bench_->should_run_)) {
+ while (bench_->should_run_) {
req.set_x(request_count_);
req.set_y(request_count_);
RpcController controller;
@@ -205,7 +204,7 @@ TEST_F(RpcBench, BenchmarkCalls) {
}
SleepFor(MonoDelta::FromSeconds(FLAGS_run_seconds));
- Release_Store(&should_run_, false);
+ should_run_ = false;
int total_reqs = 0;
@@ -233,7 +232,7 @@ class ClientAsyncWorkload {
CHECK_OK(controller_.status());
CHECK_EQ(req_.x() + req_.y(), resp_.result());
}
- if (!Acquire_Load(&bench_->should_run_)) {
+ if (!bench_->should_run_) {
bench_->stop_.CountDown();
return;
}
@@ -287,7 +286,7 @@ TEST_F(RpcBench, BenchmarkCallsAsync) {
}
SleepFor(MonoDelta::FromSeconds(FLAGS_run_seconds));
- Release_Store(&should_run_, false);
+ should_run_ = false;
sw.stop();
diff --git a/src/kudu/rpc/rpc_stub-test.cc b/src/kudu/rpc/rpc_stub-test.cc
index 2454d0d2f..aaaa9644c 100644
--- a/src/kudu/rpc/rpc_stub-test.cc
+++ b/src/kudu/rpc/rpc_stub-test.cc
@@ -18,6 +18,7 @@
#include <algorithm>
#include <atomic>
#include <csignal>
+#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
@@ -35,7 +36,6 @@
#include <glog/stl_logging.h>
#include <gtest/gtest.h>
-#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/proxy.h"
#include "kudu/rpc/rpc-test-base.h"
@@ -75,7 +75,6 @@ using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
-using base::subtle::NoBarrier_Load;
namespace kudu {
namespace rpc {
@@ -302,12 +301,6 @@ TEST_F(RpcStubTest, TestCallWithInvalidParam) {
"missing fields: y");
}
-// Wrapper around AtomicIncrement, since AtomicIncrement returns the 'old'
-// value, and our callback needs to be a void function.
-static void DoIncrement(Atomic32* count) {
- base::subtle::Barrier_AtomicIncrement(count, 1);
-}
-
// Test sending a PB parameter with a missing field on the client side.
// This also ensures that the async callback is only called once
// (regression test for a previously-encountered bug).
@@ -319,13 +312,13 @@ TEST_F(RpcStubTest, TestCallWithMissingPBFieldClientSide)
{
req.set_x(10);
// Request is missing the 'y' field.
AddResponsePB resp;
- Atomic32 callback_count = 0;
- p.AddAsync(req, &resp, &controller, [&callback_count]() {
DoIncrement(&callback_count); });
- while (NoBarrier_Load(&callback_count) == 0) {
+ std::atomic<uint32_t> callback_count(0);
+ p.AddAsync(req, &resp, &controller, [&callback_count]() { ++callback_count;
});
+ while (callback_count == 0) {
SleepFor(MonoDelta::FromMicroseconds(10));
}
SleepFor(MonoDelta::FromMicroseconds(100));
- ASSERT_EQ(1, NoBarrier_Load(&callback_count));
+ ASSERT_EQ(1, callback_count);
ASSERT_STR_CONTAINS(controller.status().ToString(),
"Invalid argument: invalid parameter for call "
"kudu.rpc_test.CalculatorService.Add: missing fields:
y");