This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 451a4c6  KUDU-2612: add background task to abort transaction 
participants
451a4c6 is described below

commit 451a4c61eed132e00b6dc50dcaf4cfeca348be2a
Author: Andrew Wong <[email protected]>
AuthorDate: Sun Jan 31 02:30:59 2021 -0800

    KUDU-2612: add background task to abort transaction participants
    
    This patch implements background tasks that abort a given transaction
    when TxnStatusManager::AbortTransaction() is called. Similar to the
    commit tasks, aborts have the following life cycle:
    1. AbortTransaction() is called. A new state, ABORT_IN_PROGRESS, is
       written to the TxnStatusManager.
    2. ABORT_TXN ops are sent to all participants in the transaction.
    3. Once all participants have responded, the ABORTED state is written
       to the TxnStatusManager.
    
    This patch doesn't test races between commits and aborts. Some reworking
    of the commit tasks will be required to account for such races, and will
    be done in a follow-up.
    
    Change-Id: I484c315c6f7331c5ec12cb06370fbaae9c7c343e
    Reviewed-on: http://gerrit.cloudera.org:8080/17017
    Tested-by: Andrew Wong <[email protected]>
    Reviewed-by: Hao Hao <[email protected]>
---
 .../org/apache/kudu/client/KuduTransaction.java    |   2 +
 .../apache/kudu/client/TestKuduTransaction.java    |   9 +-
 src/kudu/client/client-test.cc                     |  18 +-
 src/kudu/client/transaction-internal.cc            |   4 +
 src/kudu/integration-tests/txn_commit-itest.cc     | 229 +++++++++++++++++++--
 .../integration-tests/txn_status_manager-itest.cc  |   6 +-
 .../integration-tests/txn_status_table-itest.cc    |   4 +-
 src/kudu/master/txn_manager-test.cc                |   4 +-
 src/kudu/transactions/transactions.proto           |   1 +
 src/kudu/transactions/txn_status_manager-test.cc   |  10 +-
 src/kudu/transactions/txn_status_manager.cc        | 169 +++++++++++++--
 src/kudu/transactions/txn_status_manager.h         |  23 ++-
 12 files changed, 420 insertions(+), 59 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index 292ccdb..19286f2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -309,6 +309,8 @@ public class KuduTransaction implements AutoCloseable {
     GetTransactionStateResponse resp = KuduClient.joinAndHandleException(d);
     final Transactions.TxnStatePB txnState = resp.txnState();
     switch (txnState) {
+      case ABORT_IN_PROGRESS:
+        throw new NonRecoverableException(Status.Aborted("transaction is being 
aborted"));
       case ABORTED:
         throw new NonRecoverableException(Status.Aborted("transaction was 
aborted"));
       case OPEN:
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
index 37c7772..5b8b035 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -249,6 +249,9 @@ public class TestKuduTransaction {
   @MasterServerConfig(flags = {
       "--txn_manager_enabled",
   })
+  @TabletServerConfig(flags = {
+      "--txn_schedule_background_tasks=false"
+  })
   public void testIsCommitCompleteSpecialCases() throws Exception {
     KuduTransaction txn = client.newTransaction();
 
@@ -276,7 +279,7 @@ public class TestKuduTransaction {
             }
           });
       assertTrue(ex.getStatus().isAborted());
-      assertEquals("transaction was aborted", ex.getMessage());
+      assertEquals("transaction is being aborted", ex.getMessage());
     }
 
     // Try to call isCommitComplete() on a handle that isn't backed by any
@@ -606,7 +609,7 @@ public class TestKuduTransaction {
           });
       final String errmsg = ex.getMessage();
       assertTrue(errmsg, errmsg.matches(
-          ".* transaction ID .* is not open: state: ABORTED .*"));
+          ".* transaction ID .* is not open: state: ABORT.*"));
 
       // Verify that KuduTransaction.rollback() successfully runs on a 
transaction
       // handle if the underlying transaction is already aborted automatically
@@ -666,7 +669,7 @@ public class TestKuduTransaction {
           });
       final String errmsg = ex.getMessage();
       assertTrue(errmsg, errmsg.matches(
-          ".* transaction ID .* is not open: state: ABORTED .*"));
+          ".* transaction ID .* is not open: state: ABORT.*"));
 
       // Verify that KuduTransaction.rollback() successfully runs on both
       // transaction handles if the underlying transaction is already aborted
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index eb8a150..a7c8f24 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -7141,7 +7141,7 @@ TEST_F(ClientTest, TxnBasicOperations) {
     ASSERT_OK(txn->Rollback());
     auto s = txn->Commit(false /* wait */);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-    ASSERT_STR_CONTAINS(s.ToString(), "is not open: state: ABORTED");
+    ASSERT_STR_CONTAINS(s.ToString(), "is not open: state: ABORT");
   }
 
   // TODO(aserbin): uncomment this when other parts of transaction lifecycle
