This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 69cf2065805163799d85dc38b812eb73388d3a02 Author: Alexey Serbin <[email protected]> AuthorDate: Sat Jan 18 00:04:10 2020 -0800 [clock] auto-config of built-in NTP client in cloud This patch introduces auto-configuration of the built-in NTP client in public cloud environment. Currently, AWS and GCE public cloud types are supported: Kudu masters and tablet servers are now capable of auto-detecting per-instance NTP server and using it as reference server for the built-in NTP client. The auto-configuration is controlled by the --builtin_ntp_client_enable_auto_config_in_cloud boolean flag and gated by the --time_source flag (i.e. the latter should be set to 'builtin' to allow the auto-configuration to work). Change-Id: I0590c0b731a4da2f968e720dea0410d46ab62beb Reviewed-on: http://gerrit.cloudera.org:8080/15070 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <[email protected]> --- src/kudu/clock/CMakeLists.txt | 1 + src/kudu/clock/builtin_ntp.cc | 45 +++++++++++--- src/kudu/clock/builtin_ntp.h | 3 + src/kudu/clock/ntp-test.cc | 70 ++++++++++++++++++++++ .../mini-cluster/external_mini_cluster-test.cc | 13 +++- 5 files changed, 121 insertions(+), 11 deletions(-) diff --git a/src/kudu/clock/CMakeLists.txt b/src/kudu/clock/CMakeLists.txt index b0e9d85..c47a15d 100644 --- a/src/kudu/clock/CMakeLists.txt +++ b/src/kudu/clock/CMakeLists.txt @@ -30,6 +30,7 @@ endif() add_library(clock ${CLOCK_SRCS}) target_link_libraries(clock + kudu_cloud_util kudu_common kudu_util) diff --git a/src/kudu/clock/builtin_ntp.cc b/src/kudu/clock/builtin_ntp.cc index dd13581..2f5922c 100644 --- a/src/kudu/clock/builtin_ntp.cc +++ b/src/kudu/clock/builtin_ntp.cc @@ -22,10 +22,12 @@ #include <sys/socket.h> #include <sys/types.h> +#include <algorithm> #include <cerrno> #include <cstdint> #include <cstring> #include <deque> +#include <memory> #include <mutex> #include <ostream> #include <string> @@ -41,6 +43,8 @@ #include "kudu/gutil/strings/strcat.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/gutil/walltime.h" +#include "kudu/util/cloud/instance_detector.h" +#include "kudu/util/cloud/instance_metadata.h" #include "kudu/util/errno.h" #include "kudu/util/flag_tags.h" #include "kudu/util/locks.h" @@ -67,7 +71,6 @@ DEFINE_string(builtin_ntp_servers, "The NTP servers used by the built-in NTP client, in format " "<FQDN|IP>[:PORT]. These will only be used if the built-in NTP " "client is enabled."); -TAG_FLAG(builtin_ntp_servers, evolving); // In the 'Best practices' section, RFC 4330 states that 15 seconds is the // minimum allowed polling interval. @@ -82,7 +85,6 @@ DEFINE_uint32(builtin_ntp_poll_interval_ms, 16000, "The time between successive polls of a single NTP server " "(in milliseconds)"); TAG_FLAG(builtin_ntp_poll_interval_ms, advanced); -TAG_FLAG(builtin_ntp_poll_interval_ms, evolving); TAG_FLAG(builtin_ntp_poll_interval_ms, runtime); DEFINE_string(builtin_ntp_client_bind_address, "0.0.0.0", @@ -93,7 +95,6 @@ DEFINE_string(builtin_ntp_client_bind_address, "0.0.0.0", "to customize this flag if getting through a firewall to " "reach public NTP servers specified by --builtin_ntp_servers."); TAG_FLAG(builtin_ntp_client_bind_address, advanced); -TAG_FLAG(builtin_ntp_client_bind_address, evolving); DEFINE_uint32(builtin_ntp_request_timeout_ms, 3000, "Timeout for requests sent to NTP servers (in milliseconds)"); @@ -106,9 +107,18 @@ DEFINE_uint32(builtin_ntp_true_time_refresh_max_interval_s, 3600, TAG_FLAG(builtin_ntp_true_time_refresh_max_interval_s, experimental); TAG_FLAG(builtin_ntp_true_time_refresh_max_interval_s, runtime); +DEFINE_bool(builtin_ntp_client_enable_auto_config_in_cloud, false, + "Whether to attempt the automatic configuration of the built-in " + "NTP client, pointing it to the internal NTP server accessible " + "from within a cloud instance"); +TAG_FLAG(builtin_ntp_client_enable_auto_config_in_cloud, advanced); +TAG_FLAG(builtin_ntp_client_enable_auto_config_in_cloud, experimental); + using kudu::clock::internal::Interval; using kudu::clock::internal::kIntervalNone; using kudu::clock::internal::RecordedResponse; +using kudu::cloud::InstanceDetector; +using kudu::cloud::InstanceMetadata; using std::deque; using std::lock_guard; using std::string; @@ -119,10 +129,8 @@ using strings::Substitute; namespace kudu { namespace clock { -constexpr int kStandardNtpPort = 123; - // Number of seconds between Jan 1 1900 and the unix epoch start. -constexpr uint64_t kNtpTimestampDelta = 2208988800ull; +constexpr uint64_t kNtpTimestampDelta = 2208988800ULL; // Keep the last 8 polls from each server. constexpr int kResponsesToRememberPerServer = 8; @@ -562,9 +570,28 @@ Status BuiltInNtp::InitImpl() { // That's the case when this object has been created using the default // constructor. vector<HostPort> hps; - RETURN_NOT_OK_PREPEND(HostPort::ParseStrings(FLAGS_builtin_ntp_servers, - kStandardNtpPort, &hps), - "could not parse --builtin_ntp_servers flag"); + if (FLAGS_builtin_ntp_client_enable_auto_config_in_cloud) { + // Try to find the instance-only NTP server and configure the built-in + // NTP client with it. + InstanceDetector detector; + unique_ptr<InstanceMetadata> md; + string ntp_server; + auto s = detector.Detect(&md).AndThen([&] { + return md->GetNtpServer(&ntp_server); + }).AndThen([&] { + hps.emplace_back(ntp_server, 123); + return Status::OK(); + }); + WARN_NOT_OK(s, Substitute("auto-configuration of built-in NTP client " + "for cloud instance failed, switching to " + "the default set of NTP servers provided by " + "the --builtin_ntp_servers flag")); + } + if (hps.empty()) { + RETURN_NOT_OK_PREPEND(HostPort::ParseStrings(FLAGS_builtin_ntp_servers, + kStandardNtpPort, &hps), + "could not parse --builtin_ntp_servers flag"); + } RETURN_NOT_OK(PopulateServers(std::move(hps))); } for (const auto& s : servers_) { diff --git a/src/kudu/clock/builtin_ntp.h b/src/kudu/clock/builtin_ntp.h index 68dcc91..ffd8b9f 100644 --- a/src/kudu/clock/builtin_ntp.h +++ b/src/kudu/clock/builtin_ntp.h @@ -43,6 +43,9 @@ namespace internal { struct RecordedResponse; } // namespace internal +// The starndard NTP port number. +constexpr const int kStandardNtpPort = 123; + // This time service is based on a simplified NTP client implementation. // It's not RFC-compliant yet (RFC 5905). The most important missing pieces are: // * support of iburst/burst operation modes (see KUDU-2937) diff --git a/src/kudu/clock/ntp-test.cc b/src/kudu/clock/ntp-test.cc index 50232f6..93f17ed 100644 --- a/src/kudu/clock/ntp-test.cc +++ b/src/kudu/clock/ntp-test.cc @@ -23,6 +23,7 @@ #endif #include <iterator> #include <memory> +#include <ostream> #include <string> #include <utility> #include <vector> @@ -38,7 +39,11 @@ #include "kudu/clock/builtin_ntp.h" #include "kudu/clock/test/mini_chronyd.h" #include "kudu/clock/test/mini_chronyd_test_util.h" +#include "kudu/gutil/integral_types.h" #include "kudu/gutil/stringprintf.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/cloud/instance_detector.h" +#include "kudu/util/cloud/instance_metadata.h" #include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" #include "kudu/util/status.h" @@ -49,11 +54,14 @@ DECLARE_int32(ntp_initial_sync_wait_secs); DECLARE_string(builtin_ntp_servers); DECLARE_uint32(builtin_ntp_poll_interval_ms); +DECLARE_uint32(cloud_metadata_server_request_timeout_ms); using kudu::clock::internal::FindIntersection; using kudu::clock::internal::Interval; using kudu::clock::internal::RecordedResponse; using kudu::clock::internal::kIntervalNone; +using kudu::clock::kStandardNtpPort; +using kudu::cloud::InstanceDetector; using std::back_inserter; using std::copy; using std::string; @@ -681,6 +689,68 @@ TEST_F(BuiltinNtpWithMiniChronydTest, SyncAndUnsyncReferenceServers) { #endif // #ifndef __APPLE__ } +// This is a basic scenario to verify that the built-in NTP client is able +// to synchronize with NTP servers provided by supported cloud environments. +TEST_F(BuiltinNtpWithMiniChronydTest, CloudInstanceNtpServer) { + SKIP_IF_SLOW_NOT_ALLOWED(); + +#ifdef THREAD_SANITIZER + // In case of TSAN builds, it takes longer to spawn threads and, overall, + // the sanitized version of libcurl works an order of magnitude slower + // than the regular version. + FLAGS_cloud_metadata_server_request_timeout_ms = 10000; +#endif + InstanceDetector detector(MonoDelta::FromMilliseconds( + FLAGS_cloud_metadata_server_request_timeout_ms)); + unique_ptr<cloud::InstanceMetadata> md; + string ntp_server; + { + auto s = detector.Detect(&md); + if (!s.ok()) { + ASSERT_TRUE(s.IsNotFound()) << s.ToString(); + LOG(WARNING) << "test is skipped: non-supported or non-cloud environment"; + return; + } + } + { + auto s = md->GetNtpServer(&ntp_server); + if (!s.ok()) { + // The only expected error in this case is Status::NotSupported(). + ASSERT_TRUE(s.IsNotSupported()) << s.ToString(); + LOG(WARNING) << strings::Substitute( + "test is skipped: $0 cloud instance doesn't provide NTP server", + cloud::TypeToString(md->type())); + return; + } + } + + const vector<HostPort> servers_endpoints = {{ ntp_server, kStandardNtpPort }}; + + // All chronyd servers that use the system clock as a reference lock should + // present themselves as a set of NTP servers suitable for synchronisation. + ASSERT_OK(MiniChronyd::CheckNtpSource(servers_endpoints)); + + FLAGS_builtin_ntp_poll_interval_ms = 500; + BuiltInNtp c(servers_endpoints); + ASSERT_OK(c.Init()); + ASSERT_EVENTUALLY([&] { + uint64_t now_us; + uint64_t err_us; + ASSERT_OK(c.WalltimeWithError(&now_us, &err_us)); + }); + + // Make sure WalltimeWithError() works with the built-in NTP client once + // it has initted/synchronized with the reference NTP server available + // from within the cloud instance. + for (auto i = 0; i < 5; ++i) { + SleepFor(MonoDelta::FromMilliseconds(500)); + uint64_t now; + uint64 error; + ASSERT_OK(c.WalltimeWithError(&now, &error)); + LOG(INFO) << StringPrintf("built-in: " WALLTIME_DIAG_FMT, now, error); + } +} + #endif // #if !defined(NO_CHRONY) } // namespace clock diff --git a/src/kudu/mini-cluster/external_mini_cluster-test.cc b/src/kudu/mini-cluster/external_mini_cluster-test.cc index 70d81bf..66aaebe 100644 --- a/src/kudu/mini-cluster/external_mini_cluster-test.cc +++ b/src/kudu/mini-cluster/external_mini_cluster-test.cc @@ -64,6 +64,7 @@ enum BuiltInNtp { DISABLED = 0, ENABLED_SINGLE_SERVER = 1, ENABLED_MULTIPLE_SERVERS = 5, + ENABLED_CLOUD_INSTANCE_SERVER = -1, }; // Beautifies test output if a test scenario fails. @@ -95,6 +96,8 @@ std::ostream& operator<<(std::ostream& o, BuiltInNtp opt) { return o << "BuiltInNtp::ENABLED_SINGLE_SERVER"; case BuiltInNtp::ENABLED_MULTIPLE_SERVERS: return o << "BuiltInNtp::ENABLED_MULTIPLE_SERVERS"; + case BuiltInNtp::ENABLED_CLOUD_INSTANCE_SERVER: + return o << "BuiltInNtp::ENABLED_CLOUD_INSTANCE_SERVER"; } return o; } @@ -119,7 +122,7 @@ INSTANTIATE_TEST_CASE_P(, ::testing::Values(BuiltInNtp::DISABLED, BuiltInNtp::ENABLED_SINGLE_SERVER, BuiltInNtp::ENABLED_MULTIPLE_SERVERS, - BuiltInNtp::DISABLED) + BuiltInNtp::ENABLED_CLOUD_INSTANCE_SERVER) #endif // #if !defined(NO_CHRONY) ... )); @@ -274,7 +277,13 @@ TEST_P(ExternalMiniClusterTest, TestBasicOperation) { opts.hms_mode = HmsMode::ENABLE_HIVE_METASTORE; } #if !defined(NO_CHRONY) - opts.num_ntp_servers = std::get<2>(param); + if (std::get<2>(param) != BuiltInNtp::ENABLED_CLOUD_INSTANCE_SERVER) { + opts.num_ntp_servers = std::get<2>(param); + } else { + opts.num_ntp_servers = 1; + opts.extra_master_flags.emplace_back( + "--builtin_ntp_client_enable_auto_config_in_cloud=true"); + } #endif opts.num_masters = 3;
