This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 1efa619a22cb48b69de180a067008644dca878f8 Author: Alexey Serbin <[email protected]> AuthorDate: Thu Sep 12 16:22:14 2019 -0700 [mini_cluster] introduce 'builtin' clock source This patch introduces the built-in NTP client into the external minicluster test scaffolding. With this patch, it's possible to synchronize the built-in NTP client of the minicluster's masters and tablet servers with chronyd NTP server which is run as a part of the minicluster as well. The latter runs in server-only mode (i.e. not driving the system clock), using the machine's clock as a reference (for details, see 'man chronyd', '-x' option). Change-Id: I5c334ae6fa1fb12b033de7f8e8584b8dd3aa2d32 Reviewed-on: http://gerrit.cloudera.org:8080/14227 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Alexey Serbin <[email protected]> --- .../integration-tests/master_migration-itest.cc | 6 +- .../mini-cluster/external_mini_cluster-test.cc | 83 +++++++++++++++------- src/kudu/mini-cluster/external_mini_cluster.cc | 62 ++++++++++++---- src/kudu/mini-cluster/external_mini_cluster.h | 13 ++-- 4 files changed, 117 insertions(+), 47 deletions(-) diff --git a/src/kudu/integration-tests/master_migration-itest.cc b/src/kudu/integration-tests/master_migration-itest.cc index bce4745..e6aa38e 100644 --- a/src/kudu/integration-tests/master_migration-itest.cc +++ b/src/kudu/integration-tests/master_migration-itest.cc @@ -197,7 +197,11 @@ TEST_F(MasterMigrationTest, TestEndToEndMigration) { } // Bring down the old cluster configuration and bring up the new one. - cluster->Shutdown(); + // In addition in masters and tablet servers, shut down other helper processes + // as well by destroying the ExternalMiniCluster object wrapped into + // unique_ptr wrapper by calling 'std::unique_ptr::reset()'. + cluster.reset(); + opts.num_masters = 3; opts.master_rpc_addresses = master_rpc_addresses; ExternalMiniCluster migrated_cluster(std::move(opts)); diff --git a/src/kudu/mini-cluster/external_mini_cluster-test.cc b/src/kudu/mini-cluster/external_mini_cluster-test.cc index 8e60bfa..8b3acb3 100644 --- a/src/kudu/mini-cluster/external_mini_cluster-test.cc +++ b/src/kudu/mini-cluster/external_mini_cluster-test.cc @@ -40,51 +40,76 @@ #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" -namespace kudu { -namespace cluster { - -using std::make_pair; -using std::pair; using std::string; +using std::tuple; using std::vector; using strings::Substitute; +namespace kudu { +namespace cluster { + enum class Kerberos { - ENABLED, DISABLED, + ENABLED, }; -enum HiveMetastore { - ENABLED, +enum class HiveMetastore { DISABLED, + ENABLED, }; -// Beautifies CLI test output. -std::ostream& operator<<(std::ostream& o, Kerberos k) { - switch (k) { - case Kerberos::ENABLED: return o << "Kerberos::ENABLED"; - case Kerberos::DISABLED: return o << "Kerberos::DISABLED"; +enum BuiltInNtp { + DISABLED = 0, + ENABLED_SINGLE_SERVER = 1, + ENABLED_MULTIPLE_SERVERS = 5, +}; + +// Beautifies test output if a test scenario fails. +std::ostream& operator<<(std::ostream& o, Kerberos opt) { + switch (opt) { + case Kerberos::ENABLED: + return o << "Kerberos::ENABLED"; + case Kerberos::DISABLED: + return o << "Kerberos::DISABLED"; } return o; } -std::ostream& operator<<(std::ostream& o, HiveMetastore k) { - switch (k) { - case HiveMetastore::ENABLED: return o << "HiveMetastore::ENABLED"; - case HiveMetastore::DISABLED: return o << "HiveMetastore::DISABLED"; + +std::ostream& operator<<(std::ostream& o, HiveMetastore opt) { + switch (opt) { + case HiveMetastore::ENABLED: + return o << "HiveMetastore::ENABLED"; + case HiveMetastore::DISABLED: + return o << "HiveMetastore::DISABLED"; } return o; } -class ExternalMiniClusterTest : public KuduTest, - public testing::WithParamInterface<pair<Kerberos, HiveMetastore>> { +std::ostream& operator<<(std::ostream& o, BuiltInNtp opt) { + switch (opt) { + case BuiltInNtp::DISABLED: + return o << "BuiltInNtp::DISABLED"; + case BuiltInNtp::ENABLED_SINGLE_SERVER: + return o << "BuiltInNtp::ENABLED_SINGLE_SERVER"; + case BuiltInNtp::ENABLED_MULTIPLE_SERVERS: + return o << "BuiltInNtp::ENABLED_MULTIPLE_SERVERS"; + } + return o; +} + +class ExternalMiniClusterTest : + public KuduTest, + public testing::WithParamInterface<tuple<Kerberos, HiveMetastore, BuiltInNtp>> { }; -INSTANTIATE_TEST_CASE_P(KerberosOnAndOff, - ExternalMiniClusterTest, - testing::Values(make_pair(Kerberos::DISABLED, HiveMetastore::DISABLED), - make_pair(Kerberos::ENABLED, HiveMetastore::DISABLED), - make_pair(Kerberos::DISABLED, HiveMetastore::ENABLED), - make_pair(Kerberos::ENABLED, HiveMetastore::ENABLED))); +INSTANTIATE_TEST_CASE_P(, + ExternalMiniClusterTest, + ::testing::Combine( + ::testing::Values(Kerberos::DISABLED, Kerberos::ENABLED), + ::testing::Values(HiveMetastore::DISABLED, HiveMetastore::ENABLED), + ::testing::Values(BuiltInNtp::DISABLED, + BuiltInNtp::ENABLED_SINGLE_SERVER, + BuiltInNtp::ENABLED_MULTIPLE_SERVERS))); void SmokeTestKerberizedCluster(ExternalMiniClusterOptions opts) { ASSERT_TRUE(opts.enable_kerberos); @@ -124,11 +149,15 @@ TEST_F(ExternalMiniClusterTest, TestKerberosReacquire) { } TEST_P(ExternalMiniClusterTest, TestBasicOperation) { + SKIP_IF_SLOW_NOT_ALLOWED(); + ExternalMiniClusterOptions opts; - opts.enable_kerberos = GetParam().first == Kerberos::ENABLED; - if (GetParam().second == HiveMetastore::ENABLED) { + const auto& param = GetParam(); + opts.enable_kerberos = std::get<0>(param) == Kerberos::ENABLED; + if (std::get<1>(param) == HiveMetastore::ENABLED) { opts.hms_mode = HmsMode::ENABLE_HIVE_METASTORE; } + opts.num_ntp_servers = std::get<2>(param); opts.num_masters = 3; opts.num_tablet_servers = 3; diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc index 185f9d3..8e77f67 100644 --- a/src/kudu/mini-cluster/external_mini_cluster.cc +++ b/src/kudu/mini-cluster/external_mini_cluster.cc @@ -158,6 +158,24 @@ Status ExternalMiniCluster::HandleOptions() { return Status::OK(); } +Status ExternalMiniCluster::AddNtpFlags(std::vector<std::string>* flags) { + DCHECK(flags); + if (opts_.num_ntp_servers > 0) { + vector<string> ntp_endpoints; + CHECK_EQ(opts_.num_ntp_servers, ntp_servers_.size()); + for (const auto& server : ntp_servers_) { + const auto& opt = server->options(); + ntp_endpoints.emplace_back(HostPort(opt.bindaddress, opt.port).ToString()); + } + flags->emplace_back(Substitute("--builtin_ntp_servers=$0", + JoinStrings(ntp_endpoints, ","))); + flags->emplace_back(Substitute("--builtin_ntp_poll_interval_ms=100")); + flags->emplace_back(Substitute("--ntp_initial_sync_wait_secs=2")); + flags->emplace_back("--time_source=builtin"); + } + return Status::OK(); +} + Status ExternalMiniCluster::StartSentry() { sentry_->SetDataRoot(opts_.cluster_root); @@ -224,8 +242,24 @@ Status ExternalMiniCluster::Start() { // Start NTP servers, if requested. if (opts_.num_ntp_servers > 0) { + // Collect and keep alive the set of sockets bound with SO_REUSEPORT option + // until all chronyd proccesses are started. This allows to avoid port + // conflicts: chronyd doesn't support binding to wildcard addresses and + // it's necessary to make sure chronyd is able to bind to the port specified + // in its configuration. So, the mini-cluster reserves a set of ports up + // front, then starts the set of chronyd processes, each bound to one + // of the reserved ports. + vector<unique_ptr<Socket>> reserved_sockets; for (auto i = 0; i < opts_.num_ntp_servers; ++i) { - RETURN_NOT_OK_PREPEND(AddNtpServer(), + unique_ptr<Socket> reserved_socket; + RETURN_NOT_OK_PREPEND(ReserveDaemonSocket( + DaemonType::EXTERNAL_SERVER, i, opts_.bind_mode, &reserved_socket), + "failed to reserve chronyd socket address"); + Sockaddr addr; + RETURN_NOT_OK(reserved_socket->GetSocketAddress(&addr)); + reserved_sockets.emplace_back(std::move(reserved_socket)); + + RETURN_NOT_OK_PREPEND(AddNtpServer(addr), Substitute("failed to start NTP server $0", i)); } } @@ -489,9 +523,10 @@ Status ExternalMiniCluster::StartMasters() { flags.emplace_back("--location_mapping_by_uuid"); # endif } - string exe = GetBinaryPath(kMasterBinaryName); + RETURN_NOT_OK(AddNtpFlags(&flags)); // Start the masters. + const string& exe = GetBinaryPath(kMasterBinaryName); for (int i = 0; i < num_masters; i++) { string daemon_id = Substitute("master-$0", i); @@ -556,11 +591,9 @@ Status ExternalMiniCluster::AddTabletServer() { CHECK(leader_master() != nullptr) << "Must have started at least 1 master before adding tablet servers"; - int idx = tablet_servers_.size(); - string daemon_id = Substitute("ts-$0", idx); - - vector<HostPort> master_hostports = master_rpc_addrs(); - string bind_host = GetBindIpForTabletServer(idx); + const int idx = tablet_servers_.size(); + const string daemon_id = Substitute("ts-$0", idx); + const string bind_host = GetBindIpForTabletServer(idx); ExternalDaemonOptions opts; opts.messenger = messenger_; @@ -573,11 +606,14 @@ Status ExternalMiniCluster::AddTabletServer() { opts.perf_record_filename = Substitute("$0/perf-$1.data", opts.log_dir, daemon_id); } - opts.extra_flags = SubstituteInFlags(opts_.extra_tserver_flags, idx); + auto extra_flags = SubstituteInFlags(opts_.extra_tserver_flags, idx); + RETURN_NOT_OK(AddNtpFlags(&extra_flags)); + opts.extra_flags = extra_flags; opts.start_process_timeout = opts_.start_process_timeout; opts.rpc_bind_address = HostPort(bind_host, 0); opts.logtostderr = opts_.logtostderr; + vector<HostPort> master_hostports = master_rpc_addrs(); scoped_refptr<ExternalTabletServer> ts = new ExternalTabletServer(opts, master_hostports); if (opts_.enable_kerberos) { RETURN_NOT_OK_PREPEND(ts->EnableKerberos(kdc_.get(), bind_host), @@ -589,13 +625,11 @@ Status ExternalMiniCluster::AddTabletServer() { return Status::OK(); } -Status ExternalMiniCluster::AddNtpServer() { - const auto idx = ntp_servers_.size(); - string bind_host = GetBindIpForExternalServer(idx); - +Status ExternalMiniCluster::AddNtpServer(const Sockaddr& addr) { clock::MiniChronydOptions options; - options.port = 10123 + idx; - options.bindaddress = bind_host; + options.index = ntp_servers_.size(); + options.bindaddress = addr.host(); + options.port = static_cast<uint16_t>(addr.port()); unique_ptr<MiniChronyd> chrony(new MiniChronyd(std::move(options))); RETURN_NOT_OK(chrony->Start()); ntp_servers_.emplace_back(std::move(chrony)); diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h index 05e63c6..79eca1d 100644 --- a/src/kudu/mini-cluster/external_mini_cluster.h +++ b/src/kudu/mini-cluster/external_mini_cluster.h @@ -187,9 +187,11 @@ struct ExternalMiniClusterOptions { // Default: empty LocationInfo location_info; - // Number of NTP servers to start as part of the cluster. The servers - // are used as reference NTP servers for the built-in NTP client: it uses - // them to synchronize its internal clock. + // Number of NTP servers to start as part of the cluster. The NTP servers are + // used as true time references for the NTP client built into masters and + // tablet servers. Specifying a value greater than 0 automatically enables + // the built-in NTP client, i.e. switches the clock source from the system + // wallclock to the wallclock tracked by the built-in NTP client. // // Default: 0 int num_ntp_servers; @@ -223,8 +225,8 @@ class ExternalMiniCluster : public MiniCluster { Status AddTabletServer(); // Add a new NTP server to the cluster. The new NTP server is started upon - // adding it. - Status AddNtpServer(); + // adding, bind to the address and port specified by 'addr'. + Status AddNtpServer(const Sockaddr& addr); // Currently, this uses SIGKILL on each daemon for a non-graceful shutdown. void ShutdownNodes(ClusterNodes nodes) override; @@ -422,6 +424,7 @@ class ExternalMiniCluster : public MiniCluster { Status DeduceBinRoot(std::string* ret); Status HandleOptions(); + Status AddNtpFlags(std::vector<std::string>* flags); ExternalMiniClusterOptions opts_;
