This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 97b9827  KUDU-2612: add TxnManager::BeginTransaction()
97b9827 is described below

commit 97b9827199e8190656ddf356695f06a2e54e39b5
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon Oct 12 01:16:22 2020 -0700

    KUDU-2612: add TxnManager::BeginTransaction()
    
    This patch adds the implementation of the BeginTransaction() method
    to the TxnManager along with tests to cover the newly introduced
    functionality.
    
    Change-Id: I51c476d92bb5b147ffd03fd9f3163ab86d581496
    Reviewed-on: http://gerrit.cloudera.org:8080/16586
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <[email protected]>
---
 src/kudu/master/txn_manager-test.cc        | 222 +++++++++++++++++++++++++++--
 src/kudu/master/txn_manager.cc             |  86 ++++++++++-
 src/kudu/master/txn_manager.h              |   4 +
 src/kudu/transactions/txn_system_client.cc |   7 +-
 src/kudu/tserver/tablet_service.cc         |   5 +-
 5 files changed, 301 insertions(+), 23 deletions(-)

diff --git a/src/kudu/master/txn_manager-test.cc 
b/src/kudu/master/txn_manager-test.cc
index 913c03f..47d5fce 100644
--- a/src/kudu/master/txn_manager-test.cc
+++ b/src/kudu/master/txn_manager-test.cc
@@ -23,6 +23,7 @@
 #include <ostream>
 #include <string>
 #include <thread>
+#include <unordered_set>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
@@ -37,6 +38,8 @@
 #include "kudu/master/txn_manager.proxy.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "kudu/transactions/transactions.pb.h"
+#include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/util/barrier.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
@@ -47,14 +50,18 @@
 using kudu::cluster::InternalMiniCluster;
 using kudu::cluster::InternalMiniClusterOptions;
 using kudu::rpc::RpcController;
+using kudu::transactions::TxnStatePB;
 using std::string;
 using std::thread;
 using std::unique_ptr;
+using std::unordered_set;
 using std::vector;
 
 DECLARE_bool(txn_manager_enabled);
 DECLARE_bool(txn_manager_lazily_initialized);
 DECLARE_int32(rpc_service_queue_length);
