This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new b9a8f2e  KUDU-2612: background task to commit transaction
b9a8f2e is described below

commit b9a8f2e633af891b6e2b268a71e18a8e7e1ff34d
Author: Andrew Wong <[email protected]>
AuthorDate: Sun Jan 31 02:30:25 2021 -0800

    KUDU-2612: background task to commit transaction
    
    This patch introduces background tasks that get run when
    KuduTransaction::Commit() is called. The typical workflow is as follows:
    1. Commit() is called, resulting in a BeginCommitTransaction() call on
       the TxnStatusManager.
    2. An update is made to the transaction status table, marking the
       transaction's state as COMMIT_IN_PROGRESS.
    3. The commit tasks are initiated -- BEGIN_COMMIT ops are sent
       asynchronously to every participant of the transaction.
    4. Once all responses are received from the participants, a commit
       timestamp is determined, and FINALIZE_COMMIT ops are sent
       asynchronously to every participant.
    5. Once all responses are received from the participants, an update is
       made to the transaction status table, marking the transaction's state
       as COMMITTED.
    
    There are some nuances here around error handling. Namely, what do we do
    if there are errors in sending the above requests? Well, it depends on
    the error. Transient errors (i.e. timeouts) are simply retried. More
    permanent errors need a bit more thought though:
    - If a participant has been deleted, what do we do? This patch makes a
      best effort attempt to abort the transaction if so.
    - Any other kinds of errors (e.g. illegal state errors from a
      participant) aren't expected in normal operation of a cluster. For
      this, we stop the commit task and log a warning. Hopefully an operator
      can intervene.
    
    Some follow-ups to expect:
    - This isn't as robust to failures as an approach that writes an
      intermediate state to the TxnStatusManager in between steps 3 and 4. A
      follow-up patch will implement that.
    - A separate patch will implement aborting transactions.
    - I disabled the background tasks in some tests that assume state
      changes are entirely controlled by clients. A follow-up change will
      address these to account for the state changes more organically.
    
    Change-Id: Ie2258dded3ab3d527cb5d0abdc7d5e7deb4da15e
    Reviewed-on: http://gerrit.cloudera.org:8080/16952
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 .../apache/kudu/client/TestKuduTransaction.java    |  15 +-
 src/kudu/client/client-test.cc                     |  69 +-
 src/kudu/integration-tests/CMakeLists.txt          |   1 +
 src/kudu/integration-tests/txn_commit-itest.cc     | 796 +++++++++++++++++++++
 .../integration-tests/txn_status_manager-itest.cc  |   5 +
 .../integration-tests/txn_status_table-itest.cc    |   4 +
 src/kudu/master/txn_manager-test.cc                |   5 +
 src/kudu/tablet/tablet_replica.cc                  |   4 +
 src/kudu/tablet/txn_coordinator.h                  |   9 +-
 src/kudu/transactions/transactions.proto           |   7 +-
 src/kudu/transactions/txn_status_entry.cc          |   1 -
 src/kudu/transactions/txn_status_manager-test.cc   |  32 +-
 src/kudu/transactions/txn_status_manager.cc        | 311 +++++++-
 src/kudu/transactions/txn_status_manager.h         | 163 ++++-
 src/kudu/transactions/txn_system_client.cc         |  19 +
 src/kudu/transactions/txn_system_client.h          |  16 +-
 src/kudu/tserver/ts_tablet_manager.cc              |  19 +-
 src/kudu/tserver/ts_tablet_manager.h               |   3 +
 18 files changed, 1366 insertions(+), 113 deletions(-)

diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
index 7e18dfa..37c7772 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -155,6 +155,9 @@ public class TestKuduTransaction {
   @MasterServerConfig(flags = {
       "--txn_manager_enabled",
   })
+  @TabletServerConfig(flags = {
+      "--txn_schedule_background_tasks=false"
+  })
   public void testCommitAnEmptyTransaction() throws Exception {
     KuduTransaction txn = client.newTransaction();
     txn.commit(false);
@@ -226,6 +229,9 @@ public class TestKuduTransaction {
   @MasterServerConfig(flags = {
       "--txn_manager_enabled",
   })
+  @TabletServerConfig(flags = {
+      "--txn_schedule_background_tasks=false"
+  })
   public void testIsCommitComplete() throws Exception {
     KuduTransaction txn = client.newTransaction();
 
@@ -319,9 +325,6 @@ public class TestKuduTransaction {
   @MasterServerConfig(flags = {
       "--txn_manager_enabled",
   })
-  @TabletServerConfig(flags = {
-      "--txn_status_manager_finalize_commit_on_begin",
-  })
   public void testCommitAnEmptyTransactionWaitFake2PCO() throws Exception {
     KuduTransaction txn = client.newTransaction();
     txn.commit(true);
@@ -442,6 +445,9 @@ public class TestKuduTransaction {
   @MasterServerConfig(flags = {
       "--txn_manager_enabled=true",
   })
+  @TabletServerConfig(flags = {
+      "--txn_schedule_background_tasks=false"
+  })
   public void testAutoclosableUsage() throws Exception {
     byte[] buf = null;
 
@@ -623,7 +629,8 @@ public class TestKuduTransaction {
   })
   @TabletServerConfig(flags = {
       "--txn_keepalive_interval_ms=200",
-      "--txn_staleness_tracker_interval_ms=50",
+      "--txn_schedule_background_tasks=false",
+      "--txn_staleness_tracker_interval_ms=50"
   })
   public void testKeepaliveForDeserializedHandle() throws Exception {
     // Check the keepalive behavior when serializing/deserializing with default
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index a09538a..91ae430 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -429,7 +429,7 @@ class ClientTest : public KuduTest {
           continue;
         }
         tserver::TabletServerErrorPB ts_error;
-        auto s = c->FinalizeCommitTransaction(txn_id, &ts_error);
+        auto s = c->FinalizeCommitTransaction(txn_id, 
Timestamp::kInitialTimestamp, &ts_error);
         if (s.IsNotFound()) {
           continue;
         }
@@ -7096,10 +7096,6 @@ TEST_F(ClientTest, 
TestClientLocationNoLocationMappingCmd) {
 }
 
 // Check basic operations of the transaction-related API.
-// TODO(aserbin): add more scenarios and update existing ones to remove 
explicit
-//                FinalizeCommitTransaction() call when transaction
-//                orchestration is ready (i.e. FinalizeCommitTransaction() is
-//                called for all registered participants by the backend 
itself).
 TEST_F(ClientTest, TxnBasicOperations) {
   // KuduClient::NewTransaction() populates the output parameter on success.
   {
@@ -7122,22 +7118,12 @@ TEST_F(ClientTest, TxnBasicOperations) {
     ASSERT_OK(txn->Rollback());
   }
 
-  // It's possible to rollback a transaction that hasn't yet finalized
-  // its commit phase.
-  {
-    shared_ptr<KuduTransaction> txn;
-    ASSERT_OK(client_->NewTransaction(&txn));
-    ASSERT_OK(txn->Commit(false /* wait */));
-    ASSERT_OK(txn->Rollback());
-  }
-
   // It's impossible to rollback a transaction that has finalized
   // its commit phase.
   {
     shared_ptr<KuduTransaction> txn;
     ASSERT_OK(client_->NewTransaction(&txn));
-    ASSERT_OK(txn->Commit(false /* wait */));
-    ASSERT_OK(FinalizeCommitTransaction(txn));
+    ASSERT_OK(txn->Commit());
     auto cs = Status::Incomplete("other than Status::OK() initial status");
     bool is_complete = false;
     ASSERT_OK(txn->IsCommitComplete(&is_complete, &cs));
@@ -7214,17 +7200,12 @@ TEST_F(ClientTest, TxnCommit) {
     {
       shared_ptr<KuduTransaction> txn;
       ASSERT_OK(client_->NewTransaction(&txn));
-      ASSERT_OK(txn->Commit(false /* wait */));
-      // TODO(aserbin): when txn lifecycle is properly implemented, inject a
-      //                delay into the txn finalizing code to make sure
-      //                the transaction stays in the COMMIT_IN_PROGRESS state
-      //                for a while
-      bool is_complete = true;
+      ASSERT_OK(txn->Commit());
+      bool is_complete = false;
       Status cs;
       ASSERT_OK(txn->IsCommitComplete(&is_complete, &cs));
-      ASSERT_FALSE(is_complete);
-      ASSERT_TRUE(cs.IsIncomplete()) << cs.ToString();
-      ASSERT_STR_CONTAINS(cs.ToString(), "commit is still in progress");
+      ASSERT_TRUE(is_complete);
+      ASSERT_OK(cs);
       ASSERT_OK(txn->Serialize(&txn_token));
     }
 
@@ -7232,35 +7213,13 @@ TEST_F(ClientTest, TxnCommit) {
     // goes out of scope.
     shared_ptr<KuduTransaction> serdes_txn;
     ASSERT_OK(KuduTransaction::Deserialize(client_, txn_token, &serdes_txn));
-    bool is_complete = true;
+    bool is_complete = false;
     Status cs;
     ASSERT_OK(serdes_txn->IsCommitComplete(&is_complete, &cs));
-    ASSERT_FALSE(is_complete);
-    ASSERT_TRUE(cs.IsIncomplete()) << cs.ToString();
-    ASSERT_STR_CONTAINS(cs.ToString(), "commit is still in progress");
-  }
-
-  {
-    shared_ptr<KuduTransaction> txn;
-    ASSERT_OK(client_->NewTransaction(&txn));
-    ASSERT_OK(txn->Commit(false /* wait */));
-    bool is_complete = true;
-    Status cs;
-    ASSERT_OK(txn->IsCommitComplete(&is_complete, &cs));
-    ASSERT_FALSE(is_complete);
-    ASSERT_TRUE(cs.IsIncomplete()) << cs.ToString();
-    ASSERT_OK(FinalizeCommitTransaction(txn));
-    {
-      bool is_complete = false;
-      auto cs = Status::Incomplete("other than Status::OK() initial status");
-      ASSERT_OK(txn->IsCommitComplete(&is_complete, &cs));
-      ASSERT_TRUE(is_complete);
-      ASSERT_OK(cs);
-    }
+    ASSERT_TRUE(is_complete);
+    ASSERT_OK(cs);
   }
 
-  // TODO(aserbin): uncomment this when txn lifecycle is properly implemented
-#if 0
   {
     shared_ptr<KuduTransaction> txn;
     ASSERT_OK(client_->NewTransaction(&txn));
@@ -7284,7 +7243,6 @@ TEST_F(ClientTest, TxnCommit) {
     ASSERT_TRUE(is_complete);
     ASSERT_OK(cs);
   }
-#endif
 }
 
 // This test verifies the behavior of KuduTransaction instance when the bound
@@ -7526,8 +7484,7 @@ TEST_F(ClientTest, TxnKeepAlive) {
 
     SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_txn_keepalive_interval_ms));
 
-    ASSERT_OK(txn->Commit(false /* wait */));
-    ASSERT_OK(FinalizeCommitTransaction(txn));
+    ASSERT_OK(txn->Commit());
   }
 
   // Begin a transaction and move its KuduTransaction object out of the
@@ -7598,8 +7555,7 @@ TEST_F(ClientTest, TxnKeepAlive) {
 
     SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_txn_keepalive_interval_ms));
 
-    ASSERT_OK(serdes_txn->Commit(false /* wait */));
-    ASSERT_OK(FinalizeCommitTransaction(serdes_txn));
+    ASSERT_OK(serdes_txn->Commit());
   }
 }
 
@@ -7655,8 +7611,7 @@ TEST_F(ClientTest, 
TxnKeepAliveAndUnavailableTxnManagerShortTime) {
 
   // Now, when masters are back and running, the client should be able
   // to commit the transaction. It should not be automatically aborted.
-  ASSERT_OK(txn->Commit(false /* wait */));
-  ASSERT_OK(FinalizeCommitTransaction(txn));
+  ASSERT_OK(txn->Commit());
 }
 
 // A scenario to explicitly show that long-running transactions are
diff --git a/src/kudu/integration-tests/CMakeLists.txt 
b/src/kudu/integration-tests/CMakeLists.txt
index 8ff6d03..8ac1c3d 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -132,6 +132,7 @@ ADD_KUDU_TEST(tombstoned_voting-imc-itest)
 ADD_KUDU_TEST(tombstoned_voting-itest)
 ADD_KUDU_TEST(tombstoned_voting-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(token_signer-itest)
+ADD_KUDU_TEST(txn_commit-itest)
 ADD_KUDU_TEST(txn_participant-itest)
 ADD_KUDU_TEST(txn_status_table-itest)
 ADD_KUDU_TEST(txn_status_manager-itest)
diff --git a/src/kudu/integration-tests/txn_commit-itest.cc 
b/src/kudu/integration-tests/txn_commit-itest.cc
new file mode 100644
index 0000000..084435e
--- /dev/null
+++ b/src/kudu/integration-tests/txn_commit-itest.cc
@@ -0,0 +1,796 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <atomic>
+#include <functional>
+#include <map>
+#include <memory>
+#include <string>
+#include <thread>
+#include <unordered_set>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/client.pb.h"
+#include "kudu/client/scan_batch.h"
+#include "kudu/client/schema.h"
+#include "kudu/client/write_op.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/txn_id.h"
+#include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_metadata.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tablet/txn_coordinator.h"
+#include "kudu/tablet/txn_participant-test-util.h"
+#include "kudu/tablet/txn_participant.h"
+#include "kudu/transactions/transactions.pb.h"
+#include "kudu/transactions/txn_system_client.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(txn_manager_enabled);
+DECLARE_bool(txn_manager_lazily_initialized);
+DECLARE_bool(txn_schedule_background_tasks);
+DECLARE_int32(txn_status_manager_inject_latency_finalize_commit_ms);
+DECLARE_uint32(txn_background_rpc_timeout_ms);
+DECLARE_uint32(txn_keepalive_interval_ms);
+DECLARE_uint32(txn_manager_status_table_num_replicas);
+DECLARE_uint32(txn_staleness_tracker_interval_ms);
+
+using kudu::client::KuduClient;
+using kudu::client::KuduSession;
+using kudu::client::KuduTable;
+using kudu::client::KuduTransaction;
+using kudu::client::KuduScanner;
+using kudu::client::KuduScanBatch;
+using kudu::client::sp::shared_ptr;
+using kudu::cluster::InternalMiniCluster;
+using kudu::cluster::InternalMiniClusterOptions;
+using kudu::tablet::TabletReplica;
+using kudu::tablet::TxnParticipant;
+using kudu::transactions::TxnStatePB;
+using kudu::transactions::TxnSystemClient;
+using kudu::transactions::TxnTokenPB;
+using kudu::tserver::MiniTabletServer;
+using kudu::tserver::ParticipantOpPB;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace itest {
+
+class TxnCommitITest : public KuduTest {
+ public:
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  const int kNumRowsPerTxn = 10;
+
+  void SetUp() override {
+    KuduTest::SetUp();
+    // Speed up the staleness checks to help stress cases where it might race
+    // with commits.
+    FLAGS_txn_keepalive_interval_ms = 300;
+    FLAGS_txn_staleness_tracker_interval_ms = 100;
+    NO_FATALS(SetUpClusterAndTable(1));
+  }
+
+  // Sets up a cluster with the given number of tservers, creating a
+  // single-replica transaction status table and user-defined table.
+  void SetUpClusterAndTable(int num_tservers, int num_replicas = 1) {
+    FLAGS_txn_manager_enabled = true;
+    FLAGS_txn_manager_lazily_initialized = false;
+    FLAGS_txn_manager_status_table_num_replicas = num_replicas;
+
+    InternalMiniClusterOptions opts;
+    opts.num_tablet_servers = num_tservers;
+    cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
+    ASSERT_OK(cluster_->Start());
+    ASSERT_EVENTUALLY([&] {
+      // Find the TxnStatusManager's tablet ID.
+      for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+        auto* ts = cluster_->mini_tablet_server(i);
+        ASSERT_OK(ts->WaitStarted());
+        const auto tablet_ids = ts->ListTablets();
+        if (tablet_ids.empty()) continue;
+        tsm_id_ = tablet_ids[0];
+      }
+      ASSERT_FALSE(tsm_id_.empty());
+    });
+    TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client_);
+    ASSERT_OK(txn_client_->OpenTxnStatusTable());
+
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+    string authn_creds;
+    ASSERT_OK(client_->ExportAuthenticationCredentials(&authn_creds));
+    client::AuthenticationCredentialsPB pb;
+    ASSERT_TRUE(pb.ParseFromString(authn_creds));
+    ASSERT_TRUE(pb.has_real_user());
+    client_user_ = pb.real_user();
+
+    TestWorkload w(cluster_.get());
+    w.set_num_replicas(num_replicas);
+    w.set_num_tablets(2);
+    w.Setup();
+    w.Start();
+    while (w.rows_inserted() < 1) {
+      SleepFor(MonoDelta::FromMilliseconds(50));
+    }
+    w.StopAndJoin();
+    table_name_ = w.table_name();
+    initial_row_count_ = w.rows_inserted();
+
+    // TODO(awong): until we start registering participants automatically, we
+    // need to manually register them, so keep track of what tablets exist.
+    unordered_set<string> tablet_ids;
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      auto* ts = cluster_->mini_tablet_server(i);
+      for (const auto& tablet_id : ts->ListTablets()) {
+        if (tablet_id != tsm_id_) {
+          tablet_ids.emplace(tablet_id);
+        }
+      }
+    }
+    for (auto& t : tablet_ids) {
+      participant_ids_.emplace_back(std::move(t));
+    }
+  }
+
+  // TODO(awong): register participants automatically as a part of writing to
+  // tablets for the first time.
+  Status RegisterParticipants(const TxnId& txn_id, vector<string> tablet_ids) {
+    for (const auto& prt_id : tablet_ids) {
+      RETURN_NOT_OK(txn_client_->RegisterParticipant(txn_id.value(), prt_id, 
client_user_));
+      RETURN_NOT_OK(txn_client_->ParticipateInTransaction(
+          prt_id,
+          tablet::MakeParticipantOp(txn_id.value(), 
tserver::ParticipantOpPB::BEGIN_TXN),
+          kTimeout));
+    }
+    return Status::OK();
+  }
+
+  // Start a transaction, manually registering the given participants, and
+  // returning the associated transaction and session handles.
+  Status BeginTransaction(const vector<string>& participant_ids,
+                          shared_ptr<KuduTransaction>* txn,
+                          shared_ptr<KuduSession>* session) {
+    shared_ptr<KuduTransaction> txn_local;
+    RETURN_NOT_OK(client_->NewTransaction(&txn_local));
+    shared_ptr<KuduSession> txn_session_local;
+    RETURN_NOT_OK(txn_local->CreateSession(&txn_session_local));
+
+    string txn_token;
+    RETURN_NOT_OK(txn_local->Serialize(&txn_token));
+    TxnTokenPB token;
+    CHECK(token.ParseFromString(txn_token));
+    CHECK(token.has_txn_id());
+
+    RETURN_NOT_OK(RegisterParticipants(token.txn_id(), participant_ids_));
+    *txn = std::move(txn_local);
+    *session = std::move(txn_session_local);
+    return Status::OK();
+  }
+
+  // Insert 'num_rows' rows to the given session, starting with 'start_row'.
+  Status InsertToSession(
+      const shared_ptr<KuduSession>& txn_session, int start_row, int num_rows) 
{
+    shared_ptr<KuduTable> table;
+    RETURN_NOT_OK(client_->OpenTable(table_name_, &table));
+    const int target_row_id = start_row + num_rows;
+    for (int i = start_row; i < target_row_id; i++) {
+      auto* insert = table->NewInsert();
+      RETURN_NOT_OK(insert->mutable_row()->SetInt32(0, i));
+      RETURN_NOT_OK(insert->mutable_row()->SetInt32(1, i));
+      RETURN_NOT_OK(txn_session->Apply(insert));
+      RETURN_NOT_OK(txn_session->Flush());
+    }
+    return Status::OK();
+  }
+
+  Status CountRows(int* num_rows) {
+    shared_ptr<KuduTable> table;
+    RETURN_NOT_OK(client_->OpenTable(table_name_, &table));
+    KuduScanner scanner(table.get());
+    RETURN_NOT_OK(scanner.Open());
+    int rows = 0;
+    while (scanner.HasMoreRows()) {
+      KuduScanBatch batch;
+      RETURN_NOT_OK(scanner.NextBatch(&batch));
+      rows += batch.NumRows();
+    }
+    *num_rows = rows;
+    return Status::OK();
+  }
+
+ protected:
+  unique_ptr<InternalMiniCluster> cluster_;
+  unique_ptr<TxnSystemClient> txn_client_;
+
+  shared_ptr<KuduClient> client_;
+  string client_user_;
+
+  string table_name_;
+  int initial_row_count_;
+
+  // TODO(awong): Only needed until we start registering participants
+  // automatically.
+  string tsm_id_;
+  vector<string> participant_ids_;
+  int cur_txn_id_ = 0;
+};
+
+TEST_F(TxnCommitITest, TestBasicCommits) {
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
+
+  // Even though we've inserted, we shouldn't be able to see any new rows until
+  // after we commit.
+  int num_rows = 0;
+  ASSERT_OK(CountRows(&num_rows));
+  ASSERT_EQ(initial_row_count_, num_rows);
+
+  ASSERT_OK(txn->Commit());
+  ASSERT_OK(CountRows(&num_rows));
+  ASSERT_EQ(initial_row_count_ + kNumRowsPerTxn, num_rows);
+
+  // IsCommitComplete() should verify that the transaction is in the right
+  // state.
+  Status completion_status;
+  bool is_complete;
+  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_OK(completion_status);
+  ASSERT_TRUE(is_complete);
+}
+
+// Test that if we delete the TxnStatusManager while tasks are on-going,
+// nothing goes catastrophically wrong (i.e. no crashes).
+TEST_F(TxnCommitITest, TestCommitWhileDeletingTxnStatusManager) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
+
+  ASSERT_OK(txn->Commit(/*wait*/false));
+  
ASSERT_OK(cluster_->mini_tablet_server(0)->server()->tablet_manager()->DeleteTablet(
+      tsm_id_, tablet::TABLET_DATA_TOMBSTONED, boost::none));
+
+  Status completion_status;
+  bool is_complete;
+  Status s = txn->IsCommitComplete(&is_complete, &completion_status);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+}
+
+TEST_F(TxnCommitITest, TestCommitAfterDeletingParticipant) {
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
+  ASSERT_OK(client_->DeleteTable(table_name_));
+  ASSERT_OK(txn->Commit(/*wait*/false));
+
+  // The transaction should eventually succeed, treating the deleted
+  // participant as committed.
+  ASSERT_EVENTUALLY([&] {
+    Status completion_status;
+    bool is_complete = false;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_OK(completion_status);
+    ASSERT_TRUE(is_complete);
+  });
+}
+
+TEST_F(TxnCommitITest, TestCommitAfterDroppingRangeParticipant) {
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
+  ASSERT_OK(client_->DeleteTable(table_name_));
+  const auto& schema = client::KuduSchema::FromSchema(GetSimpleTestSchema());
+  unique_ptr<client::KuduTableAlterer> 
alterer(client_->NewTableAlterer(table_name_));
+  alterer->DropRangePartition(schema.NewRow(), schema.NewRow());
+  alterer.reset();
+
+  ASSERT_OK(txn->Commit(/*wait*/false));
+
+  // The transaction should eventually succeed, treating the deleted
+  // participant as committed.
+  ASSERT_EVENTUALLY([&] {
+    Status completion_status;
+    bool is_complete = false;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_OK(completion_status);
+    ASSERT_TRUE(is_complete);
+  });
+}
+
+TEST_F(TxnCommitITest, TestRestartingWhileCommitting) {
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 2000;
+  ASSERT_OK(txn->Commit(/*wait*/false));
+  // Stop the tserver without allowing the finalize commit to complete.
+  cluster_->mini_tablet_server(0)->Shutdown();
+
+  FLAGS_txn_schedule_background_tasks = false;
+  ASSERT_OK(cluster_->mini_tablet_server(0)->Restart());
+
+  // The transaction should be incomplete, as background tasks are disabled,
+  // and since we shut down before allowing to finish committing.
+  Status completion_status;
+  bool is_complete = false;
+  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_TRUE(completion_status.IsIncomplete()) << 
completion_status.ToString();
+  ASSERT_FALSE(is_complete);
+
+  // If we re-enable background tasks, background tasks should be scheduled to
+  // commit the transaction.
+  FLAGS_txn_schedule_background_tasks = true;
+  cluster_->mini_tablet_server(0)->Shutdown();
+  ASSERT_OK(cluster_->mini_tablet_server(0)->Restart());
+  ASSERT_EVENTUALLY([&] {
+    Status completion_status;
+    bool is_complete = false;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_OK(completion_status);
+    ASSERT_TRUE(is_complete);
+  });
+}
+
+// Test restarting while commit tasks are on-going, while at the same time,
+// some participants are deleted. There should be no inconsistencies in
+// assigned commit timestamps across participants.
+TEST_F(TxnCommitITest, TestRestartingWhileCommittingAndDeleting) {
+  // First, create another table that we'll delete later on.
+  unordered_set<string> first_table_tablet_ids(
+      participant_ids_.begin(), participant_ids_.end());
+  const string kSecondTableName = "default.second_table";
+  TestWorkload w(cluster_.get());
+  w.set_num_replicas(1);
+  w.set_num_tablets(2);
+  w.set_table_name(kSecondTableName);
+  w.Setup();
+  w.Start();
+  while (w.rows_inserted() < 1) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+  w.StopAndJoin();
+  unordered_set<string> participant_ids;
+  auto* mts = cluster_->mini_tablet_server(0);
+  for (const auto& tablet_id : mts->ListTablets()) {
+    if (tablet_id != tsm_id_) {
+      participant_ids.emplace(tablet_id);
+    }
+  }
+  vector<string> both_tables_participant_ids(participant_ids.begin(), 
participant_ids.end());
+
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(both_tables_participant_ids, &txn, &txn_session));
+  FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 2000;
+  ASSERT_OK(txn->Commit(/*wait*/false));
+
+  // Wait a bit to let the commit tasks start.
+  SleepFor(MonoDelta::FromMilliseconds(1000));
+
+  // Shut down without giving time for the commit to complete.
+  mts->Shutdown();
+  ASSERT_OK(mts->Restart());
+  ASSERT_OK(mts->server()->tablet_manager()->WaitForAllBootstrapsToFinish());
+
+  // Delete some of the participants. Despite this, the commit process should
+  // complete.
+  ASSERT_OK(client_->DeleteTable(kSecondTableName));
+  ASSERT_EVENTUALLY([&] {
+    Status completion_status;
+    bool is_complete = false;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_OK(completion_status);
+    ASSERT_TRUE(is_complete);
+  });
+
+  // Let's confirm that all remaining participants see the same transaction
+  // metadata.
+  vector<scoped_refptr<TabletReplica>> replicas;
+  mts->server()->tablet_manager()->GetTabletReplicas(&replicas);
+  vector<vector<TxnParticipant::TxnEntry>> txn_entries_per_replica;
+  for (const auto& r : replicas) {
+    if (r->tablet_id() != tsm_id_ && r->tablet_metadata()->table_name() != 
kSecondTableName) {
+      
txn_entries_per_replica.emplace_back(r->tablet()->txn_participant()->GetTxnsForTests());
+    }
+  }
+  ASSERT_GT(txn_entries_per_replica.size(), 1);
+  for (int i = 1; i < txn_entries_per_replica.size(); i++) {
+    const auto& txns = txn_entries_per_replica[i];
+    ASSERT_FALSE(txns.empty());
+    for (const auto& txn_entry : txns) {
+      ASSERT_NE(-1, txn_entry.commit_timestamp);
+    }
+    EXPECT_EQ(txn_entries_per_replica[0], txns);
+  }
+}
+
+// Test that when loading the TxnStatusManagers, nothing catastrophic happens
+// if we can't connect to the masters.
+TEST_F(TxnCommitITest, TestLoadTxnStatusManagerWhenNoMasters) {
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+
+  cluster_->mini_master()->Shutdown();
+  cluster_->mini_tablet_server(0)->Shutdown();
+  ASSERT_OK(cluster_->mini_tablet_server(0)->Restart());
+
+  // While the master is down, we can't contact the TxnManager.
+  Status s = BeginTransaction(participant_ids_, &txn, &txn_session);
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+
+  // Once restarted, it should be business as usual.
+  ASSERT_OK(cluster_->mini_master()->Restart());
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  });
+  scoped_refptr<tablet::TabletReplica> tsm_replica;
+  auto* tablet_manager = 
cluster_->mini_tablet_server(0)->server()->tablet_manager();
+  ASSERT_OK(tablet_manager->GetTabletReplica(tsm_id_, &tsm_replica));
+  auto participants_by_txn_id =
+      
DCHECK_NOTNULL(tsm_replica->txn_coordinator())->GetParticipantsByTxnIdForTests();
+  ASSERT_EQ(2, participants_by_txn_id.size());
+}
+
+// Test what happens if a participant is aborted somehow, and we try to commit.
+// We don't expect this to happen since aborts should start with writing an
+// abort record to the TxnStatusManager, but let's at least make sure we
+// understand what happens if this does occur.
+TEST_F(TxnCommitITest, TestCommitAfterParticipantAbort) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
+
+  // Send an ABORT_TXN op to the participant.
+  ParticipantOpPB op_pb;
+  op_pb.set_txn_id(0);
+  op_pb.set_type(ParticipantOpPB::ABORT_TXN);
+  ASSERT_OK(txn_client_->ParticipateInTransaction(
+      participant_ids_[0], op_pb, MonoDelta::FromSeconds(3)));
+
+  // When we try to commit, we should end up not completing.
+  ASSERT_OK(txn->Commit(/*wait*/false));
+
+  SleepFor(MonoDelta::FromSeconds(3));
+  Status completion_status;
+  bool is_complete;
+  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_TRUE(completion_status.IsIncomplete()) << 
completion_status.ToString();
+}
+
+// Try concurrently beginning to commit a bunch of different transactions.
+TEST_F(TxnCommitITest, TestConcurrentCommitCalls) {
+  const int kNumTxns = 4;
+  vector<shared_ptr<KuduTransaction>> txns(kNumTxns);
+  int row_start = initial_row_count_;
+  for (int i = 0; i < kNumTxns; i++) {
+    shared_ptr<KuduSession> txn_session;
+    ASSERT_OK(BeginTransaction(participant_ids_, &txns[i], &txn_session));
+    ASSERT_OK(InsertToSession(txn_session, row_start, kNumRowsPerTxn));
+    row_start += kNumRowsPerTxn;
+  }
+  int num_rows = 0;
+  ASSERT_OK(CountRows(&num_rows));
+  ASSERT_EQ(initial_row_count_, num_rows);
+
+  vector<thread> threads;
+  vector<Status> results(kNumTxns);
+  for (int i = 0; i < kNumTxns; i++) {
+    threads.emplace_back([&, i] {
+      results[i] = txns[i]->Commit(/*wait*/false);
+    });
+  }
+  for (auto& t : threads) {
+    t.join();
+  }
+  for (const auto& s : results) {
+    EXPECT_OK(s);
+  }
+  ASSERT_EVENTUALLY([&] {
+    for (const auto& txn : txns) {
+      Status completion_status;
+      bool is_complete;
+      ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+      ASSERT_OK(completion_status);
+      ASSERT_TRUE(is_complete);
+    }
+  });
+  ASSERT_OK(CountRows(&num_rows));
+  ASSERT_EQ(initial_row_count_ + kNumRowsPerTxn * kNumTxns, num_rows);
+}
+
+// Test that committing the same transaction concurrently doesn't lead to any
+// issues.
+TEST_F(TxnCommitITest, TestConcurrentRepeatedCommitCalls) {
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
+  int num_rows = 0;
+  ASSERT_OK(CountRows(&num_rows));
+  ASSERT_EQ(initial_row_count_, num_rows);
+
+  const int kNumThreads = 4;
+  vector<thread> threads;
+  vector<Status> results(kNumThreads);
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back([&, i] {
+      results[i] = txn->Commit(/*wait*/false);
+    });
+  }
+  for (auto& t : threads) {
+    t.join();
+  }
+  for (const auto& s : results) {
+    EXPECT_OK(s);
+  }
+  ASSERT_EVENTUALLY([&] {
+    Status completion_status;
+    bool is_complete;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_OK(completion_status);
+    ASSERT_TRUE(is_complete);
+  });
+  ASSERT_OK(CountRows(&num_rows));
+  ASSERT_EQ(initial_row_count_ + kNumRowsPerTxn, num_rows);
+}
+
+TEST_F(TxnCommitITest, TestDontAbortIfCommitInProgress) {
+  FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 1000;
+  string serialized_txn;
+  {
+    shared_ptr<KuduTransaction> txn;
+    shared_ptr<KuduSession> txn_session;
+    ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+    ASSERT_OK(InsertToSession(txn_session, initial_row_count_, 
kNumRowsPerTxn));
+    ASSERT_OK(txn->Commit(/*wait*/false));
+    ASSERT_OK(txn->Serialize(&serialized_txn));
+  }
+  // Wait a bit to allow would-be background aborts to happen.
+  SleepFor(MonoDelta::FromSeconds(1));
+
+  // Since we've already begun committing, we shouldn't abort. On the contrary,
+  // we should eventually successfully fully commit the transaction.
+  bool is_complete = false;
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(KuduTransaction::Deserialize(client_, serialized_txn, &txn));
+  Status completion_status;
+  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_FALSE(completion_status.IsAborted()) << completion_status.ToString();
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_OK(completion_status);
+    ASSERT_TRUE(is_complete);
+  });
+}
+
+// Test that has two nodes so we can place the TxnStatusManager and transaction
+// participant on separate nodes. This can be useful for testing when some
+// nodes are down.
+class TwoNodeTxnCommitITest : public TxnCommitITest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+    NO_FATALS(SetUpClusterAndTable(2));
+
+    // Figure out which tserver has the participant and which has the
+    // TxnStatusManager.
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      auto* ts = cluster_->mini_tablet_server(i);
+      const auto tablet_ids = ts->ListTablets();
+      for (const auto& tablet_id : tablet_ids) {
+        if (tablet_id == tsm_id_) {
+          tsm_ts_ = ts;
+          break;
+        }
+      }
+    }
+    DCHECK(tsm_ts_);
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      if (cluster_->mini_tablet_server(i) != tsm_ts_) {
+        prt_ts_ = cluster_->mini_tablet_server(i);
+        break;
+      }
+    }
+    DCHECK(prt_ts_);
+  }
+ protected:
+  // The tablet server that has a TxnStatusManager.
+  MiniTabletServer* tsm_ts_;
+
+  // A tablet server that has at least one participant.
+  MiniTabletServer* prt_ts_;
+};
+
+// Test that nothing goes wrong when participants are down, and that we'll
+// retry until they become available again.
+TEST_F(TwoNodeTxnCommitITest, TestCommitWhenParticipantsAreDown) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
+  prt_ts_->Shutdown();
+  ASSERT_OK(txn->Commit(/*wait*/false));
+
+  // Since our participant is down, we can't proceed with the commit.
+  Status completion_status;
+  bool is_complete;
+  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_FALSE(is_complete);
+  ASSERT_TRUE(completion_status.IsIncomplete()) << 
completion_status.ToString();
+  SleepFor(MonoDelta::FromSeconds(5));
+
+  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_FALSE(is_complete);
+  ASSERT_TRUE(completion_status.IsIncomplete()) << 
completion_status.ToString();
+
+  // Once we start the tserver with the participant, the commit should complete
+  // automatically.
+  ASSERT_OK(prt_ts_->Restart());
+  ASSERT_EVENTUALLY([&] {
+    Status completion_status;
+    bool is_complete;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_TRUE(is_complete);
+  });
+}
+
+// Test that when we start up, pending commits will start background tasks to
+// commit.
+TEST_F(TwoNodeTxnCommitITest, TestStartTasksDuringStartup) {
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
+
+  // Shut down our participant's tserver so our commit task keeps retrying.
+  prt_ts_->Shutdown();
+  ASSERT_OK(txn->Commit(/*wait*/false));
+
+  Status completion_status;
+  bool is_complete;
+  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_FALSE(is_complete);
+  ASSERT_TRUE(completion_status.IsIncomplete()) << 
completion_status.ToString();
+
+  // Shut down the TxnStatusManager to stop our tasks.
+  tsm_ts_->Shutdown();
+
+  // Restart both tservers. The commit task should be restarted and eventually
+  // succeed.
+  ASSERT_OK(prt_ts_->Restart());
+  ASSERT_OK(tsm_ts_->Restart());
+  ASSERT_EVENTUALLY([&] {
+    Status completion_status;
+    bool is_complete;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_OK(completion_status);
+    ASSERT_TRUE(is_complete);
+  });
+}
+
+// Abruptly shut down the tablet server while running commit tasks, ensuring
+// nothing bad happens.
+TEST_F(TwoNodeTxnCommitITest, TestCommitWhileShuttingDownTxnStatusManager) {
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction({}, &txn, &txn_session));
+
+  ASSERT_OK(txn->Commit(/*wait*/false));
+  tsm_ts_->Shutdown();
+  ASSERT_OK(tsm_ts_->Restart());
+
+  Status completion_status;
+  bool is_complete = false;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_TRUE(is_complete);
+  });
+}
+
+// Test that has three nodes so we can test leadership.
+class ThreeNodeTxnCommitITest : public TxnCommitITest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+    NO_FATALS(SetUpClusterAndTable(3, 3));
+
+    // Quiesce all but 'leader_idx_', so it becomes the leader.
+    int leader_idx = 0;
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = i != 
leader_idx;
+    }
+    leader_ts_ = cluster_->mini_tablet_server(leader_idx);
+    non_leader_ts_ = cluster_->mini_tablet_server(leader_idx + 1);
+    // We should have two leaders for our table, and one for the
+    // TxnStatusManager.
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(3, leader_ts_->server()->num_raft_leaders()->value());
+    });
+  }
+ protected:
+  MiniTabletServer* leader_ts_;
+  MiniTabletServer* non_leader_ts_;
+};
+
+TEST_F(ThreeNodeTxnCommitITest, TestCommitTasksReloadOnLeadershipChange) {
+  FLAGS_txn_schedule_background_tasks = false;
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
+
+  ASSERT_OK(txn->Commit(/*wait*/ false));
+  Status completion_status;
+  bool is_complete = false;
+  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_TRUE(completion_status.IsIncomplete()) << 
completion_status.ToString();
+  ASSERT_FALSE(is_complete);
+
+  FLAGS_txn_schedule_background_tasks = true;
+  // Change our quiescing state and bring the previous leader down so a new
+  // leader can be elected.
+  auto* new_leader_ts = non_leader_ts_;
+  *new_leader_ts->server()->mutable_quiescing() = false;
+  *leader_ts_->server()->mutable_quiescing() = true;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(3, new_leader_ts->server()->num_raft_leaders()->value());
+  });
+  // Upon becoming leader, we should have started our commit task and completed
+  // the commit.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_TRUE(is_complete);
+  });
+}
+
+} // namespace itest
+} // namespace kudu
diff --git a/src/kudu/integration-tests/txn_status_manager-itest.cc 
b/src/kudu/integration-tests/txn_status_manager-itest.cc
index 3ae3b86..b2b4df3 100644
--- a/src/kudu/integration-tests/txn_status_manager-itest.cc
+++ b/src/kudu/integration-tests/txn_status_manager-itest.cc
@@ -111,6 +111,11 @@ class TxnStatusManagerITest : public 
ExternalMiniClusterITestBase {
         "--raft_heartbeat_interval_ms=$0", kRaftHbIntervalMs));
     cluster_opts_.extra_tserver_flags.emplace_back(
         "--leader_failure_max_missed_heartbeat_periods=1.25");
