This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 1efa619a22cb48b69de180a067008644dca878f8
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Sep 12 16:22:14 2019 -0700

    [mini_cluster] introduce 'builtin' clock source
    
    This patch introduces the built-in NTP client into the external
    minicluster test scaffolding.  With this patch, it's possible
    to synchronize the built-in NTP client of the minicluster's masters
    and tablet servers with chronyd NTP server which is run as a part
    of the minicluster as well.  The latter runs in server-only mode
    (i.e. not driving the system clock), using the machine's clock
    as a reference (for details, see 'man chronyd', '-x' option).
    
    Change-Id: I5c334ae6fa1fb12b033de7f8e8584b8dd3aa2d32
    Reviewed-on: http://gerrit.cloudera.org:8080/14227
    Reviewed-by: Adar Dembo <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
---
 .../integration-tests/master_migration-itest.cc    |  6 +-
 .../mini-cluster/external_mini_cluster-test.cc     | 83 +++++++++++++++-------
 src/kudu/mini-cluster/external_mini_cluster.cc     | 62 ++++++++++++----
 src/kudu/mini-cluster/external_mini_cluster.h      | 13 ++--
 4 files changed, 117 insertions(+), 47 deletions(-)

diff --git a/src/kudu/integration-tests/master_migration-itest.cc 
b/src/kudu/integration-tests/master_migration-itest.cc
index bce4745..e6aa38e 100644
--- a/src/kudu/integration-tests/master_migration-itest.cc
+++ b/src/kudu/integration-tests/master_migration-itest.cc
@@ -197,7 +197,11 @@ TEST_F(MasterMigrationTest, TestEndToEndMigration) {
   }
 
   // Bring down the old cluster configuration and bring up the new one.
-  cluster->Shutdown();
+  // In addition in masters and tablet servers, shut down other helper 
processes
+  // as well by destroying the ExternalMiniCluster object wrapped into
+  // unique_ptr wrapper by calling 'std::unique_ptr::reset()'.
+  cluster.reset();
+
   opts.num_masters = 3;
   opts.master_rpc_addresses = master_rpc_addresses;
   ExternalMiniCluster migrated_cluster(std::move(opts));
