This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 948f92e  KUDU-2612: add a TxnSystemClient to the tservers
948f92e is described below

commit 948f92e787136f12dbfb3e7195fef24db0be0088
Author: Andrew Wong <[email protected]>
AuthorDate: Sat Jan 23 16:22:37 2021 -0800

    KUDU-2612: add a TxnSystemClient to the tservers
    
    This patch adds a TxnSystemClient that gets initialized asynchronously,
    attempting to connect to the masters in the background in a similar
    fashion to the Heartbeater threads.
    
    There is some intricacy in the initialization of the client to note.
    Namely, if trying to connect to a set of masters while none of the
    masters can be reached, the KuduClientBuilder will attempt to retry
    connecting to each master repeatedly. This is problematic, as several
    tserver tests do not spin up masters. So, taking a page out of the
    Heartbeater book, the initialization will first ping each master. As
    long as at least one of them can connect, the TxnSystemClient will then
    proceed to attempt to connect to the cluster.
    
    This patch focuses only on adding the client initialization logic to the
    tservers, ensuring that even without connecting to a cluster, tservers
    can successfully bootstrap. This client will be used in later patches to
    communicate between the TxnStatusManager and transaction participants.
    
    Also note the similarities between TxnManager initialization, which
    happens on masters in a single-threaded threadpool. I considered reusing
    the implementation for tservers, but opted not to given the TxnManager
    initialization fairly well embedded in master code and has some
    initialization pieces not needed by TxnStatusManagers and participants
    (e.g. tservers aren't in charge of creating the transaction status
    table). I left a TODO to refactor for code reuse.
    
    Change-Id: I33b5a2bb5c56ae4bb4b42069af5813e2780fc4bc
    Reviewed-on: http://gerrit.cloudera.org:8080/16974
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/client/client-test.cc                     |   5 +
 .../integration-tests/location_assignment-itest.cc |   1 +
 .../integration-tests/txn_status_table-itest.cc    |  32 ++++++
 src/kudu/rpc/sasl_common.cc                        |   1 -
 src/kudu/transactions/CMakeLists.txt               |   1 +
 src/kudu/transactions/txn_system_client.cc         | 114 ++++++++++++++++++++-
 src/kudu/transactions/txn_system_client.h          |  44 ++++++++
 src/kudu/tserver/tablet_server.cc                  |   7 ++
 src/kudu/tserver/tablet_server.h                   |  11 ++
 9 files changed, 214 insertions(+), 2 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 4d7e0ca..ca09c40 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -137,6 +137,7 @@ DECLARE_bool(allow_unsafe_replication_factor);
 DECLARE_bool(catalog_manager_support_live_row_count);
 DECLARE_bool(catalog_manager_support_on_disk_size);
 DECLARE_bool(client_use_unix_domain_sockets);
+DECLARE_bool(disable_txn_system_client_init);
 DECLARE_bool(fail_dns_resolution);
 DECLARE_bool(location_mapping_by_uuid);
 DECLARE_bool(log_inject_latency);
@@ -7705,6 +7706,10 @@ class ClientWithLocationTest : public ClientTest {
     FLAGS_location_mapping_cmd = strings::Substitute("$0 $1",
                                                      location_cmd_path, 
location);
     FLAGS_location_mapping_by_uuid = true;
+
+    // Some of these tests assume no client activity, so disable the
+    // transaction system client.
+    FLAGS_disable_txn_system_client_init = true;
   }
 };
 
diff --git a/src/kudu/integration-tests/location_assignment-itest.cc 
b/src/kudu/integration-tests/location_assignment-itest.cc
index 5a3a95d..673b131 100644
--- a/src/kudu/integration-tests/location_assignment-itest.cc
+++ b/src/kudu/integration-tests/location_assignment-itest.cc
@@ -152,6 +152,7 @@ class TsLocationAssignmentITest :
     const auto& param = GetParam();
     opts_.num_masters = std::get<0>(param);
     opts_.num_tablet_servers = std::get<1>(param);
+    opts_.extra_tserver_flags.emplace_back("--disable_txn_system_client_init");
   }
 
   virtual ~TsLocationAssignmentITest() = default;
diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc 
b/src/kudu/integration-tests/txn_status_table-itest.cc
index 6cd5b25..271e643 100644
--- a/src/kudu/integration-tests/txn_status_table-itest.cc
+++ b/src/kudu/integration-tests/txn_status_table-itest.cc
@@ -532,6 +532,38 @@ TEST_F(TxnStatusTableITest, 
SystemClientCommitAndAbortTransaction) {
   }
 }
 
