This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e6d061  [txn] update not-a-leader retry logic in TxnSystemClient
3e6d061 is described below

commit 3e6d061c0eff0cdc15a72682527ea4b04895f30c
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Dec 2 20:02:18 2020 -0800

    [txn] update not-a-leader retry logic in TxnSystemClient
    
    This patch updates the retry logic in TxnSystemClient to handle
    the case when TxnStatusManager returns Status::ServiceUnavailable().
    
    In addition, this patch adds extra provision to check for tablet replica
    status in StatusManager::CheckTxnStatusDataLoadedUnlocked().
    The signatures of the involved methods have been updated as well.
    
    This patch also enhances FinalizeCommitTransaction() to report
    on errors from underlying tablet, if any.  Also, TXN_ILLEGAL_STATE error
    code is added into CoordinateTransactionResponsePB responses when
    TxnStatusManager finds the requested transaction in a wrong state.
    
    I didn't add new tests scenarios in this patch: they are present in
    follow-up patches which introduce the tracking of stalled transactions.
    
    Change-Id: I1b8abb27c7678c5c616a325343620902f6cbfd59
    Reviewed-on: http://gerrit.cloudera.org:8080/16815
    Reviewed-by: Andrew Wong <[email protected]>
    Reviewed-by: Hao Hao <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
