This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 9306e41 KUDU-2612: initial implementation of TxnManager
9306e41 is described below
commit 9306e41c32c8a03e1aedd682b9e158066baba7f4
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Aug 27 17:08:20 2020 -0700
KUDU-2612: initial implementation of TxnManager
This is a first implementation of the TxnManager. The TxnManager class
encapsulates the logic used by the TxnManagerService while serving RPC
requests (see txn_manager.proto for the protobuf interface). The most
essential piece of the logic to be implemented by this class is the
assignment of an identifier for a new transaction and initialization
of the transaction status table, along with creating of its new
partitions, when needed. All other methods simply do proxying of
corresponding requests to the underlying instance of TxnSystemClient.
This changelist also contains test scenarios to cover the newly
introduced functionality.
The implementation of TxnManager::BeginTransaction() is moved into
a follow-up changelist by request for simplify the process of reviewing
these changes.
TxnManager::KeepTransactionAlive() will be implemented as soon
as the corresponding functionality is ready in the TxnStatusManager.
Change-Id: Ie952977a3ae5f625d1283389f0be8afb79df7d8c
Reviewed-on: http://gerrit.cloudera.org:8080/16527
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Andrew Wong <[email protected]>
Reviewed-by: Hao Hao <[email protected]>
---
src/kudu/master/CMakeLists.txt | 22 +-
src/kudu/master/master.cc | 94 ++++++++-
src/kudu/master/master.h | 28 ++-
src/kudu/master/txn_manager-test.cc | 316 +++++++++++++++++++++++++++++
src/kudu/master/txn_manager.cc | 225 ++++++++++++++++++++
src/kudu/master/txn_manager.h | 131 ++++++++++++
src/kudu/master/txn_manager.proto | 146 +++++++++++++
src/kudu/master/txn_manager_service.cc | 168 +++++++++++++++
src/kudu/master/txn_manager_service.h | 82 ++++++++
src/kudu/transactions/txn_status_manager.h | 4 +-
src/kudu/transactions/txn_system_client.cc | 2 +
11 files changed, 1203 insertions(+), 15 deletions(-)
diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 1bcc6de..2706e47 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -32,6 +32,22 @@ ADD_EXPORTABLE_LIBRARY(master_proto
DEPS ${MASTER_KRPC_LIBS}
NONLINK_DEPS ${MASTER_KRPC_TGTS})
+KRPC_GENERATE(
+ TXN_MANAGER_KRPC_SRCS TXN_MANAGER_KRPC_HDRS TXN_MANAGER_KRPC_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES txn_manager.proto)
+set(TXN_MANAGER_KRPC_LIBS
+ consensus_metadata_proto
+ krpc
+ protobuf
+ transactions_proto
+ wire_protocol_proto)
+ADD_EXPORTABLE_LIBRARY(txn_manager_proto
+ SRCS ${TXN_MANAGER_KRPC_SRCS}
+ DEPS ${TXN_MANAGER_KRPC_LIBS}
+ NONLINK_DEPS ${TXN_MANAGER_KRPC_TGTS})
+
set(MASTER_SRCS
authz_provider.cc
auto_rebalancer.cc
@@ -52,7 +68,9 @@ set(MASTER_SRCS
table_locations_cache_metrics.cc
table_metrics.cc
ts_descriptor.cc
- ts_manager.cc)
+ ts_manager.cc
+ txn_manager.cc
+ txn_manager_service.cc)
add_library(master ${MASTER_SRCS})
target_link_libraries(master
@@ -73,6 +91,7 @@ target_link_libraries(master
server_process
tablet
token_proto
+ txn_manager_proto
transactions
tserver)
@@ -100,6 +119,7 @@ ADD_KUDU_TEST(placement_policy-test)
ADD_KUDU_TEST(sys_catalog-test RESOURCE_LOCK "master-web-port")
ADD_KUDU_TEST(ts_descriptor-test DATA_FILES ../scripts/first_argument.sh)
ADD_KUDU_TEST(ts_state-test)
+ADD_KUDU_TEST(txn_manager-test RESOURCE_LOCK "master-web-port")
#########################################
# kudu-master
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index 66ed824..f44fbd0 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -44,6 +44,8 @@
#include "kudu/master/master_path_handlers.h"
#include "kudu/master/master_service.h"
#include "kudu/master/ts_manager.h"
+#include "kudu/master/txn_manager.h"
+#include "kudu/master/txn_manager_service.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/service_if.h"
@@ -53,6 +55,7 @@
#include "kudu/tserver/tablet_copy_service.h"
#include "kudu/tserver/tablet_service.h"
#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
#include "kudu/util/maintenance_manager.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
@@ -91,11 +94,15 @@ DEFINE_string(location_mapping_cmd, "",
"characters from the set [a-zA-Z0-9_-.]. If the cluster is not "
"using location awareness features this flag should not be
set.");
+DECLARE_bool(txn_manager_lazily_initialized);
+DECLARE_bool(txn_manager_enabled);
using kudu::consensus::RaftPeerPB;
using kudu::fs::ErrorHandlerType;
using kudu::rpc::ServiceIf;
using kudu::security::TokenSigner;
+using kudu::transactions::TxnManager;
+using kudu::transactions::TxnManagerServiceImpl;
using kudu::tserver::ConsensusServiceImpl;
using kudu::tserver::TabletCopyServiceImpl;
using std::min;
@@ -161,12 +168,13 @@ Status GetMasterEntryForHost(const
shared_ptr<rpc::Messenger>& messenger,
} // anonymous namespace
Master::Master(const MasterOptions& opts)
- : KuduServer("Master", opts, "kudu.master"),
- state_(kStopped),
- catalog_manager_(new CatalogManager(this)),
- path_handlers_(new MasterPathHandlers(this)),
- opts_(opts),
- registration_initialized_(false) {
+ : KuduServer("Master", opts, "kudu.master"),
+ state_(kStopped),
+ catalog_manager_(new CatalogManager(this)),
+ txn_manager_(FLAGS_txn_manager_enabled ? new TxnManager(this) : nullptr),
+ path_handlers_(new MasterPathHandlers(this)),
+ opts_(opts),
+ registration_initialized_(false) {
const auto& location_cmd = FLAGS_location_mapping_cmd;
if (!location_cmd.empty()) {
location_cache_.reset(new LocationCache(location_cmd,
metric_entity_.get()));
@@ -232,17 +240,30 @@ Status Master::StartAsync() {
new ConsensusServiceImpl(this, catalog_manager_.get()));
unique_ptr<ServiceIf> tablet_copy_service(
new TabletCopyServiceImpl(this, catalog_manager_.get()));
+ unique_ptr<ServiceIf> txn_manager_service(
+ txn_manager_ ? new TxnManagerServiceImpl(this) : nullptr);
RETURN_NOT_OK(RegisterService(std::move(impl)));
RETURN_NOT_OK(RegisterService(std::move(consensus_service)));
RETURN_NOT_OK(RegisterService(std::move(tablet_copy_service)));
+ if (txn_manager_service) {
+ RETURN_NOT_OK(RegisterService(std::move(txn_manager_service)));
+ }
RETURN_NOT_OK(KuduServer::Start());
// Now that we've bound, construct our ServerRegistrationPB.
RETURN_NOT_OK(InitMasterRegistration());
// Start initializing the catalog manager.
- RETURN_NOT_OK(init_pool_->Submit([this]() { this->InitCatalogManagerTask();
}));
+ RETURN_NOT_OK(init_pool_->Submit([this]() {
+ this->InitCatalogManagerTask();
+ }));
+
+ if (txn_manager_ && !FLAGS_txn_manager_lazily_initialized) {
+ // Start initializing the TxnManager.
+ RETURN_NOT_OK(ScheduleTxnManagerInit());
+ }
+
state_ = kRunning;
return Status::OK();
@@ -253,7 +274,7 @@ void Master::InitCatalogManagerTask() {
if (!s.ok()) {
LOG(ERROR) << "Unable to init master catalog manager: " << s.ToString();
}
- init_status_.Set(s);
+ catalog_manager_init_status_.Set(s);
}
Status Master::InitCatalogManager() {
@@ -267,8 +288,62 @@ Status Master::InitCatalogManager() {
Status Master::WaitForCatalogManagerInit() const {
CHECK_EQ(state_, kRunning);
+ return catalog_manager_init_status_.Get();
+}
+
+Status Master::ScheduleTxnManagerInit() {
+ DCHECK(txn_manager_);
+ return init_pool_->Submit([this]() { this->InitTxnManagerTask(); });
+}
+
+void Master::InitTxnManagerTask() {
+ DCHECK(txn_manager_);
+ // For successful TxnManager's initialization it's necessary to have enough
+ // tablet servers running in a Kudu cluster. Since Kudu master can be started
+ // up in environments where tablet servers start long after the master's
+ // startup, this task retries indefinitely to initialize TxnManager and
+ // make it ready to handle requests in case of non-lazy initialization mode
+ // (the latter is controlled by the --txn_manager_lazily_initialized flag).
+ Status s;
+ while (true) {
+ if (state_ == kStopping || state_ == kStopped) {
+ s = Status::Incomplete("shut down while trying to initialize
TxnManager");
+ break;
+ }
+ s = InitTxnManager();
+ if (s.ok()) {
+ break;
+ }
+ // TODO(aserbin): if retrying every second looks too often, consider adding
+ // exponential back-off and adding condition variables to
+ // wake up a long-awaiting task and retry initialization
+ // right away when TxnManager receives a call.
+ static const MonoDelta kRetryInterval = MonoDelta::FromSeconds(1);
+ KLOG_EVERY_N_SECS(WARNING, 60) << Substitute(
+ "$0: unable to init TxnManager, will retry in $1",
+ s.ToString(), kRetryInterval.ToString());
+ SleepFor(kRetryInterval);
+ }
+ txn_manager_init_status_.Set(s);
+}
+
+Status Master::InitTxnManager() {
+ if (!txn_manager_) {
+ return Status::IllegalState("TxnManager is not enabled");
+ }
+ RETURN_NOT_OK_PREPEND(txn_manager_->Init(), "unable to initialize
TxnManager");
+ return Status::OK();
+}
- return init_status_.Get();
+Status Master::WaitForTxnManagerInit(const MonoDelta& timeout) const {
+ CHECK_EQ(state_, kRunning);
+ if (timeout.Initialized()) {
+ const Status* s = txn_manager_init_status_.WaitFor(timeout);
+ if (!s) {
+ return Status::TimedOut("timed out waiting for TxnManager to
initialize");
+ }
+ }
+ return txn_manager_init_status_.Get();
}
Status Master::WaitUntilCatalogManagerIsLeaderAndReadyForTests(const
MonoDelta& timeout) {
@@ -326,6 +401,7 @@ void Master::ShutdownImpl() {
if (kInitialized == state_ || kRunning == state_) {
const string name = rpc_server_->ToString();
LOG(INFO) << "Master@" << name << " shutting down...";
+ state_ = kStopping;
// 1. Stop accepting new RPCs.
UnregisterAllServices();
diff --git a/src/kudu/master/master.h b/src/kudu/master/master.h
index 6522126..86ad225 100644
--- a/src/kudu/master/master.h
+++ b/src/kudu/master/master.h
@@ -48,6 +48,10 @@ namespace security {
class TokenSigner;
} // namespace security
+namespace transactions {
+class TxnManager;
+} // namespace transactions
+
namespace master {
class CatalogManager;
@@ -71,6 +75,7 @@ class Master : public kserver::KuduServer {
Status StartAsync();
Status WaitForCatalogManagerInit() const;
+ Status WaitForTxnManagerInit(const MonoDelta& timeout = {}) const;
// Wait until this Master's catalog manager instance is the leader and is
ready.
// This method is intended for use by unit tests.
@@ -86,6 +91,8 @@ class Master : public kserver::KuduServer {
CatalogManager* catalog_manager() { return catalog_manager_.get(); }
+ transactions::TxnManager* txn_manager() { return txn_manager_.get(); }
+
const MasterOptions& opts() { return opts_; }
LocationCache* location_cache() { return location_cache_.get(); }
@@ -126,10 +133,15 @@ class Master : public kserver::KuduServer {
private:
friend class MasterTest;
friend class CatalogManager;
+ friend class transactions::TxnManager;
void InitCatalogManagerTask();
Status InitCatalogManager();
+ void InitTxnManagerTask();
+ Status InitTxnManager();
+ Status ScheduleTxnManagerInit();
+
// Initialize registration_.
// Requires that the web server and RPC server have been started.
Status InitMasterRegistration();
@@ -142,7 +154,8 @@ class Master : public kserver::KuduServer {
enum MasterState {
kStopped,
kInitialized,
- kRunning
+ kRunning,
+ kStopping,
};
MasterState state_;
@@ -150,13 +163,20 @@ class Master : public kserver::KuduServer {
std::unique_ptr<MasterCertAuthority> cert_authority_;
std::unique_ptr<security::TokenSigner> token_signer_;
std::unique_ptr<CatalogManager> catalog_manager_;
+ std::unique_ptr<transactions::TxnManager> txn_manager_;
std::unique_ptr<MasterPathHandlers> path_handlers_;
- // The status of the master initialization. This is set
+ // The status of the catalog manager initialization. This is set
// by the async initialization task.
- Promise<Status> init_status_;
+ Promise<Status> catalog_manager_init_status_;
+
+ // The status of the TxnManager initialization. This is set by an
asynchronous
+ // initialization task. The task to initialize TxnManager can be scheduled
+ // either by master upon its start or from within TxnManager itself while
+ // processing the very first RPC after start.
+ Promise<Status> txn_manager_init_status_;
- // For initializing the catalog manager.
+ // For initializing the catalog manager and TxnManager.
std::unique_ptr<ThreadPool> init_pool_;
MasterOptions opts_;
diff --git a/src/kudu/master/txn_manager-test.cc
b/src/kudu/master/txn_manager-test.cc
new file mode 100644
index 0000000..913c03f
--- /dev/null
+++ b/src/kudu/master/txn_manager-test.cc
@@ -0,0 +1,316 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/master/txn_manager.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/master/master.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/master/txn_manager.pb.h"
+#include "kudu/master/txn_manager.proxy.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/util/barrier.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using kudu::cluster::InternalMiniCluster;
+using kudu::cluster::InternalMiniClusterOptions;
+using kudu::rpc::RpcController;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+DECLARE_bool(txn_manager_enabled);
+DECLARE_bool(txn_manager_lazily_initialized);
+DECLARE_int32(rpc_service_queue_length);
+
+namespace kudu {
+namespace transactions {
+
+class TxnManagerTest : public KuduTest {
+ protected:
+ TxnManagerTest()
+ : master_(nullptr) {
+ // Master is necessary since it hosts the TxnManager RPC service.
+ opts_.num_masters = 1;
+ // At least one tablet server is necessary to host transaction status
+ // tablets.
+ opts_.num_tablet_servers = 1;
+ }
+
+ void SetUp() override {
+ // Explicitly setting the flags just for better readability.
+ FLAGS_txn_manager_enabled = true;
+ FLAGS_txn_manager_lazily_initialized = true;
+
+ // A few scenarios (e.g. LazyInitializationConcurrentCalls) might require
+ // an insanely high capacity of the service RPC queue since their workload
+ // depends on the number of CPU cores available. They can send many
requests
+ // at once to stress the system while verifying important invariants.
+ FLAGS_rpc_service_queue_length = 10000;
+
+ KuduTest::SetUp();
+ cluster_.reset(new InternalMiniCluster(env_, opts_));
+ ASSERT_OK(Start());
+ }
+
+ void TearDown() override {
+ cluster_->Shutdown();
+ KuduTest::TearDown();
+ }
+
+ // Prepare the cluster to run test scenarios: start the cluster, wait for
+ // TxnManager initialization, and setup TxnManager's proxy.
+ Status Start() {
+ RETURN_NOT_OK(cluster_->Start());
+ // InternalMiniCluster::Start() resets the set of mini-masters: need to
+ // update the shortcut pointer.
+ master_ = cluster_->mini_master()->master();
+ if (!FLAGS_txn_manager_lazily_initialized) {
+
RETURN_NOT_OK(cluster_->mini_master()->master()->WaitForTxnManagerInit());
+ }
+
+ proxy_.reset(new TxnManagerServiceProxy(
+ cluster_->messenger(),
+ cluster_->mini_master()->bound_rpc_addr(),
+ cluster_->mini_master()->bound_rpc_addr().host()));
+ return Status::OK();
+ }
+
+ static void PrepareRpcController(RpcController* ctx) {
+ static const MonoDelta kRpcTimeout = MonoDelta::FromSeconds(30);
+ ASSERT_NE(nullptr, ctx);
+ ctx->set_timeout(kRpcTimeout);
+ }
+
+ InternalMiniClusterOptions opts_;
+ unique_ptr<InternalMiniCluster> cluster_;
+ unique_ptr<TxnManagerServiceProxy> proxy_;
+ // A shortcut to the Master object.
+ master::Master* master_;
+};
+
+// Verify the basic functionality when TxnManager is lazily initialized.
+TEST_F(TxnManagerTest, LazyInitialization) {
+ // The lazy initialization mode is on by default.
+ ASSERT_TRUE(FLAGS_txn_manager_lazily_initialized);
+ ASSERT_TRUE(master_->txn_manager()->is_lazily_initialized_);
+ ASSERT_FALSE(master_->txn_manager()->initialized_);
+
+ // Timeout is not very relevant here, it only limits the amount of time to
+ // wait. By default, the lazy initialization mode is on, so TxnManager
+ // should not be initialized.
+ {
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(1);
+ auto s = master_->WaitForTxnManagerInit(kTimeout);
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "timed out waiting for TxnManager to initialize");
+ }
+
+ // Since the lazy initialization mode is on and there haven't been any calls
+ // to the TxnManager so far, the TxnManager should not be initialized.
+ ASSERT_FALSE(master_->txn_manager()->initialized_);
+
+ // Make a call to TxnManager using a not-yet-seen txn_id. The result should
+ // be an error, but after this TxnManager should become initialized.
+ {
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ GetTransactionStateRequestPB req;
+ GetTransactionStateResponsePB resp;
+ req.set_txn_id(0);
+ ASSERT_OK(proxy_->GetTransactionState(req, &resp, &ctx));
+ ASSERT_TRUE(resp.has_error());
+ auto s = StatusFromPB(resp.error().status());
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 0 not found");
+ }
+
+ ASSERT_OK(master_->WaitForTxnManagerInit());
+ ASSERT_TRUE(master_->txn_manager()->initialized_);
+
+ // Current implementation starts assigning transaction identifiers with 0,
+ // and the very first range partition created upon initialization covers
+ // 0+ range. If making a call to TxnManager using a negative txn_id, then
+ // the result shows the fact that the corresponding tablet doesn't exist.
+ {
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ AbortTransactionRequestPB req;
+ AbortTransactionResponsePB resp;
+ req.set_txn_id(-1);
+ ASSERT_OK(proxy_->AbortTransaction(req, &resp, &ctx));
+ ASSERT_TRUE(resp.has_error());
+ auto s = StatusFromPB(resp.error().status());
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "No tablet covering the requested range partition");
+ }
+
+ // Shutdown and start the cluster again. This is to verify that
initialization
+ // code works as expected when the transaction status table already exists.
+ cluster_->Shutdown();
+ ASSERT_OK(Start());
+ ASSERT_FALSE(master_->txn_manager()->initialized_);
+
+ {
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ CommitTransactionRequestPB req;
+ CommitTransactionResponsePB resp;
+ req.set_txn_id(0);
+ ASSERT_OK(proxy_->CommitTransaction(req, &resp, &ctx));
+ ASSERT_TRUE(resp.has_error());
+ auto s = StatusFromPB(resp.error().status());
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 0 not found");
+ }
+ ASSERT_OK(master_->WaitForTxnManagerInit());
+ ASSERT_TRUE(master_->txn_manager()->initialized_);
+}
+
+// Scenario to verify that the lazy initialization of the TxnManager works as
+// expected in the presence of multiple concurrent calls.
+TEST_F(TxnManagerTest, LazyInitializationConcurrentCalls) {
+ // In this functor CHECK_ is used instead of ASSERT_ because it's targeted
+ // for multi-thread use.
+ const auto txn_initiator = [this](size_t txn_num, Barrier* b) {
+ // Create its own proxy: this is important if trying to create more
+ // concurrency since a proxy serializes RPC calls.
+ TxnManagerServiceProxy p(
+ cluster_->messenger(),
+ cluster_->mini_master()->bound_rpc_addr(),
+ cluster_->mini_master()->bound_rpc_addr().host());
+ for (auto id = 0; id < txn_num; ++id) {
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ GetTransactionStateRequestPB req;
+ GetTransactionStateResponsePB resp;
+ req.set_txn_id(id);
+ b->Wait();
+ CHECK_OK(proxy_->GetTransactionState(req, &resp, &ctx));
+ CHECK(resp.has_error());
+ auto s = StatusFromPB(resp.error().status());
+ CHECK(s.IsNotFound()) << s.ToString();
+ }
+ };
+
+ static constexpr int64_t kNumCallsPerThread = 8;
+ const int kNumCPUs = base::NumCPUs();
+ const size_t kNumThreads = 2 * kNumCPUs;
+ vector<thread> threads;
+ threads.reserve(kNumThreads);
+ Barrier barrier(kNumThreads);
+ for (auto idx = 0; idx < kNumThreads; ++idx) {
+ threads.emplace_back(txn_initiator, kNumCallsPerThread, &barrier);
+ }
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ // TxnManager should be initialized, of course.
+ ASSERT_OK(master_->WaitForTxnManagerInit());
+ ASSERT_TRUE(master_->txn_manager()->initialized_);
+}
+
+// Verify the basic functionality when TxnManager is initialized in a
+// non-lazy manner.
+TEST_F(TxnManagerTest, NonlazyInitialization) {
+ FLAGS_txn_manager_lazily_initialized = false;
+ cluster_.reset(new InternalMiniCluster(env_, opts_));
+ ASSERT_OK(Start());
+ ASSERT_FALSE(master_->txn_manager()->is_lazily_initialized_);
+ // Eventually, TxnManager should come up initialized: master initializes
+ // it on startup in case of non-lazy initialization mode.
+ ASSERT_OK(master_->WaitForTxnManagerInit());
+ ASSERT_TRUE(master_->txn_manager()->initialized_);
+
+ // Shutdown and start the cluster again. This is to verify that
initialization
+ // code works as expected when the transaction status table already exists.
+ cluster_->Shutdown();
+ ASSERT_OK(Start());
+ ASSERT_FALSE(master_->txn_manager()->is_lazily_initialized_);
+ ASSERT_OK(master_->WaitForTxnManagerInit());
+ ASSERT_TRUE(master_->txn_manager()->initialized_);
+
+ {
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ CommitTransactionRequestPB req;
+ CommitTransactionResponsePB resp;
+ req.set_txn_id(0);
+ ASSERT_OK(proxy_->CommitTransaction(req, &resp, &ctx));
+ ASSERT_TRUE(resp.has_error());
+ auto s = StatusFromPB(resp.error().status());
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 0 not found");
+ }
+}
+
+// BeginTransaction implementation is moved into a follow-up changelist.
+// TODO(aserbin): update this scenario once BeginTransction is here
+TEST_F(TxnManagerTest, BeginTransactionRpc) {
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ BeginTransactionRequestPB req;
+ BeginTransactionResponsePB resp;
+ ASSERT_OK(proxy_->BeginTransaction(req, &resp, &ctx));
+
+ ASSERT_TRUE(resp.has_error());
+ auto s = StatusFromPB(resp.error().status());
+ ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+ ASSERT_STR_CONTAINS(
+ s.ToString(), "Not implemented: BeginTransaction is not supported yet");
+}
+
+// KeepTransactionAlive is not yet supported.
+// TODO(aserbin): update this scenario once KeepTransactionAlive is implemented
+TEST_F(TxnManagerTest, KeepTransactionAliveRpc) {
+ RpcController ctx;
+ PrepareRpcController(&ctx);
+ KeepTransactionAliveRequestPB req;
+ req.set_txn_id(0);
+ KeepTransactionAliveResponsePB resp;
+ ASSERT_OK(proxy_->KeepTransactionAlive(req, &resp, &ctx));
+ ASSERT_TRUE(resp.has_error());
+ auto s = StatusFromPB(resp.error().status());
+ ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+ ASSERT_STR_CONTAINS(
+ s.ToString(), "Not implemented: KeepTransactionAlive is not supported
yet");
+}
+
+} // namespace transactions
+} // namespace kudu
diff --git a/src/kudu/master/txn_manager.cc b/src/kudu/master/txn_manager.cc
new file mode 100644
index 0000000..b0a7e45
--- /dev/null
+++ b/src/kudu/master/txn_manager.cc
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/master/txn_manager.h"
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <ostream>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/port.h"
+#include "kudu/master/master.h"
+#include "kudu/master/txn_manager.pb.h"
+#include "kudu/transactions/txn_system_client.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+
+using kudu::master::Master;
+using kudu::transactions::TxnStatusEntryPB;
+using std::string;
+using std::vector;
+
+// TODO(aserbin): remove the flag once the txn-related work is complete.
+DEFINE_bool(txn_manager_enabled, false,
+ "Whether to enable TxnManager (this enables a new feature)");
+TAG_FLAG(txn_manager_enabled, hidden);
+TAG_FLAG(txn_manager_enabled, experimental);
+
+// For Kudu clusters which use transactions we should prefer non-lazy
+// initialization of the TxnManager. That's to reduce the latency of the very
+// first call processed by the TxnManager after it starts.
+//
+// TODO(aserbin): maybe, consider changing the default value of this flag to
+// 'false' once proper tablet type filtering is implemented or
+// --txn_manager_lazily_initialized=true is added for existing
+// test scenarios which assume there isn't a single table
+// in the cluster at the time they start.
+DEFINE_bool(txn_manager_lazily_initialized, true,
+ "Whether to initialize TxnManager upon arrival of first request. "
+ "Otherwise, TxnManager is initialized upon master's startup.");
+TAG_FLAG(txn_manager_lazily_initialized, advanced);
+TAG_FLAG(txn_manager_lazily_initialized, experimental);
+
+// TODO(aserbin): clarify on the proper value for the span of the transaction
+// status table's range partition. At this point it's not yet
+// crystal clear what criteria are essential when defining one.
+DEFINE_int64(txn_manager_status_table_range_partition_span, 1000000,
+ "A span for a status table's range partition. Once TxnManager "
+ "detects there isn't a backing tablet for a transaction "
+ "identifier, it adds a new range partition in the transaction "
+ "status table with the lower bound equal to the upper bound of "
+ "the previous range and (lower bound + span) as the upper
bound.");
+TAG_FLAG(txn_manager_status_table_range_partition_span, advanced);
+TAG_FLAG(txn_manager_status_table_range_partition_span, experimental);
+
+namespace kudu {
+namespace transactions {
+
+namespace {
+
+// If 's' is not OK and 'resp' has no application specific error set,
+// set the error field of 'resp' to match 's' and set the code to
+// UNKNOWN_ERROR.
+template<class RespClass>
+void CheckRespErrorOrSetUnknown(const Status& s, RespClass* resp) {
+ if (!s.ok() && !resp->has_error()) {
+ StatusToPB(s, resp->mutable_error()->mutable_status());
+ resp->mutable_error()->set_code(TxnManagerErrorPB::UNKNOWN_ERROR);
+ }
+}
+
+// Conversion of a deadline specified for an RPC into a timeout, i.e.
+// convert a point in time to a delta between current time and the specified
+// point in time.
+MonoDelta ToDelta(const MonoTime& deadline) {
+ MonoDelta timeout = deadline == MonoTime::Max()
+ ? MonoDelta::FromNanoseconds(std::numeric_limits<int64_t>::max())
+ : deadline - MonoTime::Now();
+ return timeout;
+}
+
+} // anonymous namespace
+
+TxnManager::TxnManager(Master* server)
+ : is_lazily_initialized_(FLAGS_txn_manager_lazily_initialized),
+ server_(server),
+ need_init_(true),
+ initialized_(false),
+ next_txn_id_(0) {
+}
+
+TxnManager::~TxnManager() {
+}
+
+Status TxnManager::BeginTransaction(const string& /* username */,
+ const MonoTime& deadline,
+ int64_t* txn_id,
+ int32_t* keep_alive_interval_ms) {
+ DCHECK(txn_id);
+ DCHECK(keep_alive_interval_ms);
+ RETURN_NOT_OK(CheckInitialized(deadline));
+ // TODO(aserbin): this is implemented in a follow-up changelist
+ return Status::NotSupported("BeginTransaction is not supported yet");
+}
+
+Status TxnManager::CommitTransaction(int64_t txn_id,
+ const string& username,
+ const MonoTime& deadline) {
+ RETURN_NOT_OK(CheckInitialized(deadline));
+ return txn_sys_client_->BeginCommitTransaction(
+ txn_id, username, ToDelta(deadline));
+}
+
+Status TxnManager::GetTransactionState(int64_t txn_id,
+ const string& username,
+ const MonoTime& deadline,
+ TxnStatusEntryPB* txn_status) {
+ DCHECK(txn_status);
+ RETURN_NOT_OK(CheckInitialized(deadline));
+ return txn_sys_client_->GetTransactionStatus(
+ txn_id, username, txn_status, ToDelta(deadline));
+}
+
+Status TxnManager::AbortTransaction(int64_t txn_id,
+ const string& username,
+ const MonoTime& deadline) {
+ RETURN_NOT_OK(CheckInitialized(deadline));
+ return txn_sys_client_->AbortTransaction(txn_id, username,
ToDelta(deadline));
+}
+
+Status TxnManager::KeepTransactionAlive(int64_t /* txn_id */,
+ const string& /* username */,
+ const MonoTime& deadline) {
+ RETURN_NOT_OK(CheckInitialized(deadline));
+ // TODO(aserbin): call txn_sys_client_ once the functionality is there
+ return Status::NotSupported("KeepTransactionAlive is not supported yet");
+}
+
+// This method isn't supposed to be called concurrently, so there isn't any
+// protection against concurrent calls.
+Status TxnManager::Init() {
+ DCHECK(!initialized_);
+ if (initialized_) {
+ return Status::IllegalState("already initialized");
+ }
+ vector<HostPort> hostports;
+ RETURN_NOT_OK(server_->GetMasterHostPorts(&hostports));
+ vector<string> master_addrs;
+ master_addrs.reserve(hostports.size());
+ for (const auto& hp : hostports) {
+ master_addrs.emplace_back(hp.ToString());
+ }
+ RETURN_NOT_OK(TxnSystemClient::Create(master_addrs, &txn_sys_client_));
+ DCHECK(txn_sys_client_);
+ auto s = txn_sys_client_->CreateTxnStatusTable(
+ FLAGS_txn_manager_status_table_range_partition_span);
+ if (!s.ok() && !s.IsAlreadyPresent()) {
+ // Status::OK() is expected only on the very first call to Init() before
+ // the transaction status table is created.
+ return s;
+ }
+ RETURN_NOT_OK(txn_sys_client_->OpenTxnStatusTable());
+ initialized_ = true;
+ return Status::OK();
+}
+
+Status TxnManager::CheckInitialized(const MonoTime& deadline) {
+ static const auto kTransientErrStatus = Status::ServiceUnavailable(
+ "TxnManager is not yet initialized");
+
+ if (initialized_) {
+ return Status::OK();
+ }
+ if (!is_lazily_initialized_) {
+ return kTransientErrStatus;
+ }
+
+ // In case of lazy initialization, calls to TxnManager trigger the
+ // initialization of the object.
+ bool need_init = true;
+ if (need_init_.compare_exchange_strong(need_init, false)) {
+ auto s = server_->ScheduleTxnManagerInit();
+ // In a rare case of failure, let next call schedule the init.
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG(WARNING) << s.ToString();
+ need_init_ = true;
+ return kTransientErrStatus;
+ }
+ }
+ // TODO(aserbin): subtract a small portion of the timeout to respond a bit
+ // earlier the deadline, otherwise client might consider
+ // the call timing out, but we want to deliver
+ // ServiceUnavailable() status instead.
+ auto s = server_->WaitForTxnManagerInit(
+ deadline.Initialized() ? ToDelta(deadline) : MonoDelta());
+ if (s.IsTimedOut()) {
+ // The state of not-yet-initialized TxnManager is a transitional one,
+ // so callers are assumed to retry and succeed eventually.
+ return kTransientErrStatus;
+ }
+ return s;
+}
+
+} // namespace transactions
+} // namespace kudu
diff --git a/src/kudu/master/txn_manager.h b/src/kudu/master/txn_manager.h
new file mode 100644
index 0000000..6fca78a
--- /dev/null
+++ b/src/kudu/master/txn_manager.h
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class MonoTime;
+
+namespace master {
+class Master;
+}
+
+namespace transactions {
+
+class TxnStatusEntryPB;
+class TxnSystemClient;
+
+// This class encapsulates the logic used by the TxnManagerService while
serving
+// RPC requests (see txn_manager.proto for the protobuf interface). The most
+// essential piece of the logic implemented by this class is the assignment of
+// an identifier for a new transaction. All other methods simply do proxying
+// of corresponding requests to the underlying instance of TxnSystemClient
+// aggregated by this class.
+class TxnManager final {
+ public:
+ explicit TxnManager(master::Master* server);
+ ~TxnManager();
+
+ // Use next transaction identifier and call BeginTransaction() via
+ // txn_sys_client_, adjusting the highest seen txn_id.
+ Status BeginTransaction(const std::string& username,
+ const MonoTime& deadline,
+ int64_t* txn_id,
+ int32_t* keep_alive_interval_ms);
+
+ // Initiate the commit phase for the transaction. The control is returned
+ // right after initiating the commit phase: the caller can check for the
+ // completion of the commit phase using the GetTransactionState() RPC.
+ // So, in some sense this is an asynchronous method.
+ Status CommitTransaction(int64_t txn_id,
+ const std::string& username,
+ const MonoTime& deadline);
+
+ // The three method below proxy calls to the underlying txn_sys_client_.
+ Status AbortTransaction(int64_t txn_id,
+ const std::string& username,
+ const MonoTime& deadline);
+
+ Status GetTransactionState(int64_t txn_id,
+ const std::string& username,
+ const MonoTime& deadline,
+ TxnStatusEntryPB* txn_status);
+
+ Status KeepTransactionAlive(int64_t txn_id,
+ const std::string& username,
+ const MonoTime& deadline);
+
+ private:
+ friend class master::Master;
+ FRIEND_TEST(TxnManagerTest, LazyInitialization);
+ FRIEND_TEST(TxnManagerTest, NonlazyInitialization);
+ FRIEND_TEST(TxnManagerTest, LazyInitializationConcurrentCalls);
+
+ // Initialize the internals: create transaction status table (if not exists),
+ // create and initialize txn_sys_client_, etc. This method should not be
+ // called concurrently or multiple times.
+ Status Init();
+
+ // Return Status::OK() if TxnManager is initialized, otherwise return
+ // Status::ServiceUnavailable() unless TxnManager is configured to initialize
+ // lazily. If the latter, schedule the initialization via master's thread
+ // pool and wait for that to be completed no later than prescribed
+ // by the 'deadline' parameter. A non-initialized instance of MonoTime
+ // passed as the 'deadline' parameter has a special meaning: wait
indefinitely
+ // for the initialization of the TxnManager.
+ Status CheckInitialized(const MonoTime& deadline);
+
+ // Whether or not this instance is lazily initialized.
+ const bool is_lazily_initialized_;
+
+ // The pointer to the top-level Master object. From the lifecycle
perspective,
+ // Master is assumed to be alive during the whole lifecycle of TxnManager
+ // since the Master is the component that hosts TxnManager (this is similar
+ // to the relations between the CatalogManager and the Master).
+ master::Master* server_;
+
+ // TxnSystemClient instance to communicate with TxnStatusManager.
+ std::unique_ptr<TxnSystemClient> txn_sys_client_;
+
+ // Whether it's necessary to schedule the initialization of this TxnManager
+ // instance (this is relavant only in case of lazily initialized instance).
+ std::atomic<bool> need_init_;
+
+ // Whether this object is initialized. In case of lazily initialized
instance,
+ // the approach with atomics performs better compared with the approach using
+ // the 'standard' synchronization primitives.
+ std::atomic<bool> initialized_;
+
+ // The next_txn_id_ is used as a hint for next transaction identifier to try
+ // when assigning an identifier to a new transaction.
+ std::atomic<int64_t> next_txn_id_;
+
+ DISALLOW_COPY_AND_ASSIGN(TxnManager);
+};
+
+} // namespace transactions
+} // namespace kudu
diff --git a/src/kudu/master/txn_manager.proto
b/src/kudu/master/txn_manager.proto
new file mode 100644
index 0000000..cec41a7
--- /dev/null
+++ b/src/kudu/master/txn_manager.proto
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu.transactions;
+
+option java_package = "org.apache.kudu.transactions";
+
+import "kudu/common/wire_protocol.proto";
+import "kudu/rpc/rpc_header.proto";
+import "kudu/transactions/transactions.proto";
+
+// TxnManager specific errors.
+message TxnManagerErrorPB {
+ enum Code {
+ // An error which has no more specific error code.
+ // The code and message in 'status' may reveal more details.
+ //
+ // RPCs should avoid returning this, since callers will not be
+ // able to easily parse the error.
+ UNKNOWN_ERROR = 1;
+
+ // TODO(aserbin): introduce specific error codes, if necessary
+ }
+
+ // The error code.
+ required Code code = 1;
+
+ // The Status object for the error. This includes a text message that may be
+ // more useful to present in log messages, though its error code is less
+ // specific.
+ required AppStatusPB status = 2;
+}
+
+////////////////////////////////////////////////////////////
+// RPCs
+////////////////////////////////////////////////////////////
+
+message BeginTransactionRequestPB {
+}
+
+message BeginTransactionResponsePB {
+ // Information on error, if any occurred.
+ optional TxnManagerErrorPB error = 1;
+
+ // Assigned transaction identifier.
+ optional int64 txn_id = 2;
+
+ // The keep-alive interval (in milliseconds) to keep the transaction alive.
+ // TxnManager expects the client to send keep-alive heartbeats spaced by
+ // keepalive_millis interval.
+ optional uint32 keepalive_millis = 3;
+}
+
+message CommitTransactionRequestPB {
+ optional int64 txn_id = 1;
+}
+
+message CommitTransactionResponsePB {
+ // Information on error, if any occurred.
+ optional TxnManagerErrorPB error = 1;
+}
+
+message AbortTransactionRequestPB {
+ optional int64 txn_id = 1;
+}
+
+message AbortTransactionResponsePB {
+ // Information on error, if any occurred.
+ optional TxnManagerErrorPB error = 1;
+}
+
+message GetTransactionStateRequestPB {
+ optional int64 txn_id = 1;
+}
+
+message GetTransactionStateResponsePB {
+ // Information on error, if any occurred.
+ optional TxnManagerErrorPB error = 1;
+
+ // The transaction state at the time of processing the request.
+ optional TxnStatePB state = 2;
+}
+
+
+message KeepTransactionAliveRequestPB {
+ optional int64 txn_id = 1;
+}
+
+message KeepTransactionAliveResponsePB {
+ // Information on error, if any occurred.
+ optional TxnManagerErrorPB error = 1;
+}
+
+// Feature flags to detect incompatibilities between newer and older versions.
+enum TxnManagerFeatures {
+ UNKNOWN_FEATURE = 0;
+}
+
+// By design, TxnManager is a stateless proxy for TxnStatusTablet to serve
+// as a convenient facade for Kudu clients to work with. TxnManagerService
+// provides RPC interface to access TxnManager's functionality.
+service TxnManagerService {
+ // Set the default authz method to something invalid, so that if
+ // we forget to set the option on a new RPC call, we'll get a build
+ // failure.
+ option (kudu.rpc.default_authz_method) = "MUST_SET_AUTHZ_PER_RPC";
+
+ rpc BeginTransaction(BeginTransactionRequestPB)
+ returns (BeginTransactionResponsePB) {
+ option (kudu.rpc.authz_method) = "AuthorizeClient";
+ }
+
+ rpc CommitTransaction(CommitTransactionRequestPB)
+ returns (CommitTransactionResponsePB) {
+ option (kudu.rpc.authz_method) = "AuthorizeClient";
+ }
+
+ rpc AbortTransaction(AbortTransactionRequestPB)
+ returns (AbortTransactionResponsePB) {
+ option (kudu.rpc.authz_method) = "AuthorizeClient";
+ }
+
+ rpc GetTransactionState(GetTransactionStateRequestPB)
+ returns (GetTransactionStateResponsePB) {
+ option (kudu.rpc.authz_method) = "AuthorizeClient";
+ }
+
+ rpc KeepTransactionAlive(KeepTransactionAliveRequestPB)
+ returns (KeepTransactionAliveResponsePB) {
+ option (kudu.rpc.authz_method) = "AuthorizeClient";
+ }
+}
diff --git a/src/kudu/master/txn_manager_service.cc
b/src/kudu/master/txn_manager_service.cc
new file mode 100644
index 0000000..a86b965
--- /dev/null
+++ b/src/kudu/master/txn_manager_service.cc
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/master/txn_manager_service.h"
+
+#include <cstdint>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/port.h"
+#include "kudu/master/master.h"
+#include "kudu/master/txn_manager.h"
+#include "kudu/master/txn_manager.pb.h"
+#include "kudu/rpc/remote_user.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/server/server_base.h"
+#include "kudu/transactions/transactions.pb.h"
+#include "kudu/util/status.h"
+
+using kudu::master::Master;
+using kudu::rpc::RpcContext;
+using kudu::server::ServerBase;
+using kudu::transactions::TxnStatusEntryPB;
+
+using std::string;
+
+namespace kudu {
+namespace transactions {
+
+namespace {
+
+// If 's' is not OK and 'resp' has no application specific error set,
+// set the error field of 'resp' to match 's' and set the code to
+// UNKNOWN_ERROR.
+template<class RespClass>
+void CheckRespErrorOrSetUnknown(const Status& s, RespClass* resp) {
+ if (!s.ok() && !resp->has_error()) {
+ StatusToPB(s, resp->mutable_error()->mutable_status());
+ resp->mutable_error()->set_code(TxnManagerErrorPB::UNKNOWN_ERROR);
+ }
+}
+
+const auto kMissingTxnId =
+ Status::InvalidArgument("missing transaction identifier");
+
+} // anonymous namespace
+
+TxnManagerServiceImpl::TxnManagerServiceImpl(Master* server)
+ : TxnManagerServiceIf(server->metric_entity(), server->result_tracker()),
+ server_(server) {
+}
+
+void TxnManagerServiceImpl::BeginTransaction(
+ const BeginTransactionRequestPB* /* req */,
+ BeginTransactionResponsePB* resp,
+ RpcContext* ctx) {
+ int64_t txn_id;
+ int32_t keep_alive_interval_ms;
+ const auto s = server_->txn_manager()->BeginTransaction(
+ ctx->remote_user().username(),
+ ctx->GetClientDeadline(),
+ &txn_id,
+ &keep_alive_interval_ms);
+ if (PREDICT_TRUE(s.ok())) {
+ resp->set_txn_id(txn_id);
+ resp->set_keepalive_millis(keep_alive_interval_ms);
+ }
+ CheckRespErrorOrSetUnknown(s, resp);
+ return ctx->RespondSuccess();
+}
+
+void TxnManagerServiceImpl::CommitTransaction(
+ const CommitTransactionRequestPB* req,
+ CommitTransactionResponsePB* resp,
+ RpcContext* ctx) {
+ if (!req->has_txn_id()) {
+ CheckRespErrorOrSetUnknown(kMissingTxnId, resp);
+ return ctx->RespondSuccess();
+ }
+ // Initiate the commit phase for the transaction. The caller can check for
the
+ // completion of the commit phase using the GetTransactionState() RPC.
+ const auto s = server_->txn_manager()->CommitTransaction(
+ req->txn_id(),
+ ctx->remote_user().username(),
+ ctx->GetClientDeadline());
+ CheckRespErrorOrSetUnknown(s, resp);
+ return ctx->RespondSuccess();
+}
+
+void TxnManagerServiceImpl::GetTransactionState(
+ const GetTransactionStateRequestPB* req,
+ GetTransactionStateResponsePB* resp,
+ rpc::RpcContext* ctx) {
+ if (!req->has_txn_id()) {
+ CheckRespErrorOrSetUnknown(kMissingTxnId, resp);
+ return ctx->RespondSuccess();
+ }
+ TxnStatusEntryPB txn_status;
+ const auto s = server_->txn_manager()->GetTransactionState(
+ req->txn_id(),
+ ctx->remote_user().username(),
+ ctx->GetClientDeadline(),
+ &txn_status);
+ if (PREDICT_TRUE(s.ok())) {
+ DCHECK(txn_status.has_state());
+ resp->set_state(txn_status.state());
+ }
+ CheckRespErrorOrSetUnknown(s, resp);
+ return ctx->RespondSuccess();
+}
+
+void TxnManagerServiceImpl::AbortTransaction(
+ const AbortTransactionRequestPB* req,
+ AbortTransactionResponsePB* resp,
+ RpcContext* ctx) {
+ if (!req->has_txn_id()) {
+ CheckRespErrorOrSetUnknown(kMissingTxnId, resp);
+ return ctx->RespondSuccess();
+ }
+ const auto s = server_->txn_manager()->AbortTransaction(
+ req->txn_id(),
+ ctx->remote_user().username(),
+ ctx->GetClientDeadline());
+ CheckRespErrorOrSetUnknown(s, resp);
+ return ctx->RespondSuccess();
+}
+
+void TxnManagerServiceImpl::KeepTransactionAlive(
+ const KeepTransactionAliveRequestPB* req,
+ KeepTransactionAliveResponsePB* resp,
+ rpc::RpcContext* ctx) {
+ if (!req->has_txn_id()) {
+ CheckRespErrorOrSetUnknown(kMissingTxnId, resp);
+ return ctx->RespondSuccess();
+ }
+ const auto s = server_->txn_manager()->KeepTransactionAlive(
+ req->txn_id(),
+ ctx->remote_user().username(),
+ ctx->GetClientDeadline());
+ CheckRespErrorOrSetUnknown(s, resp);
+ return ctx->RespondSuccess();
+}
+
+bool TxnManagerServiceImpl::AuthorizeClient(
+ const google::protobuf::Message* /* req */,
+ google::protobuf::Message* /* resp */,
+ RpcContext* ctx) {
+ return server_->Authorize(ctx, ServerBase::USER);
+}
+
+} // namespace transactions
+} // namespace kudu
diff --git a/src/kudu/master/txn_manager_service.h
b/src/kudu/master/txn_manager_service.h
new file mode 100644
index 0000000..fe8ea31
--- /dev/null
+++ b/src/kudu/master/txn_manager_service.h
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "kudu/gutil/macros.h"
+#include "kudu/master/txn_manager.service.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
+
+namespace kudu {
+
+namespace master {
+class Master;
+}
+
+namespace rpc {
+class RpcContext;
+}
+
+namespace transactions {
+class GetTransactionStateRequestPB;
+class GetTransactionStateResponsePB;
+class KeepTransactionAliveRequestPB;
+class KeepTransactionAliveResponsePB;
+
+// Implementation of the TxnManager service. See txn_manager.proto for docs
+// on each RPC.
+class TxnManagerServiceImpl : public TxnManagerServiceIf {
+ public:
+ explicit TxnManagerServiceImpl(master::Master* server);
+
+ void BeginTransaction(const class BeginTransactionRequestPB* req,
+ class BeginTransactionResponsePB* resp,
+ rpc::RpcContext* ctx) override;
+
+ void CommitTransaction(const class CommitTransactionRequestPB* req,
+ class CommitTransactionResponsePB* resp,
+ rpc::RpcContext* ctx) override;
+
+ void AbortTransaction(const class AbortTransactionRequestPB* req,
+ class AbortTransactionResponsePB* resp,
+ rpc::RpcContext* ctx) override;
+
+ void GetTransactionState(const GetTransactionStateRequestPB* req,
+ GetTransactionStateResponsePB* resp,
+ rpc::RpcContext* ctx) override;
+
+ void KeepTransactionAlive(const KeepTransactionAliveRequestPB* req,
+ KeepTransactionAliveResponsePB* resp,
+ rpc::RpcContext* ctx) override;
+
+ // Authorize an RPC call which must be from a client.
+ bool AuthorizeClient(const google::protobuf::Message* req,
+ google::protobuf::Message* resp,
+ rpc::RpcContext* ctx) override;
+ private:
+ master::Master* server_;
+
+ DISALLOW_COPY_AND_ASSIGN(TxnManagerServiceImpl);
+};
+
+} // namespace transactions
+} // namespace kudu
diff --git a/src/kudu/transactions/txn_status_manager.h
b/src/kudu/transactions/txn_status_manager.h
index 7621d9b..48028bc 100644
--- a/src/kudu/transactions/txn_status_manager.h
+++ b/src/kudu/transactions/txn_status_manager.h
@@ -84,7 +84,9 @@ class TxnStatusManager final : public tablet::TxnCoordinator {
// (even if that attempt failed), which helps ensure that at most one call to
// this method will succeed for a given transaction ID. The
// 'highest_seen_txn_id' output parameter, if not null, is populated in both
- // success and failure cases.
+ // success and failure cases, except for the case when returning
+ // Status::ServiceUnavailable() due to not-yet-loaded data from the
+ // backing transaction status tablet.
//
// TODO(awong): consider computing the next available transaction ID in this
// partition and using it in case this transaction is already used, or having
diff --git a/src/kudu/transactions/txn_system_client.cc
b/src/kudu/transactions/txn_system_client.cc
index cd23d44..2dc17c9 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -59,6 +59,7 @@ namespace transactions {
Status TxnSystemClient::Create(const vector<string>& master_addrs,
unique_ptr<TxnSystemClient>* sys_client) {
+ DCHECK(!master_addrs.empty());
KuduClientBuilder builder;
builder.master_server_addrs(master_addrs);
client::sp::shared_ptr<KuduClient> client;
@@ -220,6 +221,7 @@ Status
TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_tx
const MonoDelta& timeout,
const StatusCallback& cb,
CoordinatorOpResultPB*
result) {
+ DCHECK(txn_status_table_);
const MonoTime deadline = MonoTime::Now() + timeout;
unique_ptr<TxnStatusTabletContext> ctx(
new TxnStatusTabletContext({