This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new c033487  KUDU-2612: restrict TxnStatusManager calls to be made by the 
leader only
c033487 is described below

commit c033487fd1765f9b3c5045d7c51719ed7e1c4418
Author: hahao <[email protected]>
AuthorDate: Thu Jan 28 18:31:36 2021 -0800

    KUDU-2612: restrict TxnStatusManager calls to be made by the leader only
    
    Currently, even though a non-leader TxnStatusManager will be unable to
    write to the underlying table (in the Raft subsystem the write would be
    aborted), we may want to restrict calls to be made by the leader
    TxnStatusManagers only. The motivation is to provide a more robust system,
    which avoids cases when the request was sent to a laggy follower, we may
    end up failing the request with an error.
    
    This patch introduces ScopedLeaderSharedLock (similar to the one in Catalog
    Manager) to be used to ensure the requests were sent to leaders only and
    to block all other operations while reloading the persistent transaction
    status metadata upon leadership changes. Note that during failover the
    leader replica will wait until in-flight ops in the previous consensus
    term to be applied before reloading the metadata.
    
    Change-Id: I42c1ad095dcb4bdffcbe0ecf9631a60bac208c2a
    Reviewed-on: http://gerrit.cloudera.org:8080/16648
    Tested-by: Hao Hao <[email protected]>
    Reviewed-by: Andrew Wong <[email protected]>
---
 src/kudu/client/client-test.cc                     |   3 +
 .../integration-tests/ts_tablet_manager-itest.cc   |  15 ++
 .../integration-tests/txn_status_manager-itest.cc  |  23 +-
 .../integration-tests/txn_status_table-itest.cc    | 143 ++++++++++-
 src/kudu/master/sys_catalog.cc                     |   1 +
 src/kudu/tablet/tablet_replica-test-base.cc        |   1 +
 src/kudu/tablet/tablet_replica.cc                  | 122 +++++++++-
 src/kudu/tablet/tablet_replica.h                   |  45 ++++
 src/kudu/tablet/txn_coordinator.h                  |   6 +-
 src/kudu/transactions/txn_status_manager-test.cc   | 135 ++++++----
 src/kudu/transactions/txn_status_manager.cc        | 271 +++++++++++++++++++--
 src/kudu/transactions/txn_status_manager.h         | 102 +++++++-
 src/kudu/transactions/txn_status_tablet.h          |   1 -
 .../tserver/tablet_copy_source_session-test.cc     |   3 +-
 src/kudu/tserver/tablet_service.cc                 |  53 ++--
 src/kudu/tserver/ts_tablet_manager.cc              |  55 ++++-
 src/kudu/tserver/ts_tablet_manager.h               |   3 +
 17 files changed, 846 insertions(+), 136 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index ca09c40..a09538a 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -106,6 +106,7 @@
 #include "kudu/tablet/txn_coordinator.h"
 #include "kudu/tablet/txn_participant-test-util.h"
 #include "kudu/transactions/transactions.pb.h"
+#include "kudu/transactions/txn_status_manager.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/scanners.h"
 #include "kudu/tserver/tablet_server.h"
@@ -203,6 +204,7 @@ using kudu::rpc::MessengerBuilder;
 using kudu::security::SignedTokenPB;
 using kudu::client::sp::shared_ptr;
 using kudu::tablet::TabletReplica;
+using kudu::transactions::TxnStatusManager;
 using kudu::transactions::TxnTokenPB;
 using kudu::tserver::MiniTabletServer;
 using kudu::tserver::ParticipantOpPB;
