This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 97b9827 KUDU-2612: add TxnManager::BeginTransaction()
97b9827 is described below
commit 97b9827199e8190656ddf356695f06a2e54e39b5
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon Oct 12 01:16:22 2020 -0700
KUDU-2612: add TxnManager::BeginTransaction()
This patch adds the implementation of the BeginTransaction() method
to the TxnManager along with tests to cover the newly introduced
functionality.
Change-Id: I51c476d92bb5b147ffd03fd9f3163ab86d581496
Reviewed-on: http://gerrit.cloudera.org:8080/16586
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <[email protected]>
---
src/kudu/master/txn_manager-test.cc | 222 +++++++++++++++++++++++++++--
src/kudu/master/txn_manager.cc | 86 ++++++++++-
src/kudu/master/txn_manager.h | 4 +
src/kudu/transactions/txn_system_client.cc | 7 +-
src/kudu/tserver/tablet_service.cc | 5 +-
5 files changed, 301 insertions(+), 23 deletions(-)
diff --git a/src/kudu/master/txn_manager-test.cc
b/src/kudu/master/txn_manager-test.cc
index 913c03f..47d5fce 100644
--- a/src/kudu/master/txn_manager-test.cc
+++ b/src/kudu/master/txn_manager-test.cc
@@ -23,6 +23,7 @@
#include <ostream>
#include <string>
#include <thread>
+#include <unordered_set>
#include <vector>
#include <gflags/gflags_declare.h>
@@ -37,6 +38,8 @@
#include "kudu/master/txn_manager.proxy.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/rpc/rpc_controller.h"
+#include "kudu/transactions/transactions.pb.h"
+#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/util/barrier.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/sockaddr.h"
@@ -47,14 +50,18 @@
using kudu::cluster::InternalMiniCluster;
using kudu::cluster::InternalMiniClusterOptions;
using kudu::rpc::RpcController;
+using kudu::transactions::TxnStatePB;
using std::string;
using std::thread;
using std::unique_ptr;
+using std::unordered_set;
using std::vector;
DECLARE_bool(txn_manager_enabled);
DECLARE_bool(txn_manager_lazily_initialized);
DECLARE_int32(rpc_service_queue_length);
+DECLARE_int64(txn_manager_status_table_range_partition_span);
+DECLARE_uint32(transaction_keep_alive_interval_ms);
namespace kudu {
namespace transactions {
@@ -75,6 +82,11 @@ class TxnManagerTest : public KuduTest {
FLAGS_txn_manager_enabled = true;
FLAGS_txn_manager_lazily_initialized = true;
+ // Make TxnManager creating new ranges in the transaction status table more
+ // often, so it's not necessary to start too many transactions to see it
+ // switching to a new tablet of the transaction status table.
+ FLAGS_txn_manager_status_table_range_partition_span = 1024;
+
// A few scenarios (e.g. LazyInitializationConcurrentCalls) might require
// an insanely high capacity of the service RPC queue since their workload
// depends on the number of CPU cores available. They can send many
requests
@@ -280,22 +292,206 @@ TEST_F(TxnManagerTest, NonlazyInitialization) {
}
}
-// BeginTransaction implementation is moved into a follow-up changelist.
-// TODO(aserbin): update this scenario once BeginTransction is here
-TEST_F(TxnManagerTest, BeginTransactionRpc) {
- RpcController ctx;
- PrepareRpcController(&ctx);
- BeginTransactionRequestPB req;
- BeginTransactionResponsePB resp;
- ASSERT_OK(proxy_->BeginTransaction(req, &resp, &ctx));
+// This is scenario calls almost all methods of the TxnManager.
+TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
+ const auto fetch_txn_status = [this] (int64_t txn_id, TxnStatePB* state) {
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ GetTransactionStateRequestPB req;
+ GetTransactionStateResponsePB resp;
+ req.set_txn_id(txn_id);
+ ASSERT_OK(proxy_->GetTransactionState(req, &resp, &ctx));
+ ASSERT_FALSE(resp.has_error())
+ << StatusFromPB(resp.error().status()).ToString();
+ ASSERT_TRUE(resp.has_state());
+ *state = resp.state();
+ };
- ASSERT_TRUE(resp.has_error());
- auto s = StatusFromPB(resp.error().status());
- ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
- ASSERT_STR_CONTAINS(
- s.ToString(), "Not implemented: BeginTransaction is not supported yet");
+ int64_t txn_id = -1;
+ {
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ BeginTransactionRequestPB req;
+ BeginTransactionResponsePB resp;
+ ASSERT_OK(proxy_->BeginTransaction(req, &resp, &ctx));
+ ASSERT_FALSE(resp.has_error())
+ << StatusFromPB(resp.error().status()).ToString();
+ ASSERT_TRUE(resp.has_txn_id());
+ txn_id = resp.txn_id();
+ ASSERT_LE(0, txn_id);
+ ASSERT_TRUE(resp.has_keepalive_millis());
+ ASSERT_EQ(FLAGS_transaction_keep_alive_interval_ms,
resp.keepalive_millis());
+ TxnStatePB txn_state;
+ NO_FATALS(fetch_txn_status(txn_id, &txn_state));
+ ASSERT_EQ(TxnStatePB::OPEN, txn_state);
+ }
+
+ {
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ CommitTransactionRequestPB req;
+ CommitTransactionResponsePB resp;
+ req.set_txn_id(txn_id);
+ ASSERT_OK(proxy_->CommitTransaction(req, &resp, &ctx));
+ ASSERT_FALSE(resp.has_error())
+ << StatusFromPB(resp.error().status()).ToString();
+ TxnStatePB txn_state;
+ NO_FATALS(fetch_txn_status(txn_id, &txn_state));
+ ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_state);
+ }
+
+ // TODO(aserbin): add call to KeepTransactionAlive() when TxnStatusManager
+ // has the functionality implemented.
+
+ {
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ AbortTransactionRequestPB req;
+ AbortTransactionResponsePB resp;
+ req.set_txn_id(txn_id);
+ ASSERT_OK(proxy_->AbortTransaction(req, &resp, &ctx));
+ ASSERT_FALSE(resp.has_error())
+ << StatusFromPB(resp.error().status()).ToString();
+ TxnStatePB txn_state;
+ NO_FATALS(fetch_txn_status(txn_id, &txn_state));
+ ASSERT_EQ(TxnStatePB::ABORTED, txn_state);
+ }
+}
+
+TEST_F(TxnManagerTest, BeginManyTransactions) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ // In this functor CHECK_ is used instead of ASSERT_ because it's targeted
+ // for multi-thread use, and ASSERT_ macros do not seem working as expected
+ // in such case.
+ const auto txn_initiator = [this](
+ size_t txn_num,
+ vector<int64_t>* txn_ids) {
+ // Create its own proxy: this is important if trying to create more
+ // concurrency since a proxy serializes RPC calls.
+ TxnManagerServiceProxy p(
+ cluster_->messenger(),
+ cluster_->mini_master()->bound_rpc_addr(),
+ cluster_->mini_master()->bound_rpc_addr().host());
+ int64_t max_txn_id = -1;
+ for (auto id = 0; id < txn_num; ++id) {
+ BeginTransactionResponsePB resp;
+ while (true) {
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ BeginTransactionRequestPB req;
+ resp.Clear();
+ auto s = p.BeginTransaction(req, &resp, &ctx);
+ // The only acceptable non-OK status here is
Status::ServiceUnavailable.
+ if (s.IsServiceUnavailable() ||
+ (resp.has_error() &&
+ StatusFromPB(resp.error().status()).IsServiceUnavailable())) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ continue;
+ }
+ break;
+ }
+ CHECK(!resp.has_error()) <<
StatusFromPB(resp.error().status()).ToString();
+ CHECK(resp.has_txn_id());
+ int64_t txn_id = resp.txn_id();
+ CHECK_GT(txn_id, max_txn_id);
+ max_txn_id = txn_id;
+ if (txn_ids) {
+ txn_ids->emplace_back(txn_id);
+ }
+ CHECK(resp.has_keepalive_millis());
+ CHECK_EQ(FLAGS_transaction_keep_alive_interval_ms,
resp.keepalive_millis());
+ }
+ };
+
+ // First, a simple sequential case: start many transactions one after
another.
+ // The point here is to make sure the TxnManager:
+ // * takes care adding new range partitions to the transaction status table
+ // * transaction identifiers assigned to the newly started transactions are
+ // ** unique
+ // ** increase monotonically
+ {
+ const int64_t kNumTransactions =
+ FLAGS_txn_manager_status_table_range_partition_span * 3;
+
+ // TxnManager is lazily initialized, so no tablets of the transaction
+ // status tablet should be created yet.
+ const auto txn_tablets_before =
+ cluster_->mini_tablet_server(0)->ListTablets();
+ ASSERT_EQ(0, txn_tablets_before.size());
+
+ vector<int64_t> txn_ids;
+ txn_initiator(kNumTransactions, &txn_ids);
+ ASSERT_EQ(kNumTransactions, txn_ids.size());
+ int64_t prev_txn_id = -1;
+ for (const auto& txn_id : txn_ids) {
+ ASSERT_GT(txn_id, prev_txn_id);
+ prev_txn_id = txn_id;
+ }
+
+ // Check that corresponding tablets have been created for the transaction
+ // status table.
+ const auto txn_tablets_after =
+ cluster_->mini_tablet_server(0)->ListTablets();
+ auto expected_tablets_num = 1 +
+ prev_txn_id / FLAGS_txn_manager_status_table_range_partition_span;
+ ASSERT_EQ(expected_tablets_num, txn_tablets_after.size());
+ }
+
+ // A more complex case: run multiple threads, each starting many
transactions.
+ // Make sure the generated transaction identifiers are unique.
+ {
+ const int64_t kNumTransactionsPerThread =
+ FLAGS_txn_manager_status_table_range_partition_span * 2;
+ const int kNumCPUs = base::NumCPUs();
+ const size_t kNumThreads = 2 * kNumCPUs;
+ vector<thread> threads;
+ threads.reserve(kNumThreads);
+ vector<vector<int64_t>> txn_ids_per_thread;
+ txn_ids_per_thread.resize(kNumThreads);
+ for (auto& slice : txn_ids_per_thread) {
+ slice.reserve(kNumTransactionsPerThread);
+ }
+ for (auto idx = 0; idx < kNumThreads; ++idx) {
+ threads.emplace_back(txn_initiator,
+ kNumTransactionsPerThread,
+ &txn_ids_per_thread[idx]);
+ }
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ // Verify the uniqueness of the identifiers across all the threads. Instead
+ // of sort/unique, use std::unordered_set.
+ unordered_set<int64_t> txn_ids;
+ size_t total_size = 0;
+ for (const auto& slice: txn_ids_per_thread) {
+ EXPECT_EQ(kNumTransactionsPerThread, slice.size());
+ txn_ids.insert(slice.begin(), slice.end());
+ total_size += slice.size();
+ }
+ ASSERT_EQ(kNumTransactionsPerThread * kNumThreads, total_size);
+ ASSERT_EQ(total_size, txn_ids.size());
+ }
+
+ // Now start a single transaction to get the highest assigned txn_id so far.
+ // This is to check for the number of tablets in the transaction status table
+ // after all this activity.
+ {
+ vector<int64_t> txn_ids;
+ txn_initiator(1, &txn_ids);
+ ASSERT_EQ(1, txn_ids.size());
+ const auto txn_tablets = cluster_->mini_tablet_server(0)->ListTablets();
+ auto expected_tablets_num = 1 +
+ txn_ids[0] / FLAGS_txn_manager_status_table_range_partition_span;
+ ASSERT_EQ(expected_tablets_num, txn_tablets.size());
+ }
}
+// TODO(aserbin): add test scenarios involving a multi-master Kudu cluster
+// (hence there will be multiple TxnManager instances) and
verify
+// how all this works in case of frequent master re-elections.
+
// KeepTransactionAlive is not yet supported.
// TODO(aserbin): update this scenario once KeepTransactionAlive is implemented
TEST_F(TxnManagerTest, KeepTransactionAliveRpc) {
diff --git a/src/kudu/master/txn_manager.cc b/src/kudu/master/txn_manager.cc
index b0a7e45..aee7ce6 100644
--- a/src/kudu/master/txn_manager.cc
+++ b/src/kudu/master/txn_manager.cc
@@ -73,6 +73,13 @@ DEFINE_int64(txn_manager_status_table_range_partition_span,
1000000,
TAG_FLAG(txn_manager_status_table_range_partition_span, advanced);
TAG_FLAG(txn_manager_status_table_range_partition_span, experimental);
+DEFINE_uint32(transaction_keep_alive_interval_ms, 3000,
+ "Maximum interval (in milliseconds) between subsequent "
+ "keep-alive heartbeats from client to TxnManager to let it know "
+ "that a transaction is alive");
+TAG_FLAG(transaction_keep_alive_interval_ms, runtime);
+TAG_FLAG(transaction_keep_alive_interval_ms, experimental);
+
namespace kudu {
namespace transactions {
@@ -103,6 +110,8 @@ MonoDelta ToDelta(const MonoTime& deadline) {
TxnManager::TxnManager(Master* server)
: is_lazily_initialized_(FLAGS_txn_manager_lazily_initialized),
+ txn_status_table_range_span_(
+ FLAGS_txn_manager_status_table_range_partition_span),
server_(server),
need_init_(true),
initialized_(false),
@@ -112,15 +121,86 @@ TxnManager::TxnManager(Master* server)
TxnManager::~TxnManager() {
}
-Status TxnManager::BeginTransaction(const string& /* username */,
+Status TxnManager::BeginTransaction(const string& username,
const MonoTime& deadline,
int64_t* txn_id,
int32_t* keep_alive_interval_ms) {
DCHECK(txn_id);
DCHECK(keep_alive_interval_ms);
RETURN_NOT_OK(CheckInitialized(deadline));
- // TODO(aserbin): this is implemented in a follow-up changelist
- return Status::NotSupported("BeginTransaction is not supported yet");
+
+ // TxnManager uses next_txn_id_ as a hint for next transaction identifier.
+ //
+ // TODO(aserbin): a better approach is to change TxnStatusManager's
+ // BeginTransaction() to reserve next available transaction
+ // identifier by forwarding initial hint supplied from here
+ // through the chain, where each TxnStatusManager increments
+ // its last seen txn ID and either sends it further
+ // to corresponding TxnManager or writes a record into its own
+ // txn status tablet. The latter ends the forwarding chain
with
+ // the reserved txn ID that is sent to TxnSystemClient with
+ // the response and passed back to be used here as a hint for
+ // txn ID on the next request.
+ int64_t try_txn_id = next_txn_id_++;
+ auto s = Status::TimedOut("timed out while trying to find txn_id");
+ while (MonoTime::Now() < deadline) {
+ int64_t highest_seen_txn_id = -1;
+ s = txn_sys_client_->BeginTransaction(
+ try_txn_id, username, &highest_seen_txn_id, ToDelta(deadline));
+ if (s.ok()) {
+ DCHECK_GE(highest_seen_txn_id, 0);
+ // The idea is to make the thread that has gotten a transaction reserved
+ // with the highest 'highest_seen_txn_id' so far updating 'next_txn_id_'
+ // until it succeeds or bail if there is another thread that received a
+ // greater 'highest_seen_txn_id' back from TxnStatusManager.
+ int64_t stored_txn_id = try_txn_id + 1;
+ while (true) {
+ if (next_txn_id_.compare_exchange_strong(stored_txn_id,
+ highest_seen_txn_id + 1) ||
+ stored_txn_id > highest_seen_txn_id) {
+ break;
+ }
+ }
+ break;
+ }
+ if (s.IsInvalidArgument()) {
+ // TxnStatusManager reports that try_txn_id is too low to be used as
+ // an identifier for a new transaction.
+ DCHECK_GE(highest_seen_txn_id, 0);
+ try_txn_id = highest_seen_txn_id + 1;
+ continue;
+ }
+ if (s.IsNotFound()) {
+ // No tablet exists to store the record for a new transaction identified
+ // by 'try_txn_id', so it's time to add a new range into the transaction
+ // status table. By current design, TxnManager is the entity to take care
+ // of this: the TxnStatusManager cannot do that on its own because it is
+ // a logical entity just on top of a tablet corresponding to a single
+ // range of transaction identifiers.
+ //
+ // TODO(aserbin): make the range step controllable via a gflag and
+ // retrieve the information on the high bound of the last
+ // used transaction range from the tablet's metadata.
+ const auto lb = (try_txn_id / txn_status_table_range_span_) *
+ txn_status_table_range_span_;
+ const auto hb = lb + txn_status_table_range_span_;
+ auto new_range_status = txn_sys_client_->AddTxnStatusTableRange(lb, hb);
+ if (new_range_status.ok() || new_range_status.IsAlreadyPresent()) {
+ // OK, the tablet corresponding to the new range now exists.
+ // Status::AlreadyPresent() might be returned if there were concurrent
+ // requests to add the same range to the transaction status table.
+ continue;
+ }
+ return new_range_status.CloneAndPrepend(
+ "cannot add a new range to the transaction status table");
+ }
+ break;
+ }
+ if (s.ok()) {
+ *txn_id = try_txn_id;
+ *keep_alive_interval_ms = FLAGS_transaction_keep_alive_interval_ms;
+ }
+ return s;
}
Status TxnManager::CommitTransaction(int64_t txn_id,
diff --git a/src/kudu/master/txn_manager.h b/src/kudu/master/txn_manager.h
index 6fca78a..f852799 100644
--- a/src/kudu/master/txn_manager.h
+++ b/src/kudu/master/txn_manager.h
@@ -102,6 +102,10 @@ class TxnManager final {
// Whether or not this instance is lazily initialized.
const bool is_lazily_initialized_;
+ // The span of a range partition to use when adding new ranges to the
+ // transaction status table.
+ const int64_t txn_status_table_range_span_;
+
// The pointer to the top-level Master object. From the lifecycle
perspective,
// Master is assumed to be alive during the whole lifecycle of TxnManager
// since the Master is the component that hosts TxnManager (this is similar
diff --git a/src/kudu/transactions/txn_system_client.cc
b/src/kudu/transactions/txn_system_client.cc
index 2dc17c9..092eb5d 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -138,11 +138,8 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id,
s.AsStatusCallback(),
&result));
const auto ret = s.Wait();
- if (ret.ok() || ret.IsInvalidArgument()) {
- DCHECK(result.has_highest_seen_txn_id());
- if (highest_seen_txn_id) {
- *highest_seen_txn_id = result.highest_seen_txn_id();
- }
+ if (result.has_highest_seen_txn_id() && highest_seen_txn_id) {
+ *highest_seen_txn_id = result.highest_seen_txn_id();
}
return ret;
}
diff --git a/src/kudu/tserver/tablet_service.cc
b/src/kudu/tserver/tablet_service.cc
index 1c66aa0..38f5386 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1252,7 +1252,8 @@ void TabletServiceAdminImpl::CoordinateTransaction(const
CoordinateTransactionRe
int64_t highest_seen_txn_id = -1;
switch (op.type()) {
case CoordinatorOpPB::BEGIN_TXN:
- s = txn_coordinator->BeginTransaction(txn_id, user,
&highest_seen_txn_id, &ts_error);
+ s = txn_coordinator->BeginTransaction(
+ txn_id, user, &highest_seen_txn_id, &ts_error);
break;
case CoordinatorOpPB::REGISTER_PARTICIPANT:
s = txn_coordinator->RegisterParticipant(txn_id,
op.txn_participant_id(), user, &ts_error);
@@ -1281,7 +1282,7 @@ void TabletServiceAdminImpl::CoordinateTransaction(const
CoordinateTransactionRe
// Populate corresponding field in the response.
*(resp->mutable_op_result()->mutable_txn_status()) = std::move(txn_status);
}
- if (op.type() == CoordinatorOpPB::BEGIN_TXN) {
+ if (op.type() == CoordinatorOpPB::BEGIN_TXN && !s.IsServiceUnavailable()) {
DCHECK_GE(highest_seen_txn_id, 0);
resp->mutable_op_result()->set_highest_seen_txn_id(highest_seen_txn_id);
}