---
 src/kudu/client/client-test.cc                   |  3 +-
 src/kudu/tablet/txn_coordinator.h                | 20 ++++--
 src/kudu/transactions/coordinator_rpc.cc         | 57 +++++++++++----
 src/kudu/transactions/txn_status_manager-test.cc | 55 +++++++++------
 src/kudu/transactions/txn_status_manager.cc      | 88 +++++++++++++++++-------
 src/kudu/transactions/txn_status_manager.h       | 24 +++++--
 src/kudu/tserver/tablet_service.cc               |  5 +-
 7 files changed, 178 insertions(+), 74 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 57da237..e375c92 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -411,7 +411,8 @@ class ClientTest : public KuduTest {
         if (txn_id > highest_txn_id) {
           continue;
         }
-        auto s = c->FinalizeCommitTransaction(txn_id);
+        tserver::TabletServerErrorPB ts_error;
+        auto s = c->FinalizeCommitTransaction(txn_id, &ts_error);
         if (s.IsNotFound()) {
           continue;
         }
diff --git a/src/kudu/tablet/txn_coordinator.h 
b/src/kudu/tablet/txn_coordinator.h
index 44e1065..8b718ce 100644
--- a/src/kudu/tablet/txn_coordinator.h
+++ b/src/kudu/tablet/txn_coordinator.h
@@ -75,7 +75,8 @@ class TxnCoordinator {
   // no such transaction), returns an error without populating 'ts_error'.
   //
   // TODO(awong): add a commit timestamp.
-  virtual Status FinalizeCommitTransaction(int64_t txn_id) = 0;
+  virtual Status FinalizeCommitTransaction(
+      int64_t txn_id, tserver::TabletServerErrorPB* ts_error) = 0;
 
   // Aborts the given transaction as the given user.
   //
@@ -85,10 +86,19 @@ class TxnCoordinator {
   virtual Status AbortTransaction(int64_t txn_id, const std::string& user,
                                   tserver::TabletServerErrorPB* ts_error) = 0;
 
-  // Retrieves the status entry for the specified transaction.
-  virtual Status GetTransactionStatus(int64_t txn_id,
-                                      const std::string& user,
-                                      transactions::TxnStatusEntryPB* 
txn_status) = 0;
+
+  // Retrieves the status entry for the specified transaction, returning
+  // Status::OK() in case of success with 'txn_status' populated. In case of
+  // error, returns non-OK status. 'ts_error' is used to return not-the-leader
+  // error to let the caller know that the call reached a non-leader replica 
and
+  // it's not up to its purpose, so the client needs to retry the call against
+  // a leader replica. In the latter case, the method returns
+  // Status::ServiceUnavailable().
+  virtual Status GetTransactionStatus(
+      int64_t txn_id,
+      const std::string& user,
+      transactions::TxnStatusEntryPB* txn_status,
+      tserver::TabletServerErrorPB* ts_error) = 0;
 
   // Registers a participant tablet ID to the given transaction ID as the given
   // user.
diff --git a/src/kudu/transactions/coordinator_rpc.cc 
b/src/kudu/transactions/coordinator_rpc.cc
index d8fa3d6..538c137 100644
--- a/src/kudu/transactions/coordinator_rpc.cc
+++ b/src/kudu/transactions/coordinator_rpc.cc
@@ -50,12 +50,14 @@ using kudu::rpc::ResponseCallback;
 using kudu::rpc::RetriableRpc;
 using kudu::rpc::RetriableRpcStatus;
 using kudu::tserver::CoordinatorOpResultPB;
+using kudu::tserver::TabletServerErrorPB;
 using std::string;
 using std::unique_ptr;
 using strings::Substitute;
 
 namespace kudu {
 class MonoTime;
+
 namespace transactions {
 
 CoordinatorRpc* CoordinatorRpc::NewRpc(
@@ -84,15 +86,10 @@ string CoordinatorRpc::ToString() const {
 void CoordinatorRpc::Finish(const Status& status) {
   // Free memory upon completion.
   unique_ptr<CoordinatorRpc> this_instance(this);
-  Status final_status = status;
-  if (final_status.ok() &&
-      resp_.has_op_result() && resp_.op_result().has_op_error()) {
-    final_status = StatusFromPB(resp_.op_result().op_error());
-  }
   if (resp_.has_op_result() && op_result_) {
     *op_result_ = resp_.op_result();
   }
-  cb_(final_status);
+  cb_(status);
 }
 
 bool CoordinatorRpc::GetNewAuthnTokenAndRetry() {
@@ -133,7 +130,14 @@ RetriableRpcStatus CoordinatorRpc::AnalyzeResponse(const 
Status& rpc_cb_status)
   // We only analyze OK statuses if we succeeded to do the tablet lookup. In
   // either case, let's examine whatever errors exist.
   RetriableRpcStatus result;
-  result.status = rpc_cb_status.ok() ? retrier().controller().status() : 
rpc_cb_status;
+  result.status = rpc_cb_status.ok() ? retrier().controller().status()
+                                     : rpc_cb_status;
+  if (result.status.ok() &&
+      resp_.has_op_result() && resp_.op_result().has_op_error()) {
+    // Extract the application-level error (AppStatusPB), if any, and convert 
it
+    // into Status to allow the retry logic to work as expected.
+    result.status = StatusFromPB(resp_.op_result().op_error());
+  }
 
   // Check for specific RPC errors.
   if (result.status.IsRemoteError()) {
@@ -178,17 +182,42 @@ RetriableRpcStatus CoordinatorRpc::AnalyzeResponse(const 
Status& rpc_cb_status)
   // errors -- from here on out, the result status will be the response error.
   if (result.status.ok() && resp_.has_error()) {
     result.status = StatusFromPB(resp_.error().status());
+    DCHECK(!result.status.ok());
   }
 
-  // If we get TABLET_NOT_FOUND, the replica we thought was leader has been
-  // deleted.
-  if (resp_.has_error() &&
-      resp_.error().code() == tserver::TabletServerErrorPB::TABLET_NOT_FOUND) {
-    result.result = RetriableRpcStatus::RESOURCE_NOT_FOUND;
-    return result;
+  if (resp_.has_error()) {
+    const auto code = resp_.error().code();
+    switch (code) {
+      // If we get TABLET_NOT_FOUND, the replica we thought was leader
+      // has been deleted.
+      case TabletServerErrorPB::TABLET_NOT_FOUND:
+      case TabletServerErrorPB::TABLET_FAILED:
+        result.result = RetriableRpcStatus::RESOURCE_NOT_FOUND;
+        return result;
+
+      case TabletServerErrorPB::TABLET_NOT_RUNNING:
+      case TabletServerErrorPB::THROTTLED:
+        result.result = RetriableRpcStatus::SERVICE_UNAVAILABLE;
+        return result;
+
+      case TabletServerErrorPB::NOT_THE_LEADER:
+        result.result = RetriableRpcStatus::REPLICA_NOT_LEADER;
+        return result;
+
+      case TabletServerErrorPB::TXN_ILLEGAL_STATE:
+        result.result = RetriableRpcStatus::NON_RETRIABLE_ERROR;
+        return result;
+
+      case TabletServerErrorPB::UNKNOWN_ERROR:
+      default:
+        // The rest is handled in the code below.
+        break;
+    }
   }
 
-  if (result.status.IsIllegalState() || result.status.IsAborted()) {
+  if (result.status.IsAborted() || result.status.IsIllegalState()) {
+    // This is to handle "Op aborted by new leader" Raft replication errors or
+    // non-a-Raft-leader errors.
     result.result = RetriableRpcStatus::REPLICA_NOT_LEADER;
     return result;
   }
diff --git a/src/kudu/transactions/txn_status_manager-test.cc 
b/src/kudu/transactions/txn_status_manager-test.cc
index eb81571..146024f 100644
--- a/src/kudu/transactions/txn_status_manager-test.cc
+++ b/src/kudu/transactions/txn_status_manager-test.cc
@@ -193,7 +193,7 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
       ASSERT_TRUE(s.IsServiceUnavailable());
       ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
 
-      s = tsm.FinalizeCommitTransaction(txn_id);
+      s = tsm.FinalizeCommitTransaction(txn_id, &ts_error);
       ASSERT_TRUE(s.IsServiceUnavailable());
       ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
 
@@ -202,7 +202,7 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
       ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
 
       transactions::TxnStatusEntryPB txn_status;
-      s = tsm.GetTransactionStatus(txn_id, kOwner, &txn_status);
+      s = tsm.GetTransactionStatus(txn_id, kOwner, &txn_status, &ts_error);
       ASSERT_TRUE(s.IsServiceUnavailable());
       ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
 
@@ -372,7 +372,7 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
           statuses[i] = txn_manager_->BeginCommitTransaction(txn_id, kOwner, 
&ts_error);
           break;
         case TxnStatePB::COMMITTED:
-          statuses[i] = txn_manager_->FinalizeCommitTransaction(txn_id);
+          statuses[i] = txn_manager_->FinalizeCommitTransaction(txn_id, 
&ts_error);
           break;
         default:
           FAIL() << "bad update";
@@ -428,21 +428,24 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));
 
     TxnStatusEntryPB txn_status;
-    ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+    ASSERT_OK(txn_manager_->GetTransactionStatus(
+        1, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::OPEN, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
     ASSERT_EQ(kOwner, txn_status.user());
 
     ASSERT_OK(txn_manager_->BeginCommitTransaction(1, kOwner, &ts_error));
-    ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+    ASSERT_OK(txn_manager_->GetTransactionStatus(
+        1, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
     ASSERT_EQ(kOwner, txn_status.user());
 
-    ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1));
-    ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+    ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1, &ts_error));
+    ASSERT_OK(txn_manager_->GetTransactionStatus(
+        1, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::COMMITTED, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
@@ -455,7 +458,8 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     ASSERT_OK(txn_manager_->AbortTransaction(2, kOwner, &ts_error));
 
     TxnStatusEntryPB txn_status;
-    ASSERT_OK(txn_manager_->GetTransactionStatus(2, kOwner, &txn_status));
+    ASSERT_OK(txn_manager_->GetTransactionStatus(
+        2, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::ABORTED, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
@@ -484,25 +488,30 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
   // rebuilding the TxnStatusManager from scratch.
   {
     TxnStatusEntryPB txn_status;
-    ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+    TabletServerErrorPB ts_error;
+    ASSERT_OK(txn_manager_->GetTransactionStatus(
+        1, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::COMMITTED, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
     ASSERT_EQ(kOwner, txn_status.user());
 
-    ASSERT_OK(txn_manager_->GetTransactionStatus(2, kOwner, &txn_status));
+    ASSERT_OK(txn_manager_->GetTransactionStatus(
+        2, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::ABORTED, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
     ASSERT_EQ(kOwner, txn_status.user());
 
-    ASSERT_OK(txn_manager_->GetTransactionStatus(3, kOwner, &txn_status));
+    ASSERT_OK(txn_manager_->GetTransactionStatus(
+        3, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
     ASSERT_EQ(kOwner, txn_status.user());
 
-    ASSERT_OK(txn_manager_->GetTransactionStatus(4, kOwner, &txn_status));
+    ASSERT_OK(txn_manager_->GetTransactionStatus(
+        4, kOwner, &txn_status, &ts_error));
     ASSERT_TRUE(txn_status.has_state());
     ASSERT_EQ(TxnStatePB::OPEN, txn_status.state());
     ASSERT_TRUE(txn_status.has_user());
@@ -512,21 +521,27 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
   // Supplying wrong user.
   {
     TxnStatusEntryPB txn_status;
-    auto s = txn_manager_->GetTransactionStatus(1, "stranger", &txn_status);
+    TabletServerErrorPB ts_error;
+    auto s = txn_manager_->GetTransactionStatus(
+        1, "stranger", &txn_status, &ts_error);
     ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
   }
 
   // Supplying not-yet-used transaction ID.
   {
     TxnStatusEntryPB txn_status;
-    auto s = txn_manager_->GetTransactionStatus(0, kOwner, &txn_status);
+    TabletServerErrorPB ts_error;
+    auto s = txn_manager_->GetTransactionStatus(
+        0, kOwner, &txn_status, &ts_error);
     ASSERT_TRUE(s.IsNotFound()) << s.ToString();
   }
 
   // Supplying wrong user and not-yet-used transaction ID.
   {
     TxnStatusEntryPB txn_status;
-    auto s = txn_manager_->GetTransactionStatus(0, "stranger", &txn_status);
+    TabletServerErrorPB ts_error;
+    auto s = txn_manager_->GetTransactionStatus(
+        0, "stranger", &txn_status, &ts_error);
     ASSERT_TRUE(s.IsNotFound()) << s.ToString();
   }
 }
@@ -579,23 +594,23 @@ TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) {
   // We can't begin or finalize a commit if we've aborted.
   Status s = txn_manager_->BeginCommitTransaction(kTxnId1, kOwner, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-  s = txn_manager_->FinalizeCommitTransaction(kTxnId1);
+  s = txn_manager_->FinalizeCommitTransaction(kTxnId1, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // We can't finalize a commit that hasn't begun committing.
   const int64_t kTxnId2 = 2;
   ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, nullptr, 
&ts_error));
-  s = txn_manager_->FinalizeCommitTransaction(kTxnId2);
+  s = txn_manager_->FinalizeCommitTransaction(kTxnId2, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // We can't abort a transaction that has finished committing.
   ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId2, kOwner, &ts_error));
-  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2));
+  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2, &ts_error));
   s = txn_manager_->AbortTransaction(kTxnId2, kOwner, &ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // Redundant finalize calls are also benign.
-  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2));
+  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId2, &ts_error));
 
   // Calls to begin committing should return an error if we've already
   // finalized the commit.
@@ -625,7 +640,7 @@ TEST_F(TxnStatusManagerTest, 
TestRegisterParticipantsWithStates) {
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
   // We can't register participants when we've finished committnig.
-  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId1));
+  ASSERT_OK(txn_manager_->FinalizeCommitTransaction(kTxnId1, &ts_error));
   s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(2), kOwner, 
&ts_error);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
diff --git a/src/kudu/transactions/txn_status_manager.cc 
b/src/kudu/transactions/txn_status_manager.cc
index 66a3c14..2030d88 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -27,10 +27,14 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/raft_consensus.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/tablet_replica.h"
 #include "kudu/transactions/transactions.pb.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/cow_object.h"
@@ -57,6 +61,7 @@ 
TAG_FLAG(txn_status_manager_inject_latency_load_from_tablet_ms, unsafe);
 using kudu::pb_util::SecureShortDebugString;
 using kudu::tablet::ParticipantIdsByTxnId;
 using kudu::tserver::TabletServerErrorPB;
+using kudu::consensus::RaftPeerPB;
 using std::string;
 using std::vector;
 using strings::Substitute;
@@ -76,6 +81,17 @@ namespace {
 constexpr int64_t kIdStatusDataNotLoaded = -2;
 constexpr int64_t kIdStatusDataReady = -1;
 
+Status ReportIllegalTxnState(const string& errmsg,
+                             TabletServerErrorPB* ts_error) {
+  DCHECK(ts_error);
+  auto s = Status::IllegalState(errmsg);
+  TabletServerErrorPB error;
+  StatusToPB(s, error.mutable_status());
+  error.set_code(TabletServerErrorPB::TXN_ILLEGAL_STATE);
+  *ts_error = std::move(error);
+  return s;
+}
+
 } // anonymous namespace
 
 TxnStatusManagerBuildingVisitor::TxnStatusManagerBuildingVisitor()
@@ -138,7 +154,9 @@ Status TxnStatusManager::LoadFromTablet() {
   return Status::OK();
 }
 
-Status TxnStatusManager::CheckTxnStatusDataLoadedUnlocked() const {
+Status TxnStatusManager::CheckTxnStatusDataLoadedUnlocked(
+    TabletServerErrorPB* ts_error) const {
+  DCHECK(ts_error);
   DCHECK(lock_.is_locked());
   // TODO(aserbin): this is just to handle requests which come in a short time
   //                interval when the leader replica of the transaction status
@@ -151,18 +169,30 @@ Status 
TxnStatusManager::CheckTxnStatusDataLoadedUnlocked() const {
   if (PREDICT_FALSE(highest_txn_id_ <= kIdStatusDataNotLoaded)) {
     return Status::ServiceUnavailable("transaction status data is not loaded");
   }
+  auto* consensus = status_tablet_.tablet_replica_->consensus();
+  DCHECK(consensus);
+  if (consensus->role() != RaftPeerPB::LEADER) {
+    static const Status kErrStatus = Status::ServiceUnavailable(
+        "txn status tablet replica is not a leader");
+    TabletServerErrorPB error;
+    StatusToPB(kErrStatus, error.mutable_status());
+    error.set_code(TabletServerErrorPB::NOT_THE_LEADER);
+    *ts_error = std::move(error);
+    return kErrStatus;
+  }
   return Status::OK();
 }
 
 Status TxnStatusManager::GetTransaction(int64_t txn_id,
                                         const boost::optional<string>& user,
-                                        scoped_refptr<TransactionEntry>* txn) 
const {
+                                        scoped_refptr<TransactionEntry>* txn,
+                                        TabletServerErrorPB* ts_error) const {
   std::lock_guard<simple_spinlock> l(lock_);
 
   // First, make sure the transaction status data has been loaded. If not, then
   // the caller might get an unexpected error response and bail instead of
   // retrying a bit later and getting proper response.
-  RETURN_NOT_OK(CheckTxnStatusDataLoadedUnlocked());
+  RETURN_NOT_OK(CheckTxnStatusDataLoadedUnlocked(ts_error));
 
   scoped_refptr<TransactionEntry> ret = FindPtrOrNull(txns_by_id_, txn_id);
   if (PREDICT_FALSE(!ret)) {
@@ -201,7 +231,7 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id,
     //
     // If this check fails, don not set the 'highest_seen_txn_id' because
     // 'highest_txn_id_' doesn't contain any meaningful value yet.
-    RETURN_NOT_OK(CheckTxnStatusDataLoadedUnlocked());
+    RETURN_NOT_OK(CheckTxnStatusDataLoadedUnlocked(ts_error));
 
     // Second, make sure the requested ID is viable.
     if (PREDICT_FALSE(txn_id <= highest_txn_id_)) {
@@ -256,7 +286,7 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id,
 Status TxnStatusManager::BeginCommitTransaction(int64_t txn_id, const string& 
user,
                                                 TabletServerErrorPB* ts_error) 
{
   scoped_refptr<TransactionEntry> txn;
-  RETURN_NOT_OK(GetTransaction(txn_id, user, &txn));
+  RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
 
   TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
   const auto& pb = txn_lock.data().pb;
@@ -265,9 +295,9 @@ Status TxnStatusManager::BeginCommitTransaction(int64_t 
txn_id, const string& us
     return Status::OK();
   }
   if (PREDICT_FALSE(state != TxnStatePB::OPEN)) {
-    return Status::IllegalState(
-        Substitute("transaction ID $0 is not open: $1",
-                   txn_id, SecureShortDebugString(pb)));
+    return ReportIllegalTxnState(Substitute("transaction ID $0 is not open: 
$1",
+                                            txn_id, 
SecureShortDebugString(pb)),
+                                 ts_error);
   }
   auto* mutable_data = txn_lock.mutable_data();
   mutable_data->pb.set_state(TxnStatePB::COMMIT_IN_PROGRESS);
@@ -276,9 +306,11 @@ Status TxnStatusManager::BeginCommitTransaction(int64_t 
txn_id, const string& us
   return Status::OK();
 }
 
-Status TxnStatusManager::FinalizeCommitTransaction(int64_t txn_id) {
+Status TxnStatusManager::FinalizeCommitTransaction(
+    int64_t txn_id,
+    TabletServerErrorPB* ts_error) {
   scoped_refptr<TransactionEntry> txn;
-  RETURN_NOT_OK(GetTransaction(txn_id, boost::none, &txn));
+  RETURN_NOT_OK(GetTransaction(txn_id, boost::none, &txn, ts_error));
 
   TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
   const auto& pb = txn_lock.data().pb;
@@ -287,14 +319,15 @@ Status 
TxnStatusManager::FinalizeCommitTransaction(int64_t txn_id) {
     return Status::OK();
   }
   if (PREDICT_FALSE(state != TxnStatePB::COMMIT_IN_PROGRESS)) {
-    return Status::IllegalState(
+    return ReportIllegalTxnState(
         Substitute("transaction ID $0 is not committing: $1",
-                   txn_id, SecureShortDebugString(pb)));
+                   txn_id, SecureShortDebugString(pb)),
+        ts_error);
   }
   auto* mutable_data = txn_lock.mutable_data();
   mutable_data->pb.set_state(TxnStatePB::COMMITTED);
-  TabletServerErrorPB ts_error;
-  RETURN_NOT_OK(status_tablet_.UpdateTransaction(txn_id, mutable_data->pb, 
&ts_error));
+  RETURN_NOT_OK(status_tablet_.UpdateTransaction(
+      txn_id, mutable_data->pb, ts_error));
   txn_lock.Commit();
   return Status::OK();
 }
@@ -302,7 +335,7 @@ Status TxnStatusManager::FinalizeCommitTransaction(int64_t 
txn_id) {
 Status TxnStatusManager::AbortTransaction(int64_t txn_id, const std::string& 
user,
                                           TabletServerErrorPB* ts_error) {
   scoped_refptr<TransactionEntry> txn;
-  RETURN_NOT_OK(GetTransaction(txn_id, user, &txn));
+  RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
 
   TransactionEntryLock txn_lock(txn.get(), LockMode::WRITE);
   const auto& pb = txn_lock.data().pb;
@@ -312,9 +345,10 @@ Status TxnStatusManager::AbortTransaction(int64_t txn_id, 
const std::string& use
   }
   if (PREDICT_FALSE(state != TxnStatePB::OPEN &&
       state != TxnStatePB::COMMIT_IN_PROGRESS)) {
-    return Status::IllegalState(
+    return ReportIllegalTxnState(
         Substitute("transaction ID $0 cannot be aborted: $1",
-                   txn_id, SecureShortDebugString(pb)));
+                   txn_id, SecureShortDebugString(pb)),
+        ts_error);
   }
   auto* mutable_data = txn_lock.mutable_data();
   mutable_data->pb.set_state(TxnStatePB::ABORTED);
@@ -326,10 +360,11 @@ Status TxnStatusManager::AbortTransaction(int64_t txn_id, 
const std::string& use
 Status TxnStatusManager::GetTransactionStatus(
     int64_t txn_id,
     const std::string& user,
-    transactions::TxnStatusEntryPB* txn_status) {
+    transactions::TxnStatusEntryPB* txn_status,
+    TabletServerErrorPB* ts_error) {
   DCHECK(txn_status);
   scoped_refptr<TransactionEntry> txn;
-  RETURN_NOT_OK(GetTransaction(txn_id, user, &txn));
+  RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
 
   TransactionEntryLock txn_lock(txn.get(), LockMode::READ);
   const auto& pb = txn_lock.data().pb;
@@ -341,10 +376,13 @@ Status TxnStatusManager::GetTransactionStatus(
   return Status::OK();
 }
 
-Status TxnStatusManager::RegisterParticipant(int64_t txn_id, const string& 
tablet_id,
-                                             const string& user, 
TabletServerErrorPB* ts_error) {
+Status TxnStatusManager::RegisterParticipant(
+    int64_t txn_id,
+    const string& tablet_id,
+    const string& user,
+    TabletServerErrorPB* ts_error) {
   scoped_refptr<TransactionEntry> txn;
-  RETURN_NOT_OK(GetTransaction(txn_id, user, &txn));
+  RETURN_NOT_OK(GetTransaction(txn_id, user, &txn, ts_error));
 
   // Lock the transaction in read mode and check that it's open. If the
   // transaction isn't open, e.g. because a commit is already in progress,
@@ -352,9 +390,9 @@ Status TxnStatusManager::RegisterParticipant(int64_t 
txn_id, const string& table
   TransactionEntryLock txn_lock(txn.get(), LockMode::READ);
   const auto& txn_state = txn_lock.data().pb.state();
   if (PREDICT_FALSE(txn_state != TxnStatePB::OPEN)) {
-    return Status::IllegalState(
-        Substitute("transaction ID $0 not open: $1",
-                   txn_id, TxnStatePB_Name(txn_state)));
+    return ReportIllegalTxnState(Substitute("transaction ID $0 not open: $1",
+                                            txn_id, 
TxnStatePB_Name(txn_state)),
+                                 ts_error);
   }
 
   auto participant = txn->GetOrCreateParticipant(tablet_id);
diff --git a/src/kudu/transactions/txn_status_manager.h 
b/src/kudu/transactions/txn_status_manager.h
index 48028bc..25ae220 100644
--- a/src/kudu/transactions/txn_status_manager.h
+++ b/src/kudu/transactions/txn_status_manager.h
@@ -108,7 +108,8 @@ class TxnStatusManager final : public 
tablet::TxnCoordinator {
   // so it doesn't take a user.
   //
   // TODO(awong): add a commit timestamp.
-  Status FinalizeCommitTransaction(int64_t txn_id) override;
+  Status FinalizeCommitTransaction(int64_t txn_id,
+                                   tserver::TabletServerErrorPB* ts_error) 
override;
 
   // Aborts the given transaction, returning an error if the transaction
   // doesn't exist, is committed or not yet opened, or isn't owned by the given
@@ -120,7 +121,8 @@ class TxnStatusManager final : public 
tablet::TxnCoordinator {
   // the transaction doesn't exist or isn't owned by the specified user.
   Status GetTransactionStatus(int64_t txn_id,
                               const std::string& user,
-                              transactions::TxnStatusEntryPB* txn_status) 
override;
+                              transactions::TxnStatusEntryPB* txn_status,
+                              tserver::TabletServerErrorPB* ts_error) override;
 
   // Creates an in-memory participant, writes an entry to the status table, and
   // attaches the in-memory participant to the transaction.
@@ -142,15 +144,23 @@ class TxnStatusManager final : public 
tablet::TxnCoordinator {
 
  private:
   // Verifies that the transaction status data has already been loaded from the
-  // underlying tablet. Returns Status::OK() if the data is loaded, otherwise
-  // returns Status::ServiceUnavailable().
-  Status CheckTxnStatusDataLoadedUnlocked() const;
+  // underlying tablet and the replica is a leader. Returns Status::OK() if the
+  // data is loaded and the replica is a leader. Otherwise, if the data hasn't
+  // been loaded yet, return Status::ServiceUnavailable().  If the data has
+  // been loaded, but the replica isn't a leader, returns
+  // Status::ServiceUnavailable() and sets the code in 'ts_error'
+  // to TabletServerErrorPB::NOT_THE_LEADER.
+  Status CheckTxnStatusDataLoadedUnlocked(
+      tserver::TabletServerErrorPB* ts_error) const;
 
   // Returns the transaction entry, returning an error if the transaction ID
   // doesn't exist or if 'user' is specified but isn't the owner of the
-  // transaction.
+  // transaction. In addition, if the underlying replica isn't a leader,
+  // sets the code in 'ts_error' to TabletServerErrorPB::NOT_THE_LEADER
+  // correspondingly.
   Status GetTransaction(int64_t txn_id, const boost::optional<std::string>& 
user,
-                        scoped_refptr<TransactionEntry>* txn) const;
+                        scoped_refptr<TransactionEntry>* txn,
+                        tserver::TabletServerErrorPB* ts_error) const;
 
   // Protects 'highest_txn_id_' and 'txns_by_id_'.
   mutable simple_spinlock lock_;
diff --git a/src/kudu/tserver/tablet_service.cc 
b/src/kudu/tserver/tablet_service.cc
index f6118b9..ec6c608 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -30,9 +30,9 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
-#include <boost/type_traits/decay.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
+#include <google/protobuf/stubs/port.h>
 
 #include "kudu/clock/clock.h"
 #include "kudu/common/column_predicate.h"
@@ -1268,7 +1268,8 @@ void TabletServiceAdminImpl::CoordinateTransaction(const 
CoordinateTransactionRe
       s = txn_coordinator->AbortTransaction(txn_id, user, &ts_error);
       break;
     case CoordinatorOpPB::GET_TXN_STATUS:
-      s = txn_coordinator->GetTransactionStatus(txn_id, user, &txn_status);
+      s = txn_coordinator->GetTransactionStatus(
+          txn_id, user, &txn_status, &ts_error);
       break;
     default:
       s = Status::InvalidArgument(Substitute("Unknown op type: $0", 
op.type()));

Reply via email to