+TEST_F(TxnStatusTableITest, TServerInitializesTxnSystemClient) {
+  auto* mts = cluster_->mini_tablet_server(0);
+  auto* client_initializer = mts->server()->txn_client_initializer();
+  TxnSystemClient* txn_client = nullptr;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(client_initializer->GetClient(&txn_client));
+  });
+  // We should be able to get the client, and use it to make a call.
+  ASSERT_NE(txn_client, nullptr);
+  ASSERT_OK(txn_client->CreateTxnStatusTable(100));
+  txn_client = nullptr;
+
+  // Try shutting down the master, and then restarting the tablet server to
+  // attempt to rebuild a TxnSystemClient. This should fail until the master
+  // comes back up.
+  cluster_->mini_master()->Shutdown();
+  mts->Shutdown();
+  ASSERT_OK(mts->Restart());
+  client_initializer = mts->server()->txn_client_initializer();
+  Status s = client_initializer->GetClient(&txn_client);
+  ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString();
+  ASSERT_EQ(txn_client, nullptr);
+
+  ASSERT_OK(cluster_->mini_master()->Restart());
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(client_initializer->GetClient(&txn_client));
+  });
+  // We should be able to get the client, and use it to make a call.
+  ASSERT_NE(txn_client, nullptr);
+  ASSERT_OK(txn_client->OpenTxnStatusTable());
+}
+
 TEST_F(TxnStatusTableITest, GetTransactionStatus) {
   const auto verify_state = [&](TxnStatePB state) {
     TxnStatusEntryPB txn_status;
diff --git a/src/kudu/rpc/sasl_common.cc b/src/kudu/rpc/sasl_common.cc
index 4b60e85..ab0c3b7 100644
--- a/src/kudu/rpc/sasl_common.cc
+++ b/src/kudu/rpc/sasl_common.cc
@@ -277,7 +277,6 @@ Status SaslInit(bool kerberos_keytab_provided) {
   // Only execute SASL initialization once
   static std::once_flag once;
   std::call_once(once, DoSaslInit, kerberos_keytab_provided);
-  DCHECK_EQ(kerberos_keytab_provided, has_kerberos_keytab);
 
   return sasl_init_status;
 }
diff --git a/src/kudu/transactions/CMakeLists.txt 
b/src/kudu/transactions/CMakeLists.txt
index 8e944fc..d7d26e8 100644
--- a/src/kudu/transactions/CMakeLists.txt
+++ b/src/kudu/transactions/CMakeLists.txt
@@ -41,6 +41,7 @@ add_library(transactions ${TRANSACTIONS_SRCS})
 target_link_libraries(transactions
   kudu_client
   kudu_common
+  server_process
   tablet
   transactions_proto
   ${KUDU_BASE_LIBS}
diff --git a/src/kudu/transactions/txn_system_client.cc 
b/src/kudu/transactions/txn_system_client.cc
index 48aafd2..9d9d1d8 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -20,9 +20,11 @@
 #include <functional>
 #include <memory>
 #include <mutex>
+#include <ostream>
 #include <string>
 
 #include <boost/optional/optional.hpp>
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 
 #include "kudu/client/client-internal.h"
@@ -33,15 +35,33 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/rpc/rpc_controller.h"
 #include "kudu/transactions/coordinator_rpc.h"
 #include "kudu/transactions/participant_rpc.h"
-#include "kudu/transactions/txn_status_tablet.h"
 #include "kudu/transactions/transactions.pb.h"
+#include "kudu/transactions/txn_status_tablet.h"
 #include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/util/async_util.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/threadpool.h"
+
+DEFINE_bool(disable_txn_system_client_init, false,
+            "Whether or not background TxnSystemClient initialization should "
+            "be disabled. This is useful for tests that do not expect any "
+            "client connections.");
+TAG_FLAG(disable_txn_system_client_init, unsafe);
+
+DECLARE_int64(rpc_negotiation_timeout_ms);
 
 using kudu::client::KuduClient;
 using kudu::client::KuduSchema;
@@ -50,12 +70,19 @@ using kudu::client::KuduTable;
 using kudu::client::KuduTableAlterer;
 using kudu::client::KuduTableCreator;
 using kudu::client::internal::MetaCache;
+using kudu::master::MasterServiceProxy;
+using kudu::master::PingRequestPB;
+using kudu::master::PingResponsePB;
+using kudu::rpc::Messenger;
+using kudu::rpc::RpcController;
 using kudu::tserver::CoordinatorOpPB;
 using kudu::tserver::CoordinatorOpResultPB;
 using kudu::tserver::ParticipantOpPB;
+using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace transactions {
@@ -337,5 +364,90 @@ void TxnSystemClient::ParticipateInTransactionAsync(const 
string& tablet_id,
       });
 }
 
+TxnSystemClientInitializer::TxnSystemClientInitializer()
+    : init_complete_(false),
+      shutting_down_(false) {}
+
+TxnSystemClientInitializer::~TxnSystemClientInitializer() {
+  Shutdown();
+}
+
+Status TxnSystemClientInitializer::Init(const shared_ptr<Messenger>& messenger,
+                                        vector<HostPort> master_addrs) {
+  RETURN_NOT_OK(ThreadPoolBuilder("txn-client-init")
+      .set_max_threads(1)
+      .Build(&txn_client_init_pool_));
+
+  return txn_client_init_pool_->Submit([this, messenger, master_addrs = 
std::move(master_addrs)] {
+      unique_ptr<TxnSystemClient> txn_client;
+      while (!shutting_down_) {
+        static const MonoDelta kRetryInterval = MonoDelta::FromSeconds(1);
+        if (PREDICT_FALSE(FLAGS_disable_txn_system_client_init)) {
+          KLOG_EVERY_N_SECS(WARNING, 60) <<
+              Substitute("initialization of TxnSystemClient disabled, will 
retry in $0",
+                         kRetryInterval.ToString());
+          SleepFor(kRetryInterval);
+          continue;
+        }
+        // HACK: if the master addresses are all totally unreachable,
+        // KuduClientBuilder::Build() will hang, attempting fruitlessly to
+        // retry, in the below call to Create(). So first, make sure we can at
+        // least reach the masters; if not, try again.
+        // TODO(awong): there's still a small window between these pings and
+        // client creation. If this ends up being a problem, we may need to
+        // come to a more robust solution, e.g. adding a timeout to Create().
+        DnsResolver dns_resolver;
+        Status s;
+        for (const auto& hp : master_addrs) {
+          vector<Sockaddr> addrs;
+          s = dns_resolver.ResolveAddresses(hp, &addrs).AndThen([&] {
+            unique_ptr<MasterServiceProxy> proxy(
+                new MasterServiceProxy(messenger, addrs[0], hp.host()));
+            PingRequestPB req;
+            PingResponsePB resp;
+            RpcController rpc;
+            
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_rpc_negotiation_timeout_ms));
+            return proxy->Ping(req, &resp, &rpc);
+          });
+          if (s.ok()) {
+            break;
+          }
+        }
+        // Only if we can reach at least one of the masters should we try
+        // connecting.
+        if (PREDICT_TRUE(s.ok())) {
+          s = TxnSystemClient::Create(master_addrs, &txn_client);
+        }
+        if (PREDICT_TRUE(s.ok())) {
+          txn_client_ = std::move(txn_client);
+          init_complete_ = true;
+          return;
+        }
+        KLOG_EVERY_N_SECS(WARNING, 60) <<
+            Substitute("unable to initialize TxnSystemClient, will retry in 
$0: $1",
+                       kRetryInterval.ToString(), s.ToString());
+        SleepFor(kRetryInterval);
+      }
+  });
+}
+
+Status TxnSystemClientInitializer::GetClient(TxnSystemClient** client) const {
+  // NOTE: the shutdown check is best effort. There's still room for a TOCTOU.
+  if (PREDICT_FALSE(shutting_down_)) {
+    return Status::ServiceUnavailable("could not get TxnSystemClient, shutting 
down");
+  }
+  if (PREDICT_TRUE(init_complete_)) {
+    *client = DCHECK_NOTNULL(txn_client_.get());
+    return Status::OK();
+  }
+  return Status::ServiceUnavailable("could not get TxnSystemClient, still 
initializing");
+}
+
+void TxnSystemClientInitializer::Shutdown() {
+  shutting_down_ = true;
+  txn_client_init_pool_->Wait();
+  txn_client_init_pool_->Shutdown();
+}
+
 } // namespace transactions
 } // namespace kudu