@@ -421,6 +423,7 @@ class ClientTest : public KuduTest {
         if (!c) {
           continue;
         }
+        TxnStatusManager::ScopedLeaderSharedLock l(c);
         auto highest_txn_id = c->highest_txn_id();
         if (txn_id > highest_txn_id) {
           continue;
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc 
b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 2ffefe7..a6a0d50 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -68,6 +68,7 @@
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/txn_coordinator.h"
+#include "kudu/transactions/txn_status_manager.h"
 #include "kudu/transactions/txn_status_tablet.h"
 #include "kudu/tserver/heartbeater.h"
 #include "kudu/tserver/mini_tablet_server.h"
@@ -140,6 +141,7 @@ using kudu::rpc::RpcController;
 using kudu::tablet::TabletReplica;
 using kudu::tablet::ParticipantIdsByTxnId;
 using kudu::tablet::TxnCoordinator;
+using kudu::transactions::TxnStatusManager;
 using kudu::transactions::TxnStatusTablet;
 using kudu::tserver::MiniTabletServer;
 using kudu::ClusterVerifier;
@@ -1110,6 +1112,9 @@ class TxnStatusTabletManagementTest : public 
TsTabletManagerITest {
   }
 
   static Status StartTransactions(const ParticipantIdsByTxnId& txns, 
TxnCoordinator* coordinator) {
+    TxnStatusManager* txn_status_manager =
+        dynamic_cast<TxnStatusManager*>(coordinator);
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_status_manager);
     TabletServerErrorPB ts_error;
     for (const auto& txn_id_and_prt_ids : txns) {
       const auto& txn_id = txn_id_and_prt_ids.first;
@@ -1212,8 +1217,18 @@ TEST_F(TxnStatusTabletManagementTest, 
TestCopyTransactionStatusTablet) {
     ASSERT_OK(replica->WaitUntilConsensusRunning(MonoDelta::FromSeconds(3)));
     TxnCoordinator* coordinator = replica->txn_coordinator();
     ASSERT_NE(nullptr, coordinator);
+  }
+  // Restart the tserver and ensure the transactions information is
+  // correctly reloaded.
+  {
+    ts0->Shutdown();
+    ASSERT_OK(ts0->Restart());
     // Wait for the contents of the tablet to be loaded into memory.
     ASSERT_EVENTUALLY([&] {
+      scoped_refptr<TabletReplica> replica;
+      ASSERT_TRUE(ts0->server()->tablet_manager()->LookupTablet(
+          kTxnStatusTabletId, &replica));
+      TxnCoordinator* coordinator = replica->txn_coordinator();
       ASSERT_EQ(kExpectedTxns, coordinator->GetParticipantsByTxnIdForTests());
     });
   }
diff --git a/src/kudu/integration-tests/txn_status_manager-itest.cc 
b/src/kudu/integration-tests/txn_status_manager-itest.cc
index 38e060f..6768d2c 100644
--- a/src/kudu/integration-tests/txn_status_manager-itest.cc
+++ b/src/kudu/integration-tests/txn_status_manager-itest.cc
@@ -238,28 +238,11 @@ class TxnStatusManagerITest : public 
ExternalMiniClusterITestBase {
 
 const MonoDelta TxnStatusManagerITest::kTimeout = MonoDelta::FromSeconds(15);
 
-// TODO(aserbin): enable all scenarios below once [1] is committed. Without 
[1],
-//                these scenarios sometimes fails upon calling GetTxnState():
-//
-//   Bad status: Not found: Failed to write to server:
-//   7c968757cc19497a93b15b6c6a48e446 (127.13.78.3:33027):
-//   transaction ID 0 not found, current highest txn ID: -1
-//
-//                The issue here is that a non-leader replica might load
-//                the information from tablet that's lagging behind the
-//                leader, and once the replica becomes a new leader later on,
-//                the information is stale because TxnStatusManager's data
-//                isn't yet reloaded upon the becoming a leader. Once the
-//                patch above is merged, remove this TODO and remove '#if 0'
-//                for the code below.
-//
-//                [1] https://gerrit.cloudera.org/#/c/16648/
-
 // The test to verify basic functionality of the transaction tracker: it should
 // detect transactions that haven't received KeepTransactionAlive() requests
 // for longer than the transaction's keepalive interval and automatically abort
 // those.
-TEST_F(TxnStatusManagerITest, DISABLED_StaleTransactionsCleanup) {
+TEST_F(TxnStatusManagerITest, StaleTransactionsCleanup) {
   SKIP_IF_SLOW_NOT_ALLOWED();
 
   // Check that the transaction staleness is detected and the stale transaction
@@ -300,7 +283,7 @@ TEST_F(TxnStatusManagerITest, 
DISABLED_StaleTransactionsCleanup) {
 // Make sure it's possible to disable and enable back the transaction
 // staleness tracking in run-time without restarting the processes hosting
 // TxnStatusManager instances (i.e. tablet servers).
-TEST_F(TxnStatusManagerITest, DISABLED_ToggleStaleTxnTrackerInRuntime) {
+TEST_F(TxnStatusManagerITest, ToggleStaleTxnTrackerInRuntime) {
   SKIP_IF_SLOW_NOT_ALLOWED();
 
   // Disable txn transaction tracking in run-time.
@@ -473,7 +456,7 @@ TEST_F(TxnStatusManagerITest, 
DISABLED_TxnKeepAliveMultiTxnStatusManagerInstance
 // accessible for some time, and the txn keepalive messages reach the
 // destination after TxnStatusManager is back online. So, the txn should not be
 // auto-aborted when its KuduTransaction objects is kept in the scope.
-TEST_F(TxnStatusManagerITest, 
DISABLED_TxnKeptAliveByClientIfStatusManagerRestarted) {
+TEST_F(TxnStatusManagerITest, TxnKeptAliveByClientIfStatusManagerRestarted) {
   SKIP_IF_SLOW_NOT_ALLOWED();
   shared_ptr<KuduClient> c;
   ASSERT_OK(cluster_->CreateClient(nullptr, &c));
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc 
b/src/kudu/integration-tests/txn_status_table-itest.cc
index 271e643..0311050 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -62,7 +62,13 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_bool(raft_enable_pre_election);
+DECLARE_bool(txn_status_tablet_failover_inject_timeout_error);
+DECLARE_bool(txn_status_tablet_inject_load_failure_error);
+DECLARE_bool(txn_status_tablet_inject_uninitialized_leader_status_error);
 DECLARE_double(leader_failure_max_missed_heartbeat_periods);
+DECLARE_int32(consensus_inject_latency_ms_in_notifications);
+DECLARE_int32(raft_heartbeat_interval_ms);
 DECLARE_string(superuser_acl);
 DECLARE_string(user_acl);
 DECLARE_uint32(txn_keepalive_interval_ms);
@@ -740,14 +746,141 @@ TEST_F(MultiServerTxnStatusTableITest, 
TestSystemClientCrashedNodes) {
   ASSERT_FALSE(leader_uuid.empty());
   FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
   cluster_->mini_tablet_server_by_uuid(leader_uuid)->Shutdown();
-  // We have to wait for a leader to be elected. Until that happens, the system
-  // client may try to start transactions on followers, and in doing so use up
-  // transaction IDs. Have the system client try again with a higher
-  // transaction ID until a leader is elected.
   int txn_id = 2;
+  ASSERT_OK(txn_sys_client_->BeginTransaction(++txn_id, kUser));
+}
+
+enum InjectedErrorType {
+  kFailoverTimeoutError,
+  kLeaderStatusError,
+  kLoadFromTabletError
+};
+class TxnStatusTableRetryITest : public MultiServerTxnStatusTableITest,
+                                 public 
::testing::WithParamInterface<InjectedErrorType> {
+};
+
+TEST_P(TxnStatusTableRetryITest, TestRetryOnError) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  auto error_type = GetParam();
+  switch (error_type) {
+    case kFailoverTimeoutError:
+      FLAGS_txn_status_tablet_failover_inject_timeout_error = true;
+      break;
+    case kLeaderStatusError:
+      FLAGS_txn_status_tablet_inject_uninitialized_leader_status_error = true;
+      break;
+    case kLoadFromTabletError:
+      FLAGS_txn_status_tablet_inject_load_failure_error = true;
+      break;
+  }
+  // Find the leader and force it to step down. The system client should be
+  // able to find the new leader.
+  const string& tablet_id = GetTabletId();
+  ASSERT_FALSE(tablet_id.empty());
+  string orig_leader_uuid;
+  ASSERT_OK(FindLeaderId(tablet_id, &orig_leader_uuid));
+  ASSERT_FALSE(orig_leader_uuid.empty());
+  cluster_->mini_tablet_server_by_uuid(
+      orig_leader_uuid)->server()->mutable_quiescing()->store(true);
+
+  string new_leader_uuid;
+  TxnStatusEntryPB txn_status;
   ASSERT_EVENTUALLY([&] {
-    ASSERT_OK(txn_sys_client_->BeginTransaction(++txn_id, kUser));
+    ASSERT_OK(FindLeaderId(tablet_id, &new_leader_uuid));
+    ASSERT_NE(new_leader_uuid, orig_leader_uuid);
+    Status s = txn_sys_client_->GetTransactionStatus(1, kUser, &txn_status);
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
   });
+  orig_leader_uuid = new_leader_uuid;
+  // After disabling fault injection flag, calls should succeed given
+  // the failed ones will be retried.
+  switch (error_type) {
+    case kFailoverTimeoutError:
+      FLAGS_txn_status_tablet_failover_inject_timeout_error = false;
+      break;
+    case kLeaderStatusError:
+      FLAGS_txn_status_tablet_inject_uninitialized_leader_status_error = false;
+      break;
+    case kLoadFromTabletError:
+      FLAGS_txn_status_tablet_inject_load_failure_error = false;
+      break;
+  }
+  cluster_->mini_tablet_server_by_uuid(
+      orig_leader_uuid)->server()->mutable_quiescing()->store(true);
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(FindLeaderId(tablet_id, &new_leader_uuid));
+    ASSERT_NE(new_leader_uuid, orig_leader_uuid);
+    ASSERT_OK(txn_sys_client_->GetTransactionStatus(1, kUser, &txn_status));
+  });
+}
+
+// Test that calls to transaction status tablets will retry when:
+//   1) timeout on waiting the replica to catch up with all replicated
+//      operations in previous term.
+//   2) leader status is not initialized yet.
+INSTANTIATE_TEST_CASE_P(TestTxnStatusTableRetryOnError,
+                        TxnStatusTableRetryITest,
+                        ::testing::Values(kFailoverTimeoutError,
+                                          kLeaderStatusError,
+                                          kLoadFromTabletError));
+
+class TxnStatusTableElectionStormITest : public TxnStatusTableITest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+
+    // Make leader elections more frequent to get through this test a bit more
+    // quickly.
+    FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
+    FLAGS_raft_heartbeat_interval_ms = 30;
+    InternalMiniClusterOptions opts;
+    opts.num_tablet_servers = 3;
+    cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
+    ASSERT_OK(cluster_->Start());
+    ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_sys_client_));
+
+    // Create the initial transaction status table partitions.
+    ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100, 3));
+    ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
+
+   // Inject latency so elections become more frequent and wait a bit for our
+   // latency injection to kick in.
+   FLAGS_raft_enable_pre_election = false;
+   FLAGS_consensus_inject_latency_ms_in_notifications = 1.5 * 
FLAGS_raft_heartbeat_interval_ms;
+   SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 2));
+  }
+};
+
+TEST_F(TxnStatusTableElectionStormITest, TestFrequentElections) {
+  // Ensure concurrent transaction read and write work as expected under 
frequent
+  // leader elections.
+  const int kNumTxnsPerThread = 5;
+  const int kNumThreads = 5;
+  vector<thread> threads;
+  for (int t = 0; t < kNumThreads; t++) {
+    threads.emplace_back([&, t] {
+      for (int i = 0; i < kNumTxnsPerThread; i++) {
+        int txn_id = t * kNumTxnsPerThread + i;
+        TxnStatusEntryPB txn_status;
+        Status s = txn_sys_client_->BeginTransaction(txn_id, kUser);
+        // As we don't have guarantee of the threads' execution order, 
transactions to
+        // be created later can have lower txn ID than previous created ones, 
which is
+        // not allowed.
+        if (s.ok()) {
+          // Make sure a re-election happens before the following read.
+          SleepFor(MonoDelta::FromMilliseconds(3 * 
FLAGS_raft_heartbeat_interval_ms));
+
+          CHECK_OK(txn_sys_client_->GetTransactionStatus(txn_id, kUser, 
&txn_status));
+          CHECK(txn_status.has_user());
+          CHECK_STREQ(kUser, txn_status.user().c_str());
+          CHECK(txn_status.has_state());
+          CHECK_EQ(TxnStatePB::OPEN, txn_status.state());
+        }
+      }
+    });
+  }
+  std::for_each(threads.begin(), threads.end(), [&] (thread& t) { t.join(); });
 }
 
 } // namespace itest
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 1d5d004..ebb3c3d 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -471,6 +471,7 @@ Status SysCatalogTable::SetupTablet(
       cmeta_manager_,
       local_peer_pb_,
       master_->tablet_apply_pool(),
+      /*reload_txn_status_tablet_pool*/nullptr,
       /*txn_coordinator_factory*/ nullptr,
       [this, tablet_id](const string& reason) {
         this->SysCatalogStateChanged(tablet_id, reason);
diff --git a/src/kudu/tablet/tablet_replica-test-base.cc 
b/src/kudu/tablet/tablet_replica-test-base.cc
index 5bb89e2..d9e7329 100644
--- a/src/kudu/tablet/tablet_replica-test-base.cc
+++ b/src/kudu/tablet/tablet_replica-test-base.cc
@@ -144,6 +144,7 @@ Status TabletReplicaTestBase::SetUpReplica(bool 
new_replica) {
                       cmeta_manager_,
                       *config_peer,
                       apply_pool_.get(),
+                      /*reload_txn_status_tablet_pool*/nullptr,
                       /*txn_coordinator_factory*/nullptr,
                       [tablet_id] (const string& reason) {
                         LOG(INFO) << Substitute(
diff --git a/src/kudu/tablet/tablet_replica.cc 
b/src/kudu/tablet/tablet_replica.cc
index d40a8c5..105b9f5 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -23,6 +23,7 @@
 #include <mutex>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -36,7 +37,9 @@
 #include "kudu/consensus/consensus_peers.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_anchor_registry.h"
+#include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/opid.pb.h"
+#include "kudu/consensus/quorum_util.h"
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/consensus/time_manager.h"
 #include "kudu/fs/data_dirs.h"
@@ -58,6 +61,7 @@
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
@@ -104,6 +108,7 @@ using kudu::consensus::ALTER_SCHEMA_OP;
 using kudu::consensus::ConsensusBootstrapInfo;
 using kudu::consensus::ConsensusOptions;
 using kudu::consensus::ConsensusRound;
+using kudu::consensus::ConsensusStatePB;
 using kudu::consensus::MarkDirtyCallback;
 using kudu::consensus::OpId;
 using kudu::consensus::PARTICIPANT_OP;
@@ -118,6 +123,7 @@ using kudu::consensus::WRITE_OP;
 using kudu::log::Log;
 using kudu::log::LogAnchorRegistry;
 using kudu::pb_util::SecureDebugString;
+using kudu::pb_util::SecureShortDebugString;
 using kudu::rpc::Messenger;
 using kudu::rpc::ResultTracker;
 using std::map;
@@ -135,6 +141,7 @@ TabletReplica::TabletReplica(
     scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager,
     consensus::RaftPeerPB local_peer_pb,
     ThreadPool* apply_pool,
+    ThreadPool* reload_txn_status_tablet_pool,
     TxnCoordinatorFactory* txn_coordinator_factory,
     MarkDirtyCallback cb)
     : meta_(DCHECK_NOTNULL(std::move(meta))),
@@ -142,10 +149,17 @@ TabletReplica::TabletReplica(
       local_peer_pb_(std::move(local_peer_pb)),
       log_anchor_registry_(new LogAnchorRegistry()),
       apply_pool_(apply_pool),
+      reload_txn_status_tablet_pool_(reload_txn_status_tablet_pool),
       txn_coordinator_(meta_->table_type() &&
                        *meta_->table_type() == TableTypePB::TXN_STATUS_TABLE ?
                        DCHECK_NOTNULL(txn_coordinator_factory)->Create(this) : 
nullptr),
-      mark_dirty_clbk_(std::move(cb)),
+      txn_coordinator_task_counter_(0),
+      mark_dirty_clbk_(meta_->table_type() &&
+                       *meta_->table_type() == TableTypePB::TXN_STATUS_TABLE ?
+                       [this, cb](const string& reason) {
+                         cb(reason);
+                         this->TxnStatusReplicaStateChanged(this->tablet_id(), 
reason);
+                       } : std::move(cb)),
       state_(NOT_INITIALIZED),
       last_status_("Tablet initializing...") {
 }
@@ -261,11 +275,6 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& 
bootstrap_info,
     CHECK_EQ(BOOTSTRAPPING, state_); // We are still protected by 
'state_change_lock_'.
     set_state(RUNNING);
   }
-  // TODO(awong): hook a callback into the TxnStatusManager that runs this when
-  // we become leader such that only leaders load the tablet into memory.
-  if (txn_coordinator_) {
-    RETURN_NOT_OK(txn_coordinator_->LoadFromTablet());
-  }
 
   return Status::OK();
 }
@@ -292,7 +301,9 @@ void TabletReplica::Stop() {
     set_state(STOPPING);
   }
 
+  WaitUntilTxnCoordinatorTasksFinished();
   std::lock_guard<simple_spinlock> l(state_change_lock_);
+
   // Even though Tablet::Shutdown() also unregisters its ops, we have to do it 
here
   // to ensure that any currently running operation finishes before we proceed 
with
   // the rest of the shutdown sequence. In particular, a maintenance operation 
could
@@ -354,6 +365,60 @@ void TabletReplica::WaitUntilStopped() {
   }
 }
 
+void TabletReplica::WaitUntilTxnCoordinatorTasksFinished() {
+  if (!txn_coordinator_) {
+    return;
+  }
+
+  while (true) {
+    {
+      std::lock_guard<simple_spinlock> lock(lock_);
+      if (txn_coordinator_task_counter_ == 0) {
+        return;
+      }
+    }
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+}
+
+void TabletReplica::TxnStatusReplicaStateChanged(const string& tablet_id, 
const string& reason) {
+  if (PREDICT_FALSE(!ShouldRunTxnCoordinatorStateChangedTask())) {
+    return;
+  }
+  auto decrement_on_failure = MakeScopedCleanup([&] {
+    DecreaseTxnCoordinatorTaskCounter();
+  });
+  CHECK_EQ(tablet_id, this->tablet_id());
+  shared_ptr<RaftConsensus> consensus = shared_consensus();
+  if (!consensus) {
+    LOG_WITH_PREFIX(WARNING) << "Received notification of TxnStatusTablet 
state change "
+                             << "but the raft consensus is not initialized. 
Tablet ID: "
+                             << tablet_id << ". Reason: " << reason;
+    return;
+  }
+  ConsensusStatePB cstate;
+  Status s = consensus->ConsensusState(&cstate);
+  if (PREDICT_FALSE(!s.ok())) {
+    LOG_WITH_PREFIX(WARNING) << "Consensus state is not available. " << 
s.ToString();
+    return;
+  }
+  LOG_WITH_PREFIX(INFO) << "TxnStatusTablet state changed. Reason: " << reason 
<< ". "
+                        << "Latest consensus state: " << 
SecureShortDebugString(cstate);
+  RaftPeerPB::Role new_role = GetConsensusRole(permanent_uuid(), cstate);
+  LOG_WITH_PREFIX(INFO) << "This TxnStatusTablet replica's current role is: "
+                        << RaftPeerPB::Role_Name(new_role);
+
+  if (new_role == RaftPeerPB::LEADER) {
+    // If we're going to schedule a task, only decrement our task count when
+    // that task finishes.
+    decrement_on_failure.cancel();
+    CHECK_OK(reload_txn_status_tablet_pool_->Submit([this] {
+      txn_coordinator_->PrepareLeadershipTask();
+      DecreaseTxnCoordinatorTaskCounter();
+    }));
+  }
+}
+
 string TabletReplica::LogPrefix() const {
   return meta_->LogPrefix();
 }
@@ -400,6 +465,14 @@ Status TabletReplica::CheckRunning() const {
   return Status::OK();
 }
 
+bool TabletReplica::IsShuttingDown() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  if (state_ == STOPPING || state_ == STOPPED) {
+    return true;
+  }
+  return false;
+}
+
 Status TabletReplica::WaitUntilConsensusRunning(const MonoDelta& timeout) {
   MonoTime start(MonoTime::Now());
 
@@ -902,6 +975,43 @@ ReportedTabletStatsPB TabletReplica::GetTabletStats() 
const {
   return stats_pb_;
 }
 
+bool TabletReplica::ShouldRunTxnCoordinatorStalenessTask() {
+  if (!txn_coordinator_) {
+    return false;
+  }
+  std::lock_guard<simple_spinlock> l(lock_);
+  if (state_ != RUNNING) {
+    LOG(WARNING) << Substitute("The tablet is not running. State: $0",
+                               TabletStatePB_Name(state_));
+    return false;
+  }
+  txn_coordinator_task_counter_++;
+  return true;
+}
+
+bool TabletReplica::ShouldRunTxnCoordinatorStateChangedTask() {
+  if (!txn_coordinator_) {
+    return false;
+  }
+  std::lock_guard<simple_spinlock> l(lock_);
+  // We only check if the tablet is shutting down here, since replica
+  // state change can happen even when the tablet is not running yet.
+  if (state_ == STOPPING || state_ == STOPPED) {
+    LOG(WARNING) << Substitute("The tablet is already shutting down or 
shutdown. State: $0",
+                               TabletStatePB_Name(state_));
+    return false;
+  }
+  txn_coordinator_task_counter_++;
+  return true;
+}
+
+void TabletReplica::DecreaseTxnCoordinatorTaskCounter() {
+  DCHECK(txn_coordinator_);
+  std::lock_guard<simple_spinlock> l(lock_);
+  txn_coordinator_task_counter_--;
+  DCHECK_GE(txn_coordinator_task_counter_, 0);
+}
+
 void TabletReplica::MakeUnavailable(const Status& error) {
   std::shared_ptr<Tablet> tablet;
   {
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index d33a2bb..7121c48 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -91,6 +91,7 @@ class TabletReplica : public 
RefCountedThreadSafe<TabletReplica>,
                 scoped_refptr<consensus::ConsensusMetadataManager> 
cmeta_manager,
                 consensus::RaftPeerPB local_peer_pb,
                 ThreadPool* apply_pool,
+                ThreadPool* reload_txn_status_tablet_pool,
                 TxnCoordinatorFactory* txn_coordinator_factory,
                 consensus::MarkDirtyCallback cb);
 
@@ -128,6 +129,9 @@ class TabletReplica : public 
RefCountedThreadSafe<TabletReplica>,
   // Check that the tablet is in a RUNNING state.
   Status CheckRunning() const;
 
+  // Whether the tablet is already shutting down or shutdown.
+  bool IsShuttingDown() const;
+
   // Wait until the tablet is in a RUNNING state or if there's a timeout.
   // TODO have a way to wait for any state?
   Status WaitUntilConsensusRunning(const MonoDelta& timeout);
@@ -321,6 +325,31 @@ class TabletReplica : public 
RefCountedThreadSafe<TabletReplica>,
     return txn_coordinator_.get();
   }
 
+  // Whether or not to run a new staleness transactions aborting task.
+  // If the tablet is part of a transaction status table and is in
+  // RUNNING state, register the task by increasing the transaction status
+  // manager task counter. Once registered, the tablet will wait the task to
+  // be executed before shutting down. More concretely, if this returns
+  // 'true', callers must call DecreaseTxnCoordinatorTaskCounter().
+  //
+  // Return true if the caller should run the staleness task. Otherwise
+  // return false.
+  bool ShouldRunTxnCoordinatorStalenessTask();
+
+  // Whether or not to run a new transaction status table reloading
+  // metadata task. If the tablet is part of a transaction status table
+  // and is not shutting down, register the task by increasing the
+  // transaction status manager task counter if the tablet is not shutting
+  // down. Similar to 'ShouldRunTxnCoordinatorStalenessTask' above. if
+  // this returns 'true', callers must call 
DecreaseTxnCoordinatorTaskCounter().
+  //
+  // Return true if the caller should run the metadata reloading task
+  // Otherwise return false.
+  bool ShouldRunTxnCoordinatorStateChangedTask();
+
+  // Decrease the task counter of the transaction status manager.
+  void DecreaseTxnCoordinatorTaskCounter();
+
  private:
   friend class kudu::AlterTableTest;
   friend class RefCountedThreadSafe<TabletReplica>;
@@ -336,6 +365,15 @@ class TabletReplica : public 
RefCountedThreadSafe<TabletReplica>,
   // state.
   void WaitUntilStopped();
 
+  // Wait until all on-going tasks for transaction status manager, if any,
+  // to finish.
+  void WaitUntilTxnCoordinatorTasksFinished();
+
+  // Handle the state change accordingly if this tablet is a part of the
+  // transaction status table.
+  void TxnStatusReplicaStateChanged(const std::string& tablet_id,
+                                    const std::string& reason);
+
   std::string LogPrefix() const;
   // Transition to another state. Requires that the caller hold 'lock_' if the
   // object has already published to other threads. See tablet/metadata.proto
@@ -353,11 +391,18 @@ class TabletReplica : public 
RefCountedThreadSafe<TabletReplica>,
   // Tablet server.
   ThreadPool* const apply_pool_;
 
+  // Pool that executes txn status tablet in memory state reloading. This is
+  // a multi-threaded pool, constructor-injected by the tablet server.
+  ThreadPool* const reload_txn_status_tablet_pool_;
+
   // If this tablet is a part of the transaction status table, this is the
   // entity responsible for accepting and managing requests to coordinate
   // transactions.
   const std::unique_ptr<TxnCoordinator> txn_coordinator_;
 
+  // Track the number of on-going tasks of the transaction status manager.
+  int txn_coordinator_task_counter_;
+
   // Function to mark this TabletReplica's tablet as dirty in the 
TSTabletManager.
   //
   // Must be called whenever cluster membership or leadership changes, or when
diff --git a/src/kudu/tablet/txn_coordinator.h 
b/src/kudu/tablet/txn_coordinator.h
index 39e6ae1..c679906 100644
--- a/src/kudu/tablet/txn_coordinator.h
+++ b/src/kudu/tablet/txn_coordinator.h
@@ -46,7 +46,11 @@ class TxnCoordinator {
  public:
   virtual ~TxnCoordinator() {}
 
-  virtual Status LoadFromTablet() = 0;
+  // Perform necessary work to prepare for running in the leader role.
+  // It's about reload tablet metadata into memory and do other work
+  // to update the internal state of the coordinator upon becoming
+  // the leader.
+  virtual void PrepareLeadershipTask() = 0;
 
   // Starts a transaction with the given ID as the given user.
   //
diff --git a/src/kudu/transactions/txn_status_manager-test.cc 
b/src/kudu/transactions/txn_status_manager-test.cc
index 960a742..0d104c3 100644
--- a/src/kudu/transactions/txn_status_manager-test.cc
+++ b/src/kudu/transactions/txn_status_manager-test.cc
@@ -119,60 +119,62 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
     { 3, { kParticipant1, kParticipant2 } },
   };
 
-  ASSERT_TRUE(txn_manager_->GetParticipantsByTxnIdForTests().empty());
+  {
+    TxnStatusManager::ScopedLeaderSharedLock lock(txn_manager_.get());
+    ASSERT_TRUE(txn_manager_->GetParticipantsByTxnIdForTests().empty());
 
-  TabletServerErrorPB ts_error;
-  for (const auto& txn_id_and_prts : expected_prts_by_txn_id) {
-    const auto& txn_id = txn_id_and_prts.first;
-    int64_t highest_seen_txn_id = -1;
-    ASSERT_OK(txn_manager_->BeginTransaction(
-        txn_id, kOwner, &highest_seen_txn_id, &ts_error));
-    ASSERT_GE(highest_seen_txn_id, 0);
-    ASSERT_EQ(highest_seen_txn_id, txn_id);
-    for (const auto& prt : txn_id_and_prts.second) {
-      ASSERT_OK(txn_manager_->RegisterParticipant(txn_id, prt, kOwner, 
&ts_error));
+    TabletServerErrorPB ts_error;
+    for (const auto& txn_id_and_prts : expected_prts_by_txn_id) {
+      const auto& txn_id = txn_id_and_prts.first;
+      int64_t highest_seen_txn_id = -1;
+      ASSERT_OK(txn_manager_->BeginTransaction(
+          txn_id, kOwner, &highest_seen_txn_id, &ts_error));
+      ASSERT_GE(highest_seen_txn_id, 0);
+      ASSERT_EQ(highest_seen_txn_id, txn_id);
+      for (const auto& prt : txn_id_and_prts.second) {
+        ASSERT_OK(txn_manager_->RegisterParticipant(txn_id, prt, kOwner, 
&ts_error));
+      }
     }
-  }
-  // Registering a participant that's already open is harmless, presuming the
-  // participant is still open.
-  ASSERT_OK(txn_manager_->RegisterParticipant(3, kParticipant1, kOwner, 
&ts_error));
+    // Registering a participant that's already open is harmless, presuming the
+    // participant is still open.
+    ASSERT_OK(txn_manager_->RegisterParticipant(3, kParticipant1, kOwner, 
&ts_error));
 
-  // Starting a transaction that's already been started should result in an
-  // error, even if it's not currently in flight.
-  {
-    int64_t highest_seen_txn_id = -1;
-    auto s = txn_manager_->BeginTransaction(
-        1, kOwner, &highest_seen_txn_id, &ts_error);
-    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
-    ASSERT_EQ(3, highest_seen_txn_id);
-  }
-  {
-    int64_t highest_seen_txn_id = -1;
-    auto s = txn_manager_->BeginTransaction(
-        2, kOwner, &highest_seen_txn_id, &ts_error);
-    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
-    ASSERT_EQ(3, highest_seen_txn_id);
-  }
+    // Starting a transaction that's already been started should result in an
+    // error, even if it's not currently in flight.
+    {
+      int64_t highest_seen_txn_id = -1;
+      auto s = txn_manager_->BeginTransaction(
+          1, kOwner, &highest_seen_txn_id, &ts_error);
+      ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+      ASSERT_EQ(3, highest_seen_txn_id);
+    }
+    {
+      int64_t highest_seen_txn_id = -1;
+      auto s = txn_manager_->BeginTransaction(
+          2, kOwner, &highest_seen_txn_id, &ts_error);
+      ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+      ASSERT_EQ(3, highest_seen_txn_id);
+    }
 
-  // Registering participants to transactions that don't exist should also
-  // result in errors.
-  auto s = txn_manager_->RegisterParticipant(2, kParticipant1, kOwner, 
&ts_error);
-  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    // Registering participants to transactions that don't exist should also
+    // result in errors.
+    auto s = txn_manager_->RegisterParticipant(2, kParticipant1, kOwner, 
&ts_error);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
 
-  // The underlying participants map should only reflect the successful
-  // operations.
-  ASSERT_EQ(expected_prts_by_txn_id,
-            txn_manager_->GetParticipantsByTxnIdForTests());
-  ASSERT_EQ(3, txn_manager_->highest_txn_id());
-  {
-    // Reload the TxnStatusManager from disk and verify the state.
-    TxnStatusManager txn_manager_reloaded(tablet_replica_.get());
-    ASSERT_OK(txn_manager_reloaded.LoadFromTablet());
+    // The underlying participants map should only reflect the successful
+    // operations.
     ASSERT_EQ(expected_prts_by_txn_id,
-              txn_manager_reloaded.GetParticipantsByTxnIdForTests());
-    ASSERT_EQ(3, txn_manager_reloaded.highest_txn_id());
+              txn_manager_->GetParticipantsByTxnIdForTests());
+    ASSERT_EQ(3, txn_manager_->highest_txn_id());
+    {
+      // Reload the TxnStatusManager from disk and verify the state.
+      TxnStatusManager txn_manager_reloaded(tablet_replica_.get());
+      ASSERT_OK(txn_manager_reloaded.LoadFromTablet());
+      ASSERT_EQ(expected_prts_by_txn_id,
+                txn_manager_reloaded.GetParticipantsByTxnIdForTests());
+      ASSERT_EQ(3, txn_manager_reloaded.highest_txn_id());
+    }
   }
-
   // Now rebuild the underlying replica and rebuild the TxnStatusManager.
   ASSERT_OK(RestartReplica());
   NO_FATALS(ResetTxnStatusManager());
@@ -185,6 +187,7 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
   ASSERT_OK(RestartReplica());
   {
     TxnStatusManager tsm(tablet_replica_.get());
+    TxnStatusManager::ScopedLeaderSharedLock lock(&tsm);
     // Check for the special value of the highest_txn_id when the data from
     // the transaction status tablet isn't loaded yet.
     ASSERT_EQ(-2, tsm.highest_txn_id());
@@ -261,6 +264,7 @@ TEST_F(TxnStatusManagerTest, 
TestStartTransactionsConcurrently) {
         // Synchronize the threads so we're inserting to a single range at a
         // time.
         barriers[b]->Wait();
+        TxnStatusManager::ScopedLeaderSharedLock 
leader_shared_lock(txn_manager_.get());
         auto txn_id = txns_to_insert[b][i];
         TabletServerErrorPB ts_error;
         int64_t highest_seen_txn_id = -1;
@@ -308,6 +312,7 @@ TEST_F(TxnStatusManagerTest, 
TestRegisterParticipantsConcurrently) {
   CountDownLatch begun_txn(1);
   threads.reserve(1 + kParticipantsInParallel);
   threads.emplace_back([&] {
+    TxnStatusManager::ScopedLeaderSharedLock 
leader_shared_lock(txn_manager_.get());
     TabletServerErrorPB ts_error;
     CHECK_OK(txn_manager_->BeginTransaction(kTxnId, kOwner, nullptr, 
&ts_error));
     begun_txn.CountDown();
@@ -322,6 +327,7 @@ TEST_F(TxnStatusManagerTest, 
TestRegisterParticipantsConcurrently) {
         // at least some of the participant registrations succeed.
         begun_txn.Wait();
       }
+      TxnStatusManager::ScopedLeaderSharedLock 
leader_shared_lock(txn_manager_.get());
       string prt = ParticipantId(i % kUniqueParticipantIds);
       TabletServerErrorPB ts_error;
       Status s = txn_manager_->RegisterParticipant(kTxnId, prt, kOwner, 
&ts_error);
@@ -357,6 +363,7 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
   const int kNumTransactions = 10;
   const int kNumUpdatesInParallel = 20;
   for (int i = 0; i < kNumTransactions; i++) {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
     ASSERT_OK(txn_manager_->BeginTransaction(i, kOwner, nullptr, &ts_error));
   }
@@ -376,6 +383,7 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
   // Start a bunch of threads that update transaction states.
   for (int i = 0; i < kNumUpdatesInParallel; i++) {
     threads.emplace_back([&, i] {
+      TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
       const auto& txn_id = updates[i].first;
       TabletServerErrorPB ts_error;
       switch (updates[i].second) {
@@ -438,6 +446,7 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
 // TxnStatusManager::GetTransactionStatus() method.
 TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TxnStatusEntryPB txn_status;
     TabletServerErrorPB ts_error;
     auto s = txn_manager_->GetTransactionStatus(
@@ -448,6 +457,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     ASSERT_FALSE(txn_status.has_user());
   }
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
     ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));
 
@@ -477,6 +487,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
   }
 
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
     ASSERT_OK(txn_manager_->BeginTransaction(2, kOwner, nullptr, &ts_error));
     ASSERT_OK(txn_manager_->AbortTransaction(2, kOwner, &ts_error));
@@ -490,13 +501,16 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     ASSERT_EQ(kOwner, txn_status.user());
   }
 
-  // Start another transaction and start its commit phase.
-  TabletServerErrorPB ts_error;
-  ASSERT_OK(txn_manager_->BeginTransaction(3, kOwner, nullptr, &ts_error));
-  ASSERT_OK(txn_manager_->BeginCommitTransaction(3, kOwner, &ts_error));
+  {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
+    // Start another transaction and start its commit phase.
+    TabletServerErrorPB ts_error;
+    ASSERT_OK(txn_manager_->BeginTransaction(3, kOwner, nullptr, &ts_error));
+    ASSERT_OK(txn_manager_->BeginCommitTransaction(3, kOwner, &ts_error));
 
-  // Start just another transaction.
-  ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, nullptr, &ts_error));
+    // Start just another transaction.
+    ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, nullptr, &ts_error));
+  }
 
   // Make the TxnStatusManager start from scratch.
   ASSERT_OK(RestartReplica());
@@ -506,6 +520,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
   // TxnStatusManager even after restarting the underlying replica and
   // rebuilding the TxnStatusManager from scratch.
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TxnStatusEntryPB txn_status;
     TabletServerErrorPB ts_error;
     ASSERT_OK(txn_manager_->GetTransactionStatus(
@@ -539,6 +554,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
 
   // Supplying wrong user.
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TxnStatusEntryPB txn_status;
     TabletServerErrorPB ts_error;
     auto s = txn_manager_->GetTransactionStatus(
@@ -548,6 +564,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
 
   // Supplying not-yet-used transaction ID.
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TxnStatusEntryPB txn_status;
     TabletServerErrorPB ts_error;
     auto s = txn_manager_->GetTransactionStatus(
@@ -557,6 +574,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
 
   // Supplying wrong user and not-yet-used transaction ID.
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TxnStatusEntryPB txn_status;
     TabletServerErrorPB ts_error;
     auto s = txn_manager_->GetTransactionStatus(
@@ -571,6 +589,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
 TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
   // Supplying not-yet-registered transaction ID.
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
     auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
     ASSERT_TRUE(s.IsNotFound()) << s.ToString();
@@ -579,6 +598,7 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
 
   // OPEN --> COMMIT_IN_PROGRESS --> COMMITTED
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
     ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));
     ASSERT_OK(txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error));
@@ -619,6 +639,7 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
 
   // OPEN --> COMMIT_IN_PROGRESS --> ABORTED
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
     ASSERT_OK(txn_manager_->BeginTransaction(2, kOwner, nullptr, &ts_error));
     ASSERT_OK(txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error));
@@ -644,6 +665,7 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
 
   // OPEN --> ABORTED
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
     ASSERT_OK(txn_manager_->BeginTransaction(3, kOwner, nullptr, &ts_error));
     ASSERT_OK(txn_manager_->KeepTransactionAlive(3, kOwner, &ts_error));
@@ -657,6 +679,7 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
 
   // Open a new transaction just before restarting the TxnStatusManager.
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
     ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, nullptr, &ts_error));
   }
@@ -670,6 +693,7 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
   // rebuilding the TxnStatusManager from scratch, so KeepTransactionAlive()
   // should behave the same as if no restart has happened.
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
     auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
@@ -684,6 +708,7 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
     }
   }
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
     auto s = txn_manager_->KeepTransactionAlive(2, kOwner, &ts_error);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
@@ -697,6 +722,7 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
     }
   }
   {
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
     ASSERT_OK(txn_manager_->KeepTransactionAlive(4, kOwner, &ts_error));
     // Supplying wrong user for transaction in OPEN state.
@@ -711,6 +737,7 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
 
 // Test that performing actions as the wrong user will return errors.
 TEST_F(TxnStatusManagerTest, TestWrongUser) {
+  TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
   const string kWrongUser = "stranger";
   int64_t highest_seen_txn_id = -1;
   TabletServerErrorPB ts_error;
@@ -744,6 +771,7 @@ TEST_F(TxnStatusManagerTest, TestWrongUser) {
 // Test that we can only update a transaction's state when it's in an
 // appropriate state.
 TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) {
+  TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
   const int64_t kTxnId1 = 1;
   TabletServerErrorPB ts_error;
   ASSERT_OK(txn_manager_->BeginTransaction(kTxnId1, kOwner, nullptr, 
&ts_error));
@@ -784,6 +812,7 @@ TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) {
 // Test that we can only add participants to a transaction when it's in an
 // appropriate state.
 TEST_F(TxnStatusManagerTest, TestRegisterParticipantsWithStates) {
+  TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
   TabletServerErrorPB ts_error;
   const int64_t kTxnId1 = 1;
 
diff --git a/src/kudu/transactions/txn_status_manager.cc 
b/src/kudu/transactions/txn_status_manager.cc
index 27ed3d8..fbd085e 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -18,6 +18,7 @@
 #include "kudu/transactions/txn_status_manager.h"
 
 #include <algorithm>
+#include <functional>
 #include <mutex>
 #include <ostream>
 #include <string>
@@ -31,10 +32,13 @@
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/raft_consensus.h"
+#include "kudu/gutil/casts.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/tablet/ops/op_tracker.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/transactions/transactions.pb.h"
 #include "kudu/tserver/tserver.pb.h"
@@ -45,6 +49,7 @@
 #include "kudu/util/pb_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
 
 DEFINE_uint32(txn_keepalive_interval_ms, 30000,
               "Maximum interval (in milliseconds) between subsequent "
@@ -83,10 +88,35 @@ DEFINE_bool(txn_status_manager_finalize_commit_on_begin, 
false,
 TAG_FLAG(txn_status_manager_finalize_commit_on_begin, hidden);
 TAG_FLAG(txn_status_manager_finalize_commit_on_begin, unsafe);
 
+DEFINE_int32(txn_status_tablet_failover_catchup_timeout_ms, 30 * 1000, // 30 
sec
+             "Amount of time to give a newly-elected leader tserver of 
transaction "
+             "status tablet to load the metadata containing all operations 
replicated "
+             "by the previous leader and become active.");
+TAG_FLAG(txn_status_tablet_failover_catchup_timeout_ms, advanced);
+TAG_FLAG(txn_status_tablet_failover_catchup_timeout_ms, experimental);
+
+DEFINE_bool(txn_status_tablet_failover_inject_timeout_error, false,
+            "If true, inject timeout error when waiting the replica to catch 
up with "
+            "all replicated operations in previous term.");
+TAG_FLAG(txn_status_tablet_failover_inject_timeout_error, unsafe);
+
+DEFINE_bool(txn_status_tablet_inject_load_failure_error, false,
+            "If true, inject error when loading data from the transaction 
status "
+            "tablet replica");
+TAG_FLAG(txn_status_tablet_inject_load_failure_error, unsafe);
+
+DEFINE_bool(txn_status_tablet_inject_uninitialized_leader_status_error, false,
+            "If true, inject uninitialized leader status error");
+TAG_FLAG(txn_status_tablet_inject_uninitialized_leader_status_error, unsafe);
+
+using kudu::consensus::ConsensusStatePB;
+using kudu::consensus::RaftConsensus;
+using kudu::consensus::RaftPeerPB;
 using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::RpcContext;
+using kudu::tablet::TabletReplica;
 using kudu::tablet::ParticipantIdsByTxnId;
 using kudu::tserver::TabletServerErrorPB;
-using kudu::consensus::RaftPeerPB;
 using std::string;
 using std::vector;
 using strings::Substitute;
@@ -106,6 +136,10 @@ namespace {
 constexpr int64_t kIdStatusDataNotLoaded = -2;
 constexpr int64_t kIdStatusDataReady = -1;
 
+// Value to represent uninitialized 'leader_ready_term_' assigned at the
+// transaction status manager construction time.
+constexpr int64_t kUninitializedLeaderTerm = -1;
+
 Status ReportIllegalTxnState(const string& errmsg,
                              TabletServerErrorPB* ts_error) {
   DCHECK(ts_error);
@@ -160,10 +194,97 @@ void TxnStatusManagerBuildingVisitor::Release(
 
 TxnStatusManager::TxnStatusManager(tablet::TabletReplica* tablet_replica)
     : highest_txn_id_(kIdStatusDataNotLoaded),
-      status_tablet_(tablet_replica) {
+      status_tablet_(tablet_replica),
+      leader_ready_term_(kUninitializedLeaderTerm),
+      leader_lock_(RWMutex::Priority::PREFER_WRITING) {
 }
 
-Status TxnStatusManager::LoadFromTablet() {
+////////////////////////////////////////////////////////////
+// TxnStatusManager::ScopedLeaderSharedLock
+////////////////////////////////////////////////////////////
+TxnStatusManager::ScopedLeaderSharedLock::ScopedLeaderSharedLock(
+    TxnCoordinator* txn_coordinator)
+    : 
txn_status_manager_(DCHECK_NOTNULL(down_cast<TxnStatusManager*>(txn_coordinator))),
+      leader_shared_lock_(txn_status_manager_->leader_lock_, std::try_to_lock),
+      replica_status_(Status::Uninitialized(
+          "Transaction status tablet replica is not initialized")),
+      leader_status_(Status::Uninitialized(
+          "Leader status is not initialized")),
+      initial_term_(kUninitializedLeaderTerm) {
+
+  int64_t leader_ready_term;
+  {
+    std::lock_guard<simple_spinlock> l(txn_status_manager_->leader_term_lock_);
+    replica_status_ = 
txn_status_manager_->status_tablet_.tablet_replica_->CheckRunning();
+    if (PREDICT_FALSE(!replica_status_.ok() ||
+                      
FLAGS_txn_status_tablet_inject_uninitialized_leader_status_error)) {
+      return;
+    }
+    leader_ready_term = txn_status_manager_->leader_ready_term_;
+  }
+
+  ConsensusStatePB cstate;
+  Status s =
+      
txn_status_manager_->status_tablet_.tablet_replica_->consensus()->ConsensusState(&cstate);
+  if (PREDICT_FALSE(!s.ok())) {
+    DCHECK(s.IsIllegalState()) << s.ToString();
+    replica_status_ = s.CloneAndPrepend("ConsensusState is not available");
+    return;
+  }
+  DCHECK(replica_status_.ok());
+
+  // Check if the transaction status manager is the leader.
+  initial_term_ = cstate.current_term();
+  const string& uuid = 
txn_status_manager_->status_tablet_.tablet_replica_->permanent_uuid();
+  if (PREDICT_FALSE(cstate.leader_uuid() != uuid)) {
+    leader_status_ = Status::IllegalState(
+        Substitute("Not the leader. Local UUID: $0, Raft Consensus state: $1",
+                   uuid, SecureShortDebugString(cstate)));
+    return;
+  }
+  if (PREDICT_FALSE(leader_ready_term != initial_term_ ||
+                    !leader_shared_lock_.owns_lock())) {
+    leader_status_ = Status::ServiceUnavailable(
+        "Leader not yet ready to serve requests or the leadership has 
changed");
+    return;
+  }
+  leader_status_ = Status::OK();
+}
+
+template<typename RespClass>
+bool 
TxnStatusManager::ScopedLeaderSharedLock::CheckIsInitializedAndIsLeaderOrRespond(
+    RespClass* resp, RpcContext* rpc) {
+  const Status& s = first_failed_status();
+  if (PREDICT_TRUE(s.ok())) {
+    return true;
+  }
+
+  StatusToPB(s, resp->mutable_error()->mutable_status());
+  if (!leader_status_.IsUninitialized()) {
+    resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
+  } else {
+    resp->mutable_error()->set_code(TabletServerErrorPB::TABLET_NOT_RUNNING);
+  }
+  rpc->RespondSuccess();
+  return false;
+}
+
+// Explicit specialization for callers outside this compilation unit.
+#define INITTED_AND_LEADER_OR_RESPOND(RespClass) \
+  template bool \
+  
TxnStatusManager::ScopedLeaderSharedLock::CheckIsInitializedAndIsLeaderOrRespond(
 \
+      RespClass* resp, RpcContext* rpc) /* NOLINT */
+
+INITTED_AND_LEADER_OR_RESPOND(tserver::CoordinateTransactionResponsePB);
+#undef INITTED_AND_LEADER_OR_RESPOND
+
+Status TxnStatusManager::LoadFromTabletUnlocked() {
+  leader_lock_.AssertAcquiredForWriting();
+
+  if (PREDICT_FALSE(FLAGS_txn_status_tablet_inject_load_failure_error)) {
+    return Status::IllegalState("Injected transaction status tablet reload 
error");
+  }
+
   TxnStatusManagerBuildingVisitor v;
   RETURN_NOT_OK(status_tablet_.VisitTransactions(&v));
   int64_t highest_txn_id;
@@ -180,6 +301,14 @@ Status TxnStatusManager::LoadFromTablet() {
   return Status::OK();
 }
 
+Status TxnStatusManager::LoadFromTablet() {
+  // Block new transaction status manager operations, and wait
+  // for existing operations to finish.
+  std::lock_guard<RWMutex> leader_lock_guard(leader_lock_);
+  leader_lock_.AssertAcquiredForWriting();
+  return LoadFromTabletUnlocked();
+}
+
 Status TxnStatusManager::CheckTxnStatusDataLoadedUnlocked(
     TabletServerErrorPB* ts_error) const {
   DCHECK(ts_error);
@@ -193,6 +322,9 @@ Status TxnStatusManager::CheckTxnStatusDataLoadedUnlocked(
   //                status of transactions which it is no longer aware of 
should
   //                be handled separately.
   if (PREDICT_FALSE(highest_txn_id_ <= kIdStatusDataNotLoaded)) {
+    // The records from the tablet is not yet be loaded only if the
+    // leadership status has not been initialized.
+    CHECK(leader_ready_term_ == kUninitializedLeaderTerm);
     return Status::ServiceUnavailable("transaction status data is not loaded");
   }
   auto* consensus = status_tablet_.tablet_replica_->consensus();
@@ -209,10 +341,26 @@ Status TxnStatusManager::CheckTxnStatusDataLoadedUnlocked(
   return Status::OK();
 }
 
+Status TxnStatusManager::WaitUntilCaughtUpAsLeader(const MonoDelta& timeout) {
+  // Verify the current node is indeed the leader.
+  ConsensusStatePB cstate;
+  
RETURN_NOT_OK(status_tablet_.tablet_replica_->consensus()->ConsensusState(&cstate));
+  const string& uuid = status_tablet_.tablet_replica_->permanent_uuid();
+  if (cstate.leader_uuid() != uuid) {
+    return Status::IllegalState(
+        Substitute("Node $0 not leader. Raft Consensus state: $1",
+                   uuid, SecureShortDebugString(cstate)));
+  }
+
+  // Wait for all ops to be committed.
+  return 
status_tablet_.tablet_replica_->op_tracker()->WaitForAllToFinish(timeout);
+}
+
 Status TxnStatusManager::GetTransaction(int64_t txn_id,
                                         const boost::optional<string>& user,
                                         scoped_refptr<TransactionEntry>* txn,
                                         TabletServerErrorPB* ts_error) const {
+  leader_lock_.AssertAcquiredForReading();
   std::lock_guard<simple_spinlock> l(lock_);
 
   // First, make sure the transaction status data has been loaded. If not, then
@@ -234,6 +382,96 @@ Status TxnStatusManager::GetTransaction(int64_t txn_id,
   return Status::OK();
 }
 
+void TxnStatusManager::PrepareLeadershipTask() {
+  // Return early if the tablet is already not running.
+  if (PREDICT_FALSE(status_tablet_.tablet_replica_->IsShuttingDown())) {
+    LOG(WARNING) << "Not reloading transaction status tablet metadata, because 
"
+                 << "the tablet is already shutting down or shutdown. ";
+    return;
+  }
+  const RaftConsensus* consensus = status_tablet_.tablet_replica_->consensus();
+  const int64_t term_before_wait = consensus->CurrentTerm();
+  {
+    std::lock_guard<simple_spinlock> l(leader_term_lock_);
+    if (leader_ready_term_ == term_before_wait) {
+      // The term hasn't changed since the last time this replica was the
+      // leader. It's not possible for another replica to be leader for the 
same
+      // term, so there hasn't been any actual leadership change and thus
+      // there's no reason to reload the on-disk metadata.
+      VLOG(2) << Substitute("Term $0 hasn't changed, ignoring dirty callback",
+                            term_before_wait);
+      return;
+    }
+  }
+  LOG(INFO) << "Waiting until node catch up with all replicated operations in 
previous term...";
+  Status s = WaitUntilCaughtUpAsLeader(
+      
MonoDelta::FromMilliseconds(FLAGS_txn_status_tablet_failover_catchup_timeout_ms));
+  if (PREDICT_FALSE(!s.ok() || 
FLAGS_txn_status_tablet_failover_inject_timeout_error)) {
+    WARN_NOT_OK(s, "Failed waiting for node to catch up after leader 
election");
+    // Even when we get a time out error here, it is ok to return. Since the 
client
+    // will get a ServiceUnavailable error and retry.
+    return;
+  }
+
+  const int64_t term = consensus->CurrentTerm();
+  if (term_before_wait != term) {
+    // If we got elected leader again while waiting to catch up then we will
+    // get another callback to reload the metadata, so bail.
+    LOG(INFO) << Substitute("Term changed from $0 to $1 while waiting for 
replica "
+                            "leader catchup. Not loading transaction status 
manager "
+                            "metadata", term_before_wait, term);
+    return;
+  }
+
+  {
+    // This lambda returns the result of calling the 'func'. If the returned 
status
+    // is non-OK, the caller should bail on the leadership preparation task. 
Non-OK
+    // status is not considered fatal, because errors on preparing transaction 
status
+    // table only affect transactional operations and clients can retry in 
such case.
+    const auto check = [this](
+        const std::function<Status()> func,
+        const RaftConsensus& consensus,
+        int64_t start_term,
+        const char* op_description) {
+
+      leader_lock_.AssertAcquiredForWriting();
+      const Status s = func();
+      if (s.ok()) {
+        // Not an error at all.
+        return s;
+      }
+
+      const int64_t term = consensus.CurrentTerm();
+      // If the term has changed we assume the new leader is about to do the
+      // necessary work in its leadership preparation task. Otherwise, log
+      // a warning.
+      if (term != start_term) {
+        LOG(INFO) << Substitute("$0 interrupted; change in term detected: $1 
vs $2: $3",
+                                op_description, start_term, term, 
s.ToString());
+      } else {
+        LOG(WARNING) << Substitute("$0 failed: $1", op_description, 
s.ToString());
+      }
+      return s;
+    };
+
+    // Block new operations, and wait for existing operations to finish.
+    std::lock_guard<RWMutex> leader_lock_guard(leader_lock_);
+
+    static const char* const kLoadMetaOpDescription =
+        "Loading transaction status metadata into memory";
+    LOG(INFO) << kLoadMetaOpDescription << "...";
+    LOG_SLOW_EXECUTION(WARNING, 1000, kLoadMetaOpDescription) {
+      if (!check([this]() { return this->LoadFromTabletUnlocked(); },
+                 *consensus, term, kLoadMetaOpDescription).ok()) {
+        return;
+      }
+    }
+  }
+
+  std::lock_guard<simple_spinlock> l(leader_term_lock_);
+  leader_ready_term_ = term;
+}
+
 // NOTE: In this method, the idea is to try setting the 'highest_seen_txn_id'
 //       on return in most cases. Sending back the most recent highest
 //       transaction identifier helps to avoid extra RPC calls from
@@ -247,6 +485,7 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id,
                                           const string& user,
                                           int64_t* highest_seen_txn_id,
                                           TabletServerErrorPB* ts_error) {
+  leader_lock_.AssertAcquiredForReading();
   {
     std::lock_guard<simple_spinlock> l(lock_);
 
@@ -268,9 +507,6 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id,
           Substitute("transaction ID $0 is not higher than the highest ID so 
far: $1",
                      txn_id, highest_txn_id_));
     }
-    // TODO(awong): reduce the "damage" from followers getting requests by
-    // checking for leadership before doing anything. As is, if this replica
-    // isn't the leader, we may aggressively burn through transaction IDs.
     highest_txn_id_ = txn_id;
   }
 
@@ -311,6 +547,7 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id,
 
 Status TxnStatusManager::BeginCommitTransaction(int64_t txn_id, const string& 
user,
                                                 TabletServerErrorPB* ts_error) 
{
+  leader_lock_.AssertAcquiredForReading();
   scoped_refptr<TransactionEntry> txn;
   RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
 
@@ -342,6 +579,7 @@ Status TxnStatusManager::BeginCommitTransaction(int64_t 
txn_id, const string& us
 Status TxnStatusManager::FinalizeCommitTransaction(
     int64_t txn_id,
     TabletServerErrorPB* ts_error) {
+  leader_lock_.AssertAcquiredForReading();
   scoped_refptr<TransactionEntry> txn;
   RETURN_NOT_OK(GetTransaction(txn_id, boost::none, &txn, ts_error));
 
@@ -368,6 +606,7 @@ Status TxnStatusManager::FinalizeCommitTransaction(
 Status TxnStatusManager::AbortTransaction(int64_t txn_id,
                                           const std::string& user,
                                           TabletServerErrorPB* ts_error) {
+  leader_lock_.AssertAcquiredForReading();
   scoped_refptr<TransactionEntry> txn;
   RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
 
@@ -397,6 +636,7 @@ Status TxnStatusManager::GetTransactionStatus(
     transactions::TxnStatusEntryPB* txn_status,
     TabletServerErrorPB* ts_error) {
   DCHECK(txn_status);
+  leader_lock_.AssertAcquiredForReading();
   scoped_refptr<TransactionEntry> txn;
   RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
 
@@ -451,6 +691,7 @@ Status TxnStatusManager::RegisterParticipant(
     const string& tablet_id,
     const string& user,
     TabletServerErrorPB* ts_error) {
+  leader_lock_.AssertAcquiredForReading();
   scoped_refptr<TransactionEntry> txn;
   RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
 
@@ -489,24 +730,9 @@ Status TxnStatusManager::RegisterParticipant(
 }
 
 void TxnStatusManager::AbortStaleTransactions() {
+  leader_lock_.AssertAcquiredForReading();
   const MonoDelta max_staleness_interval =
       MonoDelta::FromMilliseconds(FLAGS_txn_keepalive_interval_ms);
-
-  auto* consensus = status_tablet_.tablet_replica_->consensus();
-  DCHECK(consensus);
-  if (consensus->role() != RaftPeerPB::LEADER) {
-    // Only leader replicas abort stale transactions registered with them.
-    // As of now, keep-alive requests are sent only to leader replicas, so only
-    // they have up-to-date information about the liveliness of corresponding
-    // transactions.
-    //
-    // If a non-leader replica errorneously (due to a network partition and
-    // the absence of leader leases) tried to abort a transaction, it would 
fail
-    // because aborting a transaction means writing into the transaction status
-    // tablet, so a non-leader replica's write attempt would be rejected by
-    // the Raft consensus protocol.
-    return;
-  }
   TransactionsMap txns_by_id;
   {
     std::lock_guard<simple_spinlock> l(lock_);
@@ -543,6 +769,7 @@ void TxnStatusManager::AbortStaleTransactions() {
             "last keepalive heartbeat (effective timeout is $2): $3",
             txn_id, staleness_interval.ToString(),
             max_staleness_interval.ToString(), s.ToString());
+        auto* consensus = 
DCHECK_NOTNULL(status_tablet_.tablet_replica_->consensus());
         if (consensus->role() != RaftPeerPB::LEADER ||
             !status_tablet_.tablet_replica()->CheckRunning().ok()) {
           // If the replica is no longer a leader at this point, there is
diff --git a/src/kudu/transactions/txn_status_manager.h 
b/src/kudu/transactions/txn_status_manager.h
index cc4f27e..4a4b4a8 100644
--- a/src/kudu/transactions/txn_status_manager.h
+++ b/src/kudu/transactions/txn_status_manager.h
@@ -24,16 +24,24 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
+#include <gtest/gtest_prod.h>
 
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/tablet/txn_coordinator.h"
 #include "kudu/transactions/txn_status_entry.h"
 #include "kudu/transactions/txn_status_tablet.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/rw_mutex.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 
+class MonoDelta;
+namespace rpc {
+class RpcContext;
+}  // namespace rpc
+
 namespace tablet {
 class TabletReplica;
 } // namespace tablet
@@ -76,8 +84,56 @@ class TxnStatusManager final : public tablet::TxnCoordinator 
{
   explicit TxnStatusManager(tablet::TabletReplica* tablet_replica);
   ~TxnStatusManager() = default;
 
-  // Loads the contents of the status tablet into memory.
-  Status LoadFromTablet() override;
+  // Scoped "shared lock" to serialize replica leader elections.
+  //
+  // While in scope, blocks the transaction status manager in the event that
+  // it becomes the leader of its Raft configuration and needs to reload its
+  // persistent metadata. Once destroyed, the transaction status manager is
+  // unblocked.
+  class ScopedLeaderSharedLock {
+   public:
+    // Creates a new shared lock, trying to acquire the transaction status
+    // manager's leader_lock_ for reading in the process. If acquired, the
+    // lock is released when this object is destroyed.
+    //
+    // The object pointed by the 'txn_coordinator' parameter must outlive this
+    // object. 'txn_coordinator' must be a 'TxnStatusManager*' because of the
+    // downcast in the initialization.
+    explicit ScopedLeaderSharedLock(TxnCoordinator* txn_coordinator);
+
+    // First non-OK status of the transaction status manager, adhering to
+    // the checking order of replica_status_ first and then leader_status_.
+    const Status& first_failed_status() const {
+      RETURN_NOT_OK(replica_status_);
+      return leader_status_;
+    }
+
+    // Check that the transaction status manager is initialized and that it is
+    // the leader of its Raft configuration. Initialization status takes 
precedence
+    // over leadership status.
+    //
+    // If not initialized or if not the leader, writes the corresponding error
+    // to 'resp', responds to 'rpc', and returns false.
+    template<typename RespClass>
+    bool CheckIsInitializedAndIsLeaderOrRespond(RespClass* resp, 
rpc::RpcContext* rpc);
+
+   private:
+    TxnStatusManager* txn_status_manager_;
+    shared_lock<RWMutex> leader_shared_lock_;
+
+    // General status of the transaction status manager. If not OK (e.g. the 
tablet
+    // replica is still being initialized), all operations are illegal.
+    Status replica_status_;
+
+    // Leadership status of the transaction status manager. If not OK, the
+    // transaction status manager is not the leader.
+    Status leader_status_;
+    int64_t initial_term_;
+
+    DISALLOW_COPY_AND_ASSIGN(ScopedLeaderSharedLock);
+  };
+
+  void PrepareLeadershipTask() override;
 
   // Writes an entry to the status tablet and creates a transaction in memory.
   // Returns an error if a higher transaction ID has already been attempted
@@ -152,6 +208,16 @@ class TxnStatusManager final : public 
tablet::TxnCoordinator {
   tablet::ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const 
override;
 
  private:
+  // This test class calls LoadFromTablet() directly.
+  FRIEND_TEST(TxnStatusManagerTest, TestStartTransactions);
+  FRIEND_TEST(TxnStatusManagerTest, GetTransactionStatus);
+  friend class TxnStatusManagerTest;
+
+  // Loads the contents of the transaction status tablet into memory.
+  Status LoadFromTabletUnlocked();
+  // This is called by tests only.
+  Status LoadFromTablet();
+
   // Verifies that the transaction status data has already been loaded from the
   // underlying tablet and the replica is a leader. Returns Status::OK() if the
   // data is loaded and the replica is a leader. Otherwise, if the data hasn't
@@ -162,6 +228,17 @@ class TxnStatusManager final : public 
tablet::TxnCoordinator {
   Status CheckTxnStatusDataLoadedUnlocked(
       tserver::TabletServerErrorPB* ts_error) const;
 
+  // Loops and sleeps until one of the following conditions occurs:
+  // 1. The current node is the leader in the current term
+  //    and at least one op from the current term is committed. Returns OK.
+  // 2. The current node is not the leader. Returns IllegalState.
+  // 3. The provided timeout expires. Returns TimedOut.
+  //
+  // This method is intended to ensure that all operations replicated by
+  // previous leader are committed and visible to the local node before
+  // reading the data, to ensure consistency across failovers.
+  Status WaitUntilCaughtUpAsLeader(const MonoDelta& timeout);
+
   // Returns the transaction entry, returning an error if the transaction ID
   // doesn't exist or if 'user' is specified but isn't the owner of the
   // transaction. In addition, if the underlying replica isn't a leader,
@@ -183,6 +260,27 @@ class TxnStatusManager final : public 
tablet::TxnCoordinator {
 
   // The access to underlying storage.
   TxnStatusTablet status_tablet_;
+
+  // This field is updated when a node becomes the leader, waits for all 
outstanding
+  // uncommitted metadata in the transaction status manager to commit, and then
+  // reads that metadata into in-memory data structures. This is used to 
"fence"
+  // requests that depend on the in-memory state until the node can respond
+  // correctly.
+  int64_t leader_ready_term_;
+
+  // Lock protecting 'leader_ready_term_'.
+  mutable simple_spinlock leader_term_lock_;
+
+  // Lock used to fence operations and leader elections. All logical operations
+  // (i.e. begin transaction, get transaction, etc.) should acquire this lock 
for
+  // reading. Following an election where this replica is elected leader, it
+  // should acquire this lock for writing before reloading the metadata.
+  //
+  // Readers should not acquire this lock directly; use ScopedLeaderSharedLock
+  // instead.
+  //
+  // Always acquire this lock before 'leader_term_lock_'.
+  RWMutex leader_lock_;
 };
 
 class TxnStatusManagerFactory : public tablet::TxnCoordinatorFactory {
diff --git a/src/kudu/transactions/txn_status_tablet.h 
b/src/kudu/transactions/txn_status_tablet.h
index a112d3f..024244c 100644
--- a/src/kudu/transactions/txn_status_tablet.h
+++ b/src/kudu/transactions/txn_status_tablet.h
@@ -78,7 +78,6 @@ class TransactionsVisitor {
 // Expected usage of this class is to have a management layer that reads and
 // writes to the underlying replica only if it is leader.
 //
-// TODO(awong): ensure that only the leader TxnStatusManager can call these.
 // TODO(awong): delete transactions that are entirely aborted or committed.
 // TODO(awong): consider batching writes.
 class TxnStatusTablet {
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc 
b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 45d327e..68471f4 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -164,7 +164,8 @@ class TabletCopyTest : public KuduTabletTest {
                           cmeta_manager,
                           *config_peer,
                           apply_pool_.get(),
-                          nullptr,
+                          /*reload_txn_status_tablet_pool*/nullptr,
+                          /*txn_coordinator_factory*/nullptr,
                           [this, tablet_id](const string& reason) {
                             this->TabletReplicaStateChangedCallback(tablet_id, 
reason);
                           }));
diff --git a/src/kudu/tserver/tablet_service.cc 
b/src/kudu/tserver/tablet_service.cc
index 7224aad..611340d 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -88,6 +88,7 @@
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/txn_coordinator.h"
 #include "kudu/transactions/transactions.pb.h"
+#include "kudu/transactions/txn_status_manager.h"
 #include "kudu/tserver/scanners.h"
 #include "kudu/tserver/tablet_replica_lookup.h"
 #include "kudu/tserver/tablet_server.h"
@@ -1256,29 +1257,35 @@ void 
TabletServiceAdminImpl::CoordinateTransaction(const CoordinateTransactionRe
   const auto& user = op.user();
   const auto& txn_id = op.txn_id();
   int64_t highest_seen_txn_id = -1;
-  switch (op.type()) {
-    case CoordinatorOpPB::BEGIN_TXN:
-      s = txn_coordinator->BeginTransaction(
-          txn_id, user, &highest_seen_txn_id, &ts_error);
-      break;
-    case CoordinatorOpPB::REGISTER_PARTICIPANT:
-      s = txn_coordinator->RegisterParticipant(txn_id, 
op.txn_participant_id(), user, &ts_error);
-      break;
-    case CoordinatorOpPB::BEGIN_COMMIT_TXN:
-      s = txn_coordinator->BeginCommitTransaction(txn_id, user, &ts_error);
-      break;
-    case CoordinatorOpPB::ABORT_TXN:
-      s = txn_coordinator->AbortTransaction(txn_id, user, &ts_error);
-      break;
-    case CoordinatorOpPB::GET_TXN_STATUS:
-      s = txn_coordinator->GetTransactionStatus(
-          txn_id, user, &txn_status, &ts_error);
-      break;
-    case CoordinatorOpPB::KEEP_TXN_ALIVE:
-      s = txn_coordinator->KeepTransactionAlive(txn_id, user, &ts_error);
-      break;
-    default:
-      s = Status::InvalidArgument(Substitute("Unknown op type: $0", 
op.type()));
+  {
+    transactions::TxnStatusManager::ScopedLeaderSharedLock l(txn_coordinator);
+    if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, context)) {
+      return;
+    }
+    switch (op.type()) {
+      case CoordinatorOpPB::BEGIN_TXN:
+        s = txn_coordinator->BeginTransaction(
+            txn_id, user, &highest_seen_txn_id, &ts_error);
+        break;
+      case CoordinatorOpPB::REGISTER_PARTICIPANT:
+        s = txn_coordinator->RegisterParticipant(txn_id, 
op.txn_participant_id(), user, &ts_error);
+        break;
+      case CoordinatorOpPB::BEGIN_COMMIT_TXN:
+        s = txn_coordinator->BeginCommitTransaction(txn_id, user, &ts_error);
+        break;
+      case CoordinatorOpPB::ABORT_TXN:
+        s = txn_coordinator->AbortTransaction(txn_id, user, &ts_error);
+        break;
+      case CoordinatorOpPB::GET_TXN_STATUS:
+        s = txn_coordinator->GetTransactionStatus(
+            txn_id, user, &txn_status, &ts_error);
+        break;
+      case CoordinatorOpPB::KEEP_TXN_ALIVE:
+        s = txn_coordinator->KeepTransactionAlive(txn_id, user, &ts_error);
+        break;
+      default:
+        s = Status::InvalidArgument(Substitute("Unknown op type: $0", 
op.type()));
+   }
   }
   if (ts_error.has_status() && ts_error.status().code() != AppStatusPB::OK) {
     *resp->mutable_error() = std::move(ts_error);
diff --git a/src/kudu/tserver/ts_tablet_manager.cc 
b/src/kudu/tserver/ts_tablet_manager.cc
index 11f8a28..cee4476 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -91,6 +91,14 @@ DEFINE_int32(num_tablets_to_delete_simultaneously, 0,
              "device such as SSD or a RAID array, it may make sense to 
manually tune this.");
 TAG_FLAG(num_tablets_to_delete_simultaneously, advanced);
 
+DEFINE_int32(num_txn_status_tablets_to_reload_simultaneously, 0,
+             "Number of threads available to reload transaction status tablets 
in memory "
+             "metadata. If this is set to 0 (the default), then the number of 
reload threads "
+             "will be set based on the number of data directories. If the data 
directories "
+             "are on some very fast storage device such as SSD or a RAID 
array, it may make "
+             "sense to manually tune this.");
+TAG_FLAG(num_txn_status_tablets_to_reload_simultaneously, advanced);
+
 DEFINE_int32(tablet_start_warn_threshold_ms, 500,
              "If a tablet takes more than this number of millis to start, 
issue "
              "a warning with a trace.");
@@ -229,6 +237,7 @@ using kudu::tablet::TABLET_DATA_TOMBSTONED;
 using kudu::tablet::TabletDataState;
 using kudu::tablet::TabletMetadata;
 using kudu::tablet::TabletReplica;
+using kudu::transactions::TxnStatusManager;
 using kudu::transactions::TxnStatusManagerFactory;
 using kudu::tserver::TabletCopyClient;
 using std::make_shared;
@@ -382,6 +391,15 @@ Status TSTabletManager::Init() {
                 .set_max_threads(max_delete_threads)
                 .Build(&delete_tablet_pool_));
 
+  int max_reload_threads = 
FLAGS_num_txn_status_tablets_to_reload_simultaneously;
+  if (max_reload_threads == 0) {
+    // Default to the number of data directories.
+    max_reload_threads = fs_manager_->GetDataRootDirs().size();
+  }
+  RETURN_NOT_OK(ThreadPoolBuilder("txn-status-tablet-reload")
+                .set_max_threads(max_reload_threads)
+                .Build(&reload_txn_status_tablet_pool_));
+
   // TODO(aserbin): if better parallelism is needed to serve higher txn volume,
   //                consider using multiple threads in this pool and schedule
   //                per-tablet-replica clean-up tasks via threadpool serial
@@ -862,6 +880,7 @@ Status TSTabletManager::CreateAndRegisterTabletReplica(
                         cmeta_manager_,
                         local_peer_pb_,
                         server_->tablet_apply_pool(),
+                        reload_txn_status_tablet_pool_.get(),
                         &tsm_factory,
                         [this, tablet_id](const string& reason) {
                           this->MarkTabletDirty(tablet_id, reason);
@@ -1262,6 +1281,12 @@ void TSTabletManager::Shutdown() {
     replica->Shutdown();
   }
 
+  // Shut down the reload pool, so no new tablets are reloaded after this 
point.
+  // The shutdown takes place after the replicas are fully shutdown, to ensure
+  // on-going reloading metadata tasks of the transaction status managers are
+  // properly executed to unblock the shutdown process of replicas.
+  reload_txn_status_tablet_pool_->Shutdown();
+
   {
     std::lock_guard<RWMutex> l(lock_);
     // We don't expect anyone else to be modifying the map after we start the
@@ -1398,17 +1423,43 @@ void TSTabletManager::TxnStalenessTrackerTask() {
       shared_lock<RWMutex> l(lock_);
       for (const auto& elem : tablet_map_) {
         auto r = elem.second;
-        // Find txn status tablet replicas.
-        if (r->txn_coordinator()) {
+        // Find the running txn status tablet replicas.
+        if (r->txn_coordinator() && r->CheckRunning().ok()) {
           replicas.emplace_back(std::move(r));
         }
       }
     }
     for (auto& r : replicas) {
+      // Stop the task if the tserver is shutting down.
       if (shutdown_latch_.count() == 0) {
         return;
       }
+
+      // Only enable the staleness aborting task if the tablet replica
+      // is running.
+      if (!r->ShouldRunTxnCoordinatorStalenessTask()) {
+        continue;
+      }
+      SCOPED_CLEANUP({
+        r->DecreaseTxnCoordinatorTaskCounter();
+      });
       auto* coordinator = DCHECK_NOTNULL(r->txn_coordinator());
+      // Only leader replicas abort stale transactions registered with them.
+      // As of now, keep-alive requests are sent only to leader replicas, so 
only
+      // they have up-to-date information about the liveliness of corresponding
+      // transactions.
+      //
+      // If a non-leader replica erroneously (due to a network partition and
+      // the absence of leader leases) tried to abort a transaction, it would 
fail
+      // because aborting a transaction means writing into the transaction 
status
+      // tablet, so a non-leader replica's write attempt would be rejected by
+      // the Raft consensus protocol.
+      TxnStatusManager::ScopedLeaderSharedLock l(coordinator);
+      if (!l.first_failed_status().ok()) {
+        VLOG(1) << "Skipping transaction staleness track task: "
+                << l.first_failed_status().ToString();
+        continue;
+      }
       coordinator->AbortStaleTransactions();
     }
   }
diff --git a/src/kudu/tserver/ts_tablet_manager.h 
b/src/kudu/tserver/ts_tablet_manager.h
index 3bacffa..a04458d 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -412,6 +412,9 @@ class TSTabletManager : public 
tserver::TabletReplicaLookupIf {
   // Thread pool used to delete tablets asynchronously.
   std::unique_ptr<ThreadPool> delete_tablet_pool_;
 
+  // Thread pool used to reload transaction status tablets asynchronously.
+  std::unique_ptr<ThreadPool> reload_txn_status_tablet_pool_;
+
   // Thread pool to run TxnStatusManager tasks. As of now, this pool is
   // to run a long-running single periodic task to abort stale transactions
   // registered with corresponding transaction status tablets.

Reply via email to