This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f9ff5f  KUDU-2612 p6 (b): add {Abort,BeginCommit}Transaction() to 
TxnSystemClient
0f9ff5f is described below

commit 0f9ff5ff043125be4a1150be0306373619b4ca89
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Sep 11 19:08:00 2020 -0700

    KUDU-2612 p6 (b): add {Abort,BeginCommit}Transaction() to TxnSystemClient
    
    This patch adds to AbortTransaction() and BeginCommitTransaction()
    methods into the TxnSystemClient class.  The newly added code follows
    suit of the logic implemented in TxnSystemClient::BeginTransaction()
    method.  Added corresponding tests as well.
    
    This is a follow-up to cb1c2efb59373453e734074a02021f14c403257d.
    
    Change-Id: I84558b13664d89c7f1769df2483f2bae5a49260b
    Reviewed-on: http://gerrit.cloudera.org:8080/16443
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 .../integration-tests/txn_status_table-itest.cc    | 75 +++++++++++++++++++++-
 src/kudu/transactions/txn_system_client.cc         | 28 ++++++++
 src/kudu/transactions/txn_system_client.h          | 18 ++++--
 3 files changed, 115 insertions(+), 6 deletions(-)

diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc 
b/src/kudu/integration-tests/txn_status_table-itest.cc
index 8c5c2a3..d26a1a7 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -351,14 +351,27 @@ TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
   ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
   ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
   ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
+  ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser));
 
   // If we write out of range, we should see an error.
-  Status s = txn_sys_client_->BeginTransaction(100, kUser);
-  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  {
+    auto s = txn_sys_client_->BeginTransaction(100, kUser);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  }
+  {
+    auto s = txn_sys_client_->BeginCommitTransaction(100, kUser);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  }
+  {
+    auto s = txn_sys_client_->AbortTransaction(100, kUser);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  }
 
   // Once we add a new range, we should be able to leverage it.
   ASSERT_OK(txn_sys_client_->AddTxnStatusTableRange(100, 200));
   ASSERT_OK(txn_sys_client_->BeginTransaction(100, kUser));
+  ASSERT_OK(txn_sys_client_->BeginCommitTransaction(100, kUser));
+  ASSERT_OK(txn_sys_client_->AbortTransaction(100, kUser));
 }
 
 TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
@@ -417,6 +430,64 @@ TEST_F(TxnStatusTableITest, 
TestSystemClientRegisterParticipantErrors) {
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
 }
 