+
+    // Some of these tests rely on checking state assuming no background tasks.
+    // For simplicity, disable the background commits.
+    cluster_opts_.extra_tserver_flags.emplace_back(
+        "--txn_schedule_background_tasks=false");
   }
 
   void SetUp() override {
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc 
b/src/kudu/integration-tests/txn_status_table-itest.cc
index eeb469a..8ceb450 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -63,6 +63,7 @@
 #include "kudu/util/test_util.h"
 
 DECLARE_bool(raft_enable_pre_election);
+DECLARE_bool(txn_schedule_background_tasks);
 DECLARE_bool(txn_status_tablet_failover_inject_timeout_error);
 DECLARE_bool(txn_status_tablet_inject_load_failure_error);
 DECLARE_bool(txn_status_tablet_inject_uninitialized_leader_status_error);
@@ -109,6 +110,9 @@ class TxnStatusTableITest : public KuduTest {
 
   void SetUp() override {
     KuduTest::SetUp();
+    // Several of these tests rely on checking transaction state, which is
+    // easier to work with without committing in the background.
+    FLAGS_txn_schedule_background_tasks = false;
     cluster_.reset(new InternalMiniCluster(env_, {}));
     ASSERT_OK(cluster_->Start());
 
diff --git a/src/kudu/master/txn_manager-test.cc 
b/src/kudu/master/txn_manager-test.cc
index 79049f3..51e81e4 100644
--- a/src/kudu/master/txn_manager-test.cc
+++ b/src/kudu/master/txn_manager-test.cc
@@ -61,6 +61,7 @@ using strings::Substitute;
 
 DECLARE_bool(txn_manager_enabled);
 DECLARE_bool(txn_manager_lazily_initialized);
+DECLARE_bool(txn_schedule_background_tasks);
 DECLARE_int32(rpc_service_queue_length);
 DECLARE_int64(txn_manager_status_table_range_partition_span);
 DECLARE_uint32(txn_manager_status_table_num_replicas);
@@ -85,6 +86,10 @@ class TxnManagerTest : public KuduTest {
     FLAGS_txn_manager_enabled = true;
     FLAGS_txn_manager_lazily_initialized = true;
 
+    // To facilitate testing just the TxnManager, switch off
+    // transaction-related background tasks.
+    FLAGS_txn_schedule_background_tasks = false;
+
     // In this test, there is just a single tablet servers in the cluster.
     FLAGS_txn_manager_status_table_num_replicas = 1;
 
diff --git a/src/kudu/tablet/tablet_replica.cc 
b/src/kudu/tablet/tablet_replica.cc
index 105b9f5..3aa9aca 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -331,6 +331,10 @@ void TabletReplica::Stop() {
     tablet_->Shutdown();
   }
 
+  if (txn_coordinator_) {
+    txn_coordinator_->Shutdown();
+  }
+
   // Only mark the peer as STOPPED when all other components have shut down.
   {
     std::lock_guard<simple_spinlock> lock(lock_);
diff --git a/src/kudu/tablet/txn_coordinator.h 
b/src/kudu/tablet/txn_coordinator.h
index c679906..665ba52 100644
--- a/src/kudu/tablet/txn_coordinator.h
+++ b/src/kudu/tablet/txn_coordinator.h
@@ -21,6 +21,7 @@
 #include <string>
 #include <vector>
 
+#include "kudu/common/timestamp.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -46,6 +47,9 @@ class TxnCoordinator {
  public:
   virtual ~TxnCoordinator() {}
 
+  // Shut down the TxnCoordinator.
+  virtual void Shutdown() = 0;
+
   // Perform necessary work to prepare for running in the leader role.
   // It's about reload tablet metadata into memory and do other work
   // to update the internal state of the coordinator upon becoming
@@ -77,10 +81,9 @@ class TxnCoordinator {
   // Returns any replication-layer errors (e.g. not-the-leader errors) in
   // 'ts_error'. If there was otherwise a logical error with the request (e.g.
   // no such transaction), returns an error without populating 'ts_error'.
-  //
-  // TODO(awong): add a commit timestamp.
   virtual Status FinalizeCommitTransaction(
-      int64_t txn_id, tserver::TabletServerErrorPB* ts_error) = 0;
+      int64_t txn_id, Timestamp commit_timestamp,
+      tserver::TabletServerErrorPB* ts_error) = 0;
 
   // Aborts the given transaction as the given user.
   //
diff --git a/src/kudu/transactions/transactions.proto 
b/src/kudu/transactions/transactions.proto
index 042ab9c..7eb13c0 100644
--- a/src/kudu/transactions/transactions.proto
+++ b/src/kudu/transactions/transactions.proto
@@ -27,14 +27,13 @@ enum TxnStatePB {
   COMMITTED = 4;
 }
 
-// TODO(awong): this is a bare-bones implementation so far. We'll certainly
-// need more fields as we build out the rest of the transaction workflow (e.g.
-// commmit timestamps, etc).
-//
 // Metadata encapsulating the status of a transaction.
 message TxnStatusEntryPB {
   optional TxnStatePB state = 1;
   optional string user = 2;
+
+  // Commit timestamp associated with this transaction.
+  optional fixed64 commit_timestamp = 3;
 }
 
 // Metadata encapsulating the existence of a transaction participant.
diff --git a/src/kudu/transactions/txn_status_entry.cc 
b/src/kudu/transactions/txn_status_entry.cc
index 48f74d2..9d77d81 100644
--- a/src/kudu/transactions/txn_status_entry.cc
+++ b/src/kudu/transactions/txn_status_entry.cc
@@ -42,7 +42,6 @@ TransactionEntry::TransactionEntry(int64_t txn_id, 
std::string user)
 scoped_refptr<ParticipantEntry> TransactionEntry::GetOrCreateParticipant(
     const string& tablet_id) {
   DCHECK(metadata_.IsReadLocked());
-  DCHECK_EQ(TxnStatePB::OPEN, metadata_.state().pb.state());
 
   // In the expected case, this participant hasn't been added; add it.
   std::lock_guard<simple_spinlock> l(lock_);
diff --git a/src/kudu/transactions/txn_status_manager-test.cc 
b/src/kudu/transactions/txn_status_manager-test.cc
index 0d104c3..7dead36 100644
--- a/src/kudu/transactions/txn_status_manager-test.cc
+++ b/src/kudu/transactions/txn_status_manager-test.cc
@@ -36,6 +36,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/common/timestamp.h"
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
@@ -68,6 +69,7 @@ using std::unique_ptr;
 using std::unordered_set;
 using std::vector;
 
+DECLARE_bool(txn_schedule_background_tasks);
 DECLARE_uint32(txn_keepalive_interval_ms);
 DECLARE_uint32(txn_staleness_tracker_interval_ms);
 METRIC_DECLARE_entity(tablet);
@@ -93,6 +95,7 @@ class TxnStatusManagerTest : public TabletReplicaTestBase {
     // test scenarios verifying related functionality.
     FLAGS_txn_keepalive_interval_ms = 200;
     FLAGS_txn_staleness_tracker_interval_ms = 50;
+    FLAGS_txn_schedule_background_tasks = false;
 
     NO_FATALS(TabletReplicaTestBase::SetUp());
     ConsensusBootstrapInfo info;
@@ -101,7 +104,7 @@ class TxnStatusManagerTest : public TabletReplicaTestBase {
   }
 
   Status ResetTxnStatusManager() {
-    txn_manager_.reset(new TxnStatusManager(tablet_replica_.get()));
+    txn_manager_.reset(new TxnStatusManager(tablet_replica_.get(), nullptr, 
nullptr));
     return txn_manager_->LoadFromTablet();
   }
 
@@ -168,7 +171,7 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
     ASSERT_EQ(3, txn_manager_->highest_txn_id());
     {
       // Reload the TxnStatusManager from disk and verify the state.
-      TxnStatusManager txn_manager_reloaded(tablet_replica_.get());
+      TxnStatusManager txn_manager_reloaded(tablet_replica_.get(), nullptr, 
nullptr);
       ASSERT_OK(txn_manager_reloaded.LoadFromTablet());
       ASSERT_EQ(expected_prts_by_txn_id,
                 txn_manager_reloaded.GetParticipantsByTxnIdForTests());
@@ -186,7 +189,7 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
   // if the transaction status tablet's data is not loaded yet.
   ASSERT_OK(RestartReplica());
   {
-    TxnStatusManager tsm(tablet_replica_.get());
+    TxnStatusManager tsm(tablet_replica_.get(), nullptr, nullptr);
     TxnStatusManager::ScopedLeaderSharedLock lock(&tsm);
     // Check for the special value of the highest_txn_id when the data from
     // the transaction status tablet isn't loaded yet.
@@ -206,7 +209,7 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
       ASSERT_TRUE(s.IsServiceUnavailable());
       ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
 
-      s = tsm.FinalizeCommitTransaction(txn_id, &ts_error);
+      s = tsm.FinalizeCommitTransaction(txn_id, Timestamp::kInitialTimestamp, 
&ts_error);
       ASSERT_TRUE(s.IsServiceUnavailable());
       ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
 
@@ -394,7 +397,9 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
           statuses[i] = txn_manager_->BeginCommitTransaction(txn_id, kOwner, 
&ts_error);
           break;
         case TxnStatePB::COMMITTED:
-          statuses[i] = txn_manager_->FinalizeCommitTransaction(txn_id, 
&ts_error);
+          statuses[i] = txn_manager_->FinalizeCommitTransaction(txn_id,
+                                                                
Timestamp::kInitialTimestamp,
+                                                                &ts_error);
           break;
         default:
           FAIL() << "bad update";
@@ -477,7 +482,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     ASSERT_TRUE(txn_status.has_user());
     ASSERT_EQ(kOwner, txn_status.user());
 
-    ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1, &ts_error));
+    ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1, 
Timestamp::kInitialTimestamp, &ts_error));
     ASSERT_OK(txn_manager_->GetTransactionStatus(
         1, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
@@ -623,7 +628,7 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
                           "transaction ID 1 not owned by stranger");
     }
 
-    ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1, &ts_error));
+    ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1, 
Timestamp::kInitialTimestamp, &ts_error));
     s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(),
@@ -785,23 +790,25 @@ TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) {
   // We can't begin or finalize a commit if we've aborted.
   Status s = txn_manager_->BeginCommitTransaction(kTxnId1, kOwner, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-  s = txn_manager_->FinalizeCommitTransaction(kTxnId1, &ts_error);
+  s = txn_manager_->FinalizeCommitTransaction(kTxnId1, 
Timestamp::kInitialTimestamp, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // We can't finalize a commit that hasn't begun committing.
   const int64_t kTxnId2 = 2;
   ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, nullptr, 
&ts_error));
-  s = txn_manager_->FinalizeCommitTransaction(kTxnId2, &ts_error);
+  s = txn_manager_->FinalizeCommitTransaction(kTxnId2, 
Timestamp::kInitialTimestamp, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // We can't abort a transaction that has finished committing.
   ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId2, kOwner, &ts_error));
