This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 3e6d061 [txn] update not-a-leader retry logic in TxnSystemClient
3e6d061 is described below
commit 3e6d061c0eff0cdc15a72682527ea4b04895f30c
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Dec 2 20:02:18 2020 -0800
[txn] update not-a-leader retry logic in TxnSystemClient
This patch updates the retry logic in TxnSystemClient to handle
the case when TxnStatusManager returns Status::ServiceUnavailable().
In addition, this patch adds extra provision to check for tablet replica
status in StatusManager::CheckTxnStatusDataLoadedUnlocked().
The signatures of the involved methods have been updated as well.
This patch also enhances FinalizeCommitTransaction() to report
on errors from underlying tablet, if any. Also, TXN_ILLEGAL_STATE error
code is added into CoordinateTransactionResponsePB responses when
TxnStatusManager finds the requested transaction in a wrong state.
I didn't add new tests scenarios in this patch: they are present in
follow-up patches which introduce the tracking of stalled transactions.
Change-Id: I1b8abb27c7678c5c616a325343620902f6cbfd59
Reviewed-on: http://gerrit.cloudera.org:8080/16815
Reviewed-by: Andrew Wong <[email protected]>
Reviewed-by: Hao Hao <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
---
src/kudu/client/client-test.cc | 3 +-
src/kudu/tablet/txn_coordinator.h | 20 ++++--
src/kudu/transactions/coordinator_rpc.cc | 57 +++++++++++----
src/kudu/transactions/txn_status_manager-test.cc | 55 +++++++++------
src/kudu/transactions/txn_status_manager.cc | 88 +++++++++++++++++-------
src/kudu/transactions/txn_status_manager.h | 24 +++++--
src/kudu/tserver/tablet_service.cc | 5 +-
7 files changed, 178 insertions(+), 74 deletions(-)
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 57da237..e375c92 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -411,7 +411,8 @@ class ClientTest : public KuduTest {
if (txn_id > highest_txn_id) {
continue;
}
- auto s = c->FinalizeCommitTransaction(txn_id);
+ tserver::TabletServerErrorPB ts_error;
+ auto s = c->FinalizeCommitTransaction(txn_id, &ts_error);
if (s.IsNotFound()) {
continue;
}
diff --git a/src/kudu/tablet/txn_coordinator.h
b/src/kudu/tablet/txn_coordinator.h
index 44e1065..8b718ce 100644
--- a/src/kudu/tablet/txn_coordinator.h
+++ b/src/kudu/tablet/txn_coordinator.h
@@ -75,7 +75,8 @@ class TxnCoordinator {
// no such transaction), returns an error without populating 'ts_error'.
//
// TODO(awong): add a commit timestamp.
- virtual Status FinalizeCommitTransaction(int64_t txn_id) = 0;
+ virtual Status FinalizeCommitTransaction(
+ int64_t txn_id, tserver::TabletServerErrorPB* ts_error) = 0;
// Aborts the given transaction as the given user.
//
@@ -85,10 +86,19 @@ class TxnCoordinator {
virtual Status AbortTransaction(int64_t txn_id, const std::string& user,
tserver::TabletServerErrorPB* ts_error) = 0;
- // Retrieves the status entry for the specified transaction.
- virtual Status GetTransactionStatus(int64_t txn_id,
- const std::string& user,
- transactions::TxnStatusEntryPB*
txn_status) = 0;
+
+ // Retrieves the status entry for the specified transaction, returning
+ // Status::OK() in case of success with 'txn_status' populated. In case of
+ // error, returns non-OK status. 'ts_error' is used to return not-the-leader
+ // error to let the caller know that the call reached a non-leader replica
and
+ // it's not up to its purpose, so the client needs to retry the call against
+ // a leader replica. In the latter case, the method returns
+ // Status::ServiceUnavailable().
+ virtual Status GetTransactionStatus(
+ int64_t txn_id,
+ const std::string& user,
+ transactions::TxnStatusEntryPB* txn_status,
+ tserver::TabletServerErrorPB* ts_error) = 0;
// Registers a participant tablet ID to the given transaction ID as the given
// user.
diff --git a/src/kudu/transactions/coordinator_rpc.cc
b/src/kudu/transactions/coordinator_rpc.cc
index d8fa3d6..538c137 100644
--- a/src/kudu/transactions/coordinator_rpc.cc
+++ b/src/kudu/transactions/coordinator_rpc.cc
@@ -50,12 +50,14 @@ using kudu::rpc::ResponseCallback;
using kudu::rpc::RetriableRpc;
using kudu::rpc::RetriableRpcStatus;
using kudu::tserver::CoordinatorOpResultPB;
+using kudu::tserver::TabletServerErrorPB;
using std::string;
using std::unique_ptr;
using strings::Substitute;
namespace kudu {
class MonoTime;
+
namespace transactions {
CoordinatorRpc* CoordinatorRpc::NewRpc(
@@ -84,15 +86,10 @@ string CoordinatorRpc::ToString() const {
void CoordinatorRpc::Finish(const Status& status) {
// Free memory upon completion.
unique_ptr<CoordinatorRpc> this_instance(this);
- Status final_status = status;
- if (final_status.ok() &&
- resp_.has_op_result() && resp_.op_result().has_op_error()) {
- final_status = StatusFromPB(resp_.op_result().op_error());
- }
if (resp_.has_op_result() && op_result_) {
*op_result_ = resp_.op_result();
}
- cb_(final_status);
+ cb_(status);
}
bool CoordinatorRpc::GetNewAuthnTokenAndRetry() {
@@ -133,7 +130,14 @@ RetriableRpcStatus CoordinatorRpc::AnalyzeResponse(const
Status& rpc_cb_status)
// We only analyze OK statuses if we succeeded to do the tablet lookup. In
// either case, let's examine whatever errors exist.
RetriableRpcStatus result;
- result.status = rpc_cb_status.ok() ? retrier().controller().status() :
rpc_cb_status;
+ result.status = rpc_cb_status.ok() ? retrier().controller().status()
+ : rpc_cb_status;
+ if (result.status.ok() &&
+ resp_.has_op_result() && resp_.op_result().has_op_error()) {
+ // Extract the application-level error (AppStatusPB), if any, and convert
it
+ // into Status to allow the retry logic to work as expected.
+ result.status = StatusFromPB(resp_.op_result().op_error());
+ }
// Check for specific RPC errors.
if (result.status.IsRemoteError()) {
@@ -178,17 +182,42 @@ RetriableRpcStatus CoordinatorRpc::AnalyzeResponse(const
Status& rpc_cb_status)
// errors -- from here on out, the result status will be the response error.
if (result.status.ok() && resp_.has_error()) {
result.status = StatusFromPB(resp_.error().status());
+ DCHECK(!result.status.ok());
}
- // If we get TABLET_NOT_FOUND, the replica we thought was leader has been
- // deleted.
- if (resp_.has_error() &&
- resp_.error().code() == tserver::TabletServerErrorPB::TABLET_NOT_FOUND) {
- result.result = RetriableRpcStatus::RESOURCE_NOT_FOUND;
- return result;
+ if (resp_.has_error()) {
+ const auto code = resp_.error().code();
+ switch (code) {
+ // If we get TABLET_NOT_FOUND, the replica we thought was leader
+ // has been deleted.
+ case TabletServerErrorPB::TABLET_NOT_FOUND:
+ case TabletServerErrorPB::TABLET_FAILED:
+ result.result = RetriableRpcStatus::RESOURCE_NOT_FOUND;
+ return result;
+
+ case TabletServerErrorPB::TABLET_NOT_RUNNING:
+ case TabletServerErrorPB::THROTTLED:
+ result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
+ return result;
+
+ case TabletServerErrorPB::NOT_THE_LEADER:
+ result.result = RetriableRpcStatus::REPLICA_NOT_LEADER;
+ return result;
+
+ case TabletServerErrorPB::TXN_ILLEGAL_STATE:
+ result.result = RetriableRpcStatus::NON_RETRIABLE_ERROR;
+ return result;
+
+ case TabletServerErrorPB::UNKNOWN_ERROR:
+ default:
+ // The rest is handled in the code below.
+ break;
+ }
}
- if (result.status.IsIllegalState() || result.status.IsAborted()) {
+ if (result.status.IsAborted() || result.status.IsIllegalState()) {
+ // This is to handle "Op aborted by new leader" Raft replication errors or
+ // non-a-Raft-leader errors.
result.result = RetriableRpcStatus::REPLICA_NOT_LEADER;
return result;
}
diff --git a/src/kudu/transactions/txn_status_manager-test.cc
b/src/kudu/transactions/txn_status_manager-test.cc
index eb81571..146024f 100644
--- a/src/kudu/transactions/txn_status_manager-test.cc
+++ b/src/kudu/transactions/txn_status_manager-test.cc
@@ -193,7 +193,7 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
ASSERT_TRUE(s.IsServiceUnavailable());
ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
- s = tsm.FinalizeCommitTransaction(txn_id);
+ s = tsm.FinalizeCommitTransaction(txn_id, &ts_error);
ASSERT_TRUE(s.IsServiceUnavailable());
ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
@@ -202,7 +202,7 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
transactions::TxnStatusEntryPB txn_status;
- s = tsm.GetTransactionStatus(txn_id, kOwner, &txn_status);
+ s = tsm.GetTransactionStatus(txn_id, kOwner, &txn_status, &ts_error);
ASSERT_TRUE(s.IsServiceUnavailable());
ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
@@ -372,7 +372,7 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
statuses[i] = txn_manager_->BeginCommitTransaction(txn_id, kOwner,
&ts_error);
break;
case TxnStatePB::COMMITTED:
- statuses[i] = txn_manager_->FinalizeCommitTransaction(txn_id);
+ statuses[i] = txn_manager_->FinalizeCommitTransaction(txn_id,
&ts_error);
break;
default:
FAIL() << "bad update";
@@ -428,21 +428,24 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));
TxnStatusEntryPB txn_status;
- ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+ ASSERT_OK(txn_manager_->GetTransactionStatus(
+ 1, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::OPEN, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
ASSERT_EQ(kOwner, txn_status.user());
ASSERT_OK(txn_manager_->BeginCommitTransaction(1, kOwner, &ts_error));
- ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+ ASSERT_OK(txn_manager_->GetTransactionStatus(
+ 1, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
ASSERT_EQ(kOwner, txn_status.user());
- ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1));
- ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+ ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1, &ts_error));
+ ASSERT_OK(txn_manager_->GetTransactionStatus(
+ 1, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::COMMITTED, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
@@ -455,7 +458,8 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
ASSERT_OK(txn_manager_->AbortTransaction(2, kOwner, &ts_error));
TxnStatusEntryPB txn_status;
- ASSERT_OK(txn_manager_->GetTransactionStatus(2, kOwner, &txn_status));
+ ASSERT_OK(txn_manager_->GetTransactionStatus(
+ 2, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::ABORTED, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
@@ -484,25 +488,30 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
// rebuilding the TxnStatusManager from scratch.
{
TxnStatusEntryPB txn_status;
- ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+ TabletServerErrorPB ts_error;
+ ASSERT_OK(txn_manager_->GetTransactionStatus(
+ 1, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::COMMITTED, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
ASSERT_EQ(kOwner, txn_status.user());
- ASSERT_OK(txn_manager_->GetTransactionStatus(2, kOwner, &txn_status));
+ ASSERT_OK(txn_manager_->GetTransactionStatus(
+ 2, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::ABORTED, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
ASSERT_EQ(kOwner, txn_status.user());
- ASSERT_OK(txn_manager_->GetTransactionStatus(3, kOwner, &txn_status));
+ ASSERT_OK(txn_manager_->GetTransactionStatus(
+ 3, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
ASSERT_EQ(kOwner, txn_status.user());
- ASSERT_OK(txn_manager_->GetTransactionStatus(4, kOwner, &txn_status));
+ ASSERT_OK(txn_manager_->GetTransactionStatus(
+ 4, kOwner, &txn_status, &ts_error));
ASSERT_TRUE(txn_status.has_state());
ASSERT_EQ(TxnStatePB::OPEN, txn_status.state());
ASSERT_TRUE(txn_status.has_user());
@@ -512,21 +521,27 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
// Supplying wrong user.
{
TxnStatusEntryPB txn_status;
- auto s = txn_manager_->GetTransactionStatus(1, "stranger", &txn_status);
+ TabletServerErrorPB ts_error;
+ auto s = txn_manager_->GetTransactionStatus(
+ 1, "stranger", &txn_status, &ts_error);
ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
}
// Supplying not-yet-used transaction ID.
{
TxnStatusEntryPB txn_status;
- auto s = txn_manager_->GetTransactionStatus(0, kOwner, &txn_status);
+ TabletServerErrorPB ts_error;
+ auto s = txn_manager_->GetTransactionStatus(
+ 0, kOwner, &txn_status, &ts_error);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
}
// Supplying wrong user and not-yet-used transaction ID.
{
TxnStatusEntryPB txn_status;
- auto s = txn_manager_->GetTransactionStatus(0, "stranger", &txn_status);
+ TabletServerErrorPB ts_error;
+ auto s = txn_manager_->GetTransactionStatus(
+ 0, "stranger", &txn_status, &ts_error);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
}
}
@@ -579,23 +594,23 @@ TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) {
// We can't begin or finalize a commit if we've aborted.
Status s = txn_manager_->BeginCommitTransaction(kTxnId1, kOwner, &ts_error);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
- s = txn_manager_->FinalizeCommitTransaction(kTxnId1);
+ s = txn_manager_->FinalizeCommitTransaction(kTxnId1, &ts_error);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
// We can't finalize a commit that hasn't begun committing.
const int64_t kTxnId2 = 2;
ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, nullptr,
&ts_error));
- s = txn_manager_->FinalizeCommitTransaction(kTxnId2);
+ s = txn_manager_->FinalizeCommitTransaction(kTxnId2, &ts_error);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
// We can't abort a transaction that has finished committing.
ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId2, kOwner, &ts_error));
- ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2));
+ ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2, &ts_error));
s = txn_manager_->AbortTransaction(kTxnId2, kOwner, &ts_error);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
// Redundant finalize calls are also benign.
- ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2));
+ ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2, &ts_error));
// Calls to begin committing should return an error if we've already
// finalized the commit.
@@ -625,7 +640,7 @@ TEST_F(TxnStatusManagerTest,
TestRegisterParticipantsWithStates) {
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
// We can't register participants when we've finished committnig.
- ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId1));
+ ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId1, &ts_error));
s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner,
&ts_error);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
diff --git a/src/kudu/transactions/txn_status_manager.cc
b/src/kudu/transactions/txn_status_manager.cc
index 66a3c14..2030d88 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -27,10 +27,14 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/tablet_replica.h"
#include "kudu/transactions/transactions.pb.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/cow_object.h"
@@ -57,6 +61,7 @@
TAG_FLAG(txn_status_manager_inject_latency_load_from_tablet_ms, unsafe);
using kudu::pb_util::SecureShortDebugString;
using kudu::tablet::ParticipantIdsByTxnId;
using kudu::tserver::TabletServerErrorPB;
+using kudu::consensus::RaftPeerPB;
using std::string;
using std::vector;
using strings::Substitute;
@@ -76,6 +81,17 @@ namespace {
constexpr int64_t kIdStatusDataNotLoaded = -2;
constexpr int64_t kIdStatusDataReady = -1;
+Status ReportIllegalTxnState(const string& errmsg,
+ TabletServerErrorPB* ts_error) {
+ DCHECK(ts_error);
+ auto s = Status::IllegalState(errmsg);
+ TabletServerErrorPB error;
+ StatusToPB(s, error.mutable_status());
+ error.set_code(TabletServerErrorPB::TXN_ILLEGAL_STATE);
+ *ts_error = std::move(error);
+ return s;
+}
+
} // anonymous namespace
TxnStatusManagerBuildingVisitor::TxnStatusManagerBuildingVisitor()
@@ -138,7 +154,9 @@ Status TxnStatusManager::LoadFromTablet() {
return Status::OK();
}
-Status TxnStatusManager::CheckTxnStatusDataLoadedUnlocked() const {
+Status TxnStatusManager::CheckTxnStatusDataLoadedUnlocked(
+ TabletServerErrorPB* ts_error) const {
+ DCHECK(ts_error);
DCHECK(lock_.is_locked());
// TODO(aserbin): this is just to handle requests which come in a short time
// interval when the leader replica of the transaction status
@@ -151,18 +169,30 @@ Status
TxnStatusManager::CheckTxnStatusDataLoadedUnlocked() const {
if (PREDICT_FALSE(highest_txn_id_ <= kIdStatusDataNotLoaded)) {
return Status::ServiceUnavailable("transaction status data is not loaded");
}
+ auto* consensus = status_tablet_.tablet_replica_->consensus();
+ DCHECK(consensus);
+ if (consensus->role() != RaftPeerPB::LEADER) {
+ static const Status kErrStatus = Status::ServiceUnavailable(
+ "txn status tablet replica is not a leader");
+ TabletServerErrorPB error;
+ StatusToPB(kErrStatus, error.mutable_status());
+ error.set_code(TabletServerErrorPB::NOT_THE_LEADER);
+ *ts_error = std::move(error);
+ return kErrStatus;
+ }
return Status::OK();
}
Status TxnStatusManager::GetTransaction(int64_t txn_id,
const boost::optional<string>& user,
- scoped_refptr<TransactionEntry>* txn)
const {
+ scoped_refptr<TransactionEntry>* txn,
+ TabletServerErrorPB* ts_error) const {
std::lock_guard<simple_spinlock> l(lock_);
// First, make sure the transaction status data has been loaded. If not, then
// the caller might get an unexpected error response and bail instead of
// retrying a bit later and getting proper response.
- RETURN_NOT_OK(CheckTxnStatusDataLoadedUnlocked());
+ RETURN_NOT_OK(CheckTxnStatusDataLoadedUnlocked(ts_error));
scoped_refptr<TransactionEntry> ret = FindPtrOrNull(txns_by_id_, txn_id);
if (PREDICT_FALSE(!ret)) {
@@ -201,7 +231,7 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id,
//
// If this check fails, don not set the 'highest_seen_txn_id' because
// 'highest_txn_id_' doesn't contain any meaningful value yet.
- RETURN_NOT_OK(CheckTxnStatusDataLoadedUnlocked());
+ RETURN_NOT_OK(CheckTxnStatusDataLoadedUnlocked(ts_error));
// Second, make sure the requested ID is viable.
if (PREDICT_FALSE(txn_id <= highest_txn_id_)) {
@@ -256,7 +286,7 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id,
Status TxnStatusManager::BeginCommitTransaction(int64_t txn_id, const string&
user,
TabletServerErrorPB* ts_error)
{
scoped_refptr<TransactionEntry> txn;
- RETURN_NOT_OK(GetTransaction(txn_id, user, &txn));
+ RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
const auto& pb = txn_lock.data().pb;
@@ -265,9 +295,9 @@ Status TxnStatusManager::BeginCommitTransaction(int64_t
txn_id, const string& us
return Status::OK();
}
if (PREDICT_FALSE(state != TxnStatePB::OPEN)) {
- return Status::IllegalState(
- Substitute("transaction ID $0 is not open: $1",
- txn_id, SecureShortDebugString(pb)));
+ return ReportIllegalTxnState(Substitute("transaction ID $0 is not open:
$1",
+ txn_id,
SecureShortDebugString(pb)),
+ ts_error);
}
auto* mutable_data = txn_lock.mutable_data();
mutable_data->pb.set_state(TxnStatePB::COMMIT_IN_PROGRESS);
@@ -276,9 +306,11 @@ Status TxnStatusManager::BeginCommitTransaction(int64_t
txn_id, const string& us
return Status::OK();
}
-Status TxnStatusManager::FinalizeCommitTransaction(int64_t txn_id) {
+Status TxnStatusManager::FinalizeCommitTransaction(
+ int64_t txn_id,
+ TabletServerErrorPB* ts_error) {
scoped_refptr<TransactionEntry> txn;
- RETURN_NOT_OK(GetTransaction(txn_id, boost::none, &txn));
+ RETURN_NOT_OK(GetTransaction(txn_id, boost::none, &txn, ts_error));
TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
const auto& pb = txn_lock.data().pb;
@@ -287,14 +319,15 @@ Status
TxnStatusManager::FinalizeCommitTransaction(int64_t txn_id) {
return Status::OK();
}
if (PREDICT_FALSE(state != TxnStatePB::COMMIT_IN_PROGRESS)) {
- return Status::IllegalState(
+ return ReportIllegalTxnState(
Substitute("transaction ID $0 is not committing: $1",
- txn_id, SecureShortDebugString(pb)));
+ txn_id, SecureShortDebugString(pb)),
+ ts_error);
}
auto* mutable_data = txn_lock.mutable_data();
mutable_data->pb.set_state(TxnStatePB::COMMITTED);
- TabletServerErrorPB ts_error;
- RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb,
&ts_error));
+ RETURN_NOT_OK(status_tablet_.UpdateTransaction(
+ txn_id, mutable_data->pb, ts_error));
txn_lock.Commit();
return Status::OK();
}
@@ -302,7 +335,7 @@ Status TxnStatusManager::FinalizeCommitTransaction(int64_t
txn_id) {
Status TxnStatusManager::AbortTransaction(int64_t txn_id, const std::string&
user,
TabletServerErrorPB* ts_error) {
scoped_refptr<TransactionEntry> txn;
- RETURN_NOT_OK(GetTransaction(txn_id, user, &txn));
+ RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
const auto& pb = txn_lock.data().pb;
@@ -312,9 +345,10 @@ Status TxnStatusManager::AbortTransaction(int64_t txn_id,
const std::string& use
}
if (PREDICT_FALSE(state != TxnStatePB::OPEN &&
state != TxnStatePB::COMMIT_IN_PROGRESS)) {
- return Status::IllegalState(
+ return ReportIllegalTxnState(
Substitute("transaction ID $0 cannot be aborted: $1",
- txn_id, SecureShortDebugString(pb)));
+ txn_id, SecureShortDebugString(pb)),
+ ts_error);
}
auto* mutable_data = txn_lock.mutable_data();
mutable_data->pb.set_state(TxnStatePB::ABORTED);
@@ -326,10 +360,11 @@ Status TxnStatusManager::AbortTransaction(int64_t txn_id,
const std::string& use
Status TxnStatusManager::GetTransactionStatus(
int64_t txn_id,
const std::string& user,
- transactions::TxnStatusEntryPB* txn_status) {
+ transactions::TxnStatusEntryPB* txn_status,
+ TabletServerErrorPB* ts_error) {
DCHECK(txn_status);
scoped_refptr<TransactionEntry> txn;
- RETURN_NOT_OK(GetTransaction(txn_id, user, &txn));
+ RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
TransactionEntryLock txn_lock(txn.get(), LockMode::READ);
const auto& pb = txn_lock.data().pb;
@@ -341,10 +376,13 @@ Status TxnStatusManager::GetTransactionStatus(
return Status::OK();
}
-Status TxnStatusManager::RegisterParticipant(int64_t txn_id, const string&
tablet_id,
- const string& user,
TabletServerErrorPB* ts_error) {
+Status TxnStatusManager::RegisterParticipant(
+ int64_t txn_id,
+ const string& tablet_id,
+ const string& user,
+ TabletServerErrorPB* ts_error) {
scoped_refptr<TransactionEntry> txn;
- RETURN_NOT_OK(GetTransaction(txn_id, user, &txn));
+ RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
// Lock the transaction in read mode and check that it's open. If the
// transaction isn't open, e.g. because a commit is already in progress,
@@ -352,9 +390,9 @@ Status TxnStatusManager::RegisterParticipant(int64_t
txn_id, const string& table
TransactionEntryLock txn_lock(txn.get(), LockMode::READ);
const auto& txn_state = txn_lock.data().pb.state();
if (PREDICT_FALSE(txn_state != TxnStatePB::OPEN)) {
- return Status::IllegalState(
- Substitute("transaction ID $0 not open: $1",
- txn_id, TxnStatePB_Name(txn_state)));
+ return ReportIllegalTxnState(Substitute("transaction ID $0 not open: $1",
+ txn_id,
TxnStatePB_Name(txn_state)),
+ ts_error);
}
auto participant = txn->GetOrCreateParticipant(tablet_id);
diff --git a/src/kudu/transactions/txn_status_manager.h
b/src/kudu/transactions/txn_status_manager.h
index 48028bc..25ae220 100644
--- a/src/kudu/transactions/txn_status_manager.h
+++ b/src/kudu/transactions/txn_status_manager.h
@@ -108,7 +108,8 @@ class TxnStatusManager final : public
tablet::TxnCoordinator {
// so it doesn't take a user.
//
// TODO(awong): add a commit timestamp.
- Status FinalizeCommitTransaction(int64_t txn_id) override;
+ Status FinalizeCommitTransaction(int64_t txn_id,
+ tserver::TabletServerErrorPB* ts_error)
override;
// Aborts the given transaction, returning an error if the transaction
// doesn't exist, is committed or not yet opened, or isn't owned by the given
@@ -120,7 +121,8 @@ class TxnStatusManager final : public
tablet::TxnCoordinator {
// the transaction doesn't exist or isn't owned by the specified user.
Status GetTransactionStatus(int64_t txn_id,
const std::string& user,
- transactions::TxnStatusEntryPB* txn_status)
override;
+ transactions::TxnStatusEntryPB* txn_status,
+ tserver::TabletServerErrorPB* ts_error) override;
// Creates an in-memory participant, writes an entry to the status table, and
// attaches the in-memory participant to the transaction.
@@ -142,15 +144,23 @@ class TxnStatusManager final : public
tablet::TxnCoordinator {
private:
// Verifies that the transaction status data has already been loaded from the
- // underlying tablet. Returns Status::OK() if the data is loaded, otherwise
- // returns Status::ServiceUnavailable().
- Status CheckTxnStatusDataLoadedUnlocked() const;
+ // underlying tablet and the replica is a leader. Returns Status::OK() if the
+ // data is loaded and the replica is a leader. Otherwise, if the data hasn't
+ // been loaded yet, return Status::ServiceUnavailable(). If the data has
+ // been loaded, but the replica isn't a leader, returns
+ // Status::ServiceUnavailable() and sets the code in 'ts_error'
+ // to TabletServerErrorPB::NOT_THE_LEADER.
+ Status CheckTxnStatusDataLoadedUnlocked(
+ tserver::TabletServerErrorPB* ts_error) const;
// Returns the transaction entry, returning an error if the transaction ID
// doesn't exist or if 'user' is specified but isn't the owner of the
- // transaction.
+ // transaction. In addition, if the underlying replica isn't a leader,
+ // sets the code in 'ts_error' to TabletServerErrorPB::NOT_THE_LEADER
+ // correspondingly.
Status GetTransaction(int64_t txn_id, const boost::optional<std::string>&
user,
- scoped_refptr<TransactionEntry>* txn) const;
+ scoped_refptr<TransactionEntry>* txn,
+ tserver::TabletServerErrorPB* ts_error) const;
// Protects 'highest_txn_id_' and 'txns_by_id_'.
mutable simple_spinlock lock_;
diff --git a/src/kudu/tserver/tablet_service.cc
b/src/kudu/tserver/tablet_service.cc
index f6118b9..ec6c608 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -30,9 +30,9 @@
#include <vector>
#include <boost/optional/optional.hpp>
-#include <boost/type_traits/decay.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
+#include <google/protobuf/stubs/port.h>
#include "kudu/clock/clock.h"
#include "kudu/common/column_predicate.h"
@@ -1268,7 +1268,8 @@ void TabletServiceAdminImpl::CoordinateTransaction(const
CoordinateTransactionRe
s = txn_coordinator->AbortTransaction(txn_id, user, &ts_error);
break;
case CoordinatorOpPB::GET_TXN_STATUS:
- s = txn_coordinator->GetTransactionStatus(txn_id, user, &txn_status);
+ s = txn_coordinator->GetTransactionStatus(
+ txn_id, user, &txn_status, &ts_error);
break;
default:
s = Status::InvalidArgument(Substitute("Unknown op type: $0",
op.type()));