+TEST_F(TxnStatusTableITest, SystemClientCommitAndAbortTransaction) {
+  ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
+  ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
+
+  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
+  ASSERT_OK(txn_sys_client_->BeginCommitTransaction(1, kUser));
+  // Calling BeginCommitTransaction() on already committing transaction is OK.
+  ASSERT_OK(txn_sys_client_->BeginCommitTransaction(1, kUser));
+  // It's completely legal to abort a transaction that has entered the commit
+  // phase but hasn't finalized yet.
+  ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser));
+  // Aborting already aborted transaction is fine (aborting is idempotent).
+  ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser));
+
+  // Even if the transaction is aborted, an attempt to start another 
transaction
+  // with already used ID should yield an error.
+  {
+    auto s = txn_sys_client_->BeginTransaction(1, kUser);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
+  }
+
+  // An attempt to start committing a non-existent transaction should report
+  // an error.
+  {
+    auto s = txn_sys_client_->BeginCommitTransaction(2, kUser);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not found");
+  }
+
+  // An attempt to abort a non-existent transaction should report an error.
+  {
+    auto s = txn_sys_client_->AbortTransaction(2, kUser);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not found");
+  }
+
+  // An attempt to commit already aborted transaction should fail with an 
error.
+  {
+    ASSERT_OK(txn_sys_client_->BeginTransaction(2, kUser));
+    ASSERT_OK(txn_sys_client_->AbortTransaction(2, kUser));
+    auto s = txn_sys_client_->BeginCommitTransaction(2, kUser);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 is not open");
+  }
+
+  // Commit/Abort transaction should be called as of the transaction's owner.
+  {
+    ASSERT_OK(txn_sys_client_->BeginTransaction(3, kUser));
+    auto s = txn_sys_client_->BeginCommitTransaction(3, "stranger");
+    ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 3 not owned by 
stranger");
+    s = txn_sys_client_->AbortTransaction(3, "stranger");
+    ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 3 not owned by 
stranger");
+  }
+}
+
 // Test that a transaction system client can make concurrent calls to multiple
 // transaction status tablets.
 TEST_F(TxnStatusTableITest, TestSystemClientConcurrentCalls) {
diff --git a/src/kudu/transactions/txn_system_client.cc 
b/src/kudu/transactions/txn_system_client.cc
index fa6bbc6..3cb1d4e 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -146,6 +146,34 @@ Status TxnSystemClient::RegisterParticipant(int64_t 
txn_id, const string& partic
   return s.Wait();
 }
 
+Status TxnSystemClient::BeginCommitTransaction(int64_t txn_id,
+                                               const string& user,
+                                               MonoDelta timeout) {
+  CoordinatorOpPB coordinate_txn_op;
+  coordinate_txn_op.set_type(CoordinatorOpPB::BEGIN_COMMIT_TXN);
+  coordinate_txn_op.set_txn_id(txn_id);
+  coordinate_txn_op.set_user(user);
+  Synchronizer s;
+  RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op),
+                                           timeout,
+                                           s.AsStatusCallback()));
+  return s.Wait();
+}
+
+Status TxnSystemClient::AbortTransaction(int64_t txn_id,
+                                         const string& user,
+                                         MonoDelta timeout) {
+  CoordinatorOpPB coordinate_txn_op;
+  coordinate_txn_op.set_type(CoordinatorOpPB::ABORT_TXN);
+  coordinate_txn_op.set_txn_id(txn_id);
+  coordinate_txn_op.set_user(user);
+  Synchronizer s;
+  RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op),
+                                           timeout,
+                                           s.AsStatusCallback()));
+  return s.Wait();
+}
+
 Status TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB 
coordinate_txn_op,
                                                    const MonoDelta& timeout,
                                                    const StatusCallback& cb) {
diff --git a/src/kudu/transactions/txn_system_client.h 
b/src/kudu/transactions/txn_system_client.h
index 2477feb..b2b74fd 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -70,23 +70,33 @@ class TxnSystemClient {
     return AddTxnStatusTableRangeWithClient(lower_bound, upper_bound, 
client_.get());
   }
 
+  // TODO(awong): in the methods below with 'timeout' parameter,
+  //              pass a deadline instead of a timeout so we can more easily
+  //              associate it with potential user-specified deadlines.
+
   // Attempts to create a transaction with the given 'txn_id'.
   // Returns an error if the transaction ID has already been taken, or if there
   // was an error writing to the transaction status table.
-  // TODO(awong): pass a deadline instead of a timeout so we can more easily
-  // associate it with potential user-specified deadlines.
   Status BeginTransaction(int64_t txn_id, const std::string& user,
                           MonoDelta timeout = MonoDelta::FromSeconds(10));
 
   // Attempts to register the given participant with the given transaction.
   // Returns an error if the transaction hasn't yet been started, or if the
   // 'user' isn't permitted to modify the transaction.
-  // TODO(awong): pass a deadline instead of a timeout so we can more easily
-  // associate it with potential user-specified deadlines.
   Status RegisterParticipant(int64_t txn_id, const std::string& participant_id,
                              const std::string& user,
                              MonoDelta timeout = MonoDelta::FromSeconds(10));
 
+  // Initiates committing a transaction with the given identifier.
+  Status BeginCommitTransaction(int64_t txn_id,
+                                const std::string& user,
+                                MonoDelta timeout = 
MonoDelta::FromSeconds(10));
+
+  // Aborts a transaction with the given identifier.
+  Status AbortTransaction(int64_t txn_id,
+                          const std::string& user,
+                          MonoDelta timeout = MonoDelta::FromSeconds(10));
+
   // Opens the transaction status table, refreshing metadata with that from the
   // masters.
   Status OpenTxnStatusTable();

Reply via email to