diff --git a/src/kudu/mini-cluster/external_mini_cluster-test.cc 
b/src/kudu/mini-cluster/external_mini_cluster-test.cc
index 8e60bfa..8b3acb3 100644
--- a/src/kudu/mini-cluster/external_mini_cluster-test.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster-test.cc
@@ -40,51 +40,76 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
-namespace kudu {
-namespace cluster {
-
-using std::make_pair;
-using std::pair;
 using std::string;
+using std::tuple;
 using std::vector;
 using strings::Substitute;
 
+namespace kudu {
+namespace cluster {
+
 enum class Kerberos {
-  ENABLED,
   DISABLED,
+  ENABLED,
 };
 
-enum HiveMetastore {
-  ENABLED,
+enum class HiveMetastore {
   DISABLED,
+  ENABLED,
 };
 
-// Beautifies CLI test output.
-std::ostream& operator<<(std::ostream& o, Kerberos k) {
-  switch (k) {
-    case Kerberos::ENABLED: return o << "Kerberos::ENABLED";
-    case Kerberos::DISABLED: return o << "Kerberos::DISABLED";
+enum BuiltInNtp {
+  DISABLED = 0,
+  ENABLED_SINGLE_SERVER = 1,
+  ENABLED_MULTIPLE_SERVERS = 5,
+};
+
+// Beautifies test output if a test scenario fails.
+std::ostream& operator<<(std::ostream& o, Kerberos opt) {
+  switch (opt) {
+    case Kerberos::ENABLED:
+      return o << "Kerberos::ENABLED";
+    case Kerberos::DISABLED:
+      return o << "Kerberos::DISABLED";
   }
   return o;
 }
-std::ostream& operator<<(std::ostream& o, HiveMetastore k) {
-  switch (k) {
-    case HiveMetastore::ENABLED: return o << "HiveMetastore::ENABLED";
-    case HiveMetastore::DISABLED: return o << "HiveMetastore::DISABLED";
+
+std::ostream& operator<<(std::ostream& o, HiveMetastore opt) {
+  switch (opt) {
+    case HiveMetastore::ENABLED:
+      return o << "HiveMetastore::ENABLED";
+    case HiveMetastore::DISABLED:
+      return o << "HiveMetastore::DISABLED";
   }
   return o;
 }
 
-class ExternalMiniClusterTest : public KuduTest,
-                                public 
testing::WithParamInterface<pair<Kerberos, HiveMetastore>> {
+std::ostream& operator<<(std::ostream& o, BuiltInNtp opt) {
+  switch (opt) {
+    case BuiltInNtp::DISABLED:
+      return o << "BuiltInNtp::DISABLED";
+    case BuiltInNtp::ENABLED_SINGLE_SERVER:
+      return o << "BuiltInNtp::ENABLED_SINGLE_SERVER";
+    case BuiltInNtp::ENABLED_MULTIPLE_SERVERS:
+      return o << "BuiltInNtp::ENABLED_MULTIPLE_SERVERS";
+  }
+  return o;
+}
+
+class ExternalMiniClusterTest :
+    public KuduTest,
+    public testing::WithParamInterface<tuple<Kerberos, HiveMetastore, 
BuiltInNtp>> {
 };
 
-INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
-                        ExternalMiniClusterTest,
-                        testing::Values(make_pair(Kerberos::DISABLED, 
HiveMetastore::DISABLED),
-                                        make_pair(Kerberos::ENABLED, 
HiveMetastore::DISABLED),
-                                        make_pair(Kerberos::DISABLED, 
HiveMetastore::ENABLED),
-                                        make_pair(Kerberos::ENABLED, 
HiveMetastore::ENABLED)));
+INSTANTIATE_TEST_CASE_P(,
+    ExternalMiniClusterTest,
+    ::testing::Combine(
+        ::testing::Values(Kerberos::DISABLED, Kerberos::ENABLED),
+        ::testing::Values(HiveMetastore::DISABLED, HiveMetastore::ENABLED),
+        ::testing::Values(BuiltInNtp::DISABLED,
+                          BuiltInNtp::ENABLED_SINGLE_SERVER,
+                          BuiltInNtp::ENABLED_MULTIPLE_SERVERS)));
 
 void SmokeTestKerberizedCluster(ExternalMiniClusterOptions opts) {
   ASSERT_TRUE(opts.enable_kerberos);
@@ -124,11 +149,15 @@ TEST_F(ExternalMiniClusterTest, TestKerberosReacquire) {
 }
 
 TEST_P(ExternalMiniClusterTest, TestBasicOperation) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
   ExternalMiniClusterOptions opts;
-  opts.enable_kerberos = GetParam().first == Kerberos::ENABLED;
-  if (GetParam().second == HiveMetastore::ENABLED) {
+  const auto& param = GetParam();
+  opts.enable_kerberos = std::get<0>(param) == Kerberos::ENABLED;
+  if (std::get<1>(param) == HiveMetastore::ENABLED) {
     opts.hms_mode = HmsMode::ENABLE_HIVE_METASTORE;
   }
+  opts.num_ntp_servers = std::get<2>(param);
 
   opts.num_masters = 3;
   opts.num_tablet_servers = 3;
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc 
b/src/kudu/mini-cluster/external_mini_cluster.cc
index 185f9d3..8e77f67 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -158,6 +158,24 @@ Status ExternalMiniCluster::HandleOptions() {
   return Status::OK();
 }
 
+Status ExternalMiniCluster::AddNtpFlags(std::vector<std::string>* flags) {
+  DCHECK(flags);
+  if (opts_.num_ntp_servers > 0) {
+    vector<string> ntp_endpoints;
+    CHECK_EQ(opts_.num_ntp_servers, ntp_servers_.size());
+    for (const auto& server : ntp_servers_) {
+      const auto& opt = server->options();
+      ntp_endpoints.emplace_back(HostPort(opt.bindaddress, 
opt.port).ToString());
+    }
+    flags->emplace_back(Substitute("--builtin_ntp_servers=$0",
+                                   JoinStrings(ntp_endpoints, ",")));
+    flags->emplace_back(Substitute("--builtin_ntp_poll_interval_ms=100"));
+    flags->emplace_back(Substitute("--ntp_initial_sync_wait_secs=2"));
+    flags->emplace_back("--time_source=builtin");
+  }
+  return Status::OK();
+}
+
 Status ExternalMiniCluster::StartSentry() {
   sentry_->SetDataRoot(opts_.cluster_root);
 
@@ -224,8 +242,24 @@ Status ExternalMiniCluster::Start() {
 
   // Start NTP servers, if requested.
   if (opts_.num_ntp_servers > 0) {
+    // Collect and keep alive the set of sockets bound with SO_REUSEPORT option
+    // until all chronyd proccesses are started. This allows to avoid port
+    // conflicts: chronyd doesn't support binding to wildcard addresses and
+    // it's necessary to make sure chronyd is able to bind to the port 
specified
+    // in its configuration. So, the mini-cluster reserves a set of ports up
+    // front, then starts the set of chronyd processes, each bound to one
+    // of the reserved ports.
+    vector<unique_ptr<Socket>> reserved_sockets;
     for (auto i = 0; i < opts_.num_ntp_servers; ++i) {
-      RETURN_NOT_OK_PREPEND(AddNtpServer(),
+      unique_ptr<Socket> reserved_socket;
+      RETURN_NOT_OK_PREPEND(ReserveDaemonSocket(
+          DaemonType::EXTERNAL_SERVER, i, opts_.bind_mode, &reserved_socket),
+          "failed to reserve chronyd socket address");
+      Sockaddr addr;
+      RETURN_NOT_OK(reserved_socket->GetSocketAddress(&addr));
+      reserved_sockets.emplace_back(std::move(reserved_socket));
+
+      RETURN_NOT_OK_PREPEND(AddNtpServer(addr),
                             Substitute("failed to start NTP server $0", i));
     }
   }
@@ -489,9 +523,10 @@ Status ExternalMiniCluster::StartMasters() {
     flags.emplace_back("--location_mapping_by_uuid");
 #   endif
   }
-  string exe = GetBinaryPath(kMasterBinaryName);
+  RETURN_NOT_OK(AddNtpFlags(&flags));
 
   // Start the masters.
+  const string& exe = GetBinaryPath(kMasterBinaryName);
   for (int i = 0; i < num_masters; i++) {
     string daemon_id = Substitute("master-$0", i);
 
@@ -556,11 +591,9 @@ Status ExternalMiniCluster::AddTabletServer() {
   CHECK(leader_master() != nullptr)
       << "Must have started at least 1 master before adding tablet servers";
 
-  int idx = tablet_servers_.size();
-  string daemon_id = Substitute("ts-$0", idx);
-
-  vector<HostPort> master_hostports = master_rpc_addrs();
-  string bind_host = GetBindIpForTabletServer(idx);
+  const int idx = tablet_servers_.size();
+  const string daemon_id = Substitute("ts-$0", idx);
+  const string bind_host = GetBindIpForTabletServer(idx);
 
   ExternalDaemonOptions opts;
   opts.messenger = messenger_;
@@ -573,11 +606,14 @@ Status ExternalMiniCluster::AddTabletServer() {
     opts.perf_record_filename =
         Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
   }
-  opts.extra_flags = SubstituteInFlags(opts_.extra_tserver_flags, idx);
+  auto extra_flags = SubstituteInFlags(opts_.extra_tserver_flags, idx);
+  RETURN_NOT_OK(AddNtpFlags(&extra_flags));
+  opts.extra_flags = extra_flags;
   opts.start_process_timeout = opts_.start_process_timeout;
   opts.rpc_bind_address = HostPort(bind_host, 0);
   opts.logtostderr = opts_.logtostderr;
 
+  vector<HostPort> master_hostports = master_rpc_addrs();
   scoped_refptr<ExternalTabletServer> ts = new ExternalTabletServer(opts, 
master_hostports);
   if (opts_.enable_kerberos) {
     RETURN_NOT_OK_PREPEND(ts->EnableKerberos(kdc_.get(), bind_host),
@@ -589,13 +625,11 @@ Status ExternalMiniCluster::AddTabletServer() {
   return Status::OK();
 }
 
-Status ExternalMiniCluster::AddNtpServer() {
-  const auto idx = ntp_servers_.size();
-  string bind_host = GetBindIpForExternalServer(idx);
-
+Status ExternalMiniCluster::AddNtpServer(const Sockaddr& addr) {
   clock::MiniChronydOptions options;
-  options.port = 10123 + idx;
-  options.bindaddress = bind_host;
+  options.index = ntp_servers_.size();
+  options.bindaddress = addr.host();
+  options.port = static_cast<uint16_t>(addr.port());
   unique_ptr<MiniChronyd> chrony(new MiniChronyd(std::move(options)));
   RETURN_NOT_OK(chrony->Start());
   ntp_servers_.emplace_back(std::move(chrony));
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h 
b/src/kudu/mini-cluster/external_mini_cluster.h
index 05e63c6..79eca1d 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -187,9 +187,11 @@ struct ExternalMiniClusterOptions {
   // Default: empty
   LocationInfo location_info;
 
-  // Number of NTP servers to start as part of the cluster. The servers
-  // are used as reference NTP servers for the built-in NTP client: it uses
-  // them to synchronize its internal clock.
+  // Number of NTP servers to start as part of the cluster. The NTP servers are
+  // used as true time references for the NTP client built into masters and
+  // tablet servers. Specifying a value greater than 0 automatically enables
+  // the built-in NTP client, i.e. switches the clock source from the system
+  // wallclock to the wallclock tracked by the built-in NTP client.
   //
   // Default: 0
   int num_ntp_servers;
@@ -223,8 +225,8 @@ class ExternalMiniCluster : public MiniCluster {
   Status AddTabletServer();
 
   // Add a new NTP server to the cluster. The new NTP server is started upon
-  // adding it.
-  Status AddNtpServer();
+  // adding, bind to the address and port specified by 'addr'.
+  Status AddNtpServer(const Sockaddr& addr);
 
   // Currently, this uses SIGKILL on each daemon for a non-graceful shutdown.
   void ShutdownNodes(ClusterNodes nodes) override;
@@ -422,6 +424,7 @@ class ExternalMiniCluster : public MiniCluster {
 
   Status DeduceBinRoot(std::string* ret);
   Status HandleOptions();
+  Status AddNtpFlags(std::vector<std::string>* flags);
 
   ExternalMiniClusterOptions opts_;
 

Reply via email to