@@ -7190,10 +7190,16 @@ TEST_F(ClientTest, TxnCommit) {
     ASSERT_STR_CONTAINS(cs.ToString(), "transaction is still open");
 
     ASSERT_OK(txn->Rollback());
+    // We need to ASSERT_EVENTUALLY here to allow the abort tasks to complete.
+    // Until then, we may not consider the transaction as complete.
     ASSERT_OK(txn->IsCommitComplete(&is_complete, &cs));
-    ASSERT_TRUE(is_complete);
     ASSERT_TRUE(cs.IsAborted()) << cs.ToString();
-    ASSERT_STR_CONTAINS(cs.ToString(), "transaction has been aborted");
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_OK(txn->IsCommitComplete(&is_complete, &cs));
+      ASSERT_TRUE(is_complete);
+      ASSERT_TRUE(cs.IsAborted()) << cs.ToString();
+      ASSERT_STR_CONTAINS(cs.ToString(), "transaction has been aborted");
+    });
   }
 
   {
@@ -7508,7 +7514,7 @@ TEST_F(ClientTest, TxnKeepAlive) {
     ASSERT_OK(KuduTransaction::Deserialize(client_, txn_token, &serdes_txn));
     auto s = serdes_txn->Commit(false /* wait */);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-    ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORTED");
+    ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORT");
   }
 
   // Begin a new transaction and move the KuduTransaction object out of the
@@ -7531,7 +7537,7 @@ TEST_F(ClientTest, TxnKeepAlive) {
 
     auto s = serdes_txn->Commit(false /* wait */);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-    ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORTED");
+    ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORT");
   }
 
   // Begin a new transaction and move the KuduTransaction object out of the
@@ -7650,7 +7656,7 @@ TEST_F(ClientTest, 
TxnKeepAliveAndUnavailableTxnManagerLongTime) {
   {
     auto s = txn->Commit(false /* wait */);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-    ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORTED");
+    ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORT");
   }
 }
 
diff --git a/src/kudu/client/transaction-internal.cc 
b/src/kudu/client/transaction-internal.cc
index 7d88734..ada717a 100644
--- a/src/kudu/client/transaction-internal.cc
+++ b/src/kudu/client/transaction-internal.cc
@@ -306,6 +306,10 @@ Status KuduTransaction::Data::IsCommitCompleteImpl(
       *is_complete = false;
       *completion_status = Status::IllegalState("transaction is still open");
       break;
+    case TxnStatePB::ABORT_IN_PROGRESS:
+      *is_complete = false;
+      *completion_status = Status::Aborted("transaction is being aborted");
+      break;
     case TxnStatePB::ABORTED:
       *is_complete = true;
       *completion_status = Status::Aborted("transaction has been aborted");
diff --git a/src/kudu/integration-tests/txn_commit-itest.cc 
b/src/kudu/integration-tests/txn_commit-itest.cc
index 722c4b0..2eb87e5 100644
--- a/src/kudu/integration-tests/txn_commit-itest.cc
+++ b/src/kudu/integration-tests/txn_commit-itest.cc
@@ -15,14 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <algorithm>
 #include <atomic>
+#include <cstdint>
 #include <functional>
 #include <map>
 #include <memory>
 #include <string>
 #include <thread>
+#include <unordered_map>
 #include <unordered_set>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -38,8 +40,8 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/txn_id.h"
 #include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/master/mini_master.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
@@ -90,9 +92,9 @@ using kudu::tserver::ParticipantOpPB;
 using std::string;
 using std::thread;
 using std::unique_ptr;
+using std::unordered_map;
 using std::unordered_set;
 using std::vector;
-using strings::Substitute;
 
 namespace kudu {
 namespace itest {
@@ -240,6 +242,34 @@ class TxnCommitITest : public KuduTest {
     return Status::OK();
   }
 
+  // Returns the transaction IDs participated in by 'tablet_id' that have been
+  // aborted on 'num_replicas' replicas.
+  Status GetAbortedTxnsForParticipant(const string& tablet_id, int 
num_replicas,
+                                      vector<TxnId>* aborted_txns) {
+    vector<TxnId> txn_ids;
+    unordered_map<int64_t, int> aborts_per_txn;
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      scoped_refptr<TabletReplica> r;
+      Status s = 
cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletReplica(
+          tablet_id, &r);
+      if (!s.ok()) {
+        continue;
+      }
+      for (const auto& e : r->tablet()->txn_participant()->GetTxnsForTests()) {
+        if (e.state == tablet::TxnState::kAborted) {
+          LookupOrEmplace(&aborts_per_txn, e.txn_id, 0)++;
+        }
+      }
+    }
+    for (const auto& txn_id_and_count : aborts_per_txn) {
+      if (txn_id_and_count.second == num_replicas) {
+        txn_ids.emplace_back(txn_id_and_count.first);
+      }
+    }
+    *aborted_txns = std::move(txn_ids);
+    return Status::OK();
+  }
+
  protected:
   unique_ptr<InternalMiniCluster> cluster_;
   unique_ptr<TxnSystemClient> txn_client_;
@@ -282,6 +312,108 @@ TEST_F(TxnCommitITest, TestBasicCommits) {
   ASSERT_TRUE(is_complete);
 }
 
+TEST_F(TxnCommitITest, TestBasicAborts) {
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
+
+  int num_rows = 0;
+  ASSERT_OK(CountRows(&num_rows));
+  ASSERT_EQ(initial_row_count_, num_rows);
+
+  ASSERT_OK(txn->Rollback());
+  ASSERT_OK(CountRows(&num_rows));
+  ASSERT_EQ(initial_row_count_, num_rows);
+
+  // IsCommitComplete() should verify that the transaction is aborted. We need
+  // to wait for this to happen, since 'is_complete' is contingent on the abort
+  // tasks finishing.
+  ASSERT_EVENTUALLY([&] {
+    Status completion_status;
+    bool is_complete = false;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
+    ASSERT_TRUE(is_complete);
+  });
+
+  // On the participants, we should see the transaction as aborted.
+  ASSERT_EVENTUALLY([&] {
+    for (const auto& tablet_id : participant_ids_) {
+      vector<TxnId> aborted_txns;
+      ASSERT_OK(GetAbortedTxnsForParticipant(tablet_id, 1, &aborted_txns));
+      ASSERT_EQ(1, aborted_txns.size());
+    }
+  });
+}
+
+TEST_F(TxnCommitITest, TestAbortInProgress) {
+  FLAGS_txn_schedule_background_tasks = false;
+  shared_ptr<KuduTransaction> txn;
+  shared_ptr<KuduSession> txn_session;
+  ASSERT_OK(BeginTransaction({}, &txn, &txn_session));
+  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
+  ASSERT_OK(txn->Rollback());
+
+  // When background tasks are disabled, we'll be left in ABORT_IN_PROGRESS,
+  // and we should be able to determine it isn't complete.
+  Status completion_status;
+  bool is_complete = true;
+  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
+  ASSERT_STR_CONTAINS(completion_status.ToString(), "transaction is being 
aborted");
+  ASSERT_FALSE(is_complete);
+
+  // Once enabled, background tasks should take hold and the abort should
+  // complete.
+  FLAGS_txn_schedule_background_tasks = true;
+  auto* mts = cluster_->mini_tablet_server(0);
+  mts->Shutdown();
+  ASSERT_OK(mts->Restart());
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
+    ASSERT_STR_CONTAINS(completion_status.ToString(), "transaction has been 
aborted");
+    ASSERT_TRUE(is_complete);
+  });
+}
+
+TEST_F(TxnCommitITest, TestBackgroundAborts) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  string serialized_txn;
+  {
+    shared_ptr<KuduTransaction> txn;
+    shared_ptr<KuduSession> txn_session;
+    ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
+    ASSERT_OK(InsertToSession(txn_session, initial_row_count_, 
kNumRowsPerTxn));
+
+    int num_rows = 0;
+    ASSERT_OK(CountRows(&num_rows));
+    ASSERT_EQ(initial_row_count_, num_rows);
+    ASSERT_OK(txn->Serialize(&serialized_txn));
+  }
+  // Wait a bit for a background abort to happen.
+  SleepFor(MonoDelta::FromMilliseconds(5 * FLAGS_txn_keepalive_interval_ms));
+
+  // IsCommitComplete() should verify that the transaction is aborted.
+  Status completion_status;
+  bool is_complete = false;
+  shared_ptr<KuduTransaction> txn;
+  ASSERT_OK(KuduTransaction::Deserialize(client_, serialized_txn, &txn));
+  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
+  ASSERT_TRUE(is_complete);
+
+  // On the participants, we should see the transaction as aborted.
+  ASSERT_EVENTUALLY([&] {
+    for (const auto& tablet_id : participant_ids_) {
+      vector<TxnId> aborted_txns;
+      ASSERT_OK(GetAbortedTxnsForParticipant(tablet_id, 1, &aborted_txns));
+      ASSERT_EQ(1, aborted_txns.size());
+    }
+  });
+}
+
 // Test that if we delete the TxnStatusManager while tasks are on-going,
 // nothing goes catastrophically wrong (i.e. no crashes).
 TEST_F(TxnCommitITest, TestCommitWhileDeletingTxnStatusManager) {
@@ -686,36 +818,60 @@ TEST_F(TwoNodeTxnCommitITest, 
TestCommitWhenParticipantsAreDown) {
 }
 
 // Test that when we start up, pending commits will start background tasks to
-// commit.
+// finalize the commit or abort.
 TEST_F(TwoNodeTxnCommitITest, TestStartTasksDuringStartup) {
-  shared_ptr<KuduTransaction> txn;
-  shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
-  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
+  shared_ptr<KuduTransaction> committed_txn;
+  {
+    shared_ptr<KuduSession> txn_session;
+    ASSERT_OK(BeginTransaction(participant_ids_, &committed_txn, 
&txn_session));
+    ASSERT_OK(InsertToSession(txn_session, initial_row_count_, 
kNumRowsPerTxn));
+  }
+  shared_ptr<KuduTransaction> aborted_txn;
+  {
+    shared_ptr<KuduSession> txn_session;
+    ASSERT_OK(BeginTransaction(participant_ids_, &aborted_txn, &txn_session));
+    ASSERT_OK(InsertToSession(txn_session, initial_row_count_ + 
kNumRowsPerTxn, kNumRowsPerTxn));
+  }
 
   // Shut down our participant's tserver so our commit task keeps retrying.
   prt_ts_->Shutdown();
-  ASSERT_OK(txn->Commit(/*wait*/false));
+  ASSERT_OK(committed_txn->Commit(/*wait*/false));
+  ASSERT_OK(aborted_txn->Rollback());
 
   Status completion_status;
   bool is_complete;
-  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_OK(committed_txn->IsCommitComplete(&is_complete, &completion_status));
   ASSERT_FALSE(is_complete);
   ASSERT_TRUE(completion_status.IsIncomplete()) << 
completion_status.ToString();
 
+  ASSERT_OK(aborted_txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
+  ASSERT_FALSE(is_complete);
+
   // Shut down the TxnStatusManager to stop our tasks.
   tsm_ts_->Shutdown();
 
-  // Restart both tservers. The commit task should be restarted and eventually
-  // succeed.
+  // Restart both tservers. The background tasks should be restarted and
+  // eventually succeed.
   ASSERT_OK(prt_ts_->Restart());
   ASSERT_OK(tsm_ts_->Restart());
   ASSERT_EVENTUALLY([&] {
     Status completion_status;
     bool is_complete;
-    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_OK(committed_txn->IsCommitComplete(&is_complete, 
&completion_status));
     ASSERT_OK(completion_status);
     ASSERT_TRUE(is_complete);
+
+    ASSERT_OK(aborted_txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
+    ASSERT_TRUE(is_complete);
+  });
+  ASSERT_EVENTUALLY([&] {
+    for (const auto& tablet_id : participant_ids_) {
+      vector<TxnId> aborted_txns;
+      ASSERT_OK(GetAbortedTxnsForParticipant(tablet_id, 1, &aborted_txns));
+      ASSERT_EQ(1, aborted_txns.size());
+    }
   });
 }
 
@@ -763,18 +919,35 @@ class ThreeNodeTxnCommitITest : public TxnCommitITest {
 
 TEST_F(ThreeNodeTxnCommitITest, TestCommitTasksReloadOnLeadershipChange) {
   FLAGS_txn_schedule_background_tasks = false;
-  shared_ptr<KuduTransaction> txn;
-  shared_ptr<KuduSession> txn_session;
-  ASSERT_OK(BeginTransaction(participant_ids_, &txn, &txn_session));
-  ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
-
-  ASSERT_OK(txn->Commit(/*wait*/ false));
+  shared_ptr<KuduTransaction> committed_txn;
+  shared_ptr<KuduTransaction> aborted_txn;
+  {
+    shared_ptr<KuduSession> txn_session;
+    ASSERT_OK(BeginTransaction(participant_ids_, &committed_txn, 
&txn_session));
+    ASSERT_OK(InsertToSession(txn_session, initial_row_count_, 
kNumRowsPerTxn));
+  }
+  {
+    shared_ptr<KuduSession> txn_session;
+    ASSERT_OK(BeginTransaction(participant_ids_, &aborted_txn, &txn_session));
+    ASSERT_OK(InsertToSession(txn_session, initial_row_count_ + 
kNumRowsPerTxn, kNumRowsPerTxn));
+  }
+  ASSERT_OK(committed_txn->Commit(/*wait*/ false));
   Status completion_status;
   bool is_complete = false;
-  ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_OK(committed_txn->IsCommitComplete(&is_complete, &completion_status));
   ASSERT_TRUE(completion_status.IsIncomplete()) << 
completion_status.ToString();
   ASSERT_FALSE(is_complete);
 
+  ASSERT_OK(aborted_txn->Rollback());
+  ASSERT_OK(aborted_txn->IsCommitComplete(&is_complete, &completion_status));
+  ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
+  ASSERT_FALSE(is_complete);
+  for (const auto& tablet_id : participant_ids_) {
+    vector<TxnId> aborted_txns;
+    ASSERT_OK(GetAbortedTxnsForParticipant(tablet_id, 3, &aborted_txns));
+    ASSERT_EQ(0, aborted_txns.size());
+  }
+
   FLAGS_txn_schedule_background_tasks = true;
   // Change our quiescing states so a new leader can be elected.
   *leader_ts_->server()->mutable_quiescing() = true;
@@ -790,9 +963,23 @@ TEST_F(ThreeNodeTxnCommitITest, 
TestCommitTasksReloadOnLeadershipChange) {
   // Upon becoming leader, we should have started our commit task and completed
   // the commit.
   ASSERT_EVENTUALLY([&] {
-    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_OK(committed_txn->IsCommitComplete(&is_complete, 
&completion_status));
+    ASSERT_TRUE(is_complete);
+  });
+  // The aborted transaction should still be aborted, and we should be able to
+  // validate its abort status in its participants' metadata.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(aborted_txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
     ASSERT_TRUE(is_complete);
   });
+  ASSERT_EVENTUALLY([&] {
+    for (const auto& tablet_id : participant_ids_) {
+      vector<TxnId> aborted_txns;
+      ASSERT_OK(GetAbortedTxnsForParticipant(tablet_id, 3, &aborted_txns));
+      ASSERT_EQ(1, aborted_txns.size());
+    }
+  });
 }
 
 } // namespace itest
diff --git a/src/kudu/integration-tests/txn_status_manager-itest.cc 
b/src/kudu/integration-tests/txn_status_manager-itest.cc
index b2b4df3..baef6be 100644
--- a/src/kudu/integration-tests/txn_status_manager-itest.cc
+++ b/src/kudu/integration-tests/txn_status_manager-itest.cc
@@ -266,7 +266,7 @@ TEST_F(TxnStatusManagerITest, StaleTransactionsCleanup) {
     // and abort it. An extra margin here is to avoid flakiness due to
     // scheduling anomalies.
     SleepFor(MonoDelta::FromMilliseconds(3 * keepalive_interval_ms));
-    NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORTED));
+    NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORT_IN_PROGRESS));
   }
 
   // Check that the transaction staleness is detected and the stale transaction
