This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch branch-1.17.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.17.x by this push:
     new be5c3f520 KUDU-3620 fix heap-use-after-free and undefined behavior in 
OpDriver
be5c3f520 is described below

commit be5c3f5201da71e9991c4dd4e819ec3d1512bf51
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Oct 16 18:19:39 2024 -0700

    KUDU-3620 fix heap-use-after-free and undefined behavior in OpDriver
    
    This patch fixes a long-standing heap-use-after-free issue in OpDriver.
    The problem is addressed by supplying a valid reference to OpDriver
    objects for callbacks to make sure objects aren't destroyed while
    their methods are being invoked.
    
    The reference-counting wrapper for OpDriver objects changed from
    scoped_refptr to std::shared_ptr.  It allows for passing weak pointers
    (std::weak_ptr) instead of raw pointers to callbacks where reference
    counting cycle was suspected (left the clarification of the latter as
    TODOs), so a valid shared pointer can be constructed before invoking any
    of the required OpDriver's methods.
    
    I also updated the signature of OpTracker::GetPendingOps() to get rid of
    output parameter.
    
    Before this patch, running the following against Kudu bits built in
    ASAN configuration would trigger ASAN and UBSAN warnings once in about
    150 runs:
    
      ./ts_recovery-itest \
          --gtest_filter='DifferentFaultPoints/Kudu969Test.Test/*' \
          --gtest_repeat=10000
    
    With this patch, more than 2K iterations of the same succeeded without
    producing a single ASAN/UBSAN issue report.
    
    Change-Id: Iadc58f1724bc03373a7e165fd3798b8868dc9d29
    Reviewed-on: http://gerrit.cloudera.org:8080/21948
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Zoltan Martonka <[email protected]>
    Reviewed-by: Abhishek Chennaka <[email protected]>
    (cherry picked from commit c51e1354322a4d2cad268de26f254dd52b0c9df1)
      Conflicts:
        src/kudu/tablet/ops/op_driver.cc
        src/kudu/tablet/ops/op_tracker.cc
        src/kudu/tablet/tablet_replica.cc
        src/kudu/tablet/tablet_replica.h
    Reviewed-on: http://gerrit.cloudera.org:8080/21960
    Tested-by: Kudu Jenkins
---
 src/kudu/tablet/ops/op_driver.cc        | 44 +++++++++++++--------
 src/kudu/tablet/ops/op_driver.h         | 28 ++++++-------
 src/kudu/tablet/ops/op_tracker-test.cc  | 41 ++++++++++----------
 src/kudu/tablet/ops/op_tracker.cc       | 29 ++++++--------
 src/kudu/tablet/ops/op_tracker.h        | 26 +++++++------
 src/kudu/tablet/tablet_replica-test.cc  |  4 +-
 src/kudu/tablet/tablet_replica.cc       | 69 +++++++++++++++++++--------------
 src/kudu/tablet/tablet_replica.h        |  4 +-
 src/kudu/tablet/txn_participant-test.cc |  6 ++-
 9 files changed, 137 insertions(+), 114 deletions(-)

diff --git a/src/kudu/tablet/ops/op_driver.cc b/src/kudu/tablet/ops/op_driver.cc
index b3e1aa740..558f58049 100644
--- a/src/kudu/tablet/ops/op_driver.cc
+++ b/src/kudu/tablet/ops/op_driver.cc
@@ -62,6 +62,8 @@ using kudu::pb_util::SecureShortDebugString;
 using kudu::rpc::RequestIdPB;
 using kudu::rpc::ResultTracker;
 using std::string;
+using std::shared_ptr;
+using std::weak_ptr;
 using std::unique_ptr;
 using strings::Substitute;
 
@@ -141,6 +143,7 @@ Status OpDriver::Init(unique_ptr<Op> op,
   }
   op_ = std::move(op);
 