-  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2, &ts_error));
+  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(
+      kTxnId2, Timestamp::kInitialTimestamp, &ts_error));
   s = txn_manager_->AbortTransaction(kTxnId2, kOwner, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // Redundant finalize calls are also benign.
-  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2, &ts_error));
+  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(
+      kTxnId2, Timestamp::kInitialTimestamp, &ts_error));
 
   // Calls to begin committing should return an error if we've already
   // finalized the commit.
@@ -832,7 +839,8 @@ TEST_F(TxnStatusManagerTest, 
TestRegisterParticipantsWithStates) {
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // We can't register participants when we've finished committnig.
-  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId1, &ts_error));
+  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(
+      kTxnId1, Timestamp::kInitialTimestamp, &ts_error));
   s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner, 
&ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
diff --git a/src/kudu/transactions/txn_status_manager.cc 
b/src/kudu/transactions/txn_status_manager.cc
index fbd085e..b2d7a1f 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -22,6 +22,7 @@
 #include <mutex>
 #include <ostream>
 #include <string>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
@@ -41,7 +42,9 @@
 #include "kudu/tablet/ops/op_tracker.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/transactions/transactions.pb.h"
+#include "kudu/transactions/txn_system_client.h"
 #include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/util/cow_object.h"
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
@@ -50,6 +53,7 @@
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
+#include "kudu/util/threadpool.h"
 
 DEFINE_uint32(txn_keepalive_interval_ms, 30000,
               "Maximum interval (in milliseconds) between subsequent "
@@ -69,6 +73,12 @@ 
DEFINE_int32(txn_status_manager_inject_latency_load_from_tablet_ms, 0,
 TAG_FLAG(txn_status_manager_inject_latency_load_from_tablet_ms, hidden);
 TAG_FLAG(txn_status_manager_inject_latency_load_from_tablet_ms, unsafe);
 
+DEFINE_int32(txn_status_manager_inject_latency_finalize_commit_ms, 0,
+             "Injects a random latency between 0 and this many milliseconds "
+             "before finalizing commits.");
+TAG_FLAG(txn_status_manager_inject_latency_finalize_commit_ms, hidden);
+TAG_FLAG(txn_status_manager_inject_latency_finalize_commit_ms, unsafe);
+
 DEFINE_uint32(txn_staleness_tracker_interval_ms, 10000,
               "Period (in milliseconds) of the task that tracks and aborts "
               "stale/abandoned transactions. If this flag is set to 0, "
@@ -109,15 +119,34 @@ 
DEFINE_bool(txn_status_tablet_inject_uninitialized_leader_status_error, false,
             "If true, inject uninitialized leader status error");
 TAG_FLAG(txn_status_tablet_inject_uninitialized_leader_status_error, unsafe);
 
+DEFINE_uint32(txn_background_rpc_timeout_ms, 5000,
+              "Period (in milliseconds) with which transaction-related 
background requests "
+              "are made");
+TAG_FLAG(txn_background_rpc_timeout_ms, experimental);
+TAG_FLAG(txn_background_rpc_timeout_ms, runtime);
+
+DEFINE_uint32(txn_client_initialization_timeout_ms, 10000,
+              "Amount of time Kudu will try to initialize a client with "
+              "which to perform transaction commit tasks.");
+TAG_FLAG(txn_client_initialization_timeout_ms, runtime);
+
+DEFINE_bool(txn_schedule_background_tasks, true,
+            "Whether or not instances of the TxnStatusManager should schedule "
+            "background tasks to operate on transactions (e.g. commit, 
abort)");
+TAG_FLAG(txn_schedule_background_tasks, unsafe);
+
 using kudu::consensus::ConsensusStatePB;
 using kudu::consensus::RaftConsensus;
 using kudu::consensus::RaftPeerPB;
+
 using kudu::pb_util::SecureShortDebugString;
 using kudu::rpc::RpcContext;
 using kudu::tablet::TabletReplica;
 using kudu::tablet::ParticipantIdsByTxnId;
+using kudu::tserver::ParticipantOpPB;
 using kudu::tserver::TabletServerErrorPB;
 using std::string;
+using std::unordered_map;
 using std::vector;
 using strings::Substitute;
 
@@ -153,6 +182,170 @@ Status ReportIllegalTxnState(const string& errmsg,
 
 } // anonymous namespace
 
+bool CommitTasks::IsShuttingDownCleanupIfLastOp() {
+  if (stop_task_ || txn_status_manager_->shutting_down()) {
+    if (--ops_in_flight_ == 0) {
+      txn_status_manager_->RemoveCommitTask(txn_id_.value(), this);
+    }
+    return true;
+  }
+  return false;
+}
+
+bool CommitTasks::IsShuttingDownCleanup() const {
+  DCHECK_EQ(0, ops_in_flight_);
+  if (stop_task_ || txn_status_manager_->shutting_down()) {
+    txn_status_manager_->RemoveCommitTask(txn_id_.value(), this);
+    return true;
+  }
+  return false;
+}
+
+void CommitTasks::BeginCommitAsyncTask(int participant_idx) {
+  DCHECK_LT(participant_idx, participant_ids_.size());
+  // Status callback called with the result from the participant op. This is
+  // used to collect the participants' highest timestamps, with which we can
+  // schedule the finalize commit task.
+  //
+  // The Status is the result returned from ParticipantRpc::AnalyzeResponse.
+  scoped_refptr<CommitTasks> scoped_this(this);
+  auto participated_cb = [this, scoped_this = std::move(scoped_this),
+                          participant_idx] (const Status& s) {
+    if (IsShuttingDownCleanupIfLastOp()) {
+      return;
+    }
+    if (PREDICT_FALSE(s.IsTimedOut())) {
+      // Retry timeout errors. Other transient errors should be retried by the
+      // client until timeout.
+      BeginCommitAsyncTask(participant_idx);
+      return;
+    }
+    if (PREDICT_FALSE(s.IsNotFound())) {
+      // If the participant has been deleted, treat it as though it's already
+      // been committed, rather than attempting to abort or something. This is
+      // important to ensure retries of the commit tasks reliably result in the
+      // same operations being performed.
+      LOG(INFO) << Substitute("Participant $0 was not found: $1",
+                              participant_ids_[participant_idx], s.ToString());
+    } else if (PREDICT_FALSE(!s.ok())) {
+      // For any other kind of error, just exit without completing.
+      // TODO(awong): we're presuming that such errors wouldn't benefit from
+      // just retrying.
+      LOG(WARNING) << Substitute("Participant $0 BEGIN_COMMIT op returned $1",
+                                 participant_ids_[participant_idx], 
s.ToString());
+      stop_task_ = true;
+    }
+
+    // If this was the last participant op for this task, we have some cleanup
+    // to do.
+    if (--ops_in_flight_ == 0) {
+      if (IsShuttingDownCleanup()) {
+        return;
+      }
+      Timestamp max_timestamp(Timestamp::kInitialTimestamp);
+      for (const auto& ts : begin_commit_timestamps_) {
+        max_timestamp = std::max(ts, max_timestamp);
+      }
+      DCHECK_NE(Timestamp::kInitialTimestamp, max_timestamp);
+      FinalizeCommitAsync(max_timestamp);
+    }
+  };
+  ParticipantOpPB op_pb;
+  op_pb.set_txn_id(txn_id_.value());
+  op_pb.set_type(ParticipantOpPB::BEGIN_COMMIT);
+  txn_client_->ParticipateInTransactionAsync(
+      participant_ids_[participant_idx],
+      std::move(op_pb),
+      MonoDelta::FromMilliseconds(FLAGS_txn_background_rpc_timeout_ms),
+      std::move(participated_cb),
+      &begin_commit_timestamps_[participant_idx]);
+}
+
+void CommitTasks::FinalizeCommitAsyncTask(int participant_idx, const 
Timestamp& commit_timestamp) {
+  DCHECK_LT(participant_idx, participant_ids_.size());
+  // Status callback called with the result from the participant op.
+  scoped_refptr<CommitTasks> scoped_this(this);
+  auto participated_cb = [this, scoped_this = std::move(scoped_this),
+                          participant_idx, commit_timestamp] (const Status& s) 
{
+    if (IsShuttingDownCleanupIfLastOp()) {
+      return;
+    }
+    if (PREDICT_FALSE(s.IsTimedOut())) {
+      LOG(WARNING) << Substitute("Retrying FINALIZE_COMMIT op for txn $0: $1",
+                                 txn_id_.ToString(), s.ToString());
+      FinalizeCommitAsyncTask(participant_idx, commit_timestamp);
+      return;
+    } else if (PREDICT_FALSE(!s.ok())) {
+      // Presumably the error is not transient (e.g. not found) so retrying
+      // won't help. But we've already begun sending out FINALIZE_TXN ops, so
+      // we must complete the transaction.
+      // TODO(awong): revisit this; if we include an intermediate state between
+      // kCommitInProgress and kCommitted, that might be an opportune moment to
+      // abort.
+      LOG(WARNING) << Substitute("Participant $0 FINALIZE_COMMIT op returned 
$1",
+                                 participant_ids_[participant_idx], 
s.ToString());
+    }
+    // If this was the last participant op for this task, write the finalized
+    // commit timestamp to the tablet.
+    if (--ops_in_flight_ == 0) {
+      if (IsShuttingDownCleanup()) {
+        return;
+      }
+      ScheduleFinalizeCommitWrite(commit_timestamp);
+    }
+  };
+  ParticipantOpPB op_pb;
+  op_pb.set_txn_id(txn_id_.value());
+  op_pb.set_type(ParticipantOpPB::FINALIZE_COMMIT);
+  op_pb.set_finalized_commit_timestamp(commit_timestamp.value());
+  txn_client_->ParticipateInTransactionAsync(
+      participant_ids_[participant_idx],
+      std::move(op_pb),
+      MonoDelta::FromMilliseconds(FLAGS_txn_background_rpc_timeout_ms),
+      std::move(participated_cb));
+}
+
+void CommitTasks::FinalizeCommitAsync(Timestamp commit_timestamp) {
+  // Reset the in-flight counter to indicate we're waiting for this new set of
+  // tasks to complete.
+  auto old_val = ops_in_flight_.exchange(participant_ids_.size());
+  DCHECK_EQ(0, old_val);
+  ops_in_flight_ = participant_ids_.size();
+  for (int i = 0; i < participant_ids_.size(); i++) {
+    FinalizeCommitAsyncTask(i, commit_timestamp);
+  }
+}
+
+void CommitTasks::ScheduleFinalizeCommitWrite(Timestamp commit_timestamp) {
+  // Submit the task to a threadpool.
+  // NOTE: This is called by the reactor thread that catches the BeginCommit
+  // reseponse, so we can't do IO in this thread.
+  DCHECK_EQ(0, ops_in_flight_);
+  scoped_refptr<CommitTasks> scoped_this(this);
+  CHECK_OK(commit_pool_->Submit([this, scoped_this = std::move(scoped_this),
+                                 tsm = this->txn_status_manager_,
+                                 txn_id = this->txn_id_, commit_timestamp] {
+    MAYBE_INJECT_RANDOM_LATENCY(
+        FLAGS_txn_status_manager_inject_latency_finalize_commit_ms);
+
+    if (IsShuttingDownCleanup()) {
+      return;
+    }
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_status_manager_);
+    if (PREDICT_TRUE(l.first_failed_status().ok())) {
+      TabletServerErrorPB error_pb;
+      WARN_NOT_OK(tsm->FinalizeCommitTransaction(txn_id.value(), 
commit_timestamp, &error_pb),
+                  "Error writing to transaction status table");
+    }
+
+    // Regardless of whether we succeed or fail, remove the commit task.
+    // Presumably we failed either because the replica is being shut down, or
+    // because we're no longer leader. In either case, the task will be retried
+    // once a new leader is elected.
+    tsm->RemoveCommitTask(txn_id, this);
+  }));
+}
+
 TxnStatusManagerBuildingVisitor::TxnStatusManagerBuildingVisitor()
     : highest_txn_id_(kIdStatusDataReady) {
 }
@@ -180,6 +373,7 @@ void 
TxnStatusManagerBuildingVisitor::VisitTransactionEntries(
       l.Commit();
     }
   }
+
   // NOTE: this method isn't meant to be thread-safe, hence the lack of
   // locking.
   EmplaceOrDie(&txns_by_id_, txn_id, std::move(txn));
@@ -192,13 +386,6 @@ void TxnStatusManagerBuildingVisitor::Release(
   *txns_by_id = std::move(txns_by_id_);
 }
 
-TxnStatusManager::TxnStatusManager(tablet::TabletReplica* tablet_replica)
-    : highest_txn_id_(kIdStatusDataNotLoaded),
-      status_tablet_(tablet_replica),
-      leader_ready_term_(kUninitializedLeaderTerm),
-      leader_lock_(RWMutex::Priority::PREFER_WRITING) {
-}
-
 ////////////////////////////////////////////////////////////
 // TxnStatusManager::ScopedLeaderSharedLock
 ////////////////////////////////////////////////////////////
@@ -294,18 +481,82 @@ Status TxnStatusManager::LoadFromTabletUnlocked() {
   MAYBE_INJECT_RANDOM_LATENCY(
       FLAGS_txn_status_manager_inject_latency_load_from_tablet_ms);
 
-  std::lock_guard<simple_spinlock> l(lock_);
-  highest_txn_id_ = std::max(highest_txn_id, highest_txn_id_);
-  txns_by_id_ = std::move(txns_by_id);
+  // TODO(awong): if we can't connect to the masters, consider retrying later.
+  // For now, just load the table without starting any background tasks.
+  TxnSystemClient* txn_client = nullptr;
+  if (PREDICT_TRUE(client_initializer_)) {
+    WARN_NOT_OK(client_initializer_->WaitForClient(
+        
MonoDelta::FromMilliseconds(FLAGS_txn_client_initialization_timeout_ms), 
&txn_client),
+                "Unable to initialize TxnSystemClient");
+  }
 
+  unordered_map<int64_t, scoped_refptr<CommitTasks>> commits_in_flight;
+  unordered_map<int64_t, scoped_refptr<CommitTasks>> new_tasks;
+  if (txn_client) {
+    for (const auto& txn_id_and_entry : txns_by_id) {
+      const auto& txn_entry = txn_id_and_entry.second;
+      if (txn_id_and_entry.second->state() == TxnStatePB::COMMIT_IN_PROGRESS) {
+        const auto& txn_id = txn_id_and_entry.first;
+        new_tasks.emplace(txn_id,
+            new CommitTasks(txn_id, txn_entry->GetParticipantIds(),
+                            txn_client, commit_pool_, this));
+      }
+    }
+  }
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    highest_txn_id_ = std::max(highest_txn_id, highest_txn_id_);
+    txns_by_id_ = std::move(txns_by_id);
+    commits_in_flight = std::move(commits_in_flight_);
+    // Stop any previously on-going tasks.
+    for (const auto& txn_id_and_tasks : commits_in_flight) {
+      txn_id_and_tasks.second->stop();
+    }
+    commits_in_flight_ = std::move(new_tasks);
+    if (!commits_in_flight_.empty()) {
+      LOG(INFO) << Substitute("Starting $0 commit tasks", 
commits_in_flight_.size());
+    }
+    for (const auto& id_and_commit_task : commits_in_flight_) {
+      id_and_commit_task.second->BeginCommitAsync();
+    }
+  }
   return Status::OK();
 }
 
