This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 0f9ff5f KUDU-2612 p6 (b): add {Abort,BeginCommit}Transaction() to
TxnSystemClient
0f9ff5f is described below
commit 0f9ff5ff043125be4a1150be0306373619b4ca89
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Sep 11 19:08:00 2020 -0700
KUDU-2612 p6 (b): add {Abort,BeginCommit}Transaction() to TxnSystemClient
This patch adds to AbortTransaction() and BeginCommitTransaction()
methods into the TxnSystemClient class. The newly added code follows
suit of the logic implemented in TxnSystemClient::BeginTransaction()
method. Added corresponding tests as well.
This is a follow-up to cb1c2efb59373453e734074a02021f14c403257d.
Change-Id: I84558b13664d89c7f1769df2483f2bae5a49260b
Reviewed-on: http://gerrit.cloudera.org:8080/16443
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
.../integration-tests/txn_status_table-itest.cc | 75 +++++++++++++++++++++-
src/kudu/transactions/txn_system_client.cc | 28 ++++++++
src/kudu/transactions/txn_system_client.h | 18 ++++--
3 files changed, 115 insertions(+), 6 deletions(-)
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc
b/src/kudu/integration-tests/txn_status_table-itest.cc
index 8c5c2a3..d26a1a7 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -351,14 +351,27 @@ TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
+ ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser));
// If we write out of range, we should see an error.
- Status s = txn_sys_client_->BeginTransaction(100, kUser);
- ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ {
+ auto s = txn_sys_client_->BeginTransaction(100, kUser);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ }
+ {
+ auto s = txn_sys_client_->BeginCommitTransaction(100, kUser);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ }
+ {
+ auto s = txn_sys_client_->AbortTransaction(100, kUser);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ }
// Once we add a new range, we should be able to leverage it.
ASSERT_OK(txn_sys_client_->AddTxnStatusTableRange(100, 200));
ASSERT_OK(txn_sys_client_->BeginTransaction(100, kUser));
+ ASSERT_OK(txn_sys_client_->BeginCommitTransaction(100, kUser));
+ ASSERT_OK(txn_sys_client_->AbortTransaction(100, kUser));
}
TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
@@ -417,6 +430,64 @@ TEST_F(TxnStatusTableITest,
TestSystemClientRegisterParticipantErrors) {
ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
}
+TEST_F(TxnStatusTableITest, SystemClientCommitAndAbortTransaction) {
+ ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
+ ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
+
+ ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
+ ASSERT_OK(txn_sys_client_->BeginCommitTransaction(1, kUser));
+ // Calling BeginCommitTransaction() on already committing transaction is OK.
+ ASSERT_OK(txn_sys_client_->BeginCommitTransaction(1, kUser));
+ // It's completely legal to abort a transaction that has entered the commit
+ // phase but hasn't finalized yet.
+ ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser));
+ // Aborting already aborted transaction is fine (aborting is idempotent).
+ ASSERT_OK(txn_sys_client_->AbortTransaction(1, kUser));
+
+ // Even if the transaction is aborted, an attempt to start another
transaction
+ // with already used ID should yield an error.
+ {
+ auto s = txn_sys_client_->BeginTransaction(1, kUser);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
+ }
+
+ // An attempt to start committing a non-existent transaction should report
+ // an error.
+ {
+ auto s = txn_sys_client_->BeginCommitTransaction(2, kUser);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not found");
+ }
+
+ // An attempt to abort a non-existent transaction should report an error.
+ {
+ auto s = txn_sys_client_->AbortTransaction(2, kUser);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not found");
+ }
+
+ // An attempt to commit already aborted transaction should fail with an
error.
+ {
+ ASSERT_OK(txn_sys_client_->BeginTransaction(2, kUser));
+ ASSERT_OK(txn_sys_client_->AbortTransaction(2, kUser));
+ auto s = txn_sys_client_->BeginCommitTransaction(2, kUser);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 is not open");
+ }
+
+ // Commit/Abort transaction should be called as of the transaction's owner.
+ {
+ ASSERT_OK(txn_sys_client_->BeginTransaction(3, kUser));
+ auto s = txn_sys_client_->BeginCommitTransaction(3, "stranger");
+ ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 3 not owned by
stranger");
+ s = txn_sys_client_->AbortTransaction(3, "stranger");
+ ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 3 not owned by
stranger");
+ }
+}
+
// Test that a transaction system client can make concurrent calls to multiple
// transaction status tablets.
TEST_F(TxnStatusTableITest, TestSystemClientConcurrentCalls) {
diff --git a/src/kudu/transactions/txn_system_client.cc
b/src/kudu/transactions/txn_system_client.cc
index fa6bbc6..3cb1d4e 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -146,6 +146,34 @@ Status TxnSystemClient::RegisterParticipant(int64_t
txn_id, const string& partic
return s.Wait();
}
+Status TxnSystemClient::BeginCommitTransaction(int64_t txn_id,
+ const string& user,
+ MonoDelta timeout) {
+ CoordinatorOpPB coordinate_txn_op;
+ coordinate_txn_op.set_type(CoordinatorOpPB::BEGIN_COMMIT_TXN);
+ coordinate_txn_op.set_txn_id(txn_id);
+ coordinate_txn_op.set_user(user);
+ Synchronizer s;
+ RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op),
+ timeout,
+ s.AsStatusCallback()));
+ return s.Wait();
+}
+
+Status TxnSystemClient::AbortTransaction(int64_t txn_id,
+ const string& user,
+ MonoDelta timeout) {
+ CoordinatorOpPB coordinate_txn_op;
+ coordinate_txn_op.set_type(CoordinatorOpPB::ABORT_TXN);
+ coordinate_txn_op.set_txn_id(txn_id);
+ coordinate_txn_op.set_user(user);
+ Synchronizer s;
+ RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op),
+ timeout,
+ s.AsStatusCallback()));
+ return s.Wait();
+}
+
Status TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB
coordinate_txn_op,
const MonoDelta& timeout,
const StatusCallback& cb) {
diff --git a/src/kudu/transactions/txn_system_client.h
b/src/kudu/transactions/txn_system_client.h
index 2477feb..b2b74fd 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -70,23 +70,33 @@ class TxnSystemClient {
return AddTxnStatusTableRangeWithClient(lower_bound, upper_bound,
client_.get());
}
+ // TODO(awong): in the methods below with 'timeout' parameter,
+ // pass a deadline instead of a timeout so we can more easily
+ // associate it with potential user-specified deadlines.
+
// Attempts to create a transaction with the given 'txn_id'.
// Returns an error if the transaction ID has already been taken, or if there
// was an error writing to the transaction status table.
- // TODO(awong): pass a deadline instead of a timeout so we can more easily
- // associate it with potential user-specified deadlines.
Status BeginTransaction(int64_t txn_id, const std::string& user,
MonoDelta timeout = MonoDelta::FromSeconds(10));
// Attempts to register the given participant with the given transaction.
// Returns an error if the transaction hasn't yet been started, or if the
// 'user' isn't permitted to modify the transaction.
- // TODO(awong): pass a deadline instead of a timeout so we can more easily
- // associate it with potential user-specified deadlines.
Status RegisterParticipant(int64_t txn_id, const std::string& participant_id,
const std::string& user,
MonoDelta timeout = MonoDelta::FromSeconds(10));
+ // Initiates committing a transaction with the given identifier.
+ Status BeginCommitTransaction(int64_t txn_id,
+ const std::string& user,
+ MonoDelta timeout =
MonoDelta::FromSeconds(10));
+
+ // Aborts a transaction with the given identifier.
+ Status AbortTransaction(int64_t txn_id,
+ const std::string& user,
+ MonoDelta timeout = MonoDelta::FromSeconds(10));
+
// Opens the transaction status table, refreshing metadata with that from the
// masters.
Status OpenTxnStatusTable();