+DECLARE_int64(txn_manager_status_table_range_partition_span);
+DECLARE_uint32(transaction_keep_alive_interval_ms);
 
 namespace kudu {
 namespace transactions {
@@ -75,6 +82,11 @@ class TxnManagerTest : public KuduTest {
     FLAGS_txn_manager_enabled = true;
     FLAGS_txn_manager_lazily_initialized = true;
 
+    // Make TxnManager creating new ranges in the transaction status table more
+    // often, so it's not necessary to start too many transactions to see it
+    // switching to a new tablet of the transaction status table.
+    FLAGS_txn_manager_status_table_range_partition_span = 1024;
+
     // A few scenarios (e.g. LazyInitializationConcurrentCalls) might require
     // an insanely high capacity of the service RPC queue since their workload
     // depends on the number of CPU cores available. They can send many 
requests
@@ -280,22 +292,206 @@ TEST_F(TxnManagerTest, NonlazyInitialization) {
   }
 }
 
-// BeginTransaction implementation is moved into a follow-up changelist.
-// TODO(aserbin): update this scenario once BeginTransction is here
-TEST_F(TxnManagerTest, BeginTransactionRpc) {
-  RpcController ctx;
-  PrepareRpcController(&ctx);
-  BeginTransactionRequestPB req;
-  BeginTransactionResponsePB resp;
-  ASSERT_OK(proxy_->BeginTransaction(req, &resp, &ctx));
+// This is scenario calls almost all methods of the TxnManager.
+TEST_F(TxnManagerTest, AbortedTransactionLifecycle) {
+  const auto fetch_txn_status = [this] (int64_t txn_id, TxnStatePB* state) {
+    RpcController ctx;
+    PrepareRpcController(&ctx);
+    GetTransactionStateRequestPB req;
+    GetTransactionStateResponsePB resp;
+    req.set_txn_id(txn_id);
+    ASSERT_OK(proxy_->GetTransactionState(req, &resp, &ctx));
+    ASSERT_FALSE(resp.has_error())
+        << StatusFromPB(resp.error().status()).ToString();
+    ASSERT_TRUE(resp.has_state());
+    *state = resp.state();
+  };
 
-  ASSERT_TRUE(resp.has_error());
-  auto s = StatusFromPB(resp.error().status());
-  ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
-  ASSERT_STR_CONTAINS(
-      s.ToString(), "Not implemented: BeginTransaction is not supported yet");
+  int64_t txn_id = -1;
+  {
+    RpcController ctx;
+    PrepareRpcController(&ctx);
+    BeginTransactionRequestPB req;
+    BeginTransactionResponsePB resp;
+    ASSERT_OK(proxy_->BeginTransaction(req, &resp, &ctx));
+    ASSERT_FALSE(resp.has_error())
+        << StatusFromPB(resp.error().status()).ToString();
+    ASSERT_TRUE(resp.has_txn_id());
+    txn_id = resp.txn_id();
+    ASSERT_LE(0, txn_id);
+    ASSERT_TRUE(resp.has_keepalive_millis());
+    ASSERT_EQ(FLAGS_transaction_keep_alive_interval_ms, 
resp.keepalive_millis());
+    TxnStatePB txn_state;
+    NO_FATALS(fetch_txn_status(txn_id, &txn_state));
+    ASSERT_EQ(TxnStatePB::OPEN, txn_state);
+  }
+
+  {
+    RpcController ctx;
+    PrepareRpcController(&ctx);
+    CommitTransactionRequestPB req;
+    CommitTransactionResponsePB resp;
+    req.set_txn_id(txn_id);
+    ASSERT_OK(proxy_->CommitTransaction(req, &resp, &ctx));
+    ASSERT_FALSE(resp.has_error())
+        << StatusFromPB(resp.error().status()).ToString();
+    TxnStatePB txn_state;
+    NO_FATALS(fetch_txn_status(txn_id, &txn_state));
+    ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_state);
+  }
+
+  // TODO(aserbin): add call to KeepTransactionAlive() when TxnStatusManager
+  //                has the functionality implemented.
+
+  {
+    RpcController ctx;
+    PrepareRpcController(&ctx);
+    AbortTransactionRequestPB req;
+    AbortTransactionResponsePB resp;
+    req.set_txn_id(txn_id);
+    ASSERT_OK(proxy_->AbortTransaction(req, &resp, &ctx));
+    ASSERT_FALSE(resp.has_error())
+        << StatusFromPB(resp.error().status()).ToString();
+    TxnStatePB txn_state;
+    NO_FATALS(fetch_txn_status(txn_id, &txn_state));
+    ASSERT_EQ(TxnStatePB::ABORTED, txn_state);
+  }
+}
+
+TEST_F(TxnManagerTest, BeginManyTransactions) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // In this functor CHECK_ is used instead of ASSERT_ because it's targeted
+  // for multi-thread use, and ASSERT_ macros do not seem working as expected
+  // in such case.
+  const auto txn_initiator = [this](
+      size_t txn_num,
+      vector<int64_t>* txn_ids) {
+    // Create its own proxy: this is important if trying to create more
+    // concurrency since a proxy serializes RPC calls.
+    TxnManagerServiceProxy p(
+        cluster_->messenger(),
+        cluster_->mini_master()->bound_rpc_addr(),
+        cluster_->mini_master()->bound_rpc_addr().host());
+    int64_t max_txn_id = -1;
+    for (auto id = 0; id < txn_num; ++id) {
+      BeginTransactionResponsePB resp;
+      while (true) {
+        RpcController ctx;
+        PrepareRpcController(&ctx);
+        BeginTransactionRequestPB req;
+        resp.Clear();
+        auto s = p.BeginTransaction(req, &resp, &ctx);
+        // The only acceptable non-OK status here is 
Status::ServiceUnavailable.
+        if (s.IsServiceUnavailable() ||
+            (resp.has_error() &&
+             StatusFromPB(resp.error().status()).IsServiceUnavailable())) {
+          SleepFor(MonoDelta::FromMilliseconds(10));
+          continue;
+        }
+        break;
+      }
+      CHECK(!resp.has_error()) << 
StatusFromPB(resp.error().status()).ToString();
+      CHECK(resp.has_txn_id());
+      int64_t txn_id = resp.txn_id();
+      CHECK_GT(txn_id, max_txn_id);
+      max_txn_id = txn_id;
+      if (txn_ids) {
+        txn_ids->emplace_back(txn_id);
+      }
+      CHECK(resp.has_keepalive_millis());
+      CHECK_EQ(FLAGS_transaction_keep_alive_interval_ms, 
resp.keepalive_millis());
+    }
+  };
+
+  // First, a simple sequential case: start many transactions one after 
another.
+  // The point here is to make sure the TxnManager:
+  //   * takes care adding new range partitions to the transaction status table
+  //   * transaction identifiers assigned to the newly started transactions are
+  //       ** unique
+  //       ** increase monotonically
+  {
+    const int64_t kNumTransactions =
+        FLAGS_txn_manager_status_table_range_partition_span * 3;
+
+    // TxnManager is lazily initialized, so no tablets of the transaction
+    // status tablet should be created yet.
+    const auto txn_tablets_before =
+        cluster_->mini_tablet_server(0)->ListTablets();
+    ASSERT_EQ(0, txn_tablets_before.size());
+
+    vector<int64_t> txn_ids;
+    txn_initiator(kNumTransactions, &txn_ids);
+    ASSERT_EQ(kNumTransactions, txn_ids.size());
+    int64_t prev_txn_id = -1;
+    for (const auto& txn_id : txn_ids) {
+      ASSERT_GT(txn_id, prev_txn_id);
+      prev_txn_id = txn_id;
+    }
+
+    // Check that corresponding tablets have been created for the transaction
+    // status table.
+    const auto txn_tablets_after =
+        cluster_->mini_tablet_server(0)->ListTablets();
+    auto expected_tablets_num = 1 +
+        prev_txn_id / FLAGS_txn_manager_status_table_range_partition_span;
+    ASSERT_EQ(expected_tablets_num, txn_tablets_after.size());
+  }
+
+  // A more complex case: run multiple threads, each starting many 
transactions.
+  // Make sure the generated transaction identifiers are unique.
+  {
+    const int64_t kNumTransactionsPerThread =
+        FLAGS_txn_manager_status_table_range_partition_span * 2;
+    const int kNumCPUs = base::NumCPUs();
+    const size_t kNumThreads = 2 * kNumCPUs;
+    vector<thread> threads;
+    threads.reserve(kNumThreads);
+    vector<vector<int64_t>> txn_ids_per_thread;
+    txn_ids_per_thread.resize(kNumThreads);
+    for (auto& slice : txn_ids_per_thread) {
+      slice.reserve(kNumTransactionsPerThread);
+    }
+    for (auto idx = 0; idx < kNumThreads; ++idx) {
+      threads.emplace_back(txn_initiator,
+                           kNumTransactionsPerThread,
+                           &txn_ids_per_thread[idx]);
+    }
+    for (auto& t : threads) {
+      t.join();
+    }
+
+    // Verify the uniqueness of the identifiers across all the threads. Instead
+    // of sort/unique, use std::unordered_set.
+    unordered_set<int64_t> txn_ids;
+    size_t total_size = 0;
+    for (const auto& slice: txn_ids_per_thread) {
+      EXPECT_EQ(kNumTransactionsPerThread, slice.size());
+      txn_ids.insert(slice.begin(), slice.end());
+      total_size += slice.size();
+    }
+    ASSERT_EQ(kNumTransactionsPerThread * kNumThreads, total_size);
+    ASSERT_EQ(total_size, txn_ids.size());
+  }
+
+  // Now start a single transaction to get the highest assigned txn_id so far.
+  // This is to check for the number of tablets in the transaction status table
+  // after all this activity.
+  {
+    vector<int64_t> txn_ids;
+    txn_initiator(1, &txn_ids);
+    ASSERT_EQ(1, txn_ids.size());
+    const auto txn_tablets = cluster_->mini_tablet_server(0)->ListTablets();
+    auto expected_tablets_num = 1 +
+        txn_ids[0] / FLAGS_txn_manager_status_table_range_partition_span;
+    ASSERT_EQ(expected_tablets_num, txn_tablets.size());
+  }
 }
 
+// TODO(aserbin): add test scenarios involving a multi-master Kudu cluster
+//                (hence there will be multiple TxnManager instances) and 
verify
+//                how all this works in case of frequent master re-elections.
+
 // KeepTransactionAlive is not yet supported.
 // TODO(aserbin): update this scenario once KeepTransactionAlive is implemented
 TEST_F(TxnManagerTest, KeepTransactionAliveRpc) {
diff --git a/src/kudu/master/txn_manager.cc b/src/kudu/master/txn_manager.cc
index b0a7e45..aee7ce6 100644
--- a/src/kudu/master/txn_manager.cc
+++ b/src/kudu/master/txn_manager.cc
@@ -73,6 +73,13 @@ DEFINE_int64(txn_manager_status_table_range_partition_span, 
1000000,
 TAG_FLAG(txn_manager_status_table_range_partition_span, advanced);
 TAG_FLAG(txn_manager_status_table_range_partition_span, experimental);
 
+DEFINE_uint32(transaction_keep_alive_interval_ms, 3000,
+              "Maximum interval (in milliseconds) between subsequent "
+              "keep-alive heartbeats from client to TxnManager to let it know "
+              "that a transaction is alive");
+TAG_FLAG(transaction_keep_alive_interval_ms, runtime);
+TAG_FLAG(transaction_keep_alive_interval_ms, experimental);
+
 namespace kudu {
 namespace transactions {
 
@@ -103,6 +110,8 @@ MonoDelta ToDelta(const MonoTime& deadline) {
 
 TxnManager::TxnManager(Master* server)
     : is_lazily_initialized_(FLAGS_txn_manager_lazily_initialized),
+      txn_status_table_range_span_(
+          FLAGS_txn_manager_status_table_range_partition_span),
       server_(server),
       need_init_(true),
       initialized_(false),
@@ -112,15 +121,86 @@ TxnManager::TxnManager(Master* server)
 TxnManager::~TxnManager() {
 }
 
-Status TxnManager::BeginTransaction(const string& /* username */,
+Status TxnManager::BeginTransaction(const string& username,
                                     const MonoTime& deadline,
                                     int64_t* txn_id,
                                     int32_t* keep_alive_interval_ms) {
   DCHECK(txn_id);
   DCHECK(keep_alive_interval_ms);
   RETURN_NOT_OK(CheckInitialized(deadline));
-  // TODO(aserbin): this is implemented in a follow-up changelist
-  return Status::NotSupported("BeginTransaction is not supported yet");
+
+  // TxnManager uses next_txn_id_ as a hint for next transaction identifier.
+  //
+  // TODO(aserbin): a better approach is to change TxnStatusManager's
+  //                BeginTransaction() to reserve next available transaction
+  //                identifier by forwarding initial hint supplied from here
+  //                through the chain, where each TxnStatusManager increments
+  //                its last seen txn ID and either sends it further
+  //                to corresponding TxnManager or writes a record into its own
+  //                txn status tablet. The latter ends the forwarding chain 
with
+  //                the reserved txn ID that is sent to TxnSystemClient with
+  //                the response and passed back to be used here as a hint for
+  //                txn ID on the next request.
+  int64_t try_txn_id = next_txn_id_++;
+  auto s = Status::TimedOut("timed out while trying to find txn_id");
+  while (MonoTime::Now() < deadline) {
+    int64_t highest_seen_txn_id = -1;
+    s = txn_sys_client_->BeginTransaction(
+        try_txn_id, username, &highest_seen_txn_id, ToDelta(deadline));
+    if (s.ok()) {
+      DCHECK_GE(highest_seen_txn_id, 0);
+      // The idea is to make the thread that has gotten a transaction reserved
+      // with the highest 'highest_seen_txn_id' so far updating 'next_txn_id_'
+      // until it succeeds or bail if there is another thread that received a
+      // greater 'highest_seen_txn_id' back from TxnStatusManager.
+      int64_t stored_txn_id = try_txn_id + 1;
+      while (true) {
+        if (next_txn_id_.compare_exchange_strong(stored_txn_id,
+                                                 highest_seen_txn_id + 1) ||
+            stored_txn_id > highest_seen_txn_id) {
+          break;
+        }
+      }
+      break;
+    }
+    if (s.IsInvalidArgument()) {
+      // TxnStatusManager reports that try_txn_id is too low to be used as
+      // an identifier for a new transaction.
+      DCHECK_GE(highest_seen_txn_id, 0);
+      try_txn_id = highest_seen_txn_id + 1;
+      continue;
+    }
+    if (s.IsNotFound()) {
+      // No tablet exists to store the record for a new transaction identified
+      // by 'try_txn_id', so it's time to add a new range into the transaction
+      // status table. By current design, TxnManager is the entity to take care
+      // of this: the TxnStatusManager cannot do that on its own because it is
+      // a logical entity just on top of a tablet corresponding to a single
+      // range of transaction identifiers.
+      //
+      // TODO(aserbin): make the range step controllable via a gflag and
+      //                retrieve the information on the high bound of the last
+      //                used transaction range from the tablet's metadata.
+      const auto lb = (try_txn_id / txn_status_table_range_span_) *
+          txn_status_table_range_span_;
+      const auto hb = lb + txn_status_table_range_span_;
+      auto new_range_status = txn_sys_client_->AddTxnStatusTableRange(lb, hb);
+      if (new_range_status.ok() || new_range_status.IsAlreadyPresent()) {
+        // OK, the tablet corresponding to the new range now exists.
+        // Status::AlreadyPresent() might be returned if there were concurrent
+        // requests to add the same range to the transaction status table.
+        continue;
+      }
+      return new_range_status.CloneAndPrepend(
+          "cannot add a new range to the transaction status table");
+    }
+    break;
+  }
+  if (s.ok()) {
+    *txn_id = try_txn_id;
+    *keep_alive_interval_ms = FLAGS_transaction_keep_alive_interval_ms;
+  }
+  return s;
 }
 
 Status TxnManager::CommitTransaction(int64_t txn_id,
diff --git a/src/kudu/master/txn_manager.h b/src/kudu/master/txn_manager.h
index 6fca78a..f852799 100644
--- a/src/kudu/master/txn_manager.h
+++ b/src/kudu/master/txn_manager.h
@@ -102,6 +102,10 @@ class TxnManager final {
   // Whether or not this instance is lazily initialized.
   const bool is_lazily_initialized_;
 
+  // The span of a range partition to use when adding new ranges to the
+  // transaction status table.
+  const int64_t txn_status_table_range_span_;
+
   // The pointer to the top-level Master object. From the lifecycle 
perspective,
   // Master is assumed to be alive during the whole lifecycle of TxnManager
   // since the Master is the component that hosts TxnManager (this is similar
diff --git a/src/kudu/transactions/txn_system_client.cc 
b/src/kudu/transactions/txn_system_client.cc
index 2dc17c9..092eb5d 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -138,11 +138,8 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id,
                                            s.AsStatusCallback(),
                                            &result));
   const auto ret = s.Wait();
-  if (ret.ok() || ret.IsInvalidArgument()) {
-    DCHECK(result.has_highest_seen_txn_id());
-    if (highest_seen_txn_id) {
-      *highest_seen_txn_id = result.highest_seen_txn_id();
-    }
+  if (result.has_highest_seen_txn_id() && highest_seen_txn_id) {
+    *highest_seen_txn_id = result.highest_seen_txn_id();
   }
   return ret;
 }
diff --git a/src/kudu/tserver/tablet_service.cc 
b/src/kudu/tserver/tablet_service.cc
index 1c66aa0..38f5386 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1252,7 +1252,8 @@ void TabletServiceAdminImpl::CoordinateTransaction(const 
CoordinateTransactionRe
   int64_t highest_seen_txn_id = -1;
   switch (op.type()) {
     case CoordinatorOpPB::BEGIN_TXN:
-      s = txn_coordinator->BeginTransaction(txn_id, user, 
&highest_seen_txn_id, &ts_error);
+      s = txn_coordinator->BeginTransaction(
+          txn_id, user, &highest_seen_txn_id, &ts_error);
       break;
     case CoordinatorOpPB::REGISTER_PARTICIPANT:
       s = txn_coordinator->RegisterParticipant(txn_id, 
op.txn_participant_id(), user, &ts_error);
@@ -1281,7 +1282,7 @@ void TabletServiceAdminImpl::CoordinateTransaction(const 
CoordinateTransactionRe
     // Populate corresponding field in the response.
     *(resp->mutable_op_result()->mutable_txn_status()) = std::move(txn_status);
   }
-  if (op.type() == CoordinatorOpPB::BEGIN_TXN) {
+  if (op.type() == CoordinatorOpPB::BEGIN_TXN && !s.IsServiceUnavailable()) {
     DCHECK_GE(highest_seen_txn_id, 0);
     resp->mutable_op_result()->set_highest_seen_txn_id(highest_seen_txn_id);
   }

Reply via email to