+TxnStatusManager::TxnStatusManager(tablet::TabletReplica* tablet_replica,
+                                   TxnSystemClientInitializer* 
txn_client_initializer,
+                                   ThreadPool* commit_pool)
+    : client_initializer_(txn_client_initializer),
+      commit_pool_(commit_pool),
+      shutting_down_(false),
+      highest_txn_id_(kIdStatusDataNotLoaded),
+      status_tablet_(tablet_replica),
+      leader_ready_term_(kUninitializedLeaderTerm),
+      leader_lock_(RWMutex::Priority::PREFER_WRITING) {
+}
+
+void TxnStatusManager::Shutdown() {
+  shutting_down_ = true;
+  // Wait for all tasks to complete.
+  while (true) {
+    {
+      std::lock_guard<simple_spinlock> l(lock_);
+      if (commits_in_flight_.empty()) {
+        return;
+      }
+    }
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+}
+
+TxnStatusManager::~TxnStatusManager() {
+  Shutdown();
+}
+
 Status TxnStatusManager::LoadFromTablet() {
   // Block new transaction status manager operations, and wait
   // for existing operations to finish.
   std::lock_guard<RWMutex> leader_lock_guard(leader_lock_);
-  leader_lock_.AssertAcquiredForWriting();
   return LoadFromTabletUnlocked();
 }
 
@@ -545,9 +796,32 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id,
   return Status::OK();
 }
 