+  auto self = shared_from_this();
   if (type == consensus::FOLLOWER) {
     std::lock_guard<simple_spinlock> lock(opid_lock_);
     op_id_copy_ = op_->state()->op_id();
@@ -171,14 +174,26 @@ Status OpDriver::Init(unique_ptr<Op> op,
     unique_ptr<ReplicateMsg> replicate_msg;
     op_->NewReplicateMsg(&replicate_msg);
     if (consensus_) { // sometimes NULL in tests
-      // A raw pointer is required to avoid a refcount cycle.
+      // A weak pointer is required to avoid a refcount cycle.
+      // TODO(aserbin): is the comment above still relevant?
+      weak_ptr<OpDriver> wp(self);
       mutable_state()->set_consensus_round(
-        consensus_->NewRound(std::move(replicate_msg),
-                             [this](const Status& s) { 
this->ReplicationFinished(s); }));
+        consensus_->NewRound(
+            std::move(replicate_msg),
+            [wp = std::move(wp)](const Status& s) {
+              shared_ptr<OpDriver> sp(wp.lock());
+              if (PREDICT_TRUE(sp)) {
+                sp->ReplicationFinished(s);
+              } else {
+                // TODO(aserbin): is this even possible?
+                LOG(DFATAL) << "OpDriver isn't around: has 
ReplicationFinished() "
+                               "been called already?";
+              }
+            }));
     }
   }
 