@@ -321,7 +321,7 @@ TEST_F(TxnStatusManagerITest, 
ToggleStaleTxnTrackerInRuntime) {
   // Check that the transaction staleness is detected and the stale transaction
   // is aborted once stale transaction tracking is re-enabled.
   SleepFor(MonoDelta::FromMilliseconds(3 * keepalive_interval_ms));
-  NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORTED));
+  NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORT_IN_PROGRESS));
 }
 
 // Verify the functionality of the stale transaction tracker in 
TxnStatusManager
@@ -450,7 +450,7 @@ TEST_F(TxnStatusManagerITest, 
TxnKeepAliveMultiTxnStatusManagerInstances) {
   // should be automatically aborted by TxnStatusManager running with the
   // leader replica of the txn status tablet.
   ASSERT_EVENTUALLY([&]{
-    NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORTED));
+    NO_FATALS(CheckTxnState(txn_id, TxnStatePB::ABORT_IN_PROGRESS));
   });
 
   NO_FATALS(cluster_->AssertNoCrashes());
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc 
b/src/kudu/integration-tests/txn_status_table-itest.cc
index 8ceb450..9dd1cf7 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -594,12 +594,12 @@ TEST_F(TxnStatusTableITest, GetTransactionStatus) {
   NO_FATALS(verify_state(TxnStatePB::COMMIT_IN_PROGRESS));
 
   ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser));