diff --git a/src/kudu/transactions/txn_system_client.h 
b/src/kudu/transactions/txn_system_client.h
index 31f13aa..54e70fb 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -16,6 +16,7 @@
 // under the License.
 #pragma once
 
+#include <atomic>
 #include <cstdint>
 #include <memory>
 #include <mutex>
@@ -34,6 +35,7 @@
 namespace kudu {
 class HostPort;
 class Timestamp;
+class ThreadPool;
 
 namespace client {
 class KuduClient;
@@ -45,6 +47,10 @@ class TxnStatusTableITest;
 class TxnStatusTableITest_TestProtectCreateAndAlter_Test;
 } // namespace itest
 
+namespace rpc {
+class Messenger;
+} // namespace rpc
+
 namespace tserver {
 class CoordinatorOpPB;
 class CoordinatorOpResultPB;
@@ -175,5 +181,43 @@ class TxnSystemClient {
   client::sp::shared_ptr<client::KuduTable> txn_status_table_;
 };
 
+// Wrapper around a TxnSystemClient that allows callers to asynchronously
+// create a client.
+// TODO(awong): the problem at hand is similar to TxnManager initialization,
+// minus table creation. Refactor for code reuse?
+class TxnSystemClientInitializer {
+ public:
+  TxnSystemClientInitializer();
+  ~TxnSystemClientInitializer();
+
+  // Starts attempts to initialize the transaction system client.
+  Status Init(const std::shared_ptr<rpc::Messenger>& messenger,
+              std::vector<HostPort> master_addrs);
+
+  // Returns a ServiceUnavailable error if the client has not yet been
+  // initialized or if Shutdown() has been called. Otherwise, returns OK and
+  // sets 'client'. Callers should ensure that 'client' is only used while the
+  // TxnSystemClientInitializer is still in scope.
+  Status GetClient(TxnSystemClient** client) const;
+
+  // Stops the initialization, preventing success of further calls to
+  // GetClient().
+  void Shutdown();
+
+ private:
+  // Whether or not 'txn_client_' has been initialized.
+  std::atomic<bool> init_complete_;
+
+  // Whether or not the client initializer is shutting down, in which case
+  // attempts to access 'txn_client_' should fail.
+  std::atomic<bool> shutting_down_;
+
+  // Threadpool on which to schedule attempts to initialize 'txn_client_'.
+  std::unique_ptr<ThreadPool> txn_client_init_pool_;
+
+  // The TxnSystemClient, initialized asynchronously via calls to Init().
+  std::unique_ptr<TxnSystemClient> txn_client_;
+};
+
 } // namespace transactions
 } // namespace kudu
