This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 83e5a73e2 [util] introduce ScopedCleanup::run()
83e5a73e2 is described below
commit 83e5a73e2f6ac42925a7a97a203644438cffe6da
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Jan 30 12:22:33 2025 -0800
[util] introduce ScopedCleanup::run()
In many test scenarios, the code run by ScopedCleanup instance was
duplicated at call sites of the ScopedCleanup::cancel() method. This
patch introduces ScopedCleanup::run() method that invokes the registered
cleanup functor to use in such cases and also updates corresponding
code to call this new method instead. Unit tests to cover the newly
introduced functionality are added as well.
Change-Id: Ibfd24323f35e3504f05f3cf875098637853c8c3c
Reviewed-on: http://gerrit.cloudera.org:8080/22426
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Yifan Zhang <[email protected]>
---
src/kudu/client/client-test.cc | 3 +-
.../integration-tests/exactly_once_writes-itest.cc | 3 +-
src/kudu/integration-tests/raft_consensus-itest.cc | 3 +-
.../integration-tests/txn_participant-itest.cc | 10 ++--
.../integration-tests/txn_status_manager-itest.cc | 7 ++-
src/kudu/integration-tests/txn_write_ops-itest.cc | 12 ++---
src/kudu/master/dynamic_multi_master-test.cc | 6 +--
src/kudu/rpc/rpc-test.cc | 3 +-
src/kudu/tablet/diff_scan-test.cc | 5 +-
src/kudu/tools/rebalancer_tool-test.cc | 16 ++----
src/kudu/util/net/dns_resolver-test.cc | 4 --
src/kudu/util/net/socket-test.cc | 3 +-
src/kudu/util/scoped_cleanup-test.cc | 60 +++++++++++++++++++++-
src/kudu/util/scoped_cleanup.h | 13 ++++-
14 files changed, 93 insertions(+), 55 deletions(-)
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 9f4162230..e2ce4aebc 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -3665,8 +3665,7 @@ TEST_F(ClientTest, TestWriteWhileRestarting) {
ts->Shutdown();
ASSERT_OK(ts->Restart());
stop.CountDown();
- thread_joiner.cancel();
- t.join();
+ thread_joiner.run();
// The writer thread should have hit no issues.
ASSERT_OK(writer_error);
diff --git a/src/kudu/integration-tests/exactly_once_writes-itest.cc
b/src/kudu/integration-tests/exactly_once_writes-itest.cc
index 9dece4513..5c5e89da8 100644
--- a/src/kudu/integration-tests/exactly_once_writes-itest.cc
+++ b/src/kudu/integration-tests/exactly_once_writes-itest.cc
@@ -237,8 +237,7 @@ void
ExactlyOnceSemanticsITest::DoTestWritesWithExactlyOnceSemantics(
while (true) {
if (!threads_running.count()) {
- thread_join_func();
- thread_cleanup.cancel();
+ thread_cleanup.run();
break;
}
if (allow_crashes) {
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc
b/src/kudu/integration-tests/raft_consensus-itest.cc
index 1d5c95185..d6e92f5c5 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -1758,8 +1758,7 @@ TEST_F(RaftConsensusITest, TestConfigChangeUnderLoad) {
LOG(INFO) << "Joining writer threads...";
finish.Store(true);
- thread_join_func();
- thread_joiner.cancel();
+ thread_joiner.run();
LOG(INFO) << "Waiting for replicas to agree...";
// Wait for all servers to replicate everything up through the last write op.
diff --git a/src/kudu/integration-tests/txn_participant-itest.cc
b/src/kudu/integration-tests/txn_participant-itest.cc
index 6a9ea7c21..ab07674e8 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -23,6 +23,7 @@
#include <optional>
#include <string>
#include <thread>
+#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
@@ -47,7 +48,6 @@
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
-#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/mvcc.h"
@@ -1223,9 +1223,7 @@ TEST_F(TxnParticipantITest,
TestTxnSystemClientSucceedsOnBootstrap) {
ASSERT_OK(ts->server()->tablet_manager()->WaitForAllBootstrapsToFinish());
SleepFor(MonoDelta::FromMilliseconds(500));
}
- stop.CountDown();
- thread_joiner.cancel();
- t.join();
+ thread_joiner.run();
// None of our transactions should have failed.
ASSERT_OK(client_error);
@@ -1276,9 +1274,7 @@ TEST_F(TxnParticipantITest,
TestTxnSystemClientRetriesWhenReplicaNotFound) {
ASSERT_OK(r->WaitUntilConsensusRunning(kDefaultTimeout));
});
- stop.CountDown();
- thread_joiner.cancel();
- t.join();
+ thread_joiner.run();
// None of our transactions should have failed.
ASSERT_OK(client_error);
diff --git a/src/kudu/integration-tests/txn_status_manager-itest.cc
b/src/kudu/integration-tests/txn_status_manager-itest.cc
index d0019e1e3..5735bc3e2 100644
--- a/src/kudu/integration-tests/txn_status_manager-itest.cc
+++ b/src/kudu/integration-tests/txn_status_manager-itest.cc
@@ -22,13 +22,14 @@
#include <ostream>
#include <string>
#include <thread>
+#include <type_traits>
#include <unordered_map>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
-#include "kudu/client/client.h"
+#include "kudu/client/client.h" // IWYU pragma: keep
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/gutil/map-util.h"
@@ -440,9 +441,7 @@ TEST_F(TxnStatusManagerITest,
TxnKeepAliveMultiTxnStatusManagerInstances) {
NO_FATALS(CheckTxnState(txn_id, TxnStatePB::OPEN));
- latch.CountDown();
- txn_keepalive_sender.join();
- cleanup.cancel();
+ cleanup.run();
// An extra sanity check: make sure the recent keepalive requests were sent
// successfully, as expected.
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc
b/src/kudu/integration-tests/txn_write_ops-itest.cc
index 608fbdb22..5a4bf76d2 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -660,8 +660,7 @@ TEST_F(TxnWriteOpsITest, FrequentElections) {
max_sleep_ms = std::min(max_sleep_ms * 1.1, 1000.0);
}
}
- std::for_each(writers.begin(), writers.end(), [](thread& t) { t.join(); });
- cleanup.cancel();
+ cleanup.run();
NO_FATALS(cluster_->AssertNoCrashes());
for (auto& txn : transactions) {
@@ -1939,8 +1938,7 @@ TEST_F(TxnOpDispatcherITest,
RollbackWriteOpPendingParticipantNotYetRegistered)
const auto row_status = GetSingleRowError(session.get());
ASSERT_TRUE(row_status.IsIllegalState()) << s.ToString();
- rollback.join();
- cleanup.cancel();
+ cleanup.run();
ASSERT_OK(rollback_status);
bool is_complete = false;
@@ -1993,8 +1991,7 @@ TEST_F(TxnOpDispatcherITest,
RollbackWriteOpPendingParticipantRegistered) {
Status s = InsertRows(txn.get(), 1, &key, &session);
ASSERT_TRUE(s.IsIOError()) << s.ToString();
- rollback.join();
- cleanup.cancel();
+ cleanup.run();
ASSERT_OK(rollback_status);
bool is_complete = false;
@@ -2055,8 +2052,7 @@ TEST_F(TxnOpDispatcherITest, TxnWriteWhileReplicaDeleted)
{
// registered.
ASSERT_GE(1, GetTxnOpDispatchersTotalCount(replicas));
- cleanup.cancel();
- tablet_deleter.join();
+ cleanup.run();
ASSERT_OK(tablet_delete_status);
}
diff --git a/src/kudu/master/dynamic_multi_master-test.cc
b/src/kudu/master/dynamic_multi_master-test.cc
index d3c9e0a55..2a866e323 100644
--- a/src/kudu/master/dynamic_multi_master-test.cc
+++ b/src/kudu/master/dynamic_multi_master-test.cc
@@ -1689,11 +1689,7 @@ TEST_F(AutoAddMasterTest, TestAddWithOnGoingDdl) {
ASSERT_OK(cluster_->WaitForTabletServerCount(cluster_->num_tablet_servers(),
MonoDelta::FromSeconds(5)));
}
- proceed = false;
- thread_joiner.cancel();
- for (auto& t : threads) {
- t.join();
- }
+ thread_joiner.run();
for (const auto& e : errors) {
// NOTE: the table may exist if the CreateTable call is retried.
if (e.ok() || e.IsTimedOut() || e.IsAlreadyPresent()) {
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index c14de85a6..ba5b04623 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -631,8 +631,7 @@ TEST_P(TestRpc, TestClientConnectionMetrics) {
#endif
// Unblock all of the calls and wait for them to finish.
- latch.Wait();
- cleanup.cancel();
+ cleanup.run();
// Verify that all the RPCs have finished.
for (const auto& controller : controllers) {
diff --git a/src/kudu/tablet/diff_scan-test.cc
b/src/kudu/tablet/diff_scan-test.cc
index 903bb0ef4..f6c848d26 100644
--- a/src/kudu/tablet/diff_scan-test.cc
+++ b/src/kudu/tablet/diff_scan-test.cc
@@ -23,6 +23,7 @@
#include <string>
#include <thread>
#include <tuple>
+#include <type_traits>
#include <utility>
#include <vector>
@@ -38,7 +39,6 @@
#include "kudu/tablet/tablet-harness.h"
#include "kudu/tablet/tablet-test-base.h"
#include "kudu/tablet/tablet-test-util.h"
-#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/util/monotime.h"
#include "kudu/util/scoped_cleanup.h"
@@ -235,8 +235,7 @@ TEST_F(OrderedDiffScanWithDeletesTest,
TestDiffScanAfterDeltaFlushRacesWithBatch
t.join();
});
ASSERT_OK(UpdateTestRow(&writer, kRowKey, 0, 10000));
- t.join();
- thread_joiner.cancel();
+ thread_joiner.run();
ASSERT_OK(s);
ASSERT_OK(tablet->mvcc_manager()->WaitForApplyingOpsToApply());
diff --git a/src/kudu/tools/rebalancer_tool-test.cc
b/src/kudu/tools/rebalancer_tool-test.cc
index 938a07852..b065bdee7 100644
--- a/src/kudu/tools/rebalancer_tool-test.cc
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -1027,9 +1027,7 @@ TEST_F(IgnoredTserverGoesDownDuringRebalancingTest,
TserverDown) {
}
}
- run = false;
- stopper.join();
- stopper_cleanup.cancel();
+ stopper_cleanup.run();
NO_FATALS(cluster_->AssertNoCrashes());
}
@@ -1389,9 +1387,7 @@ TEST_P(TserverGoesDownDuringRebalancingTest, TserverDown)
{
"unacceptable health status UNAVAILABLE");
}
- run = false;
- stopper.join();
- stopper_cleanup.cancel();
+ stopper_cleanup.run();
ASSERT_OK(cluster_->tablet_server(shutdown_tserver_idx)->Restart());
NO_FATALS(cluster_->AssertNoCrashes());
@@ -1450,9 +1446,7 @@ TEST_P(TserverAddedDuringRebalancingTest, TserverStarts) {
// It's time to sneak in and add new tablet server.
ASSERT_OK(cluster_->AddTabletServer());
- run = false;
- runner.join();
- runner_cleanup.cancel();
+ runner_cleanup.run();
// The rebalancer should not fail, and eventually, after a new tablet server
// is added, the cluster should become balanced.
@@ -1584,9 +1578,7 @@ TEST_P(RebalancingDuringElectionStormTest, RoundRobin) {
}
}
- elector_run = false;
- elector.join();
- elector_cleanup.cancel();
+ elector_cleanup.run();
#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
for (auto& workload : workloads) {
workload->StopAndJoin();
diff --git a/src/kudu/util/net/dns_resolver-test.cc
b/src/kudu/util/net/dns_resolver-test.cc
index b861c68e1..80f17423f 100644
--- a/src/kudu/util/net/dns_resolver-test.cc
+++ b/src/kudu/util/net/dns_resolver-test.cc
@@ -143,10 +143,6 @@ TEST(DnsResolverTest, ConcurrentRefreshesAndResolutions) {
NO_FATALS(validate_addrs(addrs));
}
});
- for (auto& t : threads) {
- t.join();
- }
- cancel_threads.cancel();
}
TEST(DnsResolverTest, CachingVsNonCachingResolver) {
diff --git a/src/kudu/util/net/socket-test.cc b/src/kudu/util/net/socket-test.cc
index e67f956fa..30e8947c8 100644
--- a/src/kudu/util/net/socket-test.cc
+++ b/src/kudu/util/net/socket-test.cc
@@ -190,8 +190,7 @@ class SocketTest : public KuduTest {
char buf[kData.size()];
ASSERT_OK(client.BlockingRecv(reinterpret_cast<uint8_t*>(buf),
kData.size(), &n,
MonoTime::Now() +
MonoDelta::FromSeconds(5)));
- cleanup.cancel();
- t.join();
+ cleanup.run();
ASSERT_OK(client.Close());
diff --git a/src/kudu/util/scoped_cleanup-test.cc
b/src/kudu/util/scoped_cleanup-test.cc
index 2e7770547..3599971a2 100644
--- a/src/kudu/util/scoped_cleanup-test.cc
+++ b/src/kudu/util/scoped_cleanup-test.cc
@@ -41,7 +41,6 @@ TEST(ScopedCleanup, TestCleanupMacro) {
ASSERT_EQ(0, var);
}
-
TEST(ScopedCleanup, TestCancelCleanup) {
int var = 0;
{
@@ -53,4 +52,63 @@ TEST(ScopedCleanup, TestCancelCleanup) {
ASSERT_EQ(42, var);
}
+TEST(ScopedCleanup, ExplicitRun) {
+ int var = 0;
+ {
+ auto saved = var;
+ auto cleanup = MakeScopedCleanup([&] () { var = saved; });
+ var = 42;
+ cleanup.run();
+ ASSERT_EQ(0, var);
+
+ // Set 'saved' to 100 to distinguish between invoking the cleanup function
+ // by the destructor and the explicit 'run' call.
+ saved = 100;
+ }
+ // The function call in the destructor of the 'cleanup' shouldn't fire
+ // after explicitly calling ScopedCleanup::run().
+ ASSERT_EQ(0, var);
+}
+
+TEST(ScopedCleanup, CancelAndRun) {
+ int var = 0;
+ {
+ auto saved = var;
+ auto cleanup = MakeScopedCleanup([&] () { var = saved; });
+ var = 42;
+ cleanup.cancel();
+ cleanup.run();
+ ASSERT_EQ(42, var);
+
+ // Set 'saved' to 100 to distinguish between invoking the cleanup function
+ // by the destructor and the explicit 'run' call.
+ saved = 100;
+ }
+ // The function call in the destructor of the 'cleanup' shouldn't have fired.
+ ASSERT_EQ(42, var);
+}
+
+TEST(ScopedCleanup, ExplicitRunTwice) {
+ int var = 0;
+ {
+ auto saved = var;
+ auto cleanup = MakeScopedCleanup([&] () { var = saved; });
+ var = 42;
+ cleanup.run();
+ ASSERT_EQ(0, var);
+
+ // After explicitly running once, the cleanup function isn't called
+ // upon subsequent invocations of ScopedCleanup::run().
+ saved = 1;
+ cleanup.run();
+ ASSERT_EQ(0, var);
+
+ // Set 'saved' to 100 to distinguish between invoking the cleanup function
+ // by the destructor and the explicit 'run' call.
+ saved = 100;
+ }
+ // The function call in the destructor of the 'cleanup' shouldn't have fired.
+ ASSERT_EQ(0, var);
+}
+
} // namespace kudu
diff --git a/src/kudu/util/scoped_cleanup.h b/src/kudu/util/scoped_cleanup.h
index ea94acef9..4456d5175 100644
--- a/src/kudu/util/scoped_cleanup.h
+++ b/src/kudu/util/scoped_cleanup.h
@@ -40,7 +40,7 @@ namespace kudu {
//
// Use 'MakeScopedCleanup()' below to instantiate.
template<typename F>
-class ScopedCleanup {
+class ScopedCleanup final {
public:
explicit ScopedCleanup(F f)
: cancelled_(false),
@@ -51,8 +51,19 @@ class ScopedCleanup {
f_();
}
}
+
+ // Cancel the cleanup.
void cancel() { cancelled_ = true; }
+ // Explicitly run the cleanup function. Once run, no cleanup is performed
+ // in the destructor.
+ void run() {
+ if (!cancelled_) {
+ f_();
+ cancelled_ = true;
+ }
+ }
+
private:
bool cancelled_;
F f_;