This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new cece57b KUDU-2612: add RPC to send participant ops
cece57b is described below
commit cece57bcdf5d8d8ce596a3c1da4c2c26472784bc
Author: Andrew Wong <[email protected]>
AuthorDate: Mon Nov 30 22:03:40 2020 -0800
KUDU-2612: add RPC to send participant ops
This adds methods to the TxnSystemClient to send participant ops to
participants by their tablet ID. This will be used in steps 13 and
18 of the transactions write path[1].
The new ParticipantRpc abstraction borrows a lot from CoordinatorRpc
with regards to lookups and error handling, with the following
differences:
- Rather than doing the lookup by table and partition key, it performs a
lookup by tablet ID, using the functionality recently added to the
MetaCache.
- Since TxnParticipants don't return success on repeated participant op
requests calls, some additional handling is done for the
TXN_OP_ALREADY_APPLIED error code.
[1]
https://docs.google.com/document/d/1qv7Zejpfzg-HvF5azRL49g5lRLQ4437EmJ53GiupcWQ/edit#heading=h.4lm41o75ev1x
Change-Id: Ibb9ba09104761772f9aaffe582776ad34d8dbf57
Reviewed-on: http://gerrit.cloudera.org:8080/16879
Tested-by: Andrew Wong <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/client/client.h | 2 +
src/kudu/client/meta_cache.cc | 33 +-
.../integration-tests/txn_participant-itest.cc | 384 ++++++++++++++++++++-
src/kudu/tablet/txn_participant-test-util.h | 29 +-
src/kudu/transactions/CMakeLists.txt | 1 +
src/kudu/transactions/participant_rpc.cc | 239 +++++++++++++
src/kudu/transactions/participant_rpc.h | 91 +++++
src/kudu/transactions/txn_system_client.cc | 46 +++
src/kudu/transactions/txn_system_client.h | 17 +
9 files changed, 806 insertions(+), 36 deletions(-)
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index b1ab428..504c5c3 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -73,6 +73,7 @@ class FuzzTest;
namespace transactions {
class CoordinatorRpc;
+class ParticipantRpc;
class TxnSystemClient;
} // namespace transactions
@@ -892,6 +893,7 @@ class KUDU_EXPORT KuduClient : public
sp::enable_shared_from_this<KuduClient> {
friend class kudu::AuthzTokenTest;
friend class kudu::SecurityUnknownTskTest;
friend class transactions::CoordinatorRpc;
+ friend class transactions::ParticipantRpc;
friend class transactions::TxnSystemClient;
friend class tools::LeaderMasterProxy;
friend class tools::RemoteKsckCluster;
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 1ab1a11..f296b4c 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -502,18 +502,29 @@ void MetaCacheServerPicker::PickLeader(const
ServerPickedCallback& callback,
// Put another way, we don't care about the lookup results at all; we're
// just using it to fetch the latest consensus configuration information.
//
- // TODO: When we support tablet splits, we should let the lookup shift
- // the write to another tablet (i.e. if it's since been split).
+ // TODO(dralves): When we support tablet splits, we should let the lookup
+ // shift the write to another tablet (i.e. if it's since been split).
if (!leader) {
- meta_cache_->LookupTabletByKey(
- table_,
- tablet_->partition().partition_key_start(),
- deadline,
- MetaCache::LookupType::kPoint,
- nullptr,
- [this, callback, deadline](const Status& s) {
- this->LookUpTabletCb(callback, deadline, s);
- });
+ if (table_) {
+ meta_cache_->LookupTabletByKey(
+ table_,
+ tablet_->partition().partition_key_start(),
+ deadline,
+ MetaCache::LookupType::kPoint,
+ /*remote_tablet*/nullptr,
+ [this, callback, deadline](const Status& s) {
+ this->LookUpTabletCb(callback, deadline, s);
+ });
+ } else {
+ meta_cache_->LookupTabletById(
+ client_,
+ tablet_->tablet_id(),
+ deadline,
+ /*remote_tablet*/nullptr,
+ [this, callback, deadline](const Status& s) {
+ this->LookUpTabletCb(callback, deadline, s);
+ });
+ }
return;
}
diff --git a/src/kudu/integration-tests/txn_participant-itest.cc
b/src/kudu/integration-tests/txn_participant-itest.cc
index 63f884a..fcbd083 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -57,6 +57,7 @@
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/txn_participant-test-util.h"
#include "kudu/tablet/txn_participant.h"
+#include "kudu/transactions/txn_system_client.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/ts_tablet_manager.h"
@@ -66,6 +67,7 @@
#include "kudu/util/countdown_latch.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
@@ -85,16 +87,20 @@ METRIC_DECLARE_histogram(log_gc_duration);
using kudu::cluster::InternalMiniCluster;
using kudu::cluster::InternalMiniClusterOptions;
+using kudu::tablet::kAborted;
+using kudu::tablet::kAbortSequence;
using kudu::tablet::kCommitSequence;
using kudu::tablet::kDummyCommitTimestamp;
using kudu::tablet::kCommitted;
using kudu::tablet::kCommitInProgress;
using kudu::tablet::kInitializing;
using kudu::tablet::kOpen;
+using kudu::tablet::MakeParticipantOp;
using kudu::tablet::TabletReplica;
-using kudu::tablet::Txn;
using kudu::tablet::TxnState;
using kudu::tablet::TxnParticipant;
+using kudu::tablet::TxnState;
+using kudu::transactions::TxnSystemClient;
using kudu::tserver::ParticipantOpPB;
using kudu::tserver::ParticipantRequestPB;
using kudu::tserver::ParticipantResponsePB;
@@ -112,6 +118,8 @@ namespace kudu {
namespace itest {
namespace {
+const MonoDelta kDefaultTimeout = MonoDelta::FromSeconds(10);
+const MonoDelta kLongTimeout = MonoDelta::FromSeconds(30);
ParticipantRequestPB ParticipantRequest(const string& tablet_id, int64_t
txn_id,
ParticipantOpPB::ParticipantOpType
type) {
ParticipantRequestPB req;
@@ -227,7 +235,7 @@ class TxnParticipantITest : public KuduTest {
w.Setup();
w.Start();
while (w.rows_inserted() < 1) {
- SleepFor(MonoDelta::FromMilliseconds(10));
+ SleepFor(kDefaultTimeout);
}
if (table_name) {
*table_name = w.table_name();
@@ -254,6 +262,27 @@ class TxnParticipantITest : public KuduTest {
return replicas;
}
+ // Stops quiescing all tablet servers, allowing a new leader to be elected if
+ // necessary.
+ void StopQuiescingServers() {
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = false;
+ }
+ }
+
+ // Ensures that every replica has the same set of transactions.
+ void CheckReplicasMatchTxns(const vector<TabletReplica*> replicas,
+ vector<TxnParticipant::TxnEntry> txns) {
+ DCHECK(!replicas.empty());
+ const auto& tablet_id = replicas[0]->tablet_id();
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ DCHECK_EQ(tablet_id, replicas[i]->tablet_id());
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_EQ(txns,
replicas[i]->tablet()->txn_participant()->GetTxnsForTests());
+ });
+ }
+ }
+
protected:
unique_ptr<InternalMiniCluster> cluster_;
unordered_map<string, TServerDetails*> ts_map_;
@@ -267,7 +296,7 @@ TEST_F(TxnParticipantITest, TestReplicateParticipantOps) {
// tserver so we can ensure a specific leader.
const int kLeaderIdx = 0;
vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
-
ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+
ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
// Try submitting the ops on all replicas. They should succeed on the leaders
// and fail on followers.
const int64_t kTxnId = 1;
@@ -336,10 +365,9 @@ TEST_P(ParticipantCopyITest, TestCopyParticipantOps) {
constexpr const int kNumTxns = 10;
constexpr const int kLeaderIdx = 0;
constexpr const int kDeadServerIdx = kLeaderIdx + 1;
- const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
auto* leader_replica = replicas[kLeaderIdx];
- ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kTimeout));
+
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
// Apply some operations.
vector<TxnParticipant::TxnEntry> expected_txns;
@@ -393,7 +421,7 @@ TEST_P(ParticipantCopyITest, TestCopyParticipantOps) {
ASSERT_EQ(1, tablets.size());
scoped_refptr<TabletReplica> r;
ASSERT_TRUE(new_ts->server()->tablet_manager()->LookupTablet(tablets[0],
&r));
- ASSERT_OK(r->WaitUntilConsensusRunning(kTimeout));
+ ASSERT_OK(r->WaitUntilConsensusRunning(kDefaultTimeout));
ASSERT_EQ(expected_txns,
r->tablet()->txn_participant()->GetTxnsForTests());
for (int i = 0; i < kNumTxns; i++) {
ASSERT_TRUE(r->tablet_metadata()->HasTxnMetadata(i));
@@ -412,9 +440,9 @@ TEST_F(TxnParticipantITest, TestWaitOnFinalizeCommit) {
auto* follower_replica = replicas[kLeaderIdx + 1];
auto* clock = leader_replica->clock();
const int64_t kTxnId = 1;
-
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::BEGIN_TXN));
- const MonoDelta kAgreeTimeout = MonoDelta::FromSeconds(10);
+ const MonoDelta kAgreeTimeout = kDefaultTimeout;
const auto& tablet_id = leader_replica->tablet()->tablet_id();
ASSERT_OK(WaitForServersToAgree(kAgreeTimeout, ts_map_, tablet_id,
/*minimum_index*/1));
@@ -473,9 +501,9 @@ TEST_F(TxnParticipantITest, TestWaitOnAbortCommit) {
auto* follower_replica = replicas[kLeaderIdx + 1];
auto* clock = leader_replica->clock();
const int64_t kTxnId = 1;
-
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::BEGIN_TXN));
- const MonoDelta kAgreeTimeout = MonoDelta::FromSeconds(10);
+ const MonoDelta kAgreeTimeout = kDefaultTimeout;
const auto& tablet_id = leader_replica->tablet()->tablet_id();
ASSERT_OK(WaitForServersToAgree(kAgreeTimeout, ts_map_, tablet_id,
/*minimum_index*/1));
@@ -511,7 +539,7 @@ TEST_F(TxnParticipantITest, TestProxyBasicCalls) {
constexpr const int kLeaderIdx = 0;
constexpr const int kTxnId = 0;
vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
-
ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+
ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
for (const auto& op : kCommitSequence) {
const auto req = ParticipantRequest(replicas[kLeaderIdx]->tablet_id(),
kTxnId, op);
@@ -525,7 +553,7 @@ TEST_F(TxnParticipantITest,
TestProxyIllegalStatesInCommitSequence) {
constexpr const int kLeaderIdx = 0;
constexpr const int kTxnId = 0;
vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
-
ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+
ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
// Begin after already beginning.
@@ -610,7 +638,7 @@ TEST_F(TxnParticipantITest,
TestProxyIllegalStatesInAbortSequence) {
constexpr const int kLeaderIdx = 0;
constexpr const int kTxnId = 0;
vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
-
ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+
ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
// Try our illegal ops when our transaction is open.
@@ -662,7 +690,7 @@ TEST_F(TxnParticipantITest, TestProxyNonLeader) {
constexpr const int kNonLeaderIdx = kLeaderIdx + 1;
constexpr const int kTxnId = 0;
vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
-
ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+
ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
auto admin_proxy = cluster_->tserver_admin_proxy(kNonLeaderIdx);
for (const auto& op : kCommitSequence) {
const auto req = ParticipantRequest(replicas[kLeaderIdx]->tablet_id(),
kTxnId, op);
@@ -681,7 +709,7 @@ TEST_F(TxnParticipantITest, TestProxyTabletBootstrapping) {
constexpr const int kTxnId = 0;
vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
auto* leader_replica = replicas[kLeaderIdx];
-
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
FLAGS_tablet_bootstrap_inject_latency_ms = 1000;
cluster_->mini_tablet_server(kLeaderIdx)->Shutdown();
@@ -706,7 +734,7 @@ TEST_F(TxnParticipantITest, TestProxyTabletNotRunning) {
constexpr const int kTxnId = 0;
vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
auto* leader_replica = replicas[kLeaderIdx];
-
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
auto* tablet_manager =
cluster_->mini_tablet_server(kLeaderIdx)->server()->tablet_manager();
ASSERT_OK(tablet_manager->DeleteTablet(leader_replica->tablet_id(),
tablet::TABLET_DATA_TOMBSTONED, boost::none));
@@ -739,6 +767,284 @@ TEST_F(TxnParticipantITest, TestProxyTabletNotFound) {
}
}
+TEST_F(TxnParticipantITest, TestTxnSystemClientCommitSequence) {
+ constexpr const int kLeaderIdx = 0;
+ constexpr const int kTxnId = 0;
+ vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+ auto* leader_replica = replicas[kLeaderIdx];
+ const auto tablet_id = leader_replica->tablet_id();
+
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+
+ // Start a transaction and make sure it results in the expected state
+ // server-side.
+ unique_ptr<TxnSystemClient> txn_client;
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN),
kDefaultTimeout));
+ NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kOpen, -1 } }));
+
+ // Try some illegal ops and ensure we get an error.
+ Status s = txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT),
kDefaultTimeout);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kOpen, -1 } }));
+
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN),
kDefaultTimeout));
+ NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kOpen, -1 } }));
+
+ // Progress the transaction forward, and perform similar checks that we get
+ // errors when we attempt illegal ops.
+ Timestamp begin_commit_ts;
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_COMMIT),
+ kDefaultTimeout, &begin_commit_ts));
+ ASSERT_NE(Timestamp::kInvalidTimestamp, begin_commit_ts);
+ NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kCommitInProgress, -1
} }));
+
+ s = txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN),
kDefaultTimeout);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kCommitInProgress, -1
} }));
+
+ // But we should be able to BEGIN_COMMIT again and get back the same
+ // timestamp.
+ Timestamp refetched_begin_commit_ts;
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_COMMIT),
+ kDefaultTimeout, &refetched_begin_commit_ts));
+ ASSERT_EQ(refetched_begin_commit_ts, begin_commit_ts);
+ NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kCommitInProgress, -1
} }));
+
+ // Once we finish committing, we should be unable to do anything.
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT,
kDummyCommitTimestamp),
+ kDefaultTimeout));
+ NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted,
kDummyCommitTimestamp}}));
+ for (const auto type : { ParticipantOpPB::BEGIN_TXN,
ParticipantOpPB::BEGIN_COMMIT,
+ ParticipantOpPB::ABORT_TXN }) {
+ Status s = txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnId, type), kDefaultTimeout);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ }
+ NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted,
kDummyCommitTimestamp}}));
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT,
kDummyCommitTimestamp),
+ kDefaultTimeout));
+ NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted,
kDummyCommitTimestamp}}));
+}
+
+TEST_F(TxnParticipantITest, TestTxnSystemClientAbortSequence) {
+ constexpr const int kLeaderIdx = 0;
+ constexpr const int kTxnOne = 0;
+ constexpr const int kTxnTwo = 1;
+ vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+ auto* leader_replica = replicas[kLeaderIdx];
+ const auto tablet_id = leader_replica->tablet_id();
+
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+ unique_ptr<TxnSystemClient> txn_client;
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnOne, ParticipantOpPB::BEGIN_TXN),
+ kDefaultTimeout));
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnTwo, ParticipantOpPB::BEGIN_TXN),
+ kDefaultTimeout));
+ NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnOne, kOpen, -1 }, {
kTxnTwo, kOpen, -1 } }));
+
+ // Once we abort, we should be unable to do anything further.
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnOne, ParticipantOpPB::ABORT_TXN),
+ kDefaultTimeout));
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnTwo, ParticipantOpPB::BEGIN_COMMIT),
+ kDefaultTimeout));
+ NO_FATALS(CheckReplicasMatchTxns(replicas,
+ { { kTxnOne, kAborted, -1 }, { kTxnTwo, kCommitInProgress, -1 } }));
+
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnTwo, ParticipantOpPB::ABORT_TXN),
+ kDefaultTimeout));
+ NO_FATALS(CheckReplicasMatchTxns(replicas,
+ { { kTxnOne, kAborted, -1 }, { kTxnTwo, kAborted, -1 } }));
+ for (const auto type : { ParticipantOpPB::BEGIN_TXN,
ParticipantOpPB::BEGIN_COMMIT,
+ ParticipantOpPB::FINALIZE_COMMIT }) {
+ Status s = txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnOne, type), kDefaultTimeout);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ s = txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnTwo, type), kDefaultTimeout);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ }
+ NO_FATALS(CheckReplicasMatchTxns(replicas,
+ { { kTxnOne, kAborted, -1 }, { kTxnTwo, kAborted, -1 } }));
+ // Repeated abort calls are idempotent.
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnOne, ParticipantOpPB::ABORT_TXN),
+ kDefaultTimeout));
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnTwo, ParticipantOpPB::ABORT_TXN),
+ kDefaultTimeout));
+ NO_FATALS(CheckReplicasMatchTxns(replicas,
+ { { kTxnOne, kAborted, -1 }, { kTxnTwo, kAborted, -1 } }));
+}
+
+TEST_F(TxnParticipantITest, TestTxnSystemClientTimeoutWhenNoMajority) {
+ constexpr const int kLeaderIdx = 0;
+ constexpr const int kTxnId = 0;
+ vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+ auto* leader_replica = replicas[kLeaderIdx];
+ const auto tablet_id = leader_replica->tablet_id();
+
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+ // Bring down the other servers so we can't get a majority.
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ if (i == kLeaderIdx) continue;
+ cluster_->mini_tablet_server(i)->Shutdown();
+ }
+ unique_ptr<TxnSystemClient> txn_client;
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ Status s = txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN),
+ MonoDelta::FromSeconds(1));
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+ // We should have an initializing transaction until a majority is achieved,
+ // at which point the BEGIN_TXN should complete and we end up with an open
+ // transaction.
+ ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ { kTxnId, kInitializing, -1 }
}),
+ leader_replica->tablet()->txn_participant()->GetTxnsForTests());
+ replicas.clear();
+ // Restart the down servers and check that we get to the right state.
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ auto* ts = cluster_->mini_tablet_server(i);
+ if (i != kLeaderIdx) {
+ ASSERT_OK(ts->Restart());
+ }
+ scoped_refptr<TabletReplica> r;
+ auto* ts_manager = ts->server()->tablet_manager();
+ ASSERT_OK(ts_manager->WaitForAllBootstrapsToFinish());
+ ASSERT_TRUE(ts_manager->LookupTablet(tablet_id, &r));
+ replicas.emplace_back(r.get());
+ }
+ ASSERT_EVENTUALLY([&] {
+ NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kOpen, -1 } }));
+ });
+}
+
+namespace {
+// Sends participant ops to the given tablet until failure, or until
+// 'stop_latch' counts down.
+Status SendParticipantOps(TxnSystemClient* txn_client, const string& tablet_id,
+ CountDownLatch* stop_latch, int* next_txn_id) {
+ while (stop_latch->count() > 0) {
+ int txn_id = (*next_txn_id)++;
+ for (const auto& op : kCommitSequence) {
+ RETURN_NOT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(txn_id, op), kLongTimeout));
+ }
+ }
+ return Status::OK();
+}
+} // anonymous namespace
+
+// Test that the transaction system client retries when the tablet server
+// hosting the targeted tablet is shutting down and starting up.
+TEST_F(TxnParticipantITest, TestTxnSystemClientSucceedsOnBootstrap) {
+ constexpr const int kLeaderIdx = 0;
+ vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+ auto* leader_replica = replicas[kLeaderIdx];
+ const auto tablet_id = leader_replica->tablet_id();
+
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+ // Start a thread that sends participant ops to the tablet.
+ int next_txn_id = 0;
+ unique_ptr<TxnSystemClient> txn_client;
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ CountDownLatch stop(1);
+ Status client_error;
+ thread t([&] {
+ client_error = SendParticipantOps(txn_client.get(), tablet_id, &stop,
&next_txn_id);
+ });
+ auto thread_joiner = MakeScopedCleanup([&] {
+ stop.CountDown();
+ t.join();
+ });
+
+ // Stop quiescing so proper failover can happen, as we kill each server.
+ StopQuiescingServers();
+ for (int i = kLeaderIdx; i < cluster_->num_tablet_servers(); i++) {
+ auto* ts = cluster_->mini_tablet_server(i);
+ ts->Shutdown();
+ ASSERT_OK(ts->Restart());
+
+ // Wait for proper recovery to happen, and throw in some mandatory sleep to
+ // ensure we have time to replicate some ops.
+ ASSERT_OK(ts->server()->tablet_manager()->WaitForAllBootstrapsToFinish());
+ SleepFor(MonoDelta::FromMilliseconds(500));
+ }
+ stop.CountDown();
+ thread_joiner.cancel();
+ t.join();
+
+ // None of our transactions should have failed.
+ ASSERT_OK(client_error);
+ vector<TxnParticipant::TxnEntry> expected_txns;
+ for (int i = 0; i < next_txn_id; i++) {
+ expected_txns.emplace_back(TxnParticipant::TxnEntry({ i, kCommitted,
kDummyCommitTimestamp }));
+ }
+ replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+ NO_FATALS(CheckReplicasMatchTxns(replicas, expected_txns));
+}
+
+// Test that the transaction system client retries when a replica is deleted
+// and recovered.
+TEST_F(TxnParticipantITest, TestTxnSystemClientRetriesWhenReplicaNotFound) {
+ // Speed up the time it takes to determine that we need to copy.
+ FLAGS_follower_unavailable_considered_failed_sec = 1;
+
+ // We're going to delete the leader and stop quiescing.
+ constexpr const int kLeaderIdx = 0;
+ vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+ auto* leader_replica = replicas[kLeaderIdx];
+ const auto tablet_id = leader_replica->tablet_id();
+
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+ // Start a thread that sends participant ops to the tablet.
+ int next_txn_id = 0;
+ unique_ptr<TxnSystemClient> txn_client;
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+ CountDownLatch stop(1);
+ Status client_error;
+ thread t([&] {
+ client_error = SendParticipantOps(txn_client.get(), tablet_id, &stop,
&next_txn_id);
+ });
+ auto thread_joiner = MakeScopedCleanup([&] {
+ stop.CountDown();
+ t.join();
+ });
+
+ // Stop quiescing so proper failover can happen, as we kill each server.
+ StopQuiescingServers();
+ auto* tablet_manager =
cluster_->mini_tablet_server(kLeaderIdx)->server()->tablet_manager();
+ ASSERT_OK(tablet_manager->DeleteTablet(tablet_id,
tablet::TABLET_DATA_TOMBSTONED, boost::none));
+ ASSERT_EVENTUALLY([&] {
+ scoped_refptr<TabletReplica> r;
+ ASSERT_TRUE(tablet_manager->LookupTablet(tablet_id, &r));
+ ASSERT_OK(r->WaitUntilConsensusRunning(kDefaultTimeout));
+ });
+
+ stop.CountDown();
+ thread_joiner.cancel();
+ t.join();
+
+ // None of our transactions should have failed.
+ ASSERT_OK(client_error);
+ vector<TxnParticipant::TxnEntry> expected_txns;
+ for (int i = 0; i < next_txn_id; i++) {
+ expected_txns.emplace_back(TxnParticipant::TxnEntry({ i, kCommitted,
kDummyCommitTimestamp }));
+ }
+ replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+ NO_FATALS(CheckReplicasMatchTxns(replicas, expected_txns));
+}
+
class TxnParticipantElectionStormITest : public TxnParticipantITest {
public:
void SetUp() override {
@@ -884,7 +1190,7 @@ TEST_F(TxnParticipantElectionStormITest,
TestFrequentElections) {
const auto& tablets = ts->ListTablets();
scoped_refptr<TabletReplica> r;
ASSERT_TRUE(ts->server()->tablet_manager()->LookupTablet(tablets[0], &r));
- ASSERT_OK(r->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
+ ASSERT_OK(r->WaitUntilConsensusRunning(kDefaultTimeout));
auto actual_txns = r->tablet()->txn_participant()->GetTxnsForTests();
// Upon bootstrapping, we may end up replaying a REPLICATE message with no
// COMMIT message, starting it as a follower op. If it's a BEGIN_TXN op,
@@ -908,5 +1214,49 @@ TEST_F(TxnParticipantElectionStormITest,
TestFrequentElections) {
}
}
+TEST_F(TxnParticipantElectionStormITest,
TestTxnSystemClientRetriesThroughStorm) {
+ vector<TabletReplica*> replicas;
+ const string tablet_id = cluster_->mini_tablet_server(0)->ListTablets()[0];
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ scoped_refptr<TabletReplica> r;
+
ASSERT_TRUE(cluster_->mini_tablet_server(i)->server()->tablet_manager()->LookupTablet(
+ tablet_id, &r));
+ replicas.emplace_back(r.get());
+ }
+ const auto kTimeout = MonoDelta::FromSeconds(10);
+ unique_ptr<TxnSystemClient> txn_client;
+ ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(),
&txn_client));
+
+ // Start injecting latency to Raft-related traffic to spur elections.
+ FLAGS_raft_enable_pre_election = false;
+ FLAGS_consensus_inject_latency_ms_in_notifications = 1.5 *
FLAGS_raft_heartbeat_interval_ms;;
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 2));
+ constexpr const int64_t kCommittedTxnId = 0;
+ constexpr const int64_t kAbortedTxnId = 1;
+ for (const auto& op : kCommitSequence) {
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kCommittedTxnId, op), kTimeout));
+ }
+ for (const auto& op : kAbortSequence) {
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kAbortedTxnId, op), kTimeout));
+ }
+ const vector<TxnParticipant::TxnEntry> expected_txns = {
+ { kCommittedTxnId, kCommitted, kDummyCommitTimestamp },
+ { kAbortedTxnId, kAborted, -1 },
+ };
+ for (int i = 0; i < replicas.size(); i++) {
+ // NOTE: We ASSERT_EVENTUALLY here because having completed the participant
+ // op only guarantees successful replication on a majority. We need to wait
+ // a bit for the state to fully quiesce.
+ ASSERT_EVENTUALLY([&] {
+ const auto& actual_txns =
replicas[i]->tablet()->txn_participant()->GetTxnsForTests();
+ ASSERT_EQ(expected_txns, actual_txns)
+ << Substitute("Expected: $0,\nActual: $1",
+ TxnsAsString(expected_txns),
TxnsAsString(actual_txns));
+ });
+ }
+}
+
} // namespace itest
} // namespace kudu
diff --git a/src/kudu/tablet/txn_participant-test-util.h
b/src/kudu/tablet/txn_participant-test-util.h
index fcce4ee..7d85fee 100644
--- a/src/kudu/tablet/txn_participant-test-util.h
+++ b/src/kudu/tablet/txn_participant-test-util.h
@@ -29,25 +29,38 @@ namespace kudu {
namespace tablet {
constexpr const int64_t kDummyCommitTimestamp = 1337;
-const std::vector<tserver::ParticipantOpPB::ParticipantOpType> kCommitSequence
= {
+constexpr const tserver::ParticipantOpPB::ParticipantOpType kCommitSequence[]
= {
tserver::ParticipantOpPB::BEGIN_TXN,
tserver::ParticipantOpPB::BEGIN_COMMIT,
tserver::ParticipantOpPB::FINALIZE_COMMIT,
};
+constexpr const tserver::ParticipantOpPB::ParticipantOpType kAbortSequence[] =
{
+ tserver::ParticipantOpPB::BEGIN_TXN,
+ tserver::ParticipantOpPB::BEGIN_COMMIT,
+ tserver::ParticipantOpPB::ABORT_TXN,
+};
-std::unique_ptr<ParticipantOpState> NewParticipantOp(
+inline tserver::ParticipantOpPB MakeParticipantOp(
+ int64_t txn_id,
+ tserver::ParticipantOpPB::ParticipantOpType type,
+ int64_t finalized_commit_timestamp = kDummyCommitTimestamp) {
+ tserver::ParticipantOpPB op_pb;
+ op_pb.set_txn_id(txn_id);
+ op_pb.set_type(type);
+ if (type == tserver::ParticipantOpPB::FINALIZE_COMMIT) {
+ op_pb.set_finalized_commit_timestamp(finalized_commit_timestamp);
+ }
+ return op_pb;
+}
+
+inline std::unique_ptr<ParticipantOpState> NewParticipantOp(
TabletReplica* replica,
int64_t txn_id,
tserver::ParticipantOpPB::ParticipantOpType type,
int64_t finalized_commit_timestamp,
tserver::ParticipantRequestPB* req,
tserver::ParticipantResponsePB* resp) {
- auto* op = req->mutable_op();
- op->set_txn_id(txn_id);
- op->set_type(type);
- if (type == tserver::ParticipantOpPB::FINALIZE_COMMIT) {
- op->set_finalized_commit_timestamp(finalized_commit_timestamp);
- }
+ *req->mutable_op() = MakeParticipantOp(txn_id, type,
finalized_commit_timestamp);
std::unique_ptr<ParticipantOpState> op_state(new ParticipantOpState(
replica,
replica->tablet()->txn_participant(),
diff --git a/src/kudu/transactions/CMakeLists.txt
b/src/kudu/transactions/CMakeLists.txt
index f533499..8e944fc 100644
--- a/src/kudu/transactions/CMakeLists.txt
+++ b/src/kudu/transactions/CMakeLists.txt
@@ -31,6 +31,7 @@ ADD_EXPORTABLE_LIBRARY(transactions_proto
set(TRANSACTIONS_SRCS
coordinator_rpc.cc
+ participant_rpc.cc
txn_status_entry.cc
txn_status_manager.cc
txn_status_tablet.cc
diff --git a/src/kudu/transactions/participant_rpc.cc
b/src/kudu/transactions/participant_rpc.cc
new file mode 100644
index 0000000..36ece24
--- /dev/null
+++ b/src/kudu/transactions/participant_rpc.cc
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/transactions/participant_rpc.h"
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/client/client-internal.h"
+#include "kudu/client/client.h"
+#include "kudu/client/meta_cache.h"
+#include "kudu/common/timestamp.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/request_tracker.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/retriable_rpc.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/tserver/tserver_admin.proxy.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/status_callback.h"
+
+using kudu::client::internal::MetaCacheServerPicker;
+using kudu::client::internal::RemoteTabletServer;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::CredentialsPolicy;
+using kudu::rpc::ErrorStatusPB;
+using kudu::rpc::ResponseCallback;
+using kudu::rpc::RetriableRpc;
+using kudu::rpc::RetriableRpcStatus;
+using kudu::tserver::TabletServerErrorPB;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace kudu {
+class MonoTime;
+
+namespace transactions {
+
+ParticipantRpc* ParticipantRpc::NewRpc(
+ unique_ptr<TxnParticipantContext> ctx,
+ const MonoTime& deadline,
+ StatusCallback user_cb,
+ Timestamp* begin_commit_timestamp) {
+ scoped_refptr<MetaCacheServerPicker> server_picker(
+ new MetaCacheServerPicker(ctx->client,
+ ctx->client->data_->meta_cache_,
+ /*table*/nullptr, // Lookup by tablet ID.
+ DCHECK_NOTNULL(ctx->tablet.get())));
+ return new ParticipantRpc(std::move(ctx),
+ std::move(server_picker),
+ deadline,
+ std::move(user_cb),
+ begin_commit_timestamp);
+}
+
+ParticipantRpc::ParticipantRpc(unique_ptr<TxnParticipantContext> ctx,
+ scoped_refptr<MetaCacheServerPicker>
replica_picker,
+ const MonoTime& deadline,
+ StatusCallback user_cb,
+ Timestamp* begin_commit_timestamp)
+ : RetriableRpc(std::move(replica_picker),
ctx->client->data_->request_tracker_,
+ deadline, ctx->client->data_->messenger_),
+ client_(ctx->client),
+ tablet_(std::move(ctx->tablet)),
+ user_cb_(std::move(user_cb)),
+ begin_commit_timestamp_(begin_commit_timestamp) {
+ req_.set_tablet_id(tablet_->tablet_id());
+ *req_.mutable_op() = std::move(ctx->participant_op);
+}
+
+string ParticipantRpc::ToString() const {
+ return Substitute("ParticipateInTransaction($0)",
SecureShortDebugString(req_));
+}
+
+void ParticipantRpc::Try(RemoteTabletServer* replica,
+ const ResponseCallback& callback) {
+ // NOTE: 'callback' is typically SendRpcCb(), which calls AnalyzeResponse().
+ replica->admin_proxy()->ParticipateInTransactionAsync(
+ req_, &resp_, mutable_retrier()->mutable_controller(), callback);
+}
+
+RetriableRpcStatus ParticipantRpc::AnalyzeResponse(const Status&
rpc_cb_status) {
+ // We only analyze OK statuses if we succeeded to do the tablet lookup. In
+ // either case, let's examine whatever errors exist.
+ RetriableRpcStatus result;
+ result.status = rpc_cb_status.ok() ? retrier().controller().status() :
rpc_cb_status;
+
+ // Check for specific RPC errors.
+ if (result.status.IsRemoteError()) {
+ const ErrorStatusPB* err =
mutable_retrier()->controller().error_response();
+ if (err && err->has_code()) {
+ switch (err->code()) {
+ case ErrorStatusPB::ERROR_SERVER_TOO_BUSY:
+ case ErrorStatusPB::ERROR_UNAVAILABLE:
+ // NOTE: When can get an ERROR_NO_SUCH_SERVICE if we send a request as
+ // a tablet server is being destructed. Just retry. Even if this were a
+ // legitimate "no such service" error, we'll eventually time out.
+ result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
+ return result;
+ default:
+ break;
+ }
+ }
+ }
+
+ // TODO(awong): it might be easier to understand if the resulting expected
+ // action were encoded in these status enums, e.g. RETRY_SAME_SERVER.
+ if (result.status.IsServiceUnavailable()) {
+ result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
+ return result;
+ }
+
+ // Check whether we need to get a new authentication token.
+ if (result.status.IsNotAuthorized()) {
+ const ErrorStatusPB* err =
mutable_retrier()->controller().error_response();
+ if (err && err->has_code() &&
+ err->code() == ErrorStatusPB::FATAL_INVALID_AUTHENTICATION_TOKEN) {
+ result.result = RetriableRpcStatus::INVALID_AUTHENTICATION_TOKEN;
+ return result;
+ }
+ }
+
+ // If we couldn't connect to the server, e.g. it was down, failover to a
+ // different replica.
+ if (result.status.IsNetworkError()) {
+ result.result = RetriableRpcStatus::SERVER_NOT_ACCESSIBLE;
+ return result;
+ }
+
+ // We're done parsing the RPC controller errors. Unwrap the tserver response
+ // errors -- from here on out, the result status will be the response error.
+ if (result.status.ok() && resp_.has_error()) {
+ result.status = StatusFromPB(resp_.error().status());
+ }
+
+ if (resp_.has_error()) {
+ const auto code = resp_.error().code();
+ switch (code) {
+ // If we get TABLET_NOT_FOUND, the replica we thought was leader
+ // has been deleted.
+ case TabletServerErrorPB::TABLET_NOT_FOUND:
+ case TabletServerErrorPB::TABLET_FAILED:
+ result.result = RetriableRpcStatus::RESOURCE_NOT_FOUND;
+ return result;
+ case TabletServerErrorPB::TABLET_NOT_RUNNING:
+ case TabletServerErrorPB::THROTTLED:
+ result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
+ return result;
+ case TabletServerErrorPB::NOT_THE_LEADER:
+ result.result = RetriableRpcStatus::REPLICA_NOT_LEADER;
+ return result;
+ case TabletServerErrorPB::TXN_ILLEGAL_STATE:
+ result.result = RetriableRpcStatus::NON_RETRIABLE_ERROR;
+ return result;
+ case TabletServerErrorPB::TXN_OP_ALREADY_APPLIED:
+ // TXN_OP_ALREADY_APPLIED means the op succeeded, so stop retrying and
+ // return success.
+ result.result = RetriableRpcStatus::OK;
+ result.status = Status::OK();
+ return result;
+ case TabletServerErrorPB::UNKNOWN_ERROR:
+ default:
+ // The rest is handled in the code below.
+ break;
+ }
+ }
+
+ if (result.status.IsIllegalState() || result.status.IsAborted()) {
+ result.result = RetriableRpcStatus::REPLICA_NOT_LEADER;
+ return result;
+ }
+
+ // Handle the connection negotiation failure case if overall RPC's timeout
+ // hasn't expired yet: if the connection negotiation returned non-OK status,
+ // mark the server as not accessible and rely on the RetriableRpc's logic
+ // to switch to an alternative tablet replica.
+ //
+ // NOTE: Connection negotiation errors related to security are handled in the
+ // code above: see the handlers for IsNotAuthorized(), IsRemoteError().
+ if (!rpc_cb_status.IsTimedOut() && !result.status.ok() &&
+ mutable_retrier()->controller().negotiation_failed()) {
+ result.result = RetriableRpcStatus::SERVER_NOT_ACCESSIBLE;
+ return result;
+ }
+
+ if (result.status.ok()) {
+ result.result = RetriableRpcStatus::OK;
+ } else {
+ result.result = RetriableRpcStatus::NON_RETRIABLE_ERROR;
+ }
+ return result;
+}
+
+void ParticipantRpc::Finish(const Status& status) {
+ // Free memory upon completion.
+ unique_ptr<ParticipantRpc> delete_me(this);
+ if (status.ok() && begin_commit_timestamp_ && resp_.has_timestamp()) {
+ *begin_commit_timestamp_ = Timestamp(resp_.timestamp());
+ }
+ user_cb_(status);
+}
+
+bool ParticipantRpc::GetNewAuthnTokenAndRetry() {
+ resp_.Clear();
+ client_->data_->ConnectToClusterAsync(client_, retrier().deadline(),
+ [this] (const Status& s) { this->GotNewAuthnTokenRetryCb(s); },
+ CredentialsPolicy::PRIMARY_CREDENTIALS);
+ return true;
+}
+
+} // namespace transactions
+} // namespace kudu
diff --git a/src/kudu/transactions/participant_rpc.h
b/src/kudu/transactions/participant_rpc.h
new file mode 100644
index 0000000..f35c45f
--- /dev/null
+++ b/src/kudu/transactions/participant_rpc.h
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "kudu/client/meta_cache.h"
+#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/retriable_rpc.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/status_callback.h"
+
+namespace kudu {
+class MonoTime;
+class Status;
+class Timestamp;
+
+namespace client {
+class KuduClient;
+} // namespace client
+
+namespace transactions {
+
+// Context to be used when sending RPCs to specific tablets.
+struct TxnParticipantContext {
+ client::KuduClient* client;
+ tserver::ParticipantOpPB participant_op;
+
+ // NOTE: this gets set after the tablet lookup completes.
+ scoped_refptr<client::internal::RemoteTablet> tablet;
+};
+
+class ParticipantRpc final : public
rpc::RetriableRpc<client::internal::RemoteTabletServer,
+
tserver::ParticipantRequestPB,
+
tserver::ParticipantResponsePB> {
+ public:
+ // NOTE: if 'begin_commit_timestamp' is non-null, the memory it points to
+ // should stay valid until the RPC completes (i.e. until callback 'user_cb'
+ // is invoked).
+ static ParticipantRpc* NewRpc(std::unique_ptr<TxnParticipantContext> ctx,
+ const MonoTime& deadline,
+ StatusCallback user_cb,
+ Timestamp* begin_commit_timestamp = nullptr);
+ ~ParticipantRpc() {}
+ std::string ToString() const override;
+
+ protected:
+ void Try(client::internal::RemoteTabletServer* replica,
+ const rpc::ResponseCallback& callback) override;
+
+ rpc::RetriableRpcStatus AnalyzeResponse(const Status& rpc_cb_status)
override;
+
+ // Deletes itself upon completion.
+ void Finish(const Status& status) override;
+
+ bool GetNewAuthnTokenAndRetry() override;
+
+ private:
+ ParticipantRpc(std::unique_ptr<TxnParticipantContext> ctx,
+ scoped_refptr<client::internal::MetaCacheServerPicker>
replica_picker,
+ const MonoTime& deadline,
+ StatusCallback user_cb,
+ Timestamp* begin_commit_timestamp);
+
+ client::KuduClient* client_;
+ scoped_refptr<client::internal::RemoteTablet> tablet_;
+ const StatusCallback user_cb_;
+ Timestamp* begin_commit_timestamp_;
+};
+
+} // namespace transactions
+} // namespace kudu
diff --git a/src/kudu/transactions/txn_system_client.cc
b/src/kudu/transactions/txn_system_client.cc
index 30d83d6..48aafd2 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -36,6 +36,7 @@
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/transactions/coordinator_rpc.h"
+#include "kudu/transactions/participant_rpc.h"
#include "kudu/transactions/txn_status_tablet.h"
#include "kudu/transactions/transactions.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
@@ -51,6 +52,7 @@ using kudu::client::KuduTableCreator;
using kudu::client::internal::MetaCache;
using kudu::tserver::CoordinatorOpPB;
using kudu::tserver::CoordinatorOpResultPB;
+using kudu::tserver::ParticipantOpPB;
using std::string;
using std::unique_ptr;
using std::vector;
@@ -291,5 +293,49 @@ Status
TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_tx
return Status::OK();
}
+Status TxnSystemClient::ParticipateInTransaction(const string& tablet_id,
+ const ParticipantOpPB&
participant_op,
+ const MonoDelta& timeout,
+ Timestamp*
begin_commit_timestamp) {
+ Synchronizer sync;
+ ParticipateInTransactionAsync(tablet_id, participant_op, timeout,
+ sync.AsStatusCallback(),
begin_commit_timestamp);
+ return sync.Wait();
+}
+
+void TxnSystemClient::ParticipateInTransactionAsync(const string& tablet_id,
+ ParticipantOpPB
participant_op,
+ const MonoDelta& timeout,
+ StatusCallback cb,
+ Timestamp*
begin_commit_timestamp) {
+ MonoTime deadline = MonoTime::Now() + timeout;
+ unique_ptr<TxnParticipantContext> ctx(
+ new TxnParticipantContext({
+ client_.get(),
+ std::move(participant_op),
+ /*tablet*/nullptr,
+ }));
+ TxnParticipantContext* ctx_raw = ctx.release();
+ // TODO(awong): find a clever way around constructing a std::function here
+ // (maybe some fancy template magic?). For now, we're forced to pass the raw
+ // 'ctx' instead of moving it directly.
+ // See https://taylorconor.com/blog/noncopyable-lambdas/ for more details.
+ client_->data_->meta_cache_->LookupTabletById(
+ client_.get(), tablet_id, deadline, &ctx_raw->tablet,
+ [cb = std::move(cb), deadline, ctx_raw, begin_commit_timestamp] (const
Status& s) mutable {
+ unique_ptr<TxnParticipantContext> unique_ctx(ctx_raw);
+ if (PREDICT_FALSE(!s.ok())) {
+ cb(s);
+ return;
+ }
+ ParticipantRpc* rpc = ParticipantRpc::NewRpc(
+ std::move(unique_ctx),
+ deadline,
+ std::move(cb),
+ begin_commit_timestamp);
+ rpc->SendRpc();
+ });
+}
+
} // namespace transactions
} // namespace kudu
diff --git a/src/kudu/transactions/txn_system_client.h
b/src/kudu/transactions/txn_system_client.h
index 6cb4407..31f13aa 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -33,6 +33,7 @@
namespace kudu {
class HostPort;
+class Timestamp;
namespace client {
class KuduClient;
@@ -47,6 +48,7 @@ class TxnStatusTableITest_TestProtectCreateAndAlter_Test;
namespace tserver {
class CoordinatorOpPB;
class CoordinatorOpResultPB;
+class ParticipantOpPB;
} // namespace tserver
namespace transactions {
@@ -129,6 +131,15 @@ class TxnSystemClient {
// masters.
Status OpenTxnStatusTable();
+ // Sends an RPC to the leader of the given tablet to participate in a
+ // transaction.
+ //
+ // If this is a BEGIN_COMMIT op, 'begin_commit_timestamp' is populated on
success
+ // with the timestamp used to replicate the op on the participant.
+ Status ParticipateInTransaction(const std::string& tablet_id,
+ const tserver::ParticipantOpPB&
participant_op,
+ const MonoDelta& timeout,
+ Timestamp* begin_commit_timestamp = nullptr);
private:
friend class itest::TxnStatusTableITest;
@@ -147,6 +158,12 @@ class TxnSystemClient {
const StatusCallback& cb,
tserver::CoordinatorOpResultPB* result =
nullptr);
+ void ParticipateInTransactionAsync(const std::string& tablet_id,
+ tserver::ParticipantOpPB participant_op,
+ const MonoDelta& timeout,
+ StatusCallback cb,
+ Timestamp* begin_commit_timestamp =
nullptr);
+
client::sp::shared_ptr<client::KuduTable> txn_status_table() {
std::lock_guard<simple_spinlock> l(table_lock_);
return txn_status_table_;