This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch branch-1.17.x
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/branch-1.17.x by this push:
new be5c3f520 KUDU-3620 fix heap-use-after-free and undefined behavior in
OpDriver
be5c3f520 is described below
commit be5c3f5201da71e9991c4dd4e819ec3d1512bf51
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Oct 16 18:19:39 2024 -0700
KUDU-3620 fix heap-use-after-free and undefined behavior in OpDriver
This patch fixes a long-standing heap-use-after-free issue in OpDriver.
The problem is addressed by supplying a valid reference to OpDriver
objects for callbacks to make sure objects aren't destroyed while
their methods are being invoked.
The reference-counting wrapper for OpDriver objects changed from
scoped_refptr to std::shared_ptr. It allows for passing weak pointers
(std::weak_ptr) instead of raw pointers to callbacks where reference
counting cycle was suspected (left the clarification of the latter as
TODOs), so a valid shared pointer can be constructed before invoking any
of the required OpDriver's methods.
I also updated the signature of OpTracker::GetPendingOps() to get rid of
output parameter.
Before this patch, running the following against Kudu bits built in
ASAN configuration would trigger ASAN and UBSAN warnings once in about
150 runs:
./ts_recovery-itest \
--gtest_filter='DifferentFaultPoints/Kudu969Test.Test/*' \
--gtest_repeat=10000
With this patch, more than 2K iterations of the same succeeded without
producing a single ASAN/UBSAN issue report.
Change-Id: Iadc58f1724bc03373a7e165fd3798b8868dc9d29
Reviewed-on: http://gerrit.cloudera.org:8080/21948
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Zoltan Martonka <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
(cherry picked from commit c51e1354322a4d2cad268de26f254dd52b0c9df1)
Conflicts:
src/kudu/tablet/ops/op_driver.cc
src/kudu/tablet/ops/op_tracker.cc
src/kudu/tablet/tablet_replica.cc
src/kudu/tablet/tablet_replica.h
Reviewed-on: http://gerrit.cloudera.org:8080/21960
Tested-by: Kudu Jenkins
---
src/kudu/tablet/ops/op_driver.cc | 44 +++++++++++++--------
src/kudu/tablet/ops/op_driver.h | 28 ++++++-------
src/kudu/tablet/ops/op_tracker-test.cc | 41 ++++++++++----------
src/kudu/tablet/ops/op_tracker.cc | 29 ++++++--------
src/kudu/tablet/ops/op_tracker.h | 26 +++++++------
src/kudu/tablet/tablet_replica-test.cc | 4 +-
src/kudu/tablet/tablet_replica.cc | 69 +++++++++++++++++++--------------
src/kudu/tablet/tablet_replica.h | 4 +-
src/kudu/tablet/txn_participant-test.cc | 6 ++-
9 files changed, 137 insertions(+), 114 deletions(-)
diff --git a/src/kudu/tablet/ops/op_driver.cc b/src/kudu/tablet/ops/op_driver.cc
index b3e1aa740..558f58049 100644
--- a/src/kudu/tablet/ops/op_driver.cc
+++ b/src/kudu/tablet/ops/op_driver.cc
@@ -62,6 +62,8 @@ using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::RequestIdPB;
using kudu::rpc::ResultTracker;
using std::string;
+using std::shared_ptr;
+using std::weak_ptr;
using std::unique_ptr;
using strings::Substitute;
@@ -141,6 +143,7 @@ Status OpDriver::Init(unique_ptr<Op> op,
}
op_ = std::move(op);
+ auto self = shared_from_this();
if (type == consensus::FOLLOWER) {
std::lock_guard<simple_spinlock> lock(opid_lock_);
op_id_copy_ = op_->state()->op_id();
@@ -171,14 +174,26 @@ Status OpDriver::Init(unique_ptr<Op> op,
unique_ptr<ReplicateMsg> replicate_msg;
op_->NewReplicateMsg(&replicate_msg);
if (consensus_) { // sometimes NULL in tests
- // A raw pointer is required to avoid a refcount cycle.
+ // A weak pointer is required to avoid a refcount cycle.
+ // TODO(aserbin): is the comment above still relevant?
+ weak_ptr<OpDriver> wp(self);
mutable_state()->set_consensus_round(
- consensus_->NewRound(std::move(replicate_msg),
- [this](const Status& s) {
this->ReplicationFinished(s); }));
+ consensus_->NewRound(
+ std::move(replicate_msg),
+ [wp = std::move(wp)](const Status& s) {
+ shared_ptr<OpDriver> sp(wp.lock());
+ if (PREDICT_TRUE(sp)) {
+ sp->ReplicationFinished(s);
+ } else {
+ // TODO(aserbin): is this even possible?
+ LOG(DFATAL) << "OpDriver isn't around: has
ReplicationFinished() "
+ "been called already?";
+ }
+ }));
}
}
- return op_tracker_->Add(this);
+ return op_tracker_->Add(std::move(self));
}
consensus::OpId OpDriver::GetOpId() {
@@ -226,10 +241,12 @@ void OpDriver::ExecuteAsync() {
s =
consensus_->CheckLeadershipAndBindTerm(mutable_state()->consensus_round());
}
- if (s.ok()) {
- s = prepare_pool_token_->Submit([this]() { this->PrepareTask(); });
+ if (PREDICT_TRUE(s.ok())) {
+ // Provide a reference to the object to be able to call its methods
+ // regardless of what happens with other references around.
+ auto self = shared_from_this();
+ s = prepare_pool_token_->Submit([self = std::move(self)]() {
self->PrepareTask(); });
}
-
if (PREDICT_FALSE(!s.ok())) {
HandleFailure(s);
}
@@ -498,7 +515,10 @@ Status OpDriver::ApplyAsync() {
}
TRACE_EVENT_FLOW_BEGIN0("op", "ApplyTask", this);
- return apply_pool_->Submit([this]() { this->ApplyTask(); });
+ // Provide a reference to the object to make sure it's safe to call its
+ // methods regardless of what happens with other references around.
+ auto self = shared_from_this();
+ return apply_pool_->Submit([self = std::move(self)]() { self->ApplyTask();
});
}
void OpDriver::ApplyTask() {
@@ -518,10 +538,6 @@ void OpDriver::ApplyTask() {
}
#endif // #if DCHECK_IS_ON() ...
- // We need to ref-count ourself, since FinishApplying() may run very quickly
- // and end up calling Finalize() while we're still in this code.
- scoped_refptr<OpDriver> ref(this);
-
{
CommitMsg* commit_msg;
Status s = op_->Apply(&commit_msg);
@@ -581,16 +597,12 @@ Status OpDriver::CommitWait() {
void OpDriver::Finalize() {
ADOPT_TRACE(trace());
- // TODO: this is an ugly hack so that the Release() call doesn't delete the
- // object while we still hold the lock.
- scoped_refptr<OpDriver> ref(this);
std::lock_guard<simple_spinlock> lock(lock_);
op_->Finish(Op::APPLIED);
mutable_state()->completion_callback()->OpCompleted();
op_tracker_->Release(this);
}
-
std::string OpDriver::StateString(ReplicationState repl_state,
PrepareState prep_state) {
string state_str;
diff --git a/src/kudu/tablet/ops/op_driver.h b/src/kudu/tablet/ops/op_driver.h
index 9fba85303..3da26bd3d 100644
--- a/src/kudu/tablet/ops/op_driver.h
+++ b/src/kudu/tablet/ops/op_driver.h
@@ -28,6 +28,7 @@
#include "kudu/gutil/walltime.h"
#include "kudu/tablet/ops/op.h"
#include "kudu/util/locks.h"
+#include "kudu/util/make_shared.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/trace.h"
@@ -219,18 +220,10 @@ class OpTracker;
// still be ok, as it would get aborted because the replica wasn't a leader
yet (constraints 1/2).
//
// This class is thread safe.
-class OpDriver : public RefCountedThreadSafe<OpDriver> {
-
+class OpDriver : public std::enable_shared_from_this<OpDriver>,
+ public enable_make_shared<OpDriver> {
public:
- // Construct OpDriver. OpDriver does not take ownership
- // of any of the objects pointed to in the constructor's arguments.
- OpDriver(OpTracker* op_tracker,
- consensus::RaftConsensus* consensus,
- log::Log* log,
- ThreadPoolToken* prepare_pool_token,
- ThreadPool* apply_pool,
- OpOrderVerifier* order_verifier,
- MonoTime deadline = MonoTime::Max());
+ ~OpDriver() = default;
// Perform any non-constructor initialization. Sets the op that will be
// executed.
@@ -276,6 +269,17 @@ class OpDriver : public RefCountedThreadSafe<OpDriver> {
Trace* trace() { return trace_.get(); }
+ protected:
+ // Construct OpDriver. OpDriver does not take ownership
+ // of any of the objects pointed to in the constructor's arguments.
+ OpDriver(OpTracker* op_tracker,
+ consensus::RaftConsensus* consensus,
+ log::Log* log,
+ ThreadPoolToken* prepare_pool_token,
+ ThreadPool* apply_pool,
+ OpOrderVerifier* order_verifier,
+ MonoTime deadline = MonoTime::Max());
+
private:
FRIEND_TEST(TabletReplicaTest, TestShuttingDownMVCC);
friend class RefCountedThreadSafe<OpDriver>;
@@ -301,8 +305,6 @@ class OpDriver : public RefCountedThreadSafe<OpDriver> {
PREPARED
};
- ~OpDriver() {}
-
// The task submitted to the prepare threadpool to prepare the op. If
Prepare() fails,
// calls HandleFailure.
void PrepareTask();
diff --git a/src/kudu/tablet/ops/op_tracker-test.cc
b/src/kudu/tablet/ops/op_tracker-test.cc
index c948041a9..b956e87d3 100644
--- a/src/kudu/tablet/ops/op_tracker-test.cc
+++ b/src/kudu/tablet/ops/op_tracker-test.cc
@@ -18,6 +18,7 @@
#include "kudu/tablet/ops/op_tracker.h"
#include <cstdint>
+#include <iterator>
#include <memory>
#include <ostream>
#include <string>
@@ -107,24 +108,23 @@ class OpTrackerTest : public KuduTest,
void RunOpsThread(CountDownLatch* finish_latch);
Status AddDrivers(int num_drivers,
- vector<scoped_refptr<OpDriver> >* drivers) {
- vector<scoped_refptr<OpDriver> > local_drivers;
+ vector<shared_ptr<OpDriver>>* drivers) {
+ vector<shared_ptr<OpDriver>> local_drivers;
+ local_drivers.reserve(num_drivers);
for (int i = 0; i < num_drivers; i++) {
- scoped_refptr<OpDriver> driver(
- new OpDriver(&tracker_,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr));
+ auto driver(OpDriver::make_shared(
+ &tracker_,
+ nullptr,
+ nullptr,
+ nullptr,
+ nullptr,
+ nullptr));
unique_ptr<NoOpOp> op(new NoOpOp(new NoOpOpState));
RETURN_NOT_OK(driver->Init(std::move(op), consensus::LEADER));
- local_drivers.push_back(driver);
+ local_drivers.emplace_back(std::move(driver));
}
- for (const scoped_refptr<OpDriver>& d : local_drivers) {
- drivers->push_back(d);
- }
+ std::move(local_drivers.begin(), local_drivers.end(),
std::back_inserter(*drivers));
return Status::OK();
}
@@ -135,13 +135,12 @@ class OpTrackerTest : public KuduTest,
TEST_F(OpTrackerTest, TestGetPending) {
ASSERT_EQ(0, tracker_.GetNumPendingForTests());
- vector<scoped_refptr<OpDriver> > drivers;
+ vector<shared_ptr<OpDriver> > drivers;
ASSERT_OK(AddDrivers(1, &drivers));
- scoped_refptr<OpDriver> driver = drivers[0];
+ shared_ptr<OpDriver>& driver(drivers[0]);
ASSERT_EQ(1, tracker_.GetNumPendingForTests());
- vector<scoped_refptr<OpDriver> > pending_ops;
- tracker_.GetPendingOps(&pending_ops);
+ const auto pending_ops = tracker_.GetPendingOps();
ASSERT_EQ(1, pending_ops.size());
ASSERT_EQ(driver.get(), pending_ops.front().get());
@@ -155,7 +154,7 @@ TEST_F(OpTrackerTest, TestGetPending) {
void OpTrackerTest::RunOpsThread(CountDownLatch* finish_latch) {
const int kNumOps = 100;
// Start a bunch of ops.
- vector<scoped_refptr<OpDriver> > drivers;
+ vector<shared_ptr<OpDriver>> drivers;
ASSERT_OK(AddDrivers(kNumOps, &drivers));
// Wait for the main thread to tell us to proceed.
@@ -166,7 +165,7 @@ void OpTrackerTest::RunOpsThread(CountDownLatch*
finish_latch) {
SleepFor(MonoDelta::FromMilliseconds(1));
// Finish all the ops
- for (const scoped_refptr<OpDriver>& driver : drivers) {
+ for (const auto& driver : drivers) {
// And mark the op as failed, which will cause it to unregister itself.
driver->Abort(Status::Aborted(""));
}
@@ -212,7 +211,7 @@ static void CheckMetrics(const scoped_refptr<MetricEntity>&
entity,
TEST_F(OpTrackerTest, TestMetrics) {
NO_FATALS(CheckMetrics(entity_, 0, 0, 0, 0));
- vector<scoped_refptr<OpDriver> > drivers;
+ vector<shared_ptr<OpDriver> > drivers;
ASSERT_OK(AddDrivers(3, &drivers));
NO_FATALS(CheckMetrics(entity_, 3, 0, 0, 0));
@@ -250,7 +249,7 @@ TEST_P(OpTrackerTest, TestTooManyOps) {
// carries an empty ReplicateMsg), so we'll just add as many as possible
// and check that when we fail, it's because we've hit the limit.
Status s;
- vector<scoped_refptr<OpDriver>> drivers;
+ vector<shared_ptr<OpDriver>> drivers;
SCOPED_CLEANUP({
for (const auto &d : drivers) {
d->Abort(Status::Aborted(""));
diff --git a/src/kudu/tablet/ops/op_tracker.cc
b/src/kudu/tablet/ops/op_tracker.cc
index e0575e6f8..69b493b2f 100644
--- a/src/kudu/tablet/ops/op_tracker.cc
+++ b/src/kudu/tablet/ops/op_tracker.cc
@@ -130,10 +130,6 @@ OpTracker::Metrics::Metrics(const
scoped_refptr<MetricEntity>& entity)
#undef GINIT
#undef MINIT
-OpTracker::State::State()
- : memory_footprint(0) {
-}
-
OpTracker::OpTracker() {
}
@@ -144,7 +140,7 @@ OpTracker::~OpTracker() {
#endif
}
-Status OpTracker::Add(OpDriver* driver) {
+Status OpTracker::Add(shared_ptr<OpDriver> driver) {
size_t driver_mem_footprint = driver->state()->request()->SpaceUsedLong();
if (mem_tracker_ && !mem_tracker_->TryConsume(driver_mem_footprint)) {
if (metrics_) {
@@ -174,10 +170,11 @@ Status OpTracker::Add(OpDriver* driver) {
// Cache the op memory footprint so we needn't refer to the request
// again, as it may disappear between now and then.
- State st;
- st.memory_footprint = driver_mem_footprint;
std::lock_guard<simple_spinlock> l(lock_);
- InsertOrDie(&pending_ops_, driver, st);
+ const auto* driver_ptr = driver.get();
+ EmplaceOrDie(&pending_ops_,
+ driver_ptr,
+ State(std::move(driver), driver_mem_footprint));
return Status::OK();
}
@@ -237,14 +234,14 @@ void OpTracker::Release(OpDriver* driver) {
}
}
-void OpTracker::GetPendingOps(
- vector<scoped_refptr<OpDriver> >* pending_out) const {
- DCHECK(pending_out->empty());
+vector<shared_ptr<OpDriver>> OpTracker::GetPendingOps() const {
+ vector<shared_ptr<OpDriver>> ret;
std::lock_guard<simple_spinlock> l(lock_);
- for (const TxnMap::value_type& e : pending_ops_) {
- // Increments refcount of each op.
- pending_out->push_back(e.first);
+ ret.reserve(pending_ops_.size());
+ for (const auto& [_, s]: pending_ops_) {
+ ret.emplace_back(s.driver);
}
+ return ret;
}
int OpTracker::GetNumPendingForTests() const {
@@ -265,9 +262,7 @@ Status OpTracker::WaitForAllToFinish(const MonoDelta&
timeout) const {
MonoTime next_log_time = start_time + MonoDelta::FromSeconds(1);
while (1) {
- vector<scoped_refptr<OpDriver> > ops;
- GetPendingOps(&ops);
-
+ const auto ops = GetPendingOps();
if (ops.empty()) {
break;
}
diff --git a/src/kudu/tablet/ops/op_tracker.h b/src/kudu/tablet/ops/op_tracker.h
index 96b05cd39..3f51d38d2 100644
--- a/src/kudu/tablet/ops/op_tracker.h
+++ b/src/kudu/tablet/ops/op_tracker.h
@@ -19,11 +19,11 @@
#include <cstdint>
#include <memory>
#include <unordered_map>
+#include <utility>
#include <vector>
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
-#include "kudu/tablet/ops/op_driver.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/status.h"
@@ -35,6 +35,8 @@ class MonoDelta;
namespace tablet {
+class OpDriver;
+
// Each TabletReplica has a OpTracker which keeps track of pending ops.
// Each "LeaderOp" will register itself by calling Add().
// It will remove itself by calling Release().
@@ -47,14 +49,14 @@ class OpTracker {
//
// In the event that the tracker's memory limit is exceeded, returns a
// ServiceUnavailable status.
- Status Add(OpDriver* driver);
+ Status Add(std::shared_ptr<OpDriver> driver);
// Removes the op from the pending list.
// Also triggers the deletion of the Op object, if its refcount == 0.
void Release(OpDriver* driver);
- // Populates list of currently-running ops into 'pending_out' vector.
- void GetPendingOps(std::vector<scoped_refptr<OpDriver> >* pending_out) const;
+ // Returns currently-running ops.
+ std::vector<std::shared_ptr<OpDriver>> GetPendingOps() const;
// Returns number of pending ops.
int GetNumPendingForTests() const;
@@ -87,18 +89,20 @@ class OpTracker {
// Per-op state that is tracked along with the op itself.
struct State {
- State();
+ State(std::shared_ptr<OpDriver> driver, int64_t memory_footprint)
+ : driver(std::move(driver)),
+ memory_footprint(memory_footprint) {
+ }
+
+ // The reference to the driver.
+ std::shared_ptr<OpDriver> driver;
// Approximate memory footprint of the op.
- int64_t memory_footprint;
+ const int64_t memory_footprint;
};
// Protected by 'lock_'.
- typedef std::unordered_map<scoped_refptr<OpDriver>,
- State,
- ScopedRefPtrHashFunctor<OpDriver>,
- ScopedRefPtrEqualToFunctor<OpDriver> > TxnMap;
- TxnMap pending_ops_;
+ std::unordered_map<const OpDriver*, State> pending_ops_;
std::unique_ptr<Metrics> metrics_;
diff --git a/src/kudu/tablet/tablet_replica-test.cc
b/src/kudu/tablet/tablet_replica-test.cc
index 998b27b2f..7007e9980 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -51,7 +51,7 @@
#include "kudu/tablet/lock_manager.h"
#include "kudu/tablet/ops/alter_schema_op.h"
#include "kudu/tablet/ops/op.h"
-#include "kudu/tablet/ops/op_driver.h"
+#include "kudu/tablet/ops/op_driver.h" // IWYU pragma: keep
#include "kudu/tablet/ops/op_tracker.h"
#include "kudu/tablet/ops/write_op.h"
#include "kudu/tablet/tablet.h"
@@ -429,7 +429,7 @@ TEST_F(TabletReplicaTest, TestActiveOpPreventsLogGC) {
&apply_continue,
std::move(op_state)));
- scoped_refptr<OpDriver> driver;
+ shared_ptr<OpDriver> driver;
ASSERT_OK(tablet_replica_->NewLeaderOpDriver(std::move(op),
&driver,
MonoTime::Max()));
diff --git a/src/kudu/tablet/tablet_replica.cc
b/src/kudu/tablet/tablet_replica.cc
index a5fe99a5d..5cec4bec3 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -149,6 +149,7 @@ using kudu::tserver::ParticipantRequestPB;
using kudu::tserver::TabletServerErrorPB;
using std::deque;
using std::map;
+using std::make_shared;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
@@ -597,7 +598,7 @@ Status TabletReplica::SubmitWrite(unique_ptr<WriteOpState>
op_state,
op_state->SetResultTracker(result_tracker_);
unique_ptr<WriteOp> op(new WriteOp(std::move(op_state), consensus::LEADER));
- scoped_refptr<OpDriver> driver;
+ shared_ptr<OpDriver> driver;
RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver, deadline));
driver->ExecuteAsync();
return Status::OK();
@@ -608,7 +609,7 @@ Status
TabletReplica::SubmitTxnParticipantOp(std::unique_ptr<ParticipantOpState>
op_state->SetResultTracker(result_tracker_);
unique_ptr<ParticipantOp> op(new ParticipantOp(std::move(op_state),
consensus::LEADER));
- scoped_refptr<OpDriver> driver;
+ shared_ptr<OpDriver> driver;
RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver, MonoTime::Max()));
driver->ExecuteAsync();
return Status::OK();
@@ -620,7 +621,7 @@ Status
TabletReplica::SubmitAlterSchema(unique_ptr<AlterSchemaOpState> state,
unique_ptr<AlterSchemaOp> op(
new AlterSchemaOp(std::move(state), consensus::LEADER));
- scoped_refptr<OpDriver> driver;
+ shared_ptr<OpDriver> driver;
RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver, deadline));
driver->ExecuteAsync();
return Status::OK();
@@ -708,9 +709,8 @@ string TabletReplica::HumanReadableState() const {
void TabletReplica::GetInFlightOps(Op::TraceType trace_type,
vector<consensus::OpStatusPB>* out) const {
- vector<scoped_refptr<OpDriver> > pending_ops;
- op_tracker_.GetPendingOps(&pending_ops);
- for (const scoped_refptr<OpDriver>& driver : pending_ops) {
+ const auto pending_ops = op_tracker_.GetPendingOps();
+ for (const auto& driver : pending_ops) {
if (driver->state() != nullptr) {
consensus::OpStatusPB status_pb;
status_pb.mutable_op_id()->CopyFrom(driver->GetOpId());
@@ -763,9 +763,8 @@ log::RetentionIndexes TabletReplica::GetRetentionIndexes()
const {
<< Substitute("{dur: $0, peers: $1}",
ret.for_durability, ret.for_peers);
// Next, interrogate the OpTracker.
- vector<scoped_refptr<OpDriver> > pending_ops;
- op_tracker_.GetPendingOps(&pending_ops);
- for (const scoped_refptr<OpDriver>& driver : pending_ops) {
+ const auto pending_ops = op_tracker_.GetPendingOps();
+ for (const auto& driver : pending_ops) {
OpId op_id = driver->GetOpId();
// A op which doesn't have an opid hasn't been submitted for replication
yet and
// thus has no need to anchor the log.
@@ -848,13 +847,23 @@ Status TabletReplica::StartFollowerOp(const
scoped_refptr<ConsensusRound>& round
OpState* state = op->state();
state->set_consensus_round(round);
- scoped_refptr<OpDriver> driver;
+ shared_ptr<OpDriver> driver;
RETURN_NOT_OK(NewReplicaOpDriver(std::move(op), &driver));
- // A raw pointer is required to avoid a refcount cycle.
- auto* driver_raw = driver.get();
+ // A weak pointer is required to avoid a refcount cycle.
+ // TODO(aserbin): is the comment above still relevant?
+ std::weak_ptr<OpDriver> wp(driver);
state->consensus_round()->SetConsensusReplicatedCallback(
- [driver_raw](const Status& s) { driver_raw->ReplicationFinished(s); });
+ [wp = std::move(wp)](const Status& s) {
+ shared_ptr<OpDriver> sp(wp.lock());
+ if (PREDICT_TRUE(sp)) {
+ sp->ReplicationFinished(s);
+ } else {
+ // TODO(aserbin): is this even possible?
+ LOG(DFATAL) << "OpDriver isn't around: has ReplicationFinished() "
+ "been called already?";
+ }
+ });
driver->ExecuteAsync();
return Status::OK();
@@ -899,16 +908,16 @@ void
TabletReplica::FinishConsensusOnlyRound(ConsensusRound* round) {
}
Status TabletReplica::NewLeaderOpDriver(unique_ptr<Op> op,
- scoped_refptr<OpDriver>* driver,
+ shared_ptr<OpDriver>* driver,
MonoTime deadline) {
- scoped_refptr<OpDriver> op_driver = new OpDriver(
- &op_tracker_,
- consensus_.get(),
- log_.get(),
- prepare_pool_token_.get(),
- apply_pool_,
- &op_order_verifier_,
- deadline);
+ auto op_driver = OpDriver::make_shared(
+ &op_tracker_,
+ consensus_.get(),
+ log_.get(),
+ prepare_pool_token_.get(),
+ apply_pool_,
+ &op_order_verifier_,
+ deadline);
RETURN_NOT_OK(op_driver->Init(std::move(op), consensus::LEADER));
*driver = std::move(op_driver);
@@ -916,14 +925,14 @@ Status TabletReplica::NewLeaderOpDriver(unique_ptr<Op> op,
}
Status TabletReplica::NewReplicaOpDriver(unique_ptr<Op> op,
- scoped_refptr<OpDriver>*
driver) {
- scoped_refptr<OpDriver> op_driver = new OpDriver(
- &op_tracker_,
- consensus_.get(),
- log_.get(),
- prepare_pool_token_.get(),
- apply_pool_,
- &op_order_verifier_);
+ shared_ptr<OpDriver>* driver) {
+ auto op_driver = OpDriver::make_shared(
+ &op_tracker_,
+ consensus_.get(),
+ log_.get(),
+ prepare_pool_token_.get(),
+ apply_pool_,
+ &op_order_verifier_);
RETURN_NOT_OK(op_driver->Init(std::move(op), consensus::FOLLOWER));
*driver = std::move(op_driver);
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index d44f47429..b5fd71ce9 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -316,11 +316,11 @@ class TabletReplica : public
RefCountedThreadSafe<TabletReplica>,
// that have timed out while in the prepare queue. For most call sites,
// the deadline is naturally defined by the corresponding RPC.
Status NewLeaderOpDriver(std::unique_ptr<Op> op,
- scoped_refptr<OpDriver>* driver,
+ std::shared_ptr<OpDriver>* driver,
MonoTime deadline);
Status NewReplicaOpDriver(std::unique_ptr<Op> op,
- scoped_refptr<OpDriver>* driver);
+ std::shared_ptr<OpDriver>* driver);
// Tells the tablet's log to garbage collect.
Status RunLogGC();
diff --git a/src/kudu/tablet/txn_participant-test.cc
b/src/kudu/tablet/txn_participant-test.cc
index 920fb0039..fea0a9b57 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -26,6 +26,7 @@
#include <ostream>
#include <string>
#include <thread>
+#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
@@ -54,7 +55,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/ops/op.h"
-#include "kudu/tablet/ops/op_driver.h"
+#include "kudu/tablet/ops/op_driver.h" // IWYU pragma: keep
#include "kudu/tablet/ops/op_tracker.h"
#include "kudu/tablet/ops/participant_op.h"
#include "kudu/tablet/tablet-test-util.h"
@@ -86,6 +87,7 @@ using kudu::tserver::WriteRequestPB;
using std::map;
using std::nullopt;
using std::optional;
+using std::shared_ptr;
using std::string;
using std::thread;
using std::unique_ptr;
@@ -997,9 +999,9 @@ TEST_F(TxnParticipantTest,
TestActiveParticipantOpsAnchorWALs) {
op_state->set_completion_callback(std::unique_ptr<OpCompletionCallback>(
new LatchOpCompletionCallback<tserver::ParticipantResponsePB>(&latch,
&resp)));
- scoped_refptr<OpDriver> driver;
unique_ptr<DelayedParticipantOp> op(
new DelayedParticipantOp(&apply_start, &apply_continue,
std::move(op_state)));
+ shared_ptr<OpDriver> driver;
ASSERT_OK(tablet_replica_->NewLeaderOpDriver(std::move(op), &driver,
MonoTime::Max()));
driver->ExecuteAsync();
// Wait for the apply to start, indicating that we have persisted and