+void CommitTasks::BeginCommitAsync() {
+  if (participant_ids_.empty()) {
+    // If there are no participants for this transaction; just write an invalid
+    // timestamp.
+    ScheduleFinalizeCommitWrite(Timestamp::kInvalidTimestamp);
+  } else {
+    // If there are some participants, schedule beginning commit tasks so
+    // we can determine a finalized commit timestamp.
+    //
+    // TODO(awong): consider an approach in which clients propagate
+    // timestamps in such a way that the client's call to begin commit
+    // includes the expected finalized commit timestamp.
+    for (int i = 0; i < participant_ids_.size(); i++) {
+      BeginCommitAsyncTask(i);
+    }
+  }
+}
+
 Status TxnStatusManager::BeginCommitTransaction(int64_t txn_id, const string& 
user,
                                                 TabletServerErrorPB* ts_error) 
{
   leader_lock_.AssertAcquiredForReading();
+  TxnSystemClient* txn_client;
+  if (PREDICT_TRUE(FLAGS_txn_schedule_background_tasks)) {
+    RETURN_NOT_OK(client_initializer_->GetClient(&txn_client));
+  }
+
   scoped_refptr<TransactionEntry> txn;
   RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
 
@@ -565,12 +839,24 @@ Status TxnStatusManager::BeginCommitTransaction(int64_t 
txn_id, const string& us
   auto* mutable_data = txn_lock.mutable_data();
   mutable_data->pb.set_state(TxnStatePB::COMMIT_IN_PROGRESS);
   RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb, 
ts_error));
+
+  if (PREDICT_TRUE(FLAGS_txn_schedule_background_tasks)) {
+    auto participant_ids = txn->GetParticipantIds();
+    std::unique_lock<simple_spinlock> l(lock_);
+    auto iter_and_emplaced = commits_in_flight_.emplace(txn_id,
+        new CommitTasks(txn_id, std::move(participant_ids),
+                        txn_client, commit_pool_, this));
+    l.unlock();
+    if (iter_and_emplaced.second) {
+      iter_and_emplaced.first->second->BeginCommitAsync();
+    }
+  }
   txn_lock.Commit();
 
   // TODO(aserbin): remove this test-only crutch once the orchestration of
   //                the two phase commit is implemented
   if (PREDICT_FALSE(FLAGS_txn_status_manager_finalize_commit_on_begin)) {
-    RETURN_NOT_OK(FinalizeCommitTransaction(txn_id, ts_error));
+    RETURN_NOT_OK(FinalizeCommitTransaction(txn_id, 
Timestamp::kInitialTimestamp, ts_error));
   }
 
   return Status::OK();
@@ -578,6 +864,7 @@ Status TxnStatusManager::BeginCommitTransaction(int64_t 
txn_id, const string& us
 
 Status TxnStatusManager::FinalizeCommitTransaction(
     int64_t txn_id,
+    Timestamp commit_timestamp,
     TabletServerErrorPB* ts_error) {
   leader_lock_.AssertAcquiredForReading();
   scoped_refptr<TransactionEntry> txn;
diff --git a/src/kudu/transactions/txn_status_manager.h 
b/src/kudu/transactions/txn_status_manager.h
index 4a4b4a8..b9d94c7 100644
--- a/src/kudu/transactions/txn_status_manager.h
+++ b/src/kudu/transactions/txn_status_manager.h
@@ -16,16 +16,20 @@
 // under the License.
 #pragma once
 
+#include <atomic>
 #include <cstdint>
 #include <memory>
 #include <mutex>
 #include <string>
 #include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
 #include <gtest/gtest_prod.h>
 
+#include "kudu/common/timestamp.h"
+#include "kudu/common/txn_id.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/tablet/txn_coordinator.h"
@@ -36,6 +40,7 @@
 #include "kudu/util/status.h"
 
 namespace kudu {
+class ThreadPool;
 
 class MonoDelta;
 namespace rpc {
@@ -51,8 +56,112 @@ class TabletServerErrorPB;
 } // namespace tserver
 
 namespace transactions {
-
 class TxnStatusEntryPB;
+class TxnStatusManager;
+class TxnSystemClient;
+class TxnSystemClientInitializer;
+
+// A handle for background tasks associated with a single transaction.
+class CommitTasks : public RefCountedThreadSafe<CommitTasks> {
+ public:
+  CommitTasks(TxnId txn_id,
+              std::vector<std::string> participant_ids,
+              TxnSystemClient* txn_client,
+              ThreadPool* commit_pool,
+              TxnStatusManager* txn_status_manager)
+     : txn_id_(std::move(txn_id)),
+       participant_ids_(std::move(participant_ids)),
+       txn_client_(txn_client),
+       commit_pool_(commit_pool),
+       txn_status_manager_(txn_status_manager),
+       ops_in_flight_(participant_ids_.size()),
+       begin_commit_timestamps_(participant_ids_.size(), 
Timestamp::kInvalidTimestamp),
+       stop_task_(false) {
+  }
+
+  // Asynchronously sends a BEGIN_COMMIT participant op to each participant,
+  // and on completion, schedules FINALIZE_COMMIT ops to be sent.
+  //
+  // If there are no participants for the given transaction, a commit record is
+  // written to the tablet.
+  void BeginCommitAsync();
+
+  // Asynchronously sends a BEGIN_COMMIT participant op to the participant at
+  // the given index. If this was the last one to complete, schedules
+  // FINALIZE_COMMIT ops to be sent.
+  void BeginCommitAsyncTask(int participant_idx);
+
+  // Asynchronously sends a FINALIZE_COMMIT participant op to the each
+  // participant in the transaction, and upon completion, schedules a commit
+  // record to be written to the tablet.
+  void FinalizeCommitAsyncTask(int participant_idx, const Timestamp& 
commit_timestamp);
+
+  // Asynchronously sends a FINALIZE_COMMIT participant op to the participant
+  // at the given index. If this was the last one to complete, schedules a
+  // commit record with the given timestamp to be written to the tablet.
+  void FinalizeCommitAsync(Timestamp commit_timestamp);
+
+  // Schedule calls to the TxnStatusManager to be made on the commit pool.
+  // NOTE: this may be called on reactor threads and thus must not
+  // synchronously do any IO.
+  void ScheduleFinalizeCommitWrite(Timestamp commit_timestamp);
+
+  // Stops further tasks from being run. Once called calls to the above methods
+  // should effectively no-op.
+  void stop() {
+    stop_task_ = true;
+  }
+
+ private:
+  friend class RefCountedThreadSafe<CommitTasks>;
+  ~CommitTasks() = default;
+
+  // Returns true if the task has been stopped or if the TxnStatusManager is
+  // being shut down. Cleans up the task state if the caller was the last op in
+  // flight.
+  //
+  // This is useful in participant op callbacks to determine whether the
+  // callback can return immediately without doing work.
+  bool IsShuttingDownCleanupIfLastOp();
+
+  // Returns true if the task has been stopped or if the TxnStatusManager is
+  // being shut down. Cleans up the task state, expecting that the caller was
+  // the last op in flight.
+  //
+  // This is useful when there are no ops in flight and the caller is about to
+  // schedule work, and needs to determine if it should opt out because of the
+  // shutdown.
+  bool IsShuttingDownCleanup() const;
+
+  // The ID of the transaction being committed.
+  const TxnId txn_id_;
+
+  // Tablet IDs of the participants of this transaction.
+  const std::vector<std::string> participant_ids_;
+
+  // Client with which to send requests.
+  TxnSystemClient* txn_client_;
+
+  // Threadpool on which to schedule tasks.
+  ThreadPool* commit_pool_;
+
+  // The TxnStatusManager that created these tasks.
+  TxnStatusManager* txn_status_manager_;
+
+  // The number of participant op RPCs in flight.
+  std::atomic<int> ops_in_flight_;
+
+  // The timestamps used to replica each participant's BEGIN_COMMIT ops.
+  std::vector<Timestamp> begin_commit_timestamps_;
+
+  // The commit timestamp for this transaction.
+  Timestamp commit_timestamp_;
+
+  // Whether the task should stop executing, e.g. since an IllegalState error
+  // was observed on a participant, or because the TxnStatusManager changed
+  // leadership.
+  std::atomic<bool> stop_task_;
+};
 
 // Maps the transaction ID to the corresponding TransactionEntry.
 typedef std::unordered_map<int64_t, scoped_refptr<TransactionEntry>> 
TransactionsMap;
@@ -81,8 +190,10 @@ class TxnStatusManagerBuildingVisitor : public 
TransactionsVisitor {
 // underlying tablet.
 class TxnStatusManager final : public tablet::TxnCoordinator {
  public:
-  explicit TxnStatusManager(tablet::TabletReplica* tablet_replica);
-  ~TxnStatusManager() = default;
+  TxnStatusManager(tablet::TabletReplica* tablet_replica,
+                   TxnSystemClientInitializer* client_initializer,
+                   ThreadPool* commit_pool);
+  ~TxnStatusManager();
 
   // Scoped "shared lock" to serialize replica leader elections.
   //
@@ -133,6 +244,7 @@ class TxnStatusManager final : public 
tablet::TxnCoordinator {
     DISALLOW_COPY_AND_ASSIGN(ScopedLeaderSharedLock);
   };
 
+  void Shutdown() override;
   void PrepareLeadershipTask() override;
 
   // Writes an entry to the status tablet and creates a transaction in memory.
@@ -162,9 +274,7 @@ class TxnStatusManager final : public 
tablet::TxnCoordinator {
   //
   // Unlike the other transaction life-cycle calls, this isn't user-initiated,
   // so it doesn't take a user.
-  //
-  // TODO(awong): add a commit timestamp.
-  Status FinalizeCommitTransaction(int64_t txn_id,
+  Status FinalizeCommitTransaction(int64_t txn_id, Timestamp commit_timestamp,
                                    tserver::TabletServerErrorPB* ts_error) 
override;
 
   // Aborts the given transaction, returning an error if the transaction
@@ -207,14 +317,29 @@ class TxnStatusManager final : public 
tablet::TxnCoordinator {
   // associated with that transaction ID.
   tablet::ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const 
override;
 
+  void RemoveCommitTask(int64_t txn_id, const CommitTasks* tasks) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    const auto& iter = commits_in_flight_.find(txn_id);
+    if (iter != commits_in_flight_.end() && iter->second.get() == tasks) {
+      commits_in_flight_.erase(iter);
+    }
+  }
+
+  bool shutting_down() const {
+    return shutting_down_;
+  }
+
  private:
   // This test class calls LoadFromTablet() directly.
   FRIEND_TEST(TxnStatusManagerTest, TestStartTransactions);
   FRIEND_TEST(TxnStatusManagerTest, GetTransactionStatus);
   friend class TxnStatusManagerTest;
 
-  // Loads the contents of the transaction status tablet into memory.
+  // Loads the contents of the transaction status tablet into memory, starting
+  // up any background tasks (e.g. commits) that need to be driven by the new
+  // leader.
   Status LoadFromTabletUnlocked();
+
   // This is called by tests only.
   Status LoadFromTablet();
 
@@ -248,7 +373,13 @@ class TxnStatusManager final : public 
tablet::TxnCoordinator {
                         scoped_refptr<TransactionEntry>* txn,
                         tserver::TabletServerErrorPB* ts_error) const;
 
-  // Protects 'highest_txn_id_' and 'txns_by_id_'.
+  TxnSystemClientInitializer* client_initializer_;
+  ThreadPool* commit_pool_;
+
+  std::atomic<bool> shutting_down_;
+
+  // Protects 'highest_txn_id_', 'txns_by_id_', and insertions or removal from
+  // 'commits_in_flight_'.
   mutable simple_spinlock lock_;
 
   // The highest transaction ID seen by this status manager so far. Requests to
@@ -258,6 +389,10 @@ class TxnStatusManager final : public 
tablet::TxnCoordinator {
   // Tracks the currently on-going transactions.
   TransactionsMap txns_by_id_;
 
+  // Can only be inserted to while the appropriate transaction lock is being
+  // held.
+  std::unordered_map<int64_t, scoped_refptr<CommitTasks>> commits_in_flight_;
+
   // The access to underlying storage.
   TxnStatusTablet status_tablet_;
 
@@ -285,11 +420,19 @@ class TxnStatusManager final : public 
tablet::TxnCoordinator {
 
 class TxnStatusManagerFactory : public tablet::TxnCoordinatorFactory {
  public:
-  TxnStatusManagerFactory() {}
+  TxnStatusManagerFactory(TxnSystemClientInitializer* client_initializer,
+                          ThreadPool* commit_pool)
+      : txn_client_initializer_(client_initializer),
+        commit_pool_(commit_pool) {}
 
   std::unique_ptr<tablet::TxnCoordinator> Create(tablet::TabletReplica* 
replica) override {
-    return std::unique_ptr<tablet::TxnCoordinator>(new 
TxnStatusManager(replica));
+    return std::unique_ptr<tablet::TxnCoordinator>(
+        new TxnStatusManager(replica, txn_client_initializer_, commit_pool_));
   }
+
+ private:
+  TxnSystemClientInitializer* txn_client_initializer_;
+  ThreadPool* commit_pool_;
 };
 
 } // namespace transactions
diff --git a/src/kudu/transactions/txn_system_client.cc 
b/src/kudu/transactions/txn_system_client.cc
index 9d9d1d8..ded9dfc 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -443,6 +443,25 @@ Status 
TxnSystemClientInitializer::GetClient(TxnSystemClient** client) const {
   return Status::ServiceUnavailable("could not get TxnSystemClient, still 
initializing");
 }
 
+Status TxnSystemClientInitializer::WaitForClient(const MonoDelta& timeout,
+                                                 TxnSystemClient** client) 
const {
+  const auto deadline = MonoTime::Now() + timeout;
+  Status s;
+  do {
+    if (shutting_down_) {
+      return Status::ServiceUnavailable("could not get TxnSystemClient, 
shutting down");
+    }
+    s = GetClient(client);
+    if (PREDICT_TRUE(s.ok())) {
+      DCHECK(*client);
+      return Status::OK();
+    }
+    SleepFor(MonoDelta::FromMilliseconds(100));
+  } while (MonoTime::Now() < deadline);
+  return Status::TimedOut(Substitute("Unable to get client in $0: $1",
+                                     timeout.ToString(), s.ToString()));
+}
+
 void TxnSystemClientInitializer::Shutdown() {
   shutting_down_ = true;
   txn_client_init_pool_->Wait();
diff --git a/src/kudu/transactions/txn_system_client.h 
b/src/kudu/transactions/txn_system_client.h
index 54e70fb..de2983d 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -146,6 +146,11 @@ class TxnSystemClient {
                                   const tserver::ParticipantOpPB& 
participant_op,
                                   const MonoDelta& timeout,
                                   Timestamp* begin_commit_timestamp = nullptr);
+  void ParticipateInTransactionAsync(const std::string& tablet_id,
+                                     tserver::ParticipantOpPB participant_op,
+                                     const MonoDelta& timeout,
+                                     StatusCallback cb,
+                                     Timestamp* begin_commit_timestamp = 
nullptr);
  private:
 
   friend class itest::TxnStatusTableITest;
@@ -164,12 +169,6 @@ class TxnSystemClient {
                                     const StatusCallback& cb,
                                     tserver::CoordinatorOpResultPB* result = 
nullptr);
 
-  void ParticipateInTransactionAsync(const std::string& tablet_id,
-                                     tserver::ParticipantOpPB participant_op,
-                                     const MonoDelta& timeout,
-                                     StatusCallback cb,
-                                     Timestamp* begin_commit_timestamp = 
nullptr);
-
   client::sp::shared_ptr<client::KuduTable> txn_status_table() {
     std::lock_guard<simple_spinlock> l(table_lock_);
     return txn_status_table_;
@@ -200,6 +199,11 @@ class TxnSystemClientInitializer {
   // TxnSystemClientInitializer is still in scope.
   Status GetClient(TxnSystemClient** client) const;
 
+  // Like the above, but retries periodically for the client to be initialized
+  // for up to 'timeout', returning a TimedOut error if unable to. Returns a
+  // ServiceUnavailable error if the initializer has been being shut down.
+  Status WaitForClient(const MonoDelta& timeout, TxnSystemClient** client) 
const;
+
   // Stops the initialization, preventing success of further calls to
   // GetClient().
   void Shutdown();
diff --git a/src/kudu/tserver/ts_tablet_manager.cc 
b/src/kudu/tserver/ts_tablet_manager.cc
index 2ebf1cf..e37df45 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -99,6 +99,10 @@ 
DEFINE_int32(num_txn_status_tablets_to_reload_simultaneously, 0,
              "sense to manually tune this.");
 TAG_FLAG(num_txn_status_tablets_to_reload_simultaneously, advanced);
 
+DEFINE_int32(txn_commit_pool_num_threads, 10,
+             "Number of threads available for transaction commit tasks.");
+TAG_FLAG(txn_commit_pool_num_threads, advanced);
+
 DEFINE_int32(tablet_start_warn_threshold_ms, 500,
              "If a tablet takes more than this number of millis to start, 
issue "
              "a warning with a trace.");
@@ -371,6 +375,11 @@ Status TSTabletManager::Init() {
                 .set_max_threads(FLAGS_num_tablets_to_copy_simultaneously)
                 .Build(&tablet_copy_pool_));
 
+  RETURN_NOT_OK(ThreadPoolBuilder("txn-commit")
+                .set_max_queue_size(0)
+                .set_max_threads(FLAGS_txn_commit_pool_num_threads)
+                .Build(&txn_commit_pool_));
+
   // Start the threadpools we'll use to open and delete tablets.
   // This has to be done in Init() instead of the constructor, since the
   // FsManager isn't initialized until this point.
@@ -870,10 +879,8 @@ Status TSTabletManager::CreateAndRegisterTabletReplica(
     scoped_refptr<TabletMetadata> meta,
     RegisterTabletReplicaMode mode,
     scoped_refptr<TabletReplica>* replica_out) {
-  // TODO(awong): this factory will at some point contain some tserver-wide
-  // state like a system client that can make calls to leader tablets. For now,
-  // just use a simple local factory.
-  TxnStatusManagerFactory tsm_factory;
+  TxnStatusManagerFactory tsm_factory(server_->txn_client_initializer(),
+                                      txn_commit_pool_.get());
   const auto& tablet_id = meta->tablet_id();
   scoped_refptr<TabletReplica> replica(
       new TabletReplica(std::move(meta),
@@ -1287,6 +1294,10 @@ void TSTabletManager::Shutdown() {
   // properly executed to unblock the shutdown process of replicas.
   reload_txn_status_tablet_pool_->Shutdown();
 
+  // Now that our TxnStatusManagers have shut down, clean up the threadpool
+  // used for commit tasks.
+  txn_commit_pool_->Shutdown();
+
   {
     std::lock_guard<RWMutex> l(lock_);
     // We don't expect anyone else to be modifying the map after we start the
diff --git a/src/kudu/tserver/ts_tablet_manager.h 
b/src/kudu/tserver/ts_tablet_manager.h
index a04458d..fbbbf14 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -415,6 +415,9 @@ class TSTabletManager : public 
tserver::TabletReplicaLookupIf {
   // Thread pool used to reload transaction status tablets asynchronously.
   std::unique_ptr<ThreadPool> reload_txn_status_tablet_pool_;
 
+  // Thread pool used to perform background tasks on transactions, e.g. to 
commit.
+  std::unique_ptr<ThreadPool> txn_commit_pool_;
+
   // Thread pool to run TxnStatusManager tasks. As of now, this pool is
   // to run a long-running single periodic task to abort stale transactions
   // registered with corresponding transaction status tablets.

Reply via email to