diff --git a/src/kudu/tserver/tablet_server.cc 
b/src/kudu/tserver/tablet_server.cc
index 6aa59ff..92f727d 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -31,6 +31,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/service_if.h"
 #include "kudu/server/rpc_server.h"
+#include "kudu/transactions/txn_system_client.h"
 #include "kudu/tserver/heartbeater.h"
 #include "kudu/tserver/scanners.h"
 #include "kudu/tserver/tablet_copy_service.h"
@@ -44,6 +45,7 @@
 
 using kudu::fs::ErrorHandlerType;
 using kudu::rpc::ServiceIf;
+using kudu::transactions::TxnSystemClientInitializer;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -102,6 +104,9 @@ Status TabletServer::Init() {
   maintenance_manager_ = std::make_shared<MaintenanceManager>(
       MaintenanceManager::kDefaultOptions, fs_manager_->uuid(), 
metric_entity());
 
+  client_initializer_.reset(new TxnSystemClientInitializer);
+  RETURN_NOT_OK(client_initializer_->Init(messenger_, opts_.master_addresses));
+
   heartbeater_.reset(new Heartbeater(std::move(master_addrs), this));
 
   RETURN_NOT_OK_PREPEND(tablet_manager_->Init(),
@@ -170,6 +175,8 @@ void TabletServer::ShutdownImpl() {
     fs_manager_->UnsetErrorNotificationCb(ErrorHandlerType::CFILE_CORRUPTION);
     tablet_manager_->Shutdown();
 
+    client_initializer_->Shutdown();
+
     // 3. Shut down generic subsystems.
     KuduServer::Shutdown();
     LOG(INFO) << "TabletServer@" << name << " shutdown complete.";
diff --git a/src/kudu/tserver/tablet_server.h b/src/kudu/tserver/tablet_server.h
index ad0e2e4..2df5cc3 100644
--- a/src/kudu/tserver/tablet_server.h
+++ b/src/kudu/tserver/tablet_server.h
@@ -31,6 +31,10 @@ namespace kudu {
 
 class MaintenanceManager;
 
+namespace transactions {
+class TxnSystemClientInitializer;
+} // namespace transactions
+
 namespace tserver {
 
 class Heartbeater;
@@ -83,6 +87,10 @@ class TabletServer : public kserver::KuduServer {
     return maintenance_manager_.get();
   }
 
+  transactions::TxnSystemClientInitializer* txn_client_initializer() {
+    return client_initializer_.get();
+  }
+
   bool quiescing() const {
     return quiescing_;
   }
@@ -123,6 +131,9 @@ class TabletServer : public kserver::KuduServer {
   // dependencies.
   std::unique_ptr<ScannerManager> scanner_manager_;
 
+  // Thread that initializes a TxnSystemClient.
+  std::unique_ptr<transactions::TxnSystemClientInitializer> 
client_initializer_;
+
   // Thread responsible for heartbeating to the master.
   std::unique_ptr<Heartbeater> heartbeater_;
 

Reply via email to