This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new c400dae  KUDU-2612 p5 (b): add highest_seen_txn_id into 
CoordinatorOpResultPB
c400dae is described below

commit c400daebdbcb7da336d9be8fa1c872369ab1e65d
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Sep 30 21:32:47 2020 -0700

    KUDU-2612 p5 (b): add highest_seen_txn_id into CoordinatorOpResultPB
    
    This patch adds the highest transaction identifier seen by the
    TxnStatusManager (TxnStatusManager coordinates the lifecycle of
    transactional operations for a particular tablet of the transaction
    status table).  The new field is populated when responding to
    CoordinateTransaction() RPC of the BEGIN_TXN type.
    
    The signature and the implementation of the related methods in the
    TxnStatusManager and TxnSystemClient classes have been updated as well,
    along with corresponding tests to cover the newly introduced
    functionality.
    
    The newly introduced field will be used in a follow-up patch containing
    the initial implementation of the TxnManager component.  The rationale
    behind having this new field is to allow the TxnManager to 'calibrate'
    its last seen txn_id, so it could make less trial-and-error iterations
    when reserving an identifier for a transaction.
    
    The CoordinatorOpResultPB::highest_seen_txn_id would not be needed
    if TxnStatusManager (or their federation) was able to reserve an
    identifier for a new transaction on their own.  Of course, that would
    require some sort of interaction among TxnStatusManager instances
    (like request forwarding or alike).  But without such a provision,
    the TxnManager needs the newly introduced field to succeed with its
    trial-and-error approach while finding the next available identifier
    for a newly initiated transaction.
    
    This is a follow-up to 95f4109518e7b81b9115a4e8bacbd157dcecad0c.
    
    Change-Id: Ifcb4d90bc10a5695c3f54229688ccdcaf56011d0
    Reviewed-on: http://gerrit.cloudera.org:8080/16526
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 .../integration-tests/ts_tablet_manager-itest.cc   |  9 ++-
 .../integration-tests/txn_status_table-itest.cc    | 56 +++++++++++++-----
 src/kudu/tablet/txn_coordinator.h                  |  7 ++-
 src/kudu/transactions/txn_status_manager-test.cc   | 68 +++++++++++++++-------
 src/kudu/transactions/txn_status_manager.cc        | 35 ++++++++++-
 src/kudu/transactions/txn_status_manager.h         |  8 ++-
 src/kudu/transactions/txn_system_client.cc         | 18 +++++-
 src/kudu/transactions/txn_system_client.h          | 11 +++-
 src/kudu/tserver/tablet_service.cc                 |  7 ++-
 src/kudu/tserver/tserver_admin.proto               |  6 ++
 10 files changed, 177 insertions(+), 48 deletions(-)

diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc 
b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 75bb41f..2134716 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -1113,7 +1113,7 @@ class TxnStatusTabletManagementTest : public 
TsTabletManagerITest {
     TabletServerErrorPB ts_error;
     for (const auto& txn_id_and_prt_ids : txns) {
       const auto& txn_id = txn_id_and_prt_ids.first;
-      RETURN_NOT_OK(coordinator->BeginTransaction(txn_id, kOwner, &ts_error));
+      RETURN_NOT_OK(coordinator->BeginTransaction(txn_id, kOwner, nullptr, 
&ts_error));
       for (const auto& prt_id : txn_id_and_prt_ids.second) {
         RETURN_NOT_OK(coordinator->RegisterParticipant(txn_id, prt_id, kOwner, 
&ts_error));
       }
@@ -1261,7 +1261,8 @@ TEST_F(TxnStatusTabletManagementTest, 
TestTabletServerProxyCalls) {
       SCOPED_TRACE(SecureDebugString(resp));
       if (expect_success) {
         ASSERT_FALSE(resp.has_error());
-        ASSERT_EQ(op_type == CoordinatorOpPB::GET_TXN_STATUS,
+        ASSERT_EQ(op_type == CoordinatorOpPB::BEGIN_TXN ||
+                  op_type == CoordinatorOpPB::GET_TXN_STATUS,
                   resp.has_op_result());
       } else {
         ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
@@ -1353,7 +1354,8 @@ TEST_F(TxnStatusTabletManagementTest, 
TestTabletServerProxyCallErrors) {
       ASSERT_OK(admin_proxy->CoordinateTransaction(req, &resp, &rpc));
       SCOPED_TRACE(SecureDebugString(resp));
       ASSERT_FALSE(resp.has_error());
-      ASSERT_FALSE(resp.has_op_result());
+      ASSERT_TRUE(resp.has_op_result());
+      ASSERT_EQ(1, resp.op_result().highest_seen_txn_id());
     }
     {
       CoordinateTransactionResponsePB resp;
@@ -1362,6 +1364,7 @@ TEST_F(TxnStatusTabletManagementTest, 
TestTabletServerProxyCallErrors) {
       SCOPED_TRACE(SecureDebugString(resp));
       ASSERT_FALSE(resp.has_error());
       ASSERT_TRUE(resp.has_op_result());
+      ASSERT_EQ(1, resp.op_result().highest_seen_txn_id());
       Status s = StatusFromPB(resp.op_result().op_error());
       ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
       ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID so 
far");
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc 
b/src/kudu/integration-tests/txn_status_table-itest.cc
index 7674d55..37ad5b8 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -358,8 +358,11 @@ TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
 
   // If we write out of range, we should see an error.
   {
-    auto s = txn_sys_client_->BeginTransaction(100, kUser);
+    int64_t highest_seen_txn_id = -1;
+    auto s = txn_sys_client_->BeginTransaction(100, kUser, 
&highest_seen_txn_id);
     ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    // The 'highest_seen_txn_id' should be left untouched.
+    ASSERT_EQ(-1, highest_seen_txn_id);
   }
   {
     auto s = txn_sys_client_->BeginCommitTransaction(100, kUser);
@@ -372,7 +375,9 @@ TEST_F(TxnStatusTableITest, TestSystemClientFindTablets) {
 
   // Once we add a new range, we should be able to leverage it.
   ASSERT_OK(txn_sys_client_->AddTxnStatusTableRange(100, 200));
-  ASSERT_OK(txn_sys_client_->BeginTransaction(100, kUser));
+  int64_t highest_seen_txn_id = -1;
+  ASSERT_OK(txn_sys_client_->BeginTransaction(100, kUser, 
&highest_seen_txn_id));
+  ASSERT_EQ(100, highest_seen_txn_id);
   ASSERT_OK(txn_sys_client_->BeginCommitTransaction(100, kUser));
   ASSERT_OK(txn_sys_client_->AbortTransaction(100, kUser));
 }
@@ -381,11 +386,18 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
   ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
   ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
 
+  cluster_->mini_tablet_server(0)->Shutdown();
+
   // When the only server is down, the system client should keep trying until
   // it times out.
-  cluster_->mini_tablet_server(0)->Shutdown();
-  Status s = txn_sys_client_->BeginTransaction(1, kUser, 
MonoDelta::FromMilliseconds(100));
-  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+  {
+    int64_t highest_seen_txn_id = -1;
+    auto s = txn_sys_client_->BeginTransaction(
+        1, kUser, &highest_seen_txn_id, MonoDelta::FromMilliseconds(100));
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+    // The 'highest_seen_txn_id' should be left untouched.
+    ASSERT_EQ(-1, highest_seen_txn_id);
+  }
 
   // Now try with a longer timeout and ensure that if the server comes back up,
   // the system client will succeed.
@@ -398,23 +410,37 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) {
   SCOPED_CLEANUP({
     t.join();
   });
-  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser, 
MonoDelta::FromSeconds(3)));
+
+  int64_t highest_seen_txn_id = -1;
+  ASSERT_OK(txn_sys_client_->BeginTransaction(
+      1, kUser, &highest_seen_txn_id, MonoDelta::FromSeconds(3)));
+  ASSERT_EQ(highest_seen_txn_id, 1);
 }
 
 TEST_F(TxnStatusTableITest, TestSystemClientBeginTransactionErrors) {
   ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
   ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
-  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
+  int64_t highest_seen_txn_id = -1;
+  ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id));
+  ASSERT_EQ(1, highest_seen_txn_id);
 
   // Trying to start another transaction with a used ID should yield an error.
-  Status s = txn_sys_client_->BeginTransaction(1, kUser);
-  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
-  ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
+  {
+    int64_t highest_seen_txn_id = -1;
+    auto s = txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_EQ(1, highest_seen_txn_id);
+    ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
+  }
 
   // The same should be true with a different user.
-  s = txn_sys_client_->BeginTransaction(1, "stranger");
-  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
-  ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
+  {
+    int64_t highest_seen_txn_id = -1;
+    auto s = txn_sys_client_->BeginTransaction(1, "stranger", 
&highest_seen_txn_id);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_EQ(1, highest_seen_txn_id);
+    ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
+  }
 }
 
 TEST_F(TxnStatusTableITest, TestSystemClientRegisterParticipantErrors) {
@@ -451,8 +477,10 @@ TEST_F(TxnStatusTableITest, 
SystemClientCommitAndAbortTransaction) {
   // Even if the transaction is aborted, an attempt to start another 
transaction
   // with already used ID should yield an error.
   {
-    auto s = txn_sys_client_->BeginTransaction(1, kUser);
+    int64_t highest_seen_txn_id = -1;
+    auto s = txn_sys_client_->BeginTransaction(1, kUser, &highest_seen_txn_id);
     ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_EQ(1, highest_seen_txn_id);
     ASSERT_STR_CONTAINS(s.ToString(), "not higher than the highest ID");
   }
 
diff --git a/src/kudu/tablet/txn_coordinator.h 
b/src/kudu/tablet/txn_coordinator.h
index 33bcea2..44e1065 100644
--- a/src/kudu/tablet/txn_coordinator.h
+++ b/src/kudu/tablet/txn_coordinator.h
@@ -53,8 +53,11 @@ class TxnCoordinator {
   // Returns any replication-layer errors (e.g. not-the-leader errors) in
   // 'ts_error'. If there was otherwise a logical error with the request (e.g.
   // transaction already exists), returns an error without populating
-  // 'ts_error'.
-  virtual Status BeginTransaction(int64_t txn_id, const std::string& user,
+  // 'ts_error'. The 'highest_seen_txn_id' output parameter is populated with
+  // the highest seen txn_id so far: that's so for success and all error cases.
+  virtual Status BeginTransaction(int64_t txn_id,
+                                  const std::string& user,
+                                  int64_t* highest_seen_txn_id,
                                   tserver::TabletServerErrorPB* ts_error) = 0;
 
   // Begins committing the given transaction as the given user.
diff --git a/src/kudu/transactions/txn_status_manager-test.cc 
b/src/kudu/transactions/txn_status_manager-test.cc
index 7ceb0fe..feba083 100644
--- a/src/kudu/transactions/txn_status_manager-test.cc
+++ b/src/kudu/transactions/txn_status_manager-test.cc
@@ -110,7 +110,11 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
   TabletServerErrorPB ts_error;
   for (const auto& txn_id_and_prts : expected_prts_by_txn_id) {
     const auto& txn_id = txn_id_and_prts.first;
-    ASSERT_OK(txn_manager_->BeginTransaction(txn_id, kOwner, &ts_error));
+    int64_t highest_seen_txn_id = -1;
+    ASSERT_OK(txn_manager_->BeginTransaction(
+        txn_id, kOwner, &highest_seen_txn_id, &ts_error));
+    ASSERT_GE(highest_seen_txn_id, 0);
+    ASSERT_EQ(highest_seen_txn_id, txn_id);
     for (const auto& prt : txn_id_and_prts.second) {
       ASSERT_OK(txn_manager_->RegisterParticipant(txn_id, prt, kOwner, 
&ts_error));
     }
@@ -121,14 +125,24 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
 
   // Starting a transaction that's already been started should result in an
   // error, even if it's not currently in flight.
-  Status s = txn_manager_->BeginTransaction(1, kOwner, &ts_error);
-  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
-  s = txn_manager_->BeginTransaction(2, kOwner, &ts_error);
-  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  {
+    int64_t highest_seen_txn_id = -1;
+    auto s = txn_manager_->BeginTransaction(
+        1, kOwner, &highest_seen_txn_id, &ts_error);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_EQ(3, highest_seen_txn_id);
+  }
+  {
+    int64_t highest_seen_txn_id = -1;
+    auto s = txn_manager_->BeginTransaction(
+        2, kOwner, &highest_seen_txn_id, &ts_error);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_EQ(3, highest_seen_txn_id);
+  }
 
   // Registering participants to transactions that don't exist should also
   // result in errors.
-  s = txn_manager_->RegisterParticipant(2, kParticipant1, kOwner, &ts_error);
+  auto s = txn_manager_->RegisterParticipant(2, kParticipant1, kOwner, 
&ts_error);
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
 
   // The underlying participants map should only reflect the successful
@@ -170,7 +184,7 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
     const string kErrMsg = "transaction status data is not loaded";
     TabletServerErrorPB ts_error;
     for (int64_t txn_id : { 0, 1, 3, 4 }) {
-      auto s = tsm.BeginTransaction(txn_id, kOwner, &ts_error);
+      auto s = tsm.BeginTransaction(txn_id, kOwner, nullptr, &ts_error);
       ASSERT_TRUE(s.IsServiceUnavailable());
       ASSERT_STR_CONTAINS(s.ToString(), kErrMsg);
 
@@ -233,10 +247,18 @@ TEST_F(TxnStatusManagerTest, 
TestStartTransactionsConcurrently) {
         barriers[b]->Wait();
         auto txn_id = txns_to_insert[b][i];
         TabletServerErrorPB ts_error;
-        Status s = txn_manager_->BeginTransaction(txn_id, kOwner, &ts_error);
+        int64_t highest_seen_txn_id = -1;
+        auto s = txn_manager_->BeginTransaction(
+            txn_id, kOwner, &highest_seen_txn_id, &ts_error);
         if (s.ok()) {
           std::lock_guard<simple_spinlock> l(lock);
           successful_txn_ids.emplace_back(txn_id);
+          CHECK_GE(highest_seen_txn_id, txn_id);
+        } else {
+          // In case of a failure to start a transaction, the only expected
+          // failure case here is a conflict in transaction identifier. If so,
+          // the assertion on the highest_see_txn_id can be made even stronger.
+          CHECK_GT(highest_seen_txn_id, txn_id);
         }
       }
     });
@@ -271,7 +293,7 @@ TEST_F(TxnStatusManagerTest, 
TestRegisterParticipantsConcurrently) {
   threads.reserve(1 + kParticipantsInParallel);
   threads.emplace_back([&] {
     TabletServerErrorPB ts_error;
-    CHECK_OK(txn_manager_->BeginTransaction(kTxnId, kOwner, &ts_error));
+    CHECK_OK(txn_manager_->BeginTransaction(kTxnId, kOwner, nullptr, 
&ts_error));
     begun_txn.CountDown();
   });
 
@@ -320,7 +342,7 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
   const int kNumUpdatesInParallel = 20;
   for (int i = 0; i < kNumTransactions; i++) {
     TabletServerErrorPB ts_error;
-    ASSERT_OK(txn_manager_->BeginTransaction(i, kOwner, &ts_error));
+    ASSERT_OK(txn_manager_->BeginTransaction(i, kOwner, nullptr, &ts_error));
   }
   typedef std::pair<int64_t, TxnStatePB> IdAndUpdate;
   vector<IdAndUpdate> all_updates;
@@ -401,7 +423,7 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) {
 TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
   {
     TabletServerErrorPB ts_error;
-    ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, &ts_error));
+    ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, nullptr, &ts_error));
 
     TxnStatusEntryPB txn_status;
     ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
@@ -427,7 +449,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
 
   {
     TabletServerErrorPB ts_error;
-    ASSERT_OK(txn_manager_->BeginTransaction(2, kOwner, &ts_error));
+    ASSERT_OK(txn_manager_->BeginTransaction(2, kOwner, nullptr, &ts_error));
     ASSERT_OK(txn_manager_->AbortTransaction(2, kOwner, &ts_error));
 
     TxnStatusEntryPB txn_status;
@@ -440,11 +462,11 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
 
   // Start another transaction and start its commit phase.
   TabletServerErrorPB ts_error;
-  ASSERT_OK(txn_manager_->BeginTransaction(3, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->BeginTransaction(3, kOwner, nullptr, &ts_error));
   ASSERT_OK(txn_manager_->BeginCommitTransaction(3, kOwner, &ts_error));
 
   // Start just another transaction.
-  ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, nullptr, &ts_error));
 
   // Make the TxnStatusManager start from scratch.
   ASSERT_OK(RestartReplica());
@@ -504,14 +526,20 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
 // Test that performing actions as the wrong user will return errors.
 TEST_F(TxnStatusManagerTest, TestWrongUser) {
   const string kWrongUser = "stranger";
+  int64_t highest_seen_txn_id = -1;
   TabletServerErrorPB ts_error;
-  ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->BeginTransaction(
+      1, kOwner, &highest_seen_txn_id, &ts_error));
+  ASSERT_EQ(1, highest_seen_txn_id);
   ASSERT_OK(txn_manager_->RegisterParticipant(1, ParticipantId(1), kOwner, 
&ts_error));
 
   // First, any other call to begin the transaction should be rejected,
   // regardless of user.
-  Status s = txn_manager_->BeginTransaction(1, kWrongUser, &ts_error);
+  highest_seen_txn_id = -1;
+  Status s = txn_manager_->BeginTransaction(
+      1, kWrongUser, &highest_seen_txn_id, &ts_error);
   ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_EQ(1, highest_seen_txn_id);
 
   // All actions should be rejected if performed by the wrong user.
   s = txn_manager_->RegisterParticipant(1, ParticipantId(1), kWrongUser, 
&ts_error);
@@ -532,7 +560,7 @@ TEST_F(TxnStatusManagerTest, TestWrongUser) {
 TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) {
   const int64_t kTxnId1 = 1;
   TabletServerErrorPB ts_error;
-  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId1, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId1, kOwner, nullptr, 
&ts_error));
 
   // Redundant calls are benign.
   ASSERT_OK(txn_manager_->BeginCommitTransaction(kTxnId1, kOwner, &ts_error));
@@ -548,7 +576,7 @@ TEST_F(TxnStatusManagerTest, TestUpdateTransactionState) {
 
   // We can't finalize a commit that hasn't begun committing.
   const int64_t kTxnId2 = 2;
-  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, nullptr, 
&ts_error));
   s = txn_manager_->FinalizeCommitTransaction(kTxnId2);
   ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
 
@@ -577,7 +605,7 @@ TEST_F(TxnStatusManagerTest, 
TestRegisterParticipantsWithStates) {
   Status s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), 
kOwner, &ts_error);
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
 
-  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId1, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId1, kOwner, nullptr, 
&ts_error));
   ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), 
kOwner, &ts_error));
 
   // Registering the same participant is idempotent and benign.
@@ -595,7 +623,7 @@ TEST_F(TxnStatusManagerTest, 
TestRegisterParticipantsWithStates) {
 
   // We can't register participants when we've aborted the transaction.
   const int64_t kTxnId2 = 2;
-  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->BeginTransaction(kTxnId2, kOwner, nullptr, 
&ts_error));
   ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId2, ParticipantId(1), 
kOwner, &ts_error));
   ASSERT_OK(txn_manager_->AbortTransaction(kTxnId2, kOwner, &ts_error));
   s = txn_manager_->RegisterParticipant(kTxnId2, ParticipantId(2), kOwner, 
&ts_error);
diff --git a/src/kudu/transactions/txn_status_manager.cc 
b/src/kudu/transactions/txn_status_manager.cc
index c583b11..e6bacb3 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -33,6 +33,7 @@
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/cow_object.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 
 using kudu::pb_util::SecureShortDebugString;
@@ -156,18 +157,36 @@ Status TxnStatusManager::GetTransaction(int64_t txn_id,
   return Status::OK();
 }
 
-Status TxnStatusManager::BeginTransaction(int64_t txn_id, const string& user,
+// NOTE: In this method, the idea is to try setting the 'highest_seen_txn_id'
+//       on return in most cases. Sending back the most recent highest
+//       transaction identifier helps to avoid extra RPC calls from
+//       TxnManager to TxnStatusManager in case of contention. Since we use
+//       a trial-and-error approach to assign transaction identifiers,
+//       in case of higher contention outdated and not assigned
+//       highest_seen_txn_id would cause at least one extra round-trip between
+//       TxnManager and TxnStatusManager to come up with a valid identifier
+//       for a transaction.
+Status TxnStatusManager::BeginTransaction(int64_t txn_id,
+                                          const string& user,
+                                          int64_t* highest_seen_txn_id,
                                           TabletServerErrorPB* ts_error) {
   {
     std::lock_guard<simple_spinlock> l(lock_);
+
     // First, make sure the transaction status data has been loaded.
     // If not, then there is chance that, being a leader, this replica might
     // register a transaction with the identifier which is lower than the
     // identifiers of already registered transactions.
+    //
+    // If this check fails, don not set the 'highest_seen_txn_id' because
+    // 'highest_txn_id_' doesn't contain any meaningful value yet.
     RETURN_NOT_OK(CheckTxnStatusDataLoadedUnlocked());
 
     // Second, make sure the requested ID is viable.
     if (PREDICT_FALSE(txn_id <= highest_txn_id_)) {
+      if (highest_seen_txn_id) {
+        *highest_seen_txn_id = highest_txn_id_;
+      }
       return Status::InvalidArgument(
           Substitute("transaction ID $0 is not higher than the highest ID so 
far: $1",
                      txn_id, highest_txn_id_));
@@ -182,6 +201,14 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id, 
const string& user,
   // since we've serialized the transaction ID checking above, we're guaranteed
   // that at most one call to start a given transaction ID can succeed.
 
+  // This ScopedCleanup instance is to set 'highest_seen_txn_id' if writing
+  // the entry into the txn status tablet fails.
+  auto cleanup = MakeScopedCleanup([&]() {
+    if (highest_seen_txn_id) {
+      std::lock_guard<simple_spinlock> l(lock_);
+      *highest_seen_txn_id = highest_txn_id_;
+    }
+  });
   // Write an entry to the status tablet for this transaction.
   RETURN_NOT_OK(status_tablet_.AddNewTransaction(txn_id, user, ts_error));
 
@@ -196,6 +223,12 @@ Status TxnStatusManager::BeginTransaction(int64_t txn_id, 
const string& user,
   }
   std::lock_guard<simple_spinlock> l(lock_);
   EmplaceOrDie(&txns_by_id_, txn_id, std::move(txn));
+  if (highest_seen_txn_id) {
+    *highest_seen_txn_id = highest_txn_id_;
+  }
+  // Avoid acquiring the lock again: 'highest_seen_txn_id' has already been 
set.
+  cleanup.cancel();
+
   return Status::OK();
 }
 
diff --git a/src/kudu/transactions/txn_status_manager.h 
b/src/kudu/transactions/txn_status_manager.h
index ba22023..7621d9b 100644
--- a/src/kudu/transactions/txn_status_manager.h
+++ b/src/kudu/transactions/txn_status_manager.h
@@ -82,12 +82,16 @@ class TxnStatusManager final : public 
tablet::TxnCoordinator {
   // Writes an entry to the status tablet and creates a transaction in memory.
   // Returns an error if a higher transaction ID has already been attempted
   // (even if that attempt failed), which helps ensure that at most one call to
-  // this method will succeed for a given transaction ID.
+  // this method will succeed for a given transaction ID. The
+  // 'highest_seen_txn_id' output parameter, if not null, is populated in both
+  // success and failure cases.
   //
   // TODO(awong): consider computing the next available transaction ID in this
   // partition and using it in case this transaction is already used, or having
   // callers forward a request for the next-highest transaction ID.
-  Status BeginTransaction(int64_t txn_id, const std::string& user,
+  Status BeginTransaction(int64_t txn_id,
+                          const std::string& user,
+                          int64_t* highest_seen_txn_id,
                           tserver::TabletServerErrorPB* ts_error) override;
 
   // Begins committing the given transaction, returning an error if the
diff --git a/src/kudu/transactions/txn_system_client.cc 
b/src/kudu/transactions/txn_system_client.cc
index 27a909a..cd23d44 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -122,16 +122,28 @@ Status TxnSystemClient::OpenTxnStatusTable() {
   return Status::OK();
 }
 
-Status TxnSystemClient::BeginTransaction(int64_t txn_id, const string& user, 
MonoDelta timeout) {
+Status TxnSystemClient::BeginTransaction(int64_t txn_id,
+                                         const string& user,
+                                         int64_t* highest_seen_txn_id,
+                                         MonoDelta timeout) {
   CoordinatorOpPB coordinate_txn_op;
   coordinate_txn_op.set_type(CoordinatorOpPB::BEGIN_TXN);
   coordinate_txn_op.set_txn_id(txn_id);
   coordinate_txn_op.set_user(user);
   Synchronizer s;
+  CoordinatorOpResultPB result;
   RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op),
                                            timeout,
-                                           s.AsStatusCallback()));
-  return s.Wait();
+                                           s.AsStatusCallback(),
+                                           &result));
+  const auto ret = s.Wait();
+  if (ret.ok() || ret.IsInvalidArgument()) {
+    DCHECK(result.has_highest_seen_txn_id());
+    if (highest_seen_txn_id) {
+      *highest_seen_txn_id = result.highest_seen_txn_id();
+    }
+  }
+  return ret;
 }
 
 Status TxnSystemClient::RegisterParticipant(int64_t txn_id, const string& 
participant_id,
diff --git a/src/kudu/transactions/txn_system_client.h 
b/src/kudu/transactions/txn_system_client.h
index f3c8568..c5269b7 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -79,8 +79,15 @@ class TxnSystemClient {
 
   // Attempts to create a transaction with the given 'txn_id'.
   // Returns an error if the transaction ID has already been taken, or if there
-  // was an error writing to the transaction status table.
-  Status BeginTransaction(int64_t txn_id, const std::string& user,
+  // was an error writing to the transaction status table. In success case
+  // or in case of conflicting txn_id, the 'highest_seen_txn_id' output
+  // parameter (if not null) is set to the highest transaction identifier
+  // observed by corresponding TxnStatusManager. Otherwise, the
+  // 'highest_seen_txn_id' parameter is unset (e.g., in case of the requeset
+  // to TxnStatusManager timed out).
+  Status BeginTransaction(int64_t txn_id, const
+                          std::string& user,
+                          int64_t* highest_seen_txn_id = nullptr,
                           MonoDelta timeout = MonoDelta::FromSeconds(10));
 
   // Attempts to register the given participant with the given transaction.
diff --git a/src/kudu/tserver/tablet_service.cc 
b/src/kudu/tserver/tablet_service.cc
index ea20845..1c66aa0 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1249,9 +1249,10 @@ void TabletServiceAdminImpl::CoordinateTransaction(const 
CoordinateTransactionRe
   transactions::TxnStatusEntryPB txn_status;
   const auto& user = op.user();
   const auto& txn_id = op.txn_id();
+  int64_t highest_seen_txn_id = -1;
   switch (op.type()) {
     case CoordinatorOpPB::BEGIN_TXN:
-      s = txn_coordinator->BeginTransaction(txn_id, user, &ts_error);
+      s = txn_coordinator->BeginTransaction(txn_id, user, 
&highest_seen_txn_id, &ts_error);
       break;
     case CoordinatorOpPB::REGISTER_PARTICIPANT:
       s = txn_coordinator->RegisterParticipant(txn_id, 
op.txn_participant_id(), user, &ts_error);
@@ -1280,6 +1281,10 @@ void TabletServiceAdminImpl::CoordinateTransaction(const 
CoordinateTransactionRe
     // Populate corresponding field in the response.
     *(resp->mutable_op_result()->mutable_txn_status()) = std::move(txn_status);
   }
+  if (op.type() == CoordinatorOpPB::BEGIN_TXN) {
+    DCHECK_GE(highest_seen_txn_id, 0);
+    resp->mutable_op_result()->set_highest_seen_txn_id(highest_seen_txn_id);
+  }
   context->RespondSuccess();
 }
 
diff --git a/src/kudu/tserver/tserver_admin.proto 
b/src/kudu/tserver/tserver_admin.proto
index a8ba20a..bd1f881 100644
--- a/src/kudu/tserver/tserver_admin.proto
+++ b/src/kudu/tserver/tserver_admin.proto
@@ -56,6 +56,12 @@ message CoordinatorOpResultPB {
   //                of the transaction for other operation types?
   optional transactions.TxnStatusEntryPB txn_status = 2;
 
+  // The highest transaction identifier (txn_id) seen by the TxnStatusManager
+  // that coordinates the lifecycle of operations for a particular tablet
+  // of the transaction status table. This field is populated when responding
+  // to a request of the BEGIN_TXN type (in success and error cases).
+  optional int64 highest_seen_txn_id = 3;
+
   // TODO(awong): populate this with some application-level results, like the
   // actual transaction ID assigned, the next highest transaction ID available,
   // etc.

Reply via email to