-  NO_FATALS(verify_state(TxnStatePB::ABORTED));
+  NO_FATALS(verify_state(TxnStatePB::ABORT_IN_PROGRESS));
 
   {
     auto s = txn_sys_client_->BeginCommitTransaction(1, kUser);
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-    NO_FATALS(verify_state(TxnStatePB::ABORTED));
+    NO_FATALS(verify_state(TxnStatePB::ABORT_IN_PROGRESS));
   }
 
   // In the negative scenarios below, check for the expected status code
diff --git a/src/kudu/master/txn_manager-test.cc 
b/src/kudu/master/txn_manager-test.cc
index 51e81e4..d58f1f9 100644
--- a/src/kudu/master/txn_manager-test.cc
+++ b/src/kudu/master/txn_manager-test.cc
@@ -376,7 +376,7 @@ TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
         << StatusFromPB(resp.error().status()).ToString();
     TxnStatePB txn_state;
     NO_FATALS(fetch_txn_status(txn_id, &txn_state));
-    ASSERT_EQ(TxnStatePB::ABORTED, txn_state);
+    ASSERT_EQ(TxnStatePB::ABORT_IN_PROGRESS, txn_state);
   }
 
   // Try to send keep-alive for already aborted transaction.
@@ -396,7 +396,7 @@ TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
     // The transaction should stay in ABORTED state, of course.
     TxnStatePB txn_state;
     NO_FATALS(fetch_txn_status(txn_id, &txn_state));
-    ASSERT_EQ(TxnStatePB::ABORTED, txn_state);
+    ASSERT_EQ(TxnStatePB::ABORT_IN_PROGRESS, txn_state);
   }
 }
 
diff --git a/src/kudu/transactions/transactions.proto 
b/src/kudu/transactions/transactions.proto
index 7eb13c0..4d7b8f9 100644
--- a/src/kudu/transactions/transactions.proto
+++ b/src/kudu/transactions/transactions.proto
@@ -22,6 +22,7 @@ option java_package = "org.apache.kudu.transactions";
 enum TxnStatePB {
   UNKNOWN = 0;
   OPEN = 1;
+  ABORT_IN_PROGRESS = 5;
   ABORTED = 2;
   COMMIT_IN_PROGRESS = 3;
   COMMITTED = 4;
diff --git a/src/kudu/transactions/txn_status_manager-test.cc 
b/src/kudu/transactions/txn_status_manager-test.cc
index 7dead36..35fa2da 100644
--- a/src/kudu/transactions/txn_status_manager-test.cc
+++ b/src/kudu/transactions/txn_status_manager-test.cc
@@ -373,7 +373,7 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
   typedef std::pair<int64_t, TxnStatePB> IdAndUpdate;
   vector<IdAndUpdate> all_updates;
   for (int i = 0; i < kNumTransactions; i++) {
-    all_updates.emplace_back(std::make_pair(i, TxnStatePB::ABORTED));
+    all_updates.emplace_back(std::make_pair(i, TxnStatePB::ABORT_IN_PROGRESS));
     all_updates.emplace_back(std::make_pair(i, 
TxnStatePB::COMMIT_IN_PROGRESS));
     all_updates.emplace_back(std::make_pair(i, TxnStatePB::COMMITTED));
   }
@@ -390,7 +390,7 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
       const auto& txn_id = updates[i].first;
       TabletServerErrorPB ts_error;
       switch (updates[i].second) {
-        case TxnStatePB::ABORTED:
+        case TxnStatePB::ABORT_IN_PROGRESS:
           statuses[i] = txn_manager_->AbortTransaction(txn_id, kOwner, 
&ts_error);
           break;
         case TxnStatePB::COMMIT_IN_PROGRESS:
@@ -420,7 +420,7 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
       continue;
     }
     switch (updates[i].second) {
-      case TxnStatePB::ABORTED:
+      case TxnStatePB::ABORT_IN_PROGRESS:
         EmplaceIfNotPresent(&txns_with_abort, txn_id);
         break;
       case TxnStatePB::COMMIT_IN_PROGRESS:
@@ -501,7 +501,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     ASSERT_OK(txn_manager_->GetTransactionStatus(
         2, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
-    ASSERT_EQ(TxnStatePB::ABORTED, txn_status.state());
+    ASSERT_EQ(TxnStatePB::ABORT_IN_PROGRESS, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
     ASSERT_EQ(kOwner, txn_status.user());
   }
@@ -538,7 +538,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     ASSERT_OK(txn_manager_->GetTransactionStatus(
         2, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
-    ASSERT_EQ(TxnStatePB::ABORTED, txn_status.state());
+    ASSERT_EQ(TxnStatePB::ABORT_IN_PROGRESS, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
     ASSERT_EQ(kOwner, txn_status.user());
 
diff --git a/src/kudu/transactions/txn_status_manager.cc 
b/src/kudu/transactions/txn_status_manager.cc
index 9111451..21807ce 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <functional>
+#include <iterator>
 #include <mutex>
 #include <ostream>
 #include <string>
@@ -49,6 +50,7 @@
 #include "kudu/util/cow_object.h"
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/scoped_cleanup.h"
@@ -306,6 +308,86 @@ void CommitTasks::FinalizeCommitAsyncTask(int 
participant_idx, const Timestamp&
       std::move(participated_cb));
 }
 
+void CommitTasks::AbortTxnAsyncTask(int participant_idx) {
+  // Status callback called with the result from the participant op.
+  auto participated_cb = [this, participant_idx] (const Status& s) {
+    if (IsShuttingDownCleanupIfLastOp()) {
+      return;
+    }
+    if (PREDICT_FALSE(s.IsTimedOut())) {
+      // Retry timeout errors. Other transient errors should be retried by the
+      // client until timeout.
+      AbortTxnAsyncTask(participant_idx);
+      return;
+    }
+    if (PREDICT_FALSE(s.IsNotFound())) {
+      // If the participant has been deleted, treat it as though it's already
+      // been aborted. The participant's data can't be read anyway.
+      LOG(INFO) << Substitute("Participant $0 was not found: $1",
+                              participant_ids_[participant_idx], s.ToString());
+    } else if (PREDICT_FALSE(!s.ok())) {
+      LOG(WARNING) << Substitute("Participant $0 ABORT_TXN op returned $1",
+                                 participant_ids_[participant_idx], 
s.ToString());
+      stop_task_ = true;
+    }
+    // If this was the last participant op for this task, write the abort
+    // record to the tablet.
+    if (--ops_in_flight_ == 0) {
+      if (IsShuttingDownCleanup()) {
+        return;
+      }
+      ScheduleAbortTxnWrite();
+    }
+  };
+  ParticipantOpPB op_pb;
+  op_pb.set_txn_id(txn_id_.value());
+  op_pb.set_type(ParticipantOpPB::ABORT_TXN);
+  txn_client_->ParticipateInTransactionAsync(
+      participant_ids_[participant_idx],
+      std::move(op_pb),
+      MonoDelta::FromMilliseconds(FLAGS_txn_background_rpc_timeout_ms),
+      std::move(participated_cb));
+}
+
+void CommitTasks::AbortTxnAsync() {
+  // Reset the in-flight counter to indicate we're waiting for this new set of
+  // tasks to complete.
+  if (participant_ids_.empty()) {
+    ScheduleAbortTxnWrite();
+  } else {
+    ops_in_flight_ = participant_ids_.size();
+    for (int i = 0; i < participant_ids_.size(); i++) {
+      AbortTxnAsyncTask(i);
+    }
+  }
+}
+
+void CommitTasks::ScheduleAbortTxnWrite() {
+  // Submit the task to a threadpool.
+  // NOTE: This is called by the reactor thread that catches the BeginCommit
+  // reseponse, so we can't do IO in this thread.
+  DCHECK_EQ(0, ops_in_flight_);
+  scoped_refptr<CommitTasks> scoped_this(this);
+  CHECK_OK(commit_pool_->Submit([this, scoped_this = std::move(scoped_this),
+                                 tsm = txn_status_manager_,
+                                 txn_id = txn_id_] {
+    if (IsShuttingDownCleanup()) {
+      return;
+    }
+    TxnStatusManager::ScopedLeaderSharedLock l(txn_status_manager_);
+    if (PREDICT_TRUE(l.first_failed_status().ok())) {
+      WARN_NOT_OK(tsm->FinalizeAbortTransaction(txn_id.value()),
+                  "Error writing to transaction status table");
+    }
+
+    // Regardless of whether we succeed or fail, remove the commit task.
+    // Presumably we failed either because the replica is being shut down, or
+    // because we're no longer leader. In either case, the task will be retried
+    // once a new leader is elected.
+    tsm->RemoveCommitTask(txn_id, this);
+  }));
+}
+
 void CommitTasks::FinalizeCommitAsync(Timestamp commit_timestamp) {
   // Reset the in-flight counter to indicate we're waiting for this new set of
   // tasks to complete.
@@ -489,12 +571,17 @@ Status TxnStatusManager::LoadFromTabletUnlocked() {
   }
 
   unordered_map<int64_t, scoped_refptr<CommitTasks>> commits_in_flight;
-  unordered_map<int64_t, scoped_refptr<CommitTasks>> new_tasks;
+  unordered_map<int64_t, scoped_refptr<CommitTasks>> new_commits;
+  unordered_map<int64_t, scoped_refptr<CommitTasks>> new_aborts;
   if (txn_client) {
     for (const auto& [txn_id, txn_entry] : txns_by_id) {
       const auto& state = txn_entry->state();
       if (state == TxnStatePB::COMMIT_IN_PROGRESS) {
-        new_tasks.emplace(txn_id,
+        new_commits.emplace(txn_id,
+            new CommitTasks(txn_id, txn_entry->GetParticipantIds(),
+                            txn_client, commit_pool_, this));
+      } else if (state == TxnStatePB::ABORT_IN_PROGRESS) {
+        new_aborts.emplace(txn_id,
             new CommitTasks(txn_id, txn_entry->GetParticipantIds(),
                             txn_client, commit_pool_, this));
       }
@@ -509,13 +596,21 @@ Status TxnStatusManager::LoadFromTabletUnlocked() {
     for (const auto& [_, tasks] : commits_in_flight) {
       tasks->stop();
     }
-    commits_in_flight_ = std::move(new_tasks);
-    if (!commits_in_flight_.empty()) {
-      LOG(INFO) << Substitute("Starting $0 commit tasks", 
commits_in_flight_.size());
-      for (const auto& [_, tasks] : commits_in_flight_) {
+    if (!new_commits.empty()) {
+      LOG(INFO) << Substitute("Starting $0 commit tasks", new_commits.size());
+      for (const auto& [_, tasks] : new_commits) {
         tasks->BeginCommitAsync();
       }
     }
+    if (!new_aborts.empty()) {
+      LOG(INFO) << Substitute("Starting $0 aborts task", new_aborts.size());
+      for (const auto& [_, tasks] : new_aborts) {
+        tasks->AbortTxnAsync();
+      }
+    }
+    commits_in_flight_ = std::move(new_commits);
+    commits_in_flight_.insert(std::make_move_iterator(new_aborts.begin()),
+                              std::make_move_iterator(new_aborts.end()));
   }
   return Status::OK();
 }
@@ -536,13 +631,16 @@ void TxnStatusManager::Shutdown() {
   shutting_down_ = true;
   // Wait for all tasks to complete.
   while (true) {
+    int num_tasks;
     {
       std::lock_guard<simple_spinlock> l(lock_);
-      if (commits_in_flight_.empty()) {
+      num_tasks = commits_in_flight_.size();
+      if (num_tasks == 0) {
         return;
       }
     }
     SleepFor(MonoDelta::FromMilliseconds(50));
+    KLOG_EVERY_N_SECS(INFO, 10) << Substitute("Waiting for $0 task(s) to 
stop", num_tasks);
   }
 }
 
@@ -887,37 +985,82 @@ Status TxnStatusManager::FinalizeCommitTransaction(
   return Status::OK();
 }
 
+Status TxnStatusManager::FinalizeAbortTransaction(int64_t txn_id) {
+  leader_lock_.AssertAcquiredForReading();
+  TabletServerErrorPB ts_error;
+  scoped_refptr<TransactionEntry> txn;
+  RETURN_NOT_OK(GetTransaction(txn_id, /*user*/boost::none, &txn, &ts_error));
+
+  TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
+  const auto& pb = txn_lock.data().pb;
+  const auto& state = pb.state();
+  if (state == TxnStatePB::ABORTED) {
+    return Status::OK();
+  }
+  if (PREDICT_FALSE(state != TxnStatePB::ABORT_IN_PROGRESS)) {
+    return ReportIllegalTxnState(
+        Substitute("transaction ID $0 cannot be aborted: $1",
+                   txn_id, SecureShortDebugString(pb)),
+        &ts_error);
+  }
+  auto* mutable_data = txn_lock.mutable_data();
+  mutable_data->pb.set_state(TxnStatePB::ABORTED);
+  RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb, 
&ts_error));
+  txn_lock.Commit();
+  return Status::OK();
+}
+
 Status TxnStatusManager::AbortTransaction(int64_t txn_id,
-                                          const std::string& user,
+                                          const string& user,
                                           TabletServerErrorPB* ts_error) {
+
   leader_lock_.AssertAcquiredForReading();
+  TxnSystemClient* txn_client;
+  if (PREDICT_TRUE(FLAGS_txn_schedule_background_tasks)) {
+    RETURN_NOT_OK(client_initializer_->GetClient(&txn_client));
+  }
   scoped_refptr<TransactionEntry> txn;
   RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
 
   TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
   const auto& pb = txn_lock.data().pb;
   const auto& state = pb.state();
-  if (state == TxnStatePB::ABORTED) {
+  if (state == TxnStatePB::ABORTED ||
+      state == TxnStatePB::ABORT_IN_PROGRESS) {
     return Status::OK();
   }
+  // TODO(awong): if we're in COMMIT_IN_PROGRESS, we should attempt to abort
+  // any in-flight commit tasks.
   if (PREDICT_FALSE(state != TxnStatePB::OPEN &&
-      state != TxnStatePB::COMMIT_IN_PROGRESS)) {
+                    state != TxnStatePB::COMMIT_IN_PROGRESS)) {
     return ReportIllegalTxnState(
         Substitute("transaction ID $0 cannot be aborted: $1",
                    txn_id, SecureShortDebugString(pb)),
         ts_error);
   }
   auto* mutable_data = txn_lock.mutable_data();
-  mutable_data->pb.set_state(TxnStatePB::ABORTED);
+  mutable_data->pb.set_state(TxnStatePB::ABORT_IN_PROGRESS);
   RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb, 
ts_error));
+
+  if (PREDICT_TRUE(FLAGS_txn_schedule_background_tasks)) {
+    auto participant_ids = txn->GetParticipantIds();
+    std::unique_lock<simple_spinlock> l(lock_);
+    auto [map_iter, emplaced] = commits_in_flight_.emplace(txn_id,
+        new CommitTasks(txn_id, std::move(participant_ids),
+                        txn_client, commit_pool_, this));
+    l.unlock();
+    if (emplaced) {
+      map_iter->second->AbortTxnAsync();
+    }
+  }
   txn_lock.Commit();
   return Status::OK();
 }
 
 Status TxnStatusManager::GetTransactionStatus(
     int64_t txn_id,
-    const std::string& user,
-    transactions::TxnStatusEntryPB* txn_status,
+    const string& user,
+    TxnStatusEntryPB* txn_status,
     TabletServerErrorPB* ts_error) {
   DCHECK(txn_status);
   leader_lock_.AssertAcquiredForReading();
diff --git a/src/kudu/transactions/txn_status_manager.h 
b/src/kudu/transactions/txn_status_manager.h
index b9d94c7..48c4d87 100644
--- a/src/kudu/transactions/txn_status_manager.h
+++ b/src/kudu/transactions/txn_status_manager.h
@@ -101,10 +101,21 @@ class CommitTasks : public 
RefCountedThreadSafe<CommitTasks> {
   // commit record with the given timestamp to be written to the tablet.
   void FinalizeCommitAsync(Timestamp commit_timestamp);
 
+  // Asynchronously sends a ABORT_TXN participant op to each participant in the
+  // transaction, and upon completion, schedules an abort record to be written
+  // to the tablet.
+  void AbortTxnAsync();
+
+  // Asynchronously sends an ABORT_TXN participant op to the participant at the
+  // given index. If this was the last one to complete, schedules an abort
+  // record to be written to the tablet.
+  void AbortTxnAsyncTask(int participant_idx);
+
   // Schedule calls to the TxnStatusManager to be made on the commit pool.
-  // NOTE: this may be called on reactor threads and thus must not
+  // NOTE: these may be called on reactor threads and thus must not
   // synchronously do any IO.
   void ScheduleFinalizeCommitWrite(Timestamp commit_timestamp);
+  void ScheduleAbortTxnWrite();
 
   // Stops further tasks from being run. Once called calls to the above methods
   // should effectively no-op.
@@ -277,12 +288,16 @@ class TxnStatusManager final : public 
tablet::TxnCoordinator {
   Status FinalizeCommitTransaction(int64_t txn_id, Timestamp commit_timestamp,
                                    tserver::TabletServerErrorPB* ts_error) 
override;
 
-  // Aborts the given transaction, returning an error if the transaction
-  // doesn't exist, is committed or not yet opened, or isn't owned by the given
-  // user.
+  // Begins aborting the given transaction, returning an error if the
+  // transaction doesn't exist, is committed or not yet opened, or isn't owned
+  // by the given user.
   Status AbortTransaction(int64_t txn_id, const std::string& user,
                           tserver::TabletServerErrorPB* ts_error) override;
 
+  // Writes a record to the TxnStatusManager indicating the given transaction
+  // has been successfully aborted.
+  Status FinalizeAbortTransaction(int64_t txn_id);
+
   // Retrieves the status of the specified transaction, returning an error if
   // the transaction doesn't exist or isn't owned by the specified user.
   Status GetTransactionStatus(int64_t txn_id,

Reply via email to