This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 990976d38ded648b39c1db2e4241d6eda82aee63
Author: Andrew Wong <[email protected]>
AuthorDate: Thu Apr 29 16:49:54 2021 -0700

    KUDU-2612: re-open TxnSystemTable if transaction is in new range
    
    This patch makes the TxnSystemClient on tablet servers retry if attempts
    to register the participant yield a NotFound error. This happens when
    new ranges are added to the transactions status table.
    
    To disambiguate between the case where the transaction itself doesn't
    exist on the TxnStatusManager, this patch also adjust the return value
    for such cases to be InvalidArugment, which seems equally fitting for
    writing as a part of a bogus transaction ID.
    
    Change-Id: I3af58dcaa3a995dac9dc937c7dfcb652bf004873
    Reviewed-on: http://gerrit.cloudera.org:8080/17368
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 .../apache/kudu/client/TestKuduTransaction.java    |  8 +++---
 .../integration-tests/txn_status_table-itest.cc    | 10 +++----
 src/kudu/integration-tests/txn_write_ops-itest.cc  | 31 ++++++++++++++++++++--
 src/kudu/master/txn_manager-test.cc                | 10 +++----
 src/kudu/transactions/txn_status_manager-test.cc   | 12 ++++-----
 src/kudu/transactions/txn_status_manager.cc        |  2 +-
 src/kudu/tserver/ts_tablet_manager.cc              |  8 ++++++
 7 files changed, 58 insertions(+), 23 deletions(-)

diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
index 440e82d..0a97e8d 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -205,7 +205,7 @@ public class TestKuduTransaction {
     } catch (NonRecoverableException e) {
       final String errmsg = e.getMessage();
       final Status status = e.getStatus();
-      assertTrue(status.toString(), status.isNotFound());
+      assertTrue(status.toString(), status.isInvalidArgument());
       assertTrue(errmsg, errmsg.matches(".*transaction ID .* not found.*"));
     } catch (Exception e) {
       fail("unexpected exception: " + e.toString());
@@ -219,7 +219,7 @@ public class TestKuduTransaction {
     } catch (NonRecoverableException e) {
       final String errmsg = e.getMessage();
       final Status status = e.getStatus();
-      assertTrue(status.toString(), status.isNotFound());
+      assertTrue(status.toString(), status.isInvalidArgument());
       assertTrue(errmsg, errmsg.matches(".*transaction ID .* not found.*"));
     } catch (Exception e) {
       fail("unexpected exception: " + e.toString());
@@ -298,7 +298,7 @@ public class TestKuduTransaction {
             }
           });
       final Status status = ex.getStatus();
-      assertTrue(status.toString(), status.isNotFound());
+      assertTrue(status.toString(), status.isInvalidArgument());
       final String errmsg = ex.getMessage();
       assertTrue(errmsg, errmsg.matches(".*transaction ID .* not found.*"));
     }
@@ -336,7 +336,7 @@ public class TestKuduTransaction {
     } catch (NonRecoverableException e) {
       final String errmsg = e.getMessage();
       final Status status = e.getStatus();
-      assertTrue(status.toString(), status.isNotFound());
+      assertTrue(status.toString(), status.isInvalidArgument());
       assertTrue(errmsg, errmsg.matches(".*transaction ID .* not found.*"));
     } catch (Exception e) {
       fail("unexpected exception: " + e.toString());
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc 
b/src/kudu/integration-tests/txn_status_table-itest.cc
index 8195e81..ce99d4c 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -561,7 +561,7 @@ TEST_F(TxnStatusTableITest, 
TestSystemClientRegisterParticipantErrors) {
   ASSERT_OK(txn_sys_client_->CreateTxnStatusTable(100));
   ASSERT_OK(txn_sys_client_->OpenTxnStatusTable());
   Status s = txn_sys_client_->RegisterParticipant(1, "participant", kUser);
-  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
   ASSERT_STR_MATCHES(s.ToString(), "transaction ID.*not found, current highest 
txn ID:.*");
 
   ASSERT_OK(txn_sys_client_->BeginTransaction(1, kUser));
@@ -603,14 +603,14 @@ TEST_F(TxnStatusTableITest, 
SystemClientCommitAndAbortTransaction) {
   // an error.
   {
     auto s = txn_sys_client_->BeginCommitTransaction(2, kUser);
-    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not found");
   }
 
   // An attempt to abort a non-existent transaction should report an error.
   {
     auto s = txn_sys_client_->AbortTransaction(2, kUser);
-    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 2 not found");
   }
 
@@ -708,7 +708,7 @@ TEST_F(TxnStatusTableITest, GetTransactionStatus) {
   {
     TxnStatusEntryPB empty;
     auto s = txn_sys_client_->GetTransactionStatus(2, kUser, &empty);
-    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_FALSE(empty.has_user());
     ASSERT_FALSE(empty.has_state());
   }
@@ -716,7 +716,7 @@ TEST_F(TxnStatusTableITest, GetTransactionStatus) {
   {
     TxnStatusEntryPB empty;
     auto s = txn_sys_client_->GetTransactionStatus(2, "", &empty);
-    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_FALSE(empty.has_user());
     ASSERT_FALSE(empty.has_state());
   }
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc 
b/src/kudu/integration-tests/txn_write_ops-itest.cc
index 5c238f9..6acfbaf 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -486,6 +486,33 @@ TEST_F(TxnWriteOpsITest, DeadlockPrevention) {
   ASSERT_EQ(kNumTxns * 2, count);
 }
 
+// Send transactions that span more than a single range of the transaction
+// status table, ensuring we can write to newly-added ranges.
+TEST_F(TxnWriteOpsITest, TestWriteToNewRangeOfTxnIds) {
+  constexpr const auto kNumTxns = 10;
+  const vector<string> kMasterFlags = {
+    // Enable TxnManager in Kudu masters.
+    "--txn_manager_enabled=true",
+    // Set a small range so we can write to a new range of transactions IDs.
+    Substitute("--txn_manager_status_table_range_partition_span=$0", kNumTxns 
/ 3),
+  };
+  NO_FATALS(StartCluster({}, kMasterFlags, kNumTabletServers));
+  NO_FATALS(Prepare());
+  for (int i = 0; i < kNumTxns; i++) {
+    shared_ptr<KuduTransaction> txn;
+    ASSERT_OK(client_->NewTransaction(&txn));
+    shared_ptr<KuduSession> session;
+    ASSERT_OK(txn->CreateSession(&session));
+    ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+    NO_FATALS(InsertRows(table_.get(), session.get(), 1, i));
+    ASSERT_OK(txn->Commit());
+    ASSERT_EQ(0, session->CountPendingErrors());
+  }
+  size_t count;
+  ASSERT_OK(CountRows(table_.get(), &count));
+  ASSERT_EQ(kNumTxns, count);
+}
+
 // Send multiple one-row write operations to a tablet server in the context of 
a
 // multi-row transaction, and commit the transaction. This scenario verifies
 // that tablet servers are able to accept high number of write requests
@@ -849,7 +876,7 @@ TEST_F(TxnWriteOpsITest, WriteOpForNonExistentTxn) {
     ASSERT_TRUE(s.IsIOError()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
     const auto err_status = GetSingleRowError(session.get());
-    ASSERT_TRUE(err_status.IsNotFound()) << err_status.ToString();
+    ASSERT_TRUE(err_status.IsInvalidArgument()) << err_status.ToString();
     ASSERT_STR_CONTAINS(err_status.ToString(),
                         "Failed to write batch of 1 ops to tablet");
     ASSERT_STR_CONTAINS(err_status.ToString(),
@@ -1424,7 +1451,7 @@ TEST_F(TxnOpDispatcherITest, 
ErrorInParticipantRegistration) {
     auto s = InsertRows(fake_txn.get(), 1, &key, &session);
     ASSERT_TRUE(s.IsIOError()) << s.ToString();
     auto row_status = GetSingleRowError(session.get());
-    ASSERT_TRUE(row_status.IsNotFound()) << row_status.ToString();
+    ASSERT_TRUE(row_status.IsInvalidArgument()) << row_status.ToString();
     ASSERT_STR_CONTAINS(row_status.ToString(),
                         "transaction ID 10 not found, current highest txn ID");
 
diff --git a/src/kudu/master/txn_manager-test.cc 
b/src/kudu/master/txn_manager-test.cc
index 1409697..642aef7 100644
--- a/src/kudu/master/txn_manager-test.cc
+++ b/src/kudu/master/txn_manager-test.cc
@@ -178,7 +178,7 @@ TEST_F(TxnManagerTest, LazyInitialization) {
     ASSERT_OK(proxy_->GetTransactionState(req, &resp, &ctx));
     ASSERT_TRUE(resp.has_error());
     auto s = StatusFromPB(resp.error().status());
-    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 0 not found");
   }
 
@@ -218,7 +218,7 @@ TEST_F(TxnManagerTest, LazyInitialization) {
     ASSERT_OK(proxy_->CommitTransaction(req, &resp, &ctx));
     ASSERT_TRUE(resp.has_error());
     auto s = StatusFromPB(resp.error().status());
-    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 0 not found");
   }
   ASSERT_OK(master_->WaitForTxnManagerInit());
@@ -247,7 +247,7 @@ TEST_F(TxnManagerTest, LazyInitializationConcurrentCalls) {
       CHECK_OK(proxy_->GetTransactionState(req, &resp, &ctx));
       CHECK(resp.has_error());
       auto s = StatusFromPB(resp.error().status());
-      CHECK(s.IsNotFound()) << s.ToString();
+      CHECK(s.IsInvalidArgument()) << s.ToString();
     }
   };
 
@@ -298,7 +298,7 @@ TEST_F(TxnManagerTest, NonlazyInitialization) {
     ASSERT_OK(proxy_->CommitTransaction(req, &resp, &ctx));
     ASSERT_TRUE(resp.has_error());
     auto s = StatusFromPB(resp.error().status());
-    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 0 not found");
   }
 }
@@ -543,7 +543,7 @@ TEST_F(TxnManagerTest, 
KeepTransactionAliveNonExistingTxnId) {
   ASSERT_OK(proxy_->KeepTransactionAlive(req, &resp, &ctx));
   ASSERT_TRUE(resp.has_error());
   auto s = StatusFromPB(resp.error().status());
-  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 123 not found");
 }
 
diff --git a/src/kudu/transactions/txn_status_manager-test.cc 
b/src/kudu/transactions/txn_status_manager-test.cc
index aaa7e4a..df3d10e 100644
--- a/src/kudu/transactions/txn_status_manager-test.cc
+++ b/src/kudu/transactions/txn_status_manager-test.cc
@@ -162,7 +162,7 @@ TEST_F(TxnStatusManagerTest, TestStartTransactions) {
     // Registering participants to transactions that don't exist should also
     // result in errors.
     auto s = txn_manager_->RegisterParticipant(2, kParticipant1, kOwner, 
&ts_error);
-    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
 
     // The underlying participants map should only reflect the successful
     // operations.
@@ -456,7 +456,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     TabletServerErrorPB ts_error;
     auto s = txn_manager_->GetTransactionStatus(
         1, kOwner, &txn_status, &ts_error);
-    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 1 not found");
     ASSERT_FALSE(txn_status.has_state());
     ASSERT_FALSE(txn_status.has_user());
@@ -590,7 +590,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     TabletServerErrorPB ts_error;
     auto s = txn_manager_->GetTransactionStatus(
         kNoTxnId, kOwner, &txn_status, &ts_error);
-    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
   }
 
   // Supplying wrong user and not-yet-used transaction ID.
@@ -600,7 +600,7 @@ TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
     TabletServerErrorPB ts_error;
     auto s = txn_manager_->GetTransactionStatus(
         kNoTxnId, "stranger", &txn_status, &ts_error);
-    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
   }
 }
 
@@ -613,7 +613,7 @@ TEST_F(TxnStatusManagerTest, KeepTransactionAlive) {
     TxnStatusManager::ScopedLeaderSharedLock l(txn_manager_.get());
     TabletServerErrorPB ts_error;
     auto s = txn_manager_->KeepTransactionAlive(1, kOwner, &ts_error);
-    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "transaction ID 1 not found");
   }
 
@@ -836,7 +836,7 @@ TEST_F(TxnStatusManagerTest, 
TestRegisterParticipantsWithStates) {
 
   // We can't register a participant to a transaction that hasn't started.
   Status s = txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), 
kOwner, &ts_error);
-  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
 
   ASSERT_OK(txn_manager_->BeginTransaction(kTxnId1, kOwner, nullptr, 
&ts_error));
   ASSERT_OK(txn_manager_->RegisterParticipant(kTxnId1, ParticipantId(1), 
kOwner, &ts_error));
diff --git a/src/kudu/transactions/txn_status_manager.cc 
b/src/kudu/transactions/txn_status_manager.cc
index e130d85..26245dc 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -834,7 +834,7 @@ Status TxnStatusManager::GetTransaction(int64_t txn_id,
 
   scoped_refptr<TransactionEntry> ret = FindPtrOrNull(txns_by_id_, txn_id);
   if (PREDICT_FALSE(!ret)) {
-    return Status::NotFound(
+    return Status::InvalidArgument(
         Substitute("transaction ID $0 not found, current highest txn ID: $1",
                   txn_id, highest_txn_id_));
   }
diff --git a/src/kudu/tserver/ts_tablet_manager.cc 
b/src/kudu/tserver/ts_tablet_manager.cc
index e6bd634..5554365 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -1159,6 +1159,14 @@ void TSTabletManager::RegisterAndBeginParticipantTxnTask(
         txn_id, replica->tablet_id(), user, deadline - now);
     VLOG(2) << Substitute("RegisterParticipant() $0 for txn ID $1 returned $2",
                           replica->tablet_id(), txn_id, s.ToString());
+    // If the transaction falls in a range that doesn't exist, re-open the
+    // transaction status table and try again.
+    if (s.IsNotFound()) {
+      s = txn_system_client->OpenTxnStatusTable().AndThen([&] {
+        return txn_system_client->RegisterParticipant(
+            txn_id, replica->tablet_id(), user, deadline - MonoTime::Now());
+      });
+    }
     if (PREDICT_FALSE(!s.ok())) {
       return began_txn_cb(s, TabletServerErrorPB::TXN_ILLEGAL_STATE);
     }

Reply via email to