This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch branch-1.18.x in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 8c11d588c4be8f02d1babdfc0840b394271cc857 Author: Alexey Serbin <[email protected]> AuthorDate: Wed Oct 16 18:19:39 2024 -0700 KUDU-3620 fix heap-use-after-free and undefined behavior in OpDriver This patch fixes a long-standing heap-use-after-free issue in OpDriver. The problem is addressed by supplying a valid reference to OpDriver objects for callbacks to make sure objects aren't destroyed while their methods are being invoked. The reference-counting wrapper for OpDriver objects changed from scoped_refptr to std::shared_ptr. It allows for passing weak pointers (std::weak_ptr) instead of raw pointers to callbacks where reference counting cycle was suspected (left the clarification of the latter as TODOs), so a valid shared pointer can be constructed before invoking any of the required OpDriver's methods. I also updated the signature of OpTracker::GetPendingOps() to get rid of output parameter. Before this patch, running the following against Kudu bits built in ASAN configuration would trigger ASAN and UBSAN warnings once in about 150 runs: ./ts_recovery-itest \ --gtest_filter='DifferentFaultPoints/Kudu969Test.Test/*' \ --gtest_repeat=10000 With this patch, more than 2K iterations of the same succeeded without producing a single ASAN/UBSAN issue report. Change-Id: Iadc58f1724bc03373a7e165fd3798b8868dc9d29 Reviewed-on: http://gerrit.cloudera.org:8080/21948 Tested-by: Alexey Serbin <[email protected]> Reviewed-by: Zoltan Martonka <[email protected]> Reviewed-by: Abhishek Chennaka <[email protected]> (cherry picked from commit c51e1354322a4d2cad268de26f254dd52b0c9df1) Reviewed-on: http://gerrit.cloudera.org:8080/22001 --- src/kudu/tablet/ops/op_driver.cc | 44 +++++++++++++-------- src/kudu/tablet/ops/op_driver.h | 28 ++++++------- src/kudu/tablet/ops/op_tracker-test.cc | 41 ++++++++++---------- src/kudu/tablet/ops/op_tracker.cc | 29 ++++++-------- src/kudu/tablet/ops/op_tracker.h | 26 +++++++------ src/kudu/tablet/tablet_replica-test.cc | 4 +- src/kudu/tablet/tablet_replica.cc | 69 +++++++++++++++++++-------------- src/kudu/tablet/tablet_replica.h | 4 +- src/kudu/tablet/txn_participant-test.cc | 6 ++- 9 files changed, 137 insertions(+), 114 deletions(-) diff --git a/src/kudu/tablet/ops/op_driver.cc b/src/kudu/tablet/ops/op_driver.cc index 87c5bbaa1..41cae488a 100644 --- a/src/kudu/tablet/ops/op_driver.cc +++ b/src/kudu/tablet/ops/op_driver.cc @@ -62,6 +62,8 @@ using kudu::pb_util::SecureShortDebugString; using kudu::rpc::RequestIdPB; using kudu::rpc::ResultTracker; using std::string; +using std::shared_ptr; +using std::weak_ptr; using std::unique_ptr; using strings::Substitute; @@ -141,6 +143,7 @@ Status OpDriver::Init(unique_ptr<Op> op, } op_ = std::move(op); + auto self = shared_from_this(); if (type == consensus::FOLLOWER) { std::lock_guard lock(opid_lock_); op_id_copy_ = op_->state()->op_id(); @@ -171,14 +174,26 @@ Status OpDriver::Init(unique_ptr<Op> op, unique_ptr<ReplicateMsg> replicate_msg; op_->NewReplicateMsg(&replicate_msg); if (consensus_) { // sometimes NULL in tests - // A raw pointer is required to avoid a refcount cycle. + // A weak pointer is required to avoid a refcount cycle. + // TODO(aserbin): is the comment above still relevant? + weak_ptr<OpDriver> wp(self); mutable_state()->set_consensus_round( - consensus_->NewRound(std::move(replicate_msg), - [this](const Status& s) { this->ReplicationFinished(s); })); + consensus_->NewRound( + std::move(replicate_msg), + [wp = std::move(wp)](const Status& s) { + shared_ptr<OpDriver> sp(wp.lock()); + if (PREDICT_TRUE(sp)) { + sp->ReplicationFinished(s); + } else { + // TODO(aserbin): is this even possible? + LOG(DFATAL) << "OpDriver isn't around: has ReplicationFinished() " + "been called already?"; + } + })); } } - return op_tracker_->Add(this); + return op_tracker_->Add(std::move(self)); } consensus::OpId OpDriver::GetOpId() { @@ -226,10 +241,12 @@ void OpDriver::ExecuteAsync() { s = consensus_->CheckLeadershipAndBindTerm(mutable_state()->consensus_round()); } - if (s.ok()) { - s = prepare_pool_token_->Submit([this]() { this->PrepareTask(); }); + if (PREDICT_TRUE(s.ok())) { + // Provide a reference to the object to be able to call its methods + // regardless of what happens with other references around. + auto self = shared_from_this(); + s = prepare_pool_token_->Submit([self = std::move(self)]() { self->PrepareTask(); }); } - if (PREDICT_FALSE(!s.ok())) { HandleFailure(s); } @@ -501,7 +518,10 @@ Status OpDriver::ApplyAsync() { } TRACE_EVENT_FLOW_BEGIN0("op", "ApplyTask", this); - return apply_pool_->Submit([this]() { this->ApplyTask(); }); + // Provide a reference to the object to make sure it's safe to call its + // methods regardless of what happens with other references around. + auto self = shared_from_this(); + return apply_pool_->Submit([self = std::move(self)]() { self->ApplyTask(); }); } void OpDriver::ApplyTask() { @@ -521,10 +541,6 @@ void OpDriver::ApplyTask() { } #endif // #if DCHECK_IS_ON() ... - // We need to ref-count ourself, since FinishApplying() may run very quickly - // and end up calling Finalize() while we're still in this code. - scoped_refptr<OpDriver> ref(this); - { CommitMsg* commit_msg; Status s = op_->Apply(&commit_msg); @@ -584,16 +600,12 @@ Status OpDriver::CommitWait() { void OpDriver::Finalize() { ADOPT_TRACE(trace()); - // TODO: this is an ugly hack so that the Release() call doesn't delete the - // object while we still hold the lock. - scoped_refptr<OpDriver> ref(this); std::lock_guard lock(lock_); op_->Finish(Op::APPLIED); mutable_state()->completion_callback()->OpCompleted(); op_tracker_->Release(this); } - std::string OpDriver::StateString(ReplicationState repl_state, PrepareState prep_state) { string state_str; diff --git a/src/kudu/tablet/ops/op_driver.h b/src/kudu/tablet/ops/op_driver.h index 9fba85303..3da26bd3d 100644 --- a/src/kudu/tablet/ops/op_driver.h +++ b/src/kudu/tablet/ops/op_driver.h @@ -28,6 +28,7 @@ #include "kudu/gutil/walltime.h" #include "kudu/tablet/ops/op.h" #include "kudu/util/locks.h" +#include "kudu/util/make_shared.h" #include "kudu/util/monotime.h" #include "kudu/util/status.h" #include "kudu/util/trace.h" @@ -219,18 +220,10 @@ class OpTracker; // still be ok, as it would get aborted because the replica wasn't a leader yet (constraints 1/2). // // This class is thread safe. -class OpDriver : public RefCountedThreadSafe<OpDriver> { - +class OpDriver : public std::enable_shared_from_this<OpDriver>, + public enable_make_shared<OpDriver> { public: - // Construct OpDriver. OpDriver does not take ownership - // of any of the objects pointed to in the constructor's arguments. - OpDriver(OpTracker* op_tracker, - consensus::RaftConsensus* consensus, - log::Log* log, - ThreadPoolToken* prepare_pool_token, - ThreadPool* apply_pool, - OpOrderVerifier* order_verifier, - MonoTime deadline = MonoTime::Max()); + ~OpDriver() = default; // Perform any non-constructor initialization. Sets the op that will be // executed. @@ -276,6 +269,17 @@ class OpDriver : public RefCountedThreadSafe<OpDriver> { Trace* trace() { return trace_.get(); } + protected: + // Construct OpDriver. OpDriver does not take ownership + // of any of the objects pointed to in the constructor's arguments. + OpDriver(OpTracker* op_tracker, + consensus::RaftConsensus* consensus, + log::Log* log, + ThreadPoolToken* prepare_pool_token, + ThreadPool* apply_pool, + OpOrderVerifier* order_verifier, + MonoTime deadline = MonoTime::Max()); + private: FRIEND_TEST(TabletReplicaTest, TestShuttingDownMVCC); friend class RefCountedThreadSafe<OpDriver>; @@ -301,8 +305,6 @@ class OpDriver : public RefCountedThreadSafe<OpDriver> { PREPARED }; - ~OpDriver() {} - // The task submitted to the prepare threadpool to prepare the op. If Prepare() fails, // calls HandleFailure. void PrepareTask(); diff --git a/src/kudu/tablet/ops/op_tracker-test.cc b/src/kudu/tablet/ops/op_tracker-test.cc index c3c15d938..914e0d873 100644 --- a/src/kudu/tablet/ops/op_tracker-test.cc +++ b/src/kudu/tablet/ops/op_tracker-test.cc @@ -18,6 +18,7 @@ #include "kudu/tablet/ops/op_tracker.h" #include <cstdint> +#include <iterator> #include <memory> #include <ostream> #include <string> @@ -108,24 +109,23 @@ class OpTrackerTest : public KuduTest, void RunOpsThread(CountDownLatch* finish_latch); Status AddDrivers(int num_drivers, - vector<scoped_refptr<OpDriver> >* drivers) { - vector<scoped_refptr<OpDriver> > local_drivers; + vector<shared_ptr<OpDriver>>* drivers) { + vector<shared_ptr<OpDriver>> local_drivers; + local_drivers.reserve(num_drivers); for (int i = 0; i < num_drivers; i++) { - scoped_refptr<OpDriver> driver( - new OpDriver(&tracker_, - nullptr, - nullptr, - nullptr, - nullptr, - nullptr)); + auto driver(OpDriver::make_shared( + &tracker_, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr)); unique_ptr<NoOpOp> op(new NoOpOp(new NoOpOpState)); RETURN_NOT_OK(driver->Init(std::move(op), consensus::LEADER)); - local_drivers.push_back(driver); + local_drivers.emplace_back(std::move(driver)); } - for (const scoped_refptr<OpDriver>& d : local_drivers) { - drivers->push_back(d); - } + std::move(local_drivers.begin(), local_drivers.end(), std::back_inserter(*drivers)); return Status::OK(); } @@ -136,13 +136,12 @@ class OpTrackerTest : public KuduTest, TEST_F(OpTrackerTest, TestGetPending) { ASSERT_EQ(0, tracker_.GetNumPendingForTests()); - vector<scoped_refptr<OpDriver> > drivers; + vector<shared_ptr<OpDriver> > drivers; ASSERT_OK(AddDrivers(1, &drivers)); - scoped_refptr<OpDriver> driver = drivers[0]; + shared_ptr<OpDriver>& driver(drivers[0]); ASSERT_EQ(1, tracker_.GetNumPendingForTests()); - vector<scoped_refptr<OpDriver> > pending_ops; - tracker_.GetPendingOps(&pending_ops); + const auto pending_ops = tracker_.GetPendingOps(); ASSERT_EQ(1, pending_ops.size()); ASSERT_EQ(driver.get(), pending_ops.front().get()); @@ -156,7 +155,7 @@ TEST_F(OpTrackerTest, TestGetPending) { void OpTrackerTest::RunOpsThread(CountDownLatch* finish_latch) { const int kNumOps = 100; // Start a bunch of ops. - vector<scoped_refptr<OpDriver> > drivers; + vector<shared_ptr<OpDriver>> drivers; ASSERT_OK(AddDrivers(kNumOps, &drivers)); // Wait for the main thread to tell us to proceed. @@ -167,7 +166,7 @@ void OpTrackerTest::RunOpsThread(CountDownLatch* finish_latch) { SleepFor(MonoDelta::FromMilliseconds(1)); // Finish all the ops - for (const scoped_refptr<OpDriver>& driver : drivers) { + for (const auto& driver : drivers) { // And mark the op as failed, which will cause it to unregister itself. driver->Abort(Status::Aborted("")); } @@ -213,7 +212,7 @@ static void CheckMetrics(const scoped_refptr<MetricEntity>& entity, TEST_F(OpTrackerTest, TestMetrics) { NO_FATALS(CheckMetrics(entity_, 0, 0, 0, 0)); - vector<scoped_refptr<OpDriver> > drivers; + vector<shared_ptr<OpDriver> > drivers; ASSERT_OK(AddDrivers(3, &drivers)); NO_FATALS(CheckMetrics(entity_, 3, 0, 0, 0)); @@ -251,7 +250,7 @@ TEST_P(OpTrackerTest, TestTooManyOps) { // carries an empty ReplicateMsg), so we'll just add as many as possible // and check that when we fail, it's because we've hit the limit. Status s; - vector<scoped_refptr<OpDriver>> drivers; + vector<shared_ptr<OpDriver>> drivers; SCOPED_CLEANUP({ for (const auto &d : drivers) { d->Abort(Status::Aborted("")); diff --git a/src/kudu/tablet/ops/op_tracker.cc b/src/kudu/tablet/ops/op_tracker.cc index 058d16429..141965ad2 100644 --- a/src/kudu/tablet/ops/op_tracker.cc +++ b/src/kudu/tablet/ops/op_tracker.cc @@ -131,10 +131,6 @@ OpTracker::Metrics::Metrics(const scoped_refptr<MetricEntity>& entity) #undef GINIT #undef MINIT -OpTracker::State::State() - : memory_footprint(0) { -} - OpTracker::OpTracker() { } @@ -145,7 +141,7 @@ OpTracker::~OpTracker() { #endif } -Status OpTracker::Add(OpDriver* driver) { +Status OpTracker::Add(shared_ptr<OpDriver> driver) { size_t driver_mem_footprint = driver->state()->request()->SpaceUsedLong(); if (mem_tracker_ && !mem_tracker_->TryConsume(driver_mem_footprint)) { if (metrics_) { @@ -175,10 +171,11 @@ Status OpTracker::Add(OpDriver* driver) { // Cache the op memory footprint so we needn't refer to the request // again, as it may disappear between now and then. - State st; - st.memory_footprint = driver_mem_footprint; std::lock_guard l(lock_); - InsertOrDie(&pending_ops_, driver, st); + const auto* driver_ptr = driver.get(); + EmplaceOrDie(&pending_ops_, + driver_ptr, + State(std::move(driver), driver_mem_footprint)); return Status::OK(); } @@ -243,14 +240,14 @@ void OpTracker::Release(OpDriver* driver) { } } -void OpTracker::GetPendingOps( - vector<scoped_refptr<OpDriver> >* pending_out) const { - DCHECK(pending_out->empty()); +vector<shared_ptr<OpDriver>> OpTracker::GetPendingOps() const { + vector<shared_ptr<OpDriver>> ret; std::lock_guard l(lock_); - for (const TxnMap::value_type& e : pending_ops_) { - // Increments refcount of each op. - pending_out->push_back(e.first); + ret.reserve(pending_ops_.size()); + for (const auto& [_, s]: pending_ops_) { + ret.emplace_back(s.driver); } + return ret; } int OpTracker::GetNumPendingForTests() const { @@ -271,9 +268,7 @@ Status OpTracker::WaitForAllToFinish(const MonoDelta& timeout) const { MonoTime next_log_time = start_time + MonoDelta::FromSeconds(1); while (1) { - vector<scoped_refptr<OpDriver> > ops; - GetPendingOps(&ops); - + const auto ops = GetPendingOps(); if (ops.empty()) { break; } diff --git a/src/kudu/tablet/ops/op_tracker.h b/src/kudu/tablet/ops/op_tracker.h index 96b05cd39..3f51d38d2 100644 --- a/src/kudu/tablet/ops/op_tracker.h +++ b/src/kudu/tablet/ops/op_tracker.h @@ -19,11 +19,11 @@ #include <cstdint> #include <memory> #include <unordered_map> +#include <utility> #include <vector> #include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" -#include "kudu/tablet/ops/op_driver.h" #include "kudu/util/locks.h" #include "kudu/util/metrics.h" #include "kudu/util/status.h" @@ -35,6 +35,8 @@ class MonoDelta; namespace tablet { +class OpDriver; + // Each TabletReplica has a OpTracker which keeps track of pending ops. // Each "LeaderOp" will register itself by calling Add(). // It will remove itself by calling Release(). @@ -47,14 +49,14 @@ class OpTracker { // // In the event that the tracker's memory limit is exceeded, returns a // ServiceUnavailable status. - Status Add(OpDriver* driver); + Status Add(std::shared_ptr<OpDriver> driver); // Removes the op from the pending list. // Also triggers the deletion of the Op object, if its refcount == 0. void Release(OpDriver* driver); - // Populates list of currently-running ops into 'pending_out' vector. - void GetPendingOps(std::vector<scoped_refptr<OpDriver> >* pending_out) const; + // Returns currently-running ops. + std::vector<std::shared_ptr<OpDriver>> GetPendingOps() const; // Returns number of pending ops. int GetNumPendingForTests() const; @@ -87,18 +89,20 @@ class OpTracker { // Per-op state that is tracked along with the op itself. struct State { - State(); + State(std::shared_ptr<OpDriver> driver, int64_t memory_footprint) + : driver(std::move(driver)), + memory_footprint(memory_footprint) { + } + + // The reference to the driver. + std::shared_ptr<OpDriver> driver; // Approximate memory footprint of the op. - int64_t memory_footprint; + const int64_t memory_footprint; }; // Protected by 'lock_'. - typedef std::unordered_map<scoped_refptr<OpDriver>, - State, - ScopedRefPtrHashFunctor<OpDriver>, - ScopedRefPtrEqualToFunctor<OpDriver> > TxnMap; - TxnMap pending_ops_; + std::unordered_map<const OpDriver*, State> pending_ops_; std::unique_ptr<Metrics> metrics_; diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc index 453309330..be8a83a41 100644 --- a/src/kudu/tablet/tablet_replica-test.cc +++ b/src/kudu/tablet/tablet_replica-test.cc @@ -51,7 +51,7 @@ #include "kudu/tablet/lock_manager.h" #include "kudu/tablet/ops/alter_schema_op.h" #include "kudu/tablet/ops/op.h" -#include "kudu/tablet/ops/op_driver.h" +#include "kudu/tablet/ops/op_driver.h" // IWYU pragma: keep #include "kudu/tablet/ops/op_tracker.h" #include "kudu/tablet/ops/write_op.h" #include "kudu/tablet/tablet.h" @@ -451,7 +451,7 @@ TEST_F(TabletReplicaTest, TestActiveOpPreventsLogGC) { &apply_continue, std::move(op_state))); - scoped_refptr<OpDriver> driver; + shared_ptr<OpDriver> driver; ASSERT_OK(tablet_replica_->NewLeaderOpDriver(std::move(op), &driver, MonoTime::Max())); diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc index 30ec21ae3..bc90e1169 100644 --- a/src/kudu/tablet/tablet_replica.cc +++ b/src/kudu/tablet/tablet_replica.cc @@ -149,6 +149,7 @@ using kudu::tserver::ParticipantRequestPB; using kudu::tserver::TabletServerErrorPB; using std::deque; using std::map; +using std::make_shared; using std::shared_ptr; using std::string; using std::unique_ptr; @@ -597,7 +598,7 @@ Status TabletReplica::SubmitWrite(unique_ptr<WriteOpState> op_state, op_state->SetResultTracker(result_tracker_); unique_ptr<WriteOp> op(new WriteOp(std::move(op_state), consensus::LEADER)); - scoped_refptr<OpDriver> driver; + shared_ptr<OpDriver> driver; RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver, deadline)); driver->ExecuteAsync(); return Status::OK(); @@ -608,7 +609,7 @@ Status TabletReplica::SubmitTxnParticipantOp(std::unique_ptr<ParticipantOpState> op_state->SetResultTracker(result_tracker_); unique_ptr<ParticipantOp> op(new ParticipantOp(std::move(op_state), consensus::LEADER)); - scoped_refptr<OpDriver> driver; + shared_ptr<OpDriver> driver; RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver, MonoTime::Max())); driver->ExecuteAsync(); return Status::OK(); @@ -620,7 +621,7 @@ Status TabletReplica::SubmitAlterSchema(unique_ptr<AlterSchemaOpState> state, unique_ptr<AlterSchemaOp> op( new AlterSchemaOp(std::move(state), consensus::LEADER)); - scoped_refptr<OpDriver> driver; + shared_ptr<OpDriver> driver; RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver, deadline)); driver->ExecuteAsync(); return Status::OK(); @@ -708,9 +709,8 @@ string TabletReplica::HumanReadableState() const { void TabletReplica::GetInFlightOps(Op::TraceType trace_type, vector<consensus::OpStatusPB>* out) const { - vector<scoped_refptr<OpDriver> > pending_ops; - op_tracker_.GetPendingOps(&pending_ops); - for (const scoped_refptr<OpDriver>& driver : pending_ops) { + const auto pending_ops = op_tracker_.GetPendingOps(); + for (const auto& driver : pending_ops) { if (driver->state() != nullptr) { consensus::OpStatusPB status_pb; status_pb.mutable_op_id()->CopyFrom(driver->GetOpId()); @@ -763,9 +763,8 @@ log::RetentionIndexes TabletReplica::GetRetentionIndexes() const { << Substitute("{dur: $0, peers: $1}", ret.for_durability, ret.for_peers); // Next, interrogate the OpTracker. - vector<scoped_refptr<OpDriver> > pending_ops; - op_tracker_.GetPendingOps(&pending_ops); - for (const scoped_refptr<OpDriver>& driver : pending_ops) { + const auto pending_ops = op_tracker_.GetPendingOps(); + for (const auto& driver : pending_ops) { OpId op_id = driver->GetOpId(); // A op which doesn't have an opid hasn't been submitted for replication yet and // thus has no need to anchor the log. @@ -848,13 +847,23 @@ Status TabletReplica::StartFollowerOp(const scoped_refptr<ConsensusRound>& round OpState* state = op->state(); state->set_consensus_round(round); - scoped_refptr<OpDriver> driver; + shared_ptr<OpDriver> driver; RETURN_NOT_OK(NewFollowerOpDriver(std::move(op), &driver)); - // A raw pointer is required to avoid a refcount cycle. - auto* driver_raw = driver.get(); + // A weak pointer is required to avoid a refcount cycle. + // TODO(aserbin): is the comment above still relevant? + std::weak_ptr<OpDriver> wp(driver); state->consensus_round()->SetConsensusReplicatedCallback( - [driver_raw](const Status& s) { driver_raw->ReplicationFinished(s); }); + [wp = std::move(wp)](const Status& s) { + shared_ptr<OpDriver> sp(wp.lock()); + if (PREDICT_TRUE(sp)) { + sp->ReplicationFinished(s); + } else { + // TODO(aserbin): is this even possible? + LOG(DFATAL) << "OpDriver isn't around: has ReplicationFinished() " + "been called already?"; + } + }); driver->ExecuteAsync(); return Status::OK(); @@ -899,16 +908,16 @@ void TabletReplica::FinishConsensusOnlyRound(ConsensusRound* round) { } Status TabletReplica::NewLeaderOpDriver(unique_ptr<Op> op, - scoped_refptr<OpDriver>* driver, + shared_ptr<OpDriver>* driver, MonoTime deadline) { - scoped_refptr<OpDriver> op_driver = new OpDriver( - &op_tracker_, - consensus_.get(), - log_.get(), - prepare_pool_token_.get(), - apply_pool_, - &op_order_verifier_, - deadline); + auto op_driver = OpDriver::make_shared( + &op_tracker_, + consensus_.get(), + log_.get(), + prepare_pool_token_.get(), + apply_pool_, + &op_order_verifier_, + deadline); RETURN_NOT_OK(op_driver->Init(std::move(op), consensus::LEADER)); *driver = std::move(op_driver); @@ -916,14 +925,14 @@ Status TabletReplica::NewLeaderOpDriver(unique_ptr<Op> op, } Status TabletReplica::NewFollowerOpDriver(unique_ptr<Op> op, - scoped_refptr<OpDriver>* driver) { - scoped_refptr<OpDriver> op_driver = new OpDriver( - &op_tracker_, - consensus_.get(), - log_.get(), - prepare_pool_token_.get(), - apply_pool_, - &op_order_verifier_); + shared_ptr<OpDriver>* driver) { + auto op_driver = OpDriver::make_shared( + &op_tracker_, + consensus_.get(), + log_.get(), + prepare_pool_token_.get(), + apply_pool_, + &op_order_verifier_); RETURN_NOT_OK(op_driver->Init(std::move(op), consensus::FOLLOWER)); *driver = std::move(op_driver); diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h index 2f7faa82f..8b9c69012 100644 --- a/src/kudu/tablet/tablet_replica.h +++ b/src/kudu/tablet/tablet_replica.h @@ -315,11 +315,11 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>, // that have timed out while in the prepare queue. For most call sites, // the deadline is naturally defined by the corresponding RPC. Status NewLeaderOpDriver(std::unique_ptr<Op> op, - scoped_refptr<OpDriver>* driver, + std::shared_ptr<OpDriver>* driver, MonoTime deadline); Status NewFollowerOpDriver(std::unique_ptr<Op> op, - scoped_refptr<OpDriver>* driver); + std::shared_ptr<OpDriver>* driver); // Tells the tablet's log to garbage collect. Status RunLogGC(); diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc index 920fb0039..fea0a9b57 100644 --- a/src/kudu/tablet/txn_participant-test.cc +++ b/src/kudu/tablet/txn_participant-test.cc @@ -26,6 +26,7 @@ #include <ostream> #include <string> #include <thread> +#include <type_traits> #include <unordered_map> #include <utility> #include <vector> @@ -54,7 +55,7 @@ #include "kudu/gutil/strings/substitute.h" #include "kudu/tablet/metadata.pb.h" #include "kudu/tablet/ops/op.h" -#include "kudu/tablet/ops/op_driver.h" +#include "kudu/tablet/ops/op_driver.h" // IWYU pragma: keep #include "kudu/tablet/ops/op_tracker.h" #include "kudu/tablet/ops/participant_op.h" #include "kudu/tablet/tablet-test-util.h" @@ -86,6 +87,7 @@ using kudu::tserver::WriteRequestPB; using std::map; using std::nullopt; using std::optional; +using std::shared_ptr; using std::string; using std::thread; using std::unique_ptr; @@ -997,9 +999,9 @@ TEST_F(TxnParticipantTest, TestActiveParticipantOpsAnchorWALs) { op_state->set_completion_callback(std::unique_ptr<OpCompletionCallback>( new LatchOpCompletionCallback<tserver::ParticipantResponsePB>(&latch, &resp))); - scoped_refptr<OpDriver> driver; unique_ptr<DelayedParticipantOp> op( new DelayedParticipantOp(&apply_start, &apply_continue, std::move(op_state))); + shared_ptr<OpDriver> driver; ASSERT_OK(tablet_replica_->NewLeaderOpDriver(std::move(op), &driver, MonoTime::Max())); driver->ExecuteAsync(); // Wait for the apply to start, indicating that we have persisted and
