This is an automated email from the ASF dual-hosted git repository. adar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit e110cb88b145edb6d536ae0806e02736f5bf51e5 Author: Alexey Serbin <[email protected]> AuthorDate: Tue Sep 24 22:11:31 2019 -0700 [clock] move clock sync wait into HybridClock::Init() This patch moves the logic to await on clock synchronisation from SystemNtp::Init() into HybridClock::Init(). This is to consolidate and reuse the logic for other clock sources, such as built-in NTP client coming with one of follow-up patches. Change-Id: I4bc5d3160a68932a31cde5ff221f74452bb14a21 Reviewed-on: http://gerrit.cloudera.org:8080/14301 Tested-by: Alexey Serbin <[email protected]> Reviewed-by: Adar Dembo <[email protected]> --- src/kudu/clock/hybrid_clock.cc | 54 +++++++++++++++++++++++++++++++++++------- src/kudu/clock/system_ntp.cc | 54 +----------------------------------------- src/kudu/clock/time_service.h | 8 +++++-- 3 files changed, 52 insertions(+), 64 deletions(-) diff --git a/src/kudu/clock/hybrid_clock.cc b/src/kudu/clock/hybrid_clock.cc index bec8cba..6db1990 100644 --- a/src/kudu/clock/hybrid_clock.cc +++ b/src/kudu/clock/hybrid_clock.cc @@ -63,14 +63,22 @@ DEFINE_string(time_source, "system", "'system' or 'mock' (for tests only)"); TAG_FLAG(time_source, experimental); DEFINE_validator(time_source, [](const char* /* flag_name */, const string& value) { - if (boost::iequals(value, "system") || - boost::iequals(value, "mock")) { - return true; - } - LOG(ERROR) << "unknown value for 'time_source': '" << value << "'" - << " (expected one of 'system' or 'mock')"; - return false; - }); + if (boost::iequals(value, "system") || + boost::iequals(value, "mock")) { + return true; + } + LOG(ERROR) << "unknown value for 'time_source': '" << value << "'" + << " (expected one of 'system' or 'mock')"; + return false; +}); + +DEFINE_int32(ntp_initial_sync_wait_secs, 60, + "Amount of time in seconds to wait for clock synchronisation at " + "startup. A value of zero means Kudu will fail to start " + "if the clock is unsynchronized. This flag can prevent Kudu from " + "crashing if it starts before NTP can synchronize the clock."); +TAG_FLAG(ntp_initial_sync_wait_secs, evolving); +TAG_FLAG(ntp_initial_sync_wait_secs, advanced); METRIC_DEFINE_gauge_uint64(server, hybrid_clock_timestamp, "Hybrid Clock Timestamp", @@ -126,7 +134,35 @@ Status HybridClock::Init() { } else { return Status::InvalidArgument("invalid NTP source", FLAGS_time_source); } - RETURN_NOT_OK(time_service_->Init()); + + const auto wait_s = FLAGS_ntp_initial_sync_wait_secs; + const auto deadline = MonoTime::Now() + MonoDelta::FromSeconds(wait_s); + bool need_log = true; + Status s; + do { + s = time_service_->Init(); + if (!s.IsServiceUnavailable()) { + break; + } + if (need_log) { + // Log about what's going on, just once. + if (wait_s > 0) { + LOG(INFO) << Substitute("waiting up to --ntp_initial_sync_wait_secs=$0 " + "seconds for the clock to synchronize", wait_s); + } else { + LOG(INFO) << Substitute("not waiting for clock synchronization: " + "--ntp_initial_sync_wait_secs=$0 is nonpositive", + wait_s); + } + need_log = false; + } + SleepFor(MonoDelta::FromSeconds(1)); + } while (MonoTime::Now() < deadline); + + if (!s.ok()) { + time_service_->DumpDiagnostics(/* log= */nullptr); + return s.CloneAndPrepend("timed out waiting for clock synchronisation"); + } state_ = kInitialized; diff --git a/src/kudu/clock/system_ntp.cc b/src/kudu/clock/system_ntp.cc index 9993838..9163175 100644 --- a/src/kudu/clock/system_ntp.cc +++ b/src/kudu/clock/system_ntp.cc @@ -25,7 +25,6 @@ #include <string> #include <vector> -#include <gflags/gflags.h> #include <gflags/gflags_declare.h> #include <glog/logging.h> @@ -33,23 +32,13 @@ #include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/errno.h" -#include "kudu/util/flag_tags.h" #include "kudu/util/logging.h" -#include "kudu/util/monotime.h" #include "kudu/util/path_util.h" #include "kudu/util/status.h" #include "kudu/util/subprocess.h" DECLARE_bool(inject_unsync_time_errors); -DEFINE_int32(ntp_initial_sync_wait_secs, 60, - "Amount of time in seconds to wait for NTP to synchronize the " - "clock at startup. A value of zero means Kudu will fail to start " - "if the clock is unsynchronized. This flag can prevent Kudu from " - "crashing if it starts before NTP can synchronize the clock."); -TAG_FLAG(ntp_initial_sync_wait_secs, evolving); -TAG_FLAG(ntp_initial_sync_wait_secs, advanced); - using std::string; using std::vector; using strings::Substitute; @@ -115,36 +104,6 @@ void TryRun(vector<string> cmd, vector<string>* log) { } -Status WaitForNtp() { - int32_t wait_secs = FLAGS_ntp_initial_sync_wait_secs; - if (wait_secs <= 0) { - LOG(INFO) << Substitute("Not waiting for clock synchronization: " - "--ntp_initial_sync_wait_secs=$0 is nonpositive", - wait_secs); - return Status::OK(); - } - LOG(INFO) << Substitute("Waiting up to --ntp_initial_sync_wait_secs=$0 " - "seconds for the clock to synchronize", wait_secs); - - // We previously relied on ntpd/chrony support tools to wait, but that - // approach doesn't work in environments where ntpd is unreachable but the - // clock is still synchronized (i.e. running inside a Linux container). - // - // Now we just interrogate the kernel directly. - Status s; - for (int i = 0; i < wait_secs; i++) { - timex timex; - s = CallAdjTime(&timex); - if (s.ok() || !s.IsServiceUnavailable()) { - return s; - } - SleepFor(MonoDelta::FromSeconds(1)); - } - - // Return the last failure. - return s.CloneAndPrepend("Timed out waiting for clock sync"); -} - } // anonymous namespace void SystemNtp::DumpDiagnostics(vector<string>* log) const { @@ -172,19 +131,9 @@ void SystemNtp::DumpDiagnostics(vector<string>* log) const { TryRun({"chronyc", "-n", "sources"}, log); } - Status SystemNtp::Init() { timex timex; - Status s = CallAdjTime(&timex); - if (s.IsServiceUnavailable()) { - s = WaitForNtp().AndThen([&timex]() { - return CallAdjTime(&timex); - }); - } - if (!s.ok()) { - DumpDiagnostics(/* log= */nullptr); - return s; - } + RETURN_NOT_OK(CallAdjTime(&timex)); // Calculate the sleep skew adjustment according to the max tolerance of the clock. // Tolerance comes in parts per million but needs to be applied a scaling factor. @@ -197,7 +146,6 @@ Status SystemNtp::Init() { return Status::OK(); } - Status SystemNtp::WalltimeWithError(uint64_t *now_usec, uint64_t *error_usec) { // Read the time. This will return an error if the clock is not synchronized. diff --git a/src/kudu/clock/time_service.h b/src/kudu/clock/time_service.h index aa64255..cc28714 100644 --- a/src/kudu/clock/time_service.h +++ b/src/kudu/clock/time_service.h @@ -32,8 +32,12 @@ class TimeService { TimeService() = default; virtual ~TimeService() = default; - // Initialize the NTP source, validating that it is available and - // properly synchronized. + // Initialize the NTP source, validating that it is available and properly + // synchronized: Status::OK() is returned in such case. If the source + // is not yet synchronized, then Status::ServiceUnavailable() is returned: + // a caller may call this method again to eventually get Status::OK(). + // In case of other non-OK() return statuses, the caller should not invoke + // this method again. virtual Status Init() = 0; // Return the current wall time in microseconds since the Unix epoch in '*now_usec'.