-  return op_tracker_->Add(this);
+  return op_tracker_->Add(std::move(self));
 }
 
 consensus::OpId OpDriver::GetOpId() {
@@ -226,10 +241,12 @@ void OpDriver::ExecuteAsync() {
     s = 
consensus_->CheckLeadershipAndBindTerm(mutable_state()->consensus_round());
   }
 
-  if (s.ok()) {
-    s = prepare_pool_token_->Submit([this]() { this->PrepareTask(); });
+  if (PREDICT_TRUE(s.ok())) {
+    // Provide a reference to the object to be able to call its methods
+    // regardless of what happens with other references around.
+    auto self = shared_from_this();
+    s = prepare_pool_token_->Submit([self = std::move(self)]() { 
self->PrepareTask(); });
   }
-
   if (PREDICT_FALSE(!s.ok())) {
     HandleFailure(s);
   }
@@ -498,7 +515,10 @@ Status OpDriver::ApplyAsync() {
   }
 
   TRACE_EVENT_FLOW_BEGIN0("op", "ApplyTask", this);
-  return apply_pool_->Submit([this]() { this->ApplyTask(); });
+  // Provide a reference to the object to make sure it's safe to call its
+  // methods regardless of what happens with other references around.
+  auto self = shared_from_this();
+  return apply_pool_->Submit([self = std::move(self)]() { self->ApplyTask(); 
});
 }
 
 void OpDriver::ApplyTask() {
@@ -518,10 +538,6 @@ void OpDriver::ApplyTask() {
   }
 #endif // #if DCHECK_IS_ON() ...
 
-  // We need to ref-count ourself, since FinishApplying() may run very quickly
-  // and end up calling Finalize() while we're still in this code.
-  scoped_refptr<OpDriver> ref(this);
-
   {
     CommitMsg* commit_msg;
     Status s = op_->Apply(&commit_msg);
@@ -581,16 +597,12 @@ Status OpDriver::CommitWait() {
 
 void OpDriver::Finalize() {
   ADOPT_TRACE(trace());
-  // TODO: this is an ugly hack so that the Release() call doesn't delete the
-  // object while we still hold the lock.
-  scoped_refptr<OpDriver> ref(this);
   std::lock_guard<simple_spinlock> lock(lock_);
   op_->Finish(Op::APPLIED);
   mutable_state()->completion_callback()->OpCompleted();
   op_tracker_->Release(this);
 }
 
-
 std::string OpDriver::StateString(ReplicationState repl_state,
                                   PrepareState prep_state) {
   string state_str;
diff --git a/src/kudu/tablet/ops/op_driver.h b/src/kudu/tablet/ops/op_driver.h
index 9fba85303..3da26bd3d 100644
--- a/src/kudu/tablet/ops/op_driver.h
+++ b/src/kudu/tablet/ops/op_driver.h
@@ -28,6 +28,7 @@
 #include "kudu/gutil/walltime.h"
 #include "kudu/tablet/ops/op.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/make_shared.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "kudu/util/trace.h"
@@ -219,18 +220,10 @@ class OpTracker;
 // still be ok, as it would get aborted because the replica wasn't a leader 
yet (constraints 1/2).
 //
 // This class is thread safe.
-class OpDriver : public RefCountedThreadSafe<OpDriver> {
-
+class OpDriver : public std::enable_shared_from_this<OpDriver>,
+                 public enable_make_shared<OpDriver> {
  public:
-  // Construct OpDriver. OpDriver does not take ownership
-  // of any of the objects pointed to in the constructor's arguments.
-  OpDriver(OpTracker* op_tracker,
-           consensus::RaftConsensus* consensus,
-           log::Log* log,
-           ThreadPoolToken* prepare_pool_token,
-           ThreadPool* apply_pool,
-           OpOrderVerifier* order_verifier,
-           MonoTime deadline = MonoTime::Max());
+  ~OpDriver() = default;
 
   // Perform any non-constructor initialization. Sets the op that will be
   // executed.
@@ -276,6 +269,17 @@ class OpDriver : public RefCountedThreadSafe<OpDriver> {
 
   Trace* trace() { return trace_.get(); }
 
+ protected:
+  // Construct OpDriver. OpDriver does not take ownership
+  // of any of the objects pointed to in the constructor's arguments.
+  OpDriver(OpTracker* op_tracker,
+           consensus::RaftConsensus* consensus,
+           log::Log* log,
+           ThreadPoolToken* prepare_pool_token,
+           ThreadPool* apply_pool,
+           OpOrderVerifier* order_verifier,
+           MonoTime deadline = MonoTime::Max());
+
  private:
   FRIEND_TEST(TabletReplicaTest, TestShuttingDownMVCC);
   friend class RefCountedThreadSafe<OpDriver>;
@@ -301,8 +305,6 @@ class OpDriver : public RefCountedThreadSafe<OpDriver> {
     PREPARED
   };
 
-  ~OpDriver() {}
-
   // The task submitted to the prepare threadpool to prepare the op. If 
Prepare() fails,
   // calls HandleFailure.
   void PrepareTask();
diff --git a/src/kudu/tablet/ops/op_tracker-test.cc 
b/src/kudu/tablet/ops/op_tracker-test.cc
index c948041a9..b956e87d3 100644
--- a/src/kudu/tablet/ops/op_tracker-test.cc
+++ b/src/kudu/tablet/ops/op_tracker-test.cc
@@ -18,6 +18,7 @@
 #include "kudu/tablet/ops/op_tracker.h"
 
 #include <cstdint>
+#include <iterator>
 #include <memory>
 #include <ostream>
 #include <string>
@@ -107,24 +108,23 @@ class OpTrackerTest : public KuduTest,
   void RunOpsThread(CountDownLatch* finish_latch);
 
   Status AddDrivers(int num_drivers,
-                    vector<scoped_refptr<OpDriver> >* drivers) {
-    vector<scoped_refptr<OpDriver> > local_drivers;
+                    vector<shared_ptr<OpDriver>>* drivers) {
+    vector<shared_ptr<OpDriver>> local_drivers;
+    local_drivers.reserve(num_drivers);
     for (int i = 0; i < num_drivers; i++) {
-      scoped_refptr<OpDriver> driver(
-          new OpDriver(&tracker_,
-                       nullptr,
-                       nullptr,
-                       nullptr,
-                       nullptr,
-                       nullptr));
+      auto driver(OpDriver::make_shared(
+          &tracker_,
+          nullptr,
+          nullptr,
+          nullptr,
+          nullptr,
+          nullptr));
       unique_ptr<NoOpOp> op(new NoOpOp(new NoOpOpState));
       RETURN_NOT_OK(driver->Init(std::move(op), consensus::LEADER));
-      local_drivers.push_back(driver);
+      local_drivers.emplace_back(std::move(driver));
     }
 
-    for (const scoped_refptr<OpDriver>& d : local_drivers) {
-      drivers->push_back(d);
-    }
+    std::move(local_drivers.begin(), local_drivers.end(), 
std::back_inserter(*drivers));
     return Status::OK();
   }
 
@@ -135,13 +135,12 @@ class OpTrackerTest : public KuduTest,
 
 TEST_F(OpTrackerTest, TestGetPending) {
   ASSERT_EQ(0, tracker_.GetNumPendingForTests());
-  vector<scoped_refptr<OpDriver> > drivers;
+  vector<shared_ptr<OpDriver> > drivers;
   ASSERT_OK(AddDrivers(1, &drivers));
-  scoped_refptr<OpDriver> driver = drivers[0];
+  shared_ptr<OpDriver>& driver(drivers[0]);
   ASSERT_EQ(1, tracker_.GetNumPendingForTests());
 
-  vector<scoped_refptr<OpDriver> > pending_ops;
-  tracker_.GetPendingOps(&pending_ops);
+  const auto pending_ops = tracker_.GetPendingOps();
   ASSERT_EQ(1, pending_ops.size());
   ASSERT_EQ(driver.get(), pending_ops.front().get());
 
@@ -155,7 +154,7 @@ TEST_F(OpTrackerTest, TestGetPending) {
 void OpTrackerTest::RunOpsThread(CountDownLatch* finish_latch) {
   const int kNumOps = 100;
   // Start a bunch of ops.
-  vector<scoped_refptr<OpDriver> > drivers;
+  vector<shared_ptr<OpDriver>> drivers;
   ASSERT_OK(AddDrivers(kNumOps, &drivers));
 
   // Wait for the main thread to tell us to proceed.
@@ -166,7 +165,7 @@ void OpTrackerTest::RunOpsThread(CountDownLatch* 
finish_latch) {
   SleepFor(MonoDelta::FromMilliseconds(1));
 
   // Finish all the ops
-  for (const scoped_refptr<OpDriver>& driver : drivers) {
+  for (const auto& driver : drivers) {
     // And mark the op as failed, which will cause it to unregister itself.
     driver->Abort(Status::Aborted(""));
   }
@@ -212,7 +211,7 @@ static void CheckMetrics(const scoped_refptr<MetricEntity>& 
entity,
 TEST_F(OpTrackerTest, TestMetrics) {
   NO_FATALS(CheckMetrics(entity_, 0, 0, 0, 0));
 
-  vector<scoped_refptr<OpDriver> > drivers;
+  vector<shared_ptr<OpDriver> > drivers;
   ASSERT_OK(AddDrivers(3, &drivers));
   NO_FATALS(CheckMetrics(entity_, 3, 0, 0, 0));
 
@@ -250,7 +249,7 @@ TEST_P(OpTrackerTest, TestTooManyOps) {
   // carries an empty ReplicateMsg), so we'll just add as many as possible
   // and check that when we fail, it's because we've hit the limit.
   Status s;
-  vector<scoped_refptr<OpDriver>> drivers;
+  vector<shared_ptr<OpDriver>> drivers;
   SCOPED_CLEANUP({
                    for (const auto &d : drivers) {
                      d->Abort(Status::Aborted(""));
diff --git a/src/kudu/tablet/ops/op_tracker.cc 
b/src/kudu/tablet/ops/op_tracker.cc
index e0575e6f8..69b493b2f 100644
--- a/src/kudu/tablet/ops/op_tracker.cc
+++ b/src/kudu/tablet/ops/op_tracker.cc
@@ -130,10 +130,6 @@ OpTracker::Metrics::Metrics(const 
scoped_refptr<MetricEntity>& entity)
 #undef GINIT
 #undef MINIT
 
-OpTracker::State::State()
-    : memory_footprint(0) {
-}
-
 OpTracker::OpTracker() {
 }
 
@@ -144,7 +140,7 @@ OpTracker::~OpTracker() {
 #endif
 }
 
-Status OpTracker::Add(OpDriver* driver) {
+Status OpTracker::Add(shared_ptr<OpDriver> driver) {
   size_t driver_mem_footprint = driver->state()->request()->SpaceUsedLong();
   if (mem_tracker_ && !mem_tracker_->TryConsume(driver_mem_footprint)) {
     if (metrics_) {
@@ -174,10 +170,11 @@ Status OpTracker::Add(OpDriver* driver) {
 
   // Cache the op memory footprint so we needn't refer to the request
   // again, as it may disappear between now and then.
-  State st;
-  st.memory_footprint = driver_mem_footprint;
   std::lock_guard<simple_spinlock> l(lock_);
-  InsertOrDie(&pending_ops_, driver, st);
+  const auto* driver_ptr = driver.get();
+  EmplaceOrDie(&pending_ops_,
+               driver_ptr,
+               State(std::move(driver), driver_mem_footprint));
   return Status::OK();
 }
 
@@ -237,14 +234,14 @@ void OpTracker::Release(OpDriver* driver) {
   }
 }
 
-void OpTracker::GetPendingOps(
-    vector<scoped_refptr<OpDriver> >* pending_out) const {
-  DCHECK(pending_out->empty());
+vector<shared_ptr<OpDriver>> OpTracker::GetPendingOps() const {
+  vector<shared_ptr<OpDriver>> ret;
   std::lock_guard<simple_spinlock> l(lock_);
-  for (const TxnMap::value_type& e : pending_ops_) {
-    // Increments refcount of each op.
-    pending_out->push_back(e.first);
+  ret.reserve(pending_ops_.size());
+  for (const auto& [_, s]: pending_ops_) {
+    ret.emplace_back(s.driver);
   }
+  return ret;
 }
 
 int OpTracker::GetNumPendingForTests() const {
@@ -265,9 +262,7 @@ Status OpTracker::WaitForAllToFinish(const MonoDelta& 
timeout) const {
   MonoTime next_log_time = start_time + MonoDelta::FromSeconds(1);
 
   while (1) {
-    vector<scoped_refptr<OpDriver> > ops;
-    GetPendingOps(&ops);
-
+    const auto ops = GetPendingOps();
     if (ops.empty()) {
       break;
     }
diff --git a/src/kudu/tablet/ops/op_tracker.h b/src/kudu/tablet/ops/op_tracker.h
index 96b05cd39..3f51d38d2 100644
--- a/src/kudu/tablet/ops/op_tracker.h
+++ b/src/kudu/tablet/ops/op_tracker.h
@@ -19,11 +19,11 @@
 #include <cstdint>
 #include <memory>
 #include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/tablet/ops/op_driver.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
@@ -35,6 +35,8 @@ class MonoDelta;
 
 namespace tablet {
 
+class OpDriver;
+
 // Each TabletReplica has a OpTracker which keeps track of pending ops.
 // Each "LeaderOp" will register itself by calling Add().
 // It will remove itself by calling Release().
@@ -47,14 +49,14 @@ class OpTracker {
   //
   // In the event that the tracker's memory limit is exceeded, returns a
   // ServiceUnavailable status.
-  Status Add(OpDriver* driver);
+  Status Add(std::shared_ptr<OpDriver> driver);
 
   // Removes the op from the pending list.
   // Also triggers the deletion of the Op object, if its refcount == 0.
   void Release(OpDriver* driver);
 
-  // Populates list of currently-running ops into 'pending_out' vector.
-  void GetPendingOps(std::vector<scoped_refptr<OpDriver> >* pending_out) const;
+  // Returns currently-running ops.
+  std::vector<std::shared_ptr<OpDriver>> GetPendingOps() const;
 
   // Returns number of pending ops.
   int GetNumPendingForTests() const;
@@ -87,18 +89,20 @@ class OpTracker {
 
   // Per-op state that is tracked along with the op itself.
   struct State {
-    State();
+    State(std::shared_ptr<OpDriver> driver, int64_t memory_footprint)
+        : driver(std::move(driver)),
+          memory_footprint(memory_footprint) {
+    }
+
+    // The reference to the driver.
+    std::shared_ptr<OpDriver> driver;
 
     // Approximate memory footprint of the op.
-    int64_t memory_footprint;
+    const int64_t memory_footprint;
   };
 
   // Protected by 'lock_'.
-  typedef std::unordered_map<scoped_refptr<OpDriver>,
-      State,
-      ScopedRefPtrHashFunctor<OpDriver>,
-      ScopedRefPtrEqualToFunctor<OpDriver> > TxnMap;
-  TxnMap pending_ops_;
+  std::unordered_map<const OpDriver*, State> pending_ops_;
 
   std::unique_ptr<Metrics> metrics_;
 
diff --git a/src/kudu/tablet/tablet_replica-test.cc 
b/src/kudu/tablet/tablet_replica-test.cc
index 998b27b2f..7007e9980 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -51,7 +51,7 @@
 #include "kudu/tablet/lock_manager.h"
 #include "kudu/tablet/ops/alter_schema_op.h"
 #include "kudu/tablet/ops/op.h"
-#include "kudu/tablet/ops/op_driver.h"
+#include "kudu/tablet/ops/op_driver.h"  // IWYU pragma: keep
 #include "kudu/tablet/ops/op_tracker.h"
 #include "kudu/tablet/ops/write_op.h"
 #include "kudu/tablet/tablet.h"
@@ -429,7 +429,7 @@ TEST_F(TabletReplicaTest, TestActiveOpPreventsLogGC) {
                            &apply_continue,
                            std::move(op_state)));
 
-    scoped_refptr<OpDriver> driver;
+    shared_ptr<OpDriver> driver;
     ASSERT_OK(tablet_replica_->NewLeaderOpDriver(std::move(op),
                                                  &driver,
                                                  MonoTime::Max()));
diff --git a/src/kudu/tablet/tablet_replica.cc 
b/src/kudu/tablet/tablet_replica.cc
index a5fe99a5d..5cec4bec3 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -149,6 +149,7 @@ using kudu::tserver::ParticipantRequestPB;
 using kudu::tserver::TabletServerErrorPB;
 using std::deque;
 using std::map;
+using std::make_shared;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
@@ -597,7 +598,7 @@ Status TabletReplica::SubmitWrite(unique_ptr<WriteOpState> 
op_state,
 
   op_state->SetResultTracker(result_tracker_);
   unique_ptr<WriteOp> op(new WriteOp(std::move(op_state), consensus::LEADER));
-  scoped_refptr<OpDriver> driver;
+  shared_ptr<OpDriver> driver;
   RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver, deadline));
   driver->ExecuteAsync();
   return Status::OK();
@@ -608,7 +609,7 @@ Status 
TabletReplica::SubmitTxnParticipantOp(std::unique_ptr<ParticipantOpState>
 
   op_state->SetResultTracker(result_tracker_);
   unique_ptr<ParticipantOp> op(new ParticipantOp(std::move(op_state), 
consensus::LEADER));
-  scoped_refptr<OpDriver> driver;
+  shared_ptr<OpDriver> driver;
   RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver, MonoTime::Max()));
   driver->ExecuteAsync();
   return Status::OK();
@@ -620,7 +621,7 @@ Status 
TabletReplica::SubmitAlterSchema(unique_ptr<AlterSchemaOpState> state,
 
   unique_ptr<AlterSchemaOp> op(
       new AlterSchemaOp(std::move(state), consensus::LEADER));
-  scoped_refptr<OpDriver> driver;
+  shared_ptr<OpDriver> driver;
   RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver, deadline));
   driver->ExecuteAsync();
   return Status::OK();
@@ -708,9 +709,8 @@ string TabletReplica::HumanReadableState() const {
 
 void TabletReplica::GetInFlightOps(Op::TraceType trace_type,
                                    vector<consensus::OpStatusPB>* out) const {
-  vector<scoped_refptr<OpDriver> > pending_ops;
-  op_tracker_.GetPendingOps(&pending_ops);
-  for (const scoped_refptr<OpDriver>& driver : pending_ops) {
+  const auto pending_ops = op_tracker_.GetPendingOps();
+  for (const auto& driver : pending_ops) {
     if (driver->state() != nullptr) {
       consensus::OpStatusPB status_pb;
       status_pb.mutable_op_id()->CopyFrom(driver->GetOpId());
@@ -763,9 +763,8 @@ log::RetentionIndexes TabletReplica::GetRetentionIndexes() 
const {
                       << Substitute("{dur: $0, peers: $1}", 
ret.for_durability, ret.for_peers);
 
   // Next, interrogate the OpTracker.
-  vector<scoped_refptr<OpDriver> > pending_ops;
-  op_tracker_.GetPendingOps(&pending_ops);
-  for (const scoped_refptr<OpDriver>& driver : pending_ops) {
+  const auto pending_ops = op_tracker_.GetPendingOps();
+  for (const auto& driver : pending_ops) {
     OpId op_id = driver->GetOpId();
     // A op which doesn't have an opid hasn't been submitted for replication 
yet and
     // thus has no need to anchor the log.
@@ -848,13 +847,23 @@ Status TabletReplica::StartFollowerOp(const 
scoped_refptr<ConsensusRound>& round
   OpState* state = op->state();
   state->set_consensus_round(round);
 
-  scoped_refptr<OpDriver> driver;
+  shared_ptr<OpDriver> driver;
   RETURN_NOT_OK(NewReplicaOpDriver(std::move(op), &driver));
 
-  // A raw pointer is required to avoid a refcount cycle.
-  auto* driver_raw = driver.get();
+  // A weak pointer is required to avoid a refcount cycle.
+  // TODO(aserbin): is the comment above still relevant?
+  std::weak_ptr<OpDriver> wp(driver);
   state->consensus_round()->SetConsensusReplicatedCallback(
-      [driver_raw](const Status& s) { driver_raw->ReplicationFinished(s); });
+      [wp = std::move(wp)](const Status& s) {
+        shared_ptr<OpDriver> sp(wp.lock());
+        if (PREDICT_TRUE(sp)) {
+          sp->ReplicationFinished(s);
+        } else {
+          // TODO(aserbin): is this even possible?
+          LOG(DFATAL) << "OpDriver isn't around: has ReplicationFinished() "
+                         "been called already?";
+        }
+      });
 
   driver->ExecuteAsync();
   return Status::OK();
@@ -899,16 +908,16 @@ void 
TabletReplica::FinishConsensusOnlyRound(ConsensusRound* round) {
 }
 
 Status TabletReplica::NewLeaderOpDriver(unique_ptr<Op> op,
-                                        scoped_refptr<OpDriver>* driver,
+                                        shared_ptr<OpDriver>* driver,
                                         MonoTime deadline) {
-  scoped_refptr<OpDriver> op_driver = new OpDriver(
-    &op_tracker_,
-    consensus_.get(),
-    log_.get(),
-    prepare_pool_token_.get(),
-    apply_pool_,
-    &op_order_verifier_,
-    deadline);
+  auto op_driver = OpDriver::make_shared(
+      &op_tracker_,
+      consensus_.get(),
+      log_.get(),
+      prepare_pool_token_.get(),
+      apply_pool_,
+      &op_order_verifier_,
+      deadline);
   RETURN_NOT_OK(op_driver->Init(std::move(op), consensus::LEADER));
   *driver = std::move(op_driver);
 
@@ -916,14 +925,14 @@ Status TabletReplica::NewLeaderOpDriver(unique_ptr<Op> op,
 }
 
 Status TabletReplica::NewReplicaOpDriver(unique_ptr<Op> op,
-                                                  scoped_refptr<OpDriver>* 
driver) {
-  scoped_refptr<OpDriver> op_driver = new OpDriver(
-    &op_tracker_,
-    consensus_.get(),
-    log_.get(),
-    prepare_pool_token_.get(),
-    apply_pool_,
-    &op_order_verifier_);
+                                         shared_ptr<OpDriver>* driver) {
+  auto op_driver = OpDriver::make_shared(
+      &op_tracker_,
+      consensus_.get(),
+      log_.get(),
+      prepare_pool_token_.get(),
+      apply_pool_,
+      &op_order_verifier_);
   RETURN_NOT_OK(op_driver->Init(std::move(op), consensus::FOLLOWER));
   *driver = std::move(op_driver);
 
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index d44f47429..b5fd71ce9 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -316,11 +316,11 @@ class TabletReplica : public 
RefCountedThreadSafe<TabletReplica>,
   // that have timed out while in the prepare queue. For most call sites,
   // the deadline is naturally defined by the corresponding RPC.
   Status NewLeaderOpDriver(std::unique_ptr<Op> op,
-                           scoped_refptr<OpDriver>* driver,
+                           std::shared_ptr<OpDriver>* driver,
                            MonoTime deadline);
 
   Status NewReplicaOpDriver(std::unique_ptr<Op> op,
-                            scoped_refptr<OpDriver>* driver);
+                            std::shared_ptr<OpDriver>* driver);
 
   // Tells the tablet's log to garbage collect.
   Status RunLogGC();
diff --git a/src/kudu/tablet/txn_participant-test.cc 
b/src/kudu/tablet/txn_participant-test.cc
index 920fb0039..fea0a9b57 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -26,6 +26,7 @@
 #include <ostream>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -54,7 +55,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/ops/op.h"
-#include "kudu/tablet/ops/op_driver.h"
+#include "kudu/tablet/ops/op_driver.h"  // IWYU pragma: keep
 #include "kudu/tablet/ops/op_tracker.h"
 #include "kudu/tablet/ops/participant_op.h"
 #include "kudu/tablet/tablet-test-util.h"
@@ -86,6 +87,7 @@ using kudu::tserver::WriteRequestPB;
 using std::map;
 using std::nullopt;
 using std::optional;
+using std::shared_ptr;
 using std::string;
 using std::thread;
 using std::unique_ptr;
@@ -997,9 +999,9 @@ TEST_F(TxnParticipantTest, 
TestActiveParticipantOpsAnchorWALs) {
   op_state->set_completion_callback(std::unique_ptr<OpCompletionCallback>(
       new LatchOpCompletionCallback<tserver::ParticipantResponsePB>(&latch, 
&resp)));
 
-  scoped_refptr<OpDriver> driver;
   unique_ptr<DelayedParticipantOp> op(
       new DelayedParticipantOp(&apply_start, &apply_continue, 
std::move(op_state)));
+  shared_ptr<OpDriver> driver;
   ASSERT_OK(tablet_replica_->NewLeaderOpDriver(std::move(op), &driver, 
MonoTime::Max()));
   driver->ExecuteAsync();
   // Wait for the apply to start, indicating that we have persisted and

Reply via email to