This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 69cf2065805163799d85dc38b812eb73388d3a02
Author: Alexey Serbin <[email protected]>
AuthorDate: Sat Jan 18 00:04:10 2020 -0800

    [clock] auto-config of built-in NTP client in cloud
    
    This patch introduces auto-configuration of the built-in NTP client
    in public cloud environment.  Currently, AWS and GCE public cloud types
    are supported: Kudu masters and tablet servers are now capable of
    auto-detecting per-instance NTP server and using it as reference server
    for the built-in NTP client.
    
    The auto-configuration is controlled by the
    --builtin_ntp_client_enable_auto_config_in_cloud boolean flag and gated
    by the --time_source flag (i.e. the latter should be set to 'builtin'
    to allow the auto-configuration to work).
    
    Change-Id: I0590c0b731a4da2f968e720dea0410d46ab62beb
    Reviewed-on: http://gerrit.cloudera.org:8080/15070
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <[email protected]>
---
 src/kudu/clock/CMakeLists.txt                      |  1 +
 src/kudu/clock/builtin_ntp.cc                      | 45 +++++++++++---
 src/kudu/clock/builtin_ntp.h                       |  3 +
 src/kudu/clock/ntp-test.cc                         | 70 ++++++++++++++++++++++
 .../mini-cluster/external_mini_cluster-test.cc     | 13 +++-
 5 files changed, 121 insertions(+), 11 deletions(-)

diff --git a/src/kudu/clock/CMakeLists.txt b/src/kudu/clock/CMakeLists.txt
index b0e9d85..c47a15d 100644
--- a/src/kudu/clock/CMakeLists.txt
+++ b/src/kudu/clock/CMakeLists.txt
@@ -30,6 +30,7 @@ endif()
 add_library(clock ${CLOCK_SRCS})
 
 target_link_libraries(clock
+  kudu_cloud_util
   kudu_common
   kudu_util)
 
diff --git a/src/kudu/clock/builtin_ntp.cc b/src/kudu/clock/builtin_ntp.cc
index dd13581..2f5922c 100644
--- a/src/kudu/clock/builtin_ntp.cc
+++ b/src/kudu/clock/builtin_ntp.cc
@@ -22,10 +22,12 @@
 #include <sys/socket.h>
 #include <sys/types.h>
 
+#include <algorithm>
 #include <cerrno>
 #include <cstdint>
 #include <cstring>
 #include <deque>
+#include <memory>
 #include <mutex>
 #include <ostream>
 #include <string>
@@ -41,6 +43,8 @@
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
+#include "kudu/util/cloud/instance_detector.h"
+#include "kudu/util/cloud/instance_metadata.h"
 #include "kudu/util/errno.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/locks.h"
@@ -67,7 +71,6 @@ DEFINE_string(builtin_ntp_servers,
               "The NTP servers used by the built-in NTP client, in format "
               "<FQDN|IP>[:PORT]. These will only be used if the built-in NTP "
               "client is enabled.");
-TAG_FLAG(builtin_ntp_servers, evolving);
 
 // In the 'Best practices' section, RFC 4330 states that 15 seconds is the
 // minimum allowed polling interval.
@@ -82,7 +85,6 @@ DEFINE_uint32(builtin_ntp_poll_interval_ms, 16000,
               "The time between successive polls of a single NTP server "
               "(in milliseconds)");
 TAG_FLAG(builtin_ntp_poll_interval_ms, advanced);
-TAG_FLAG(builtin_ntp_poll_interval_ms, evolving);
 TAG_FLAG(builtin_ntp_poll_interval_ms, runtime);
 
 DEFINE_string(builtin_ntp_client_bind_address, "0.0.0.0",
@@ -93,7 +95,6 @@ DEFINE_string(builtin_ntp_client_bind_address, "0.0.0.0",
               "to customize this flag if getting through a firewall to "
               "reach public NTP servers specified by --builtin_ntp_servers.");
 TAG_FLAG(builtin_ntp_client_bind_address, advanced);
-TAG_FLAG(builtin_ntp_client_bind_address, evolving);
 
 DEFINE_uint32(builtin_ntp_request_timeout_ms, 3000,
               "Timeout for requests sent to NTP servers (in milliseconds)");
@@ -106,9 +107,18 @@ 
DEFINE_uint32(builtin_ntp_true_time_refresh_max_interval_s, 3600,
 TAG_FLAG(builtin_ntp_true_time_refresh_max_interval_s, experimental);
 TAG_FLAG(builtin_ntp_true_time_refresh_max_interval_s, runtime);
 
+DEFINE_bool(builtin_ntp_client_enable_auto_config_in_cloud, false,
+            "Whether to attempt the automatic configuration of the built-in "
+            "NTP client, pointing it to the internal NTP server accessible "
+            "from within a cloud instance");
+TAG_FLAG(builtin_ntp_client_enable_auto_config_in_cloud, advanced);
+TAG_FLAG(builtin_ntp_client_enable_auto_config_in_cloud, experimental);
+
 using kudu::clock::internal::Interval;
 using kudu::clock::internal::kIntervalNone;
 using kudu::clock::internal::RecordedResponse;
+using kudu::cloud::InstanceDetector;
+using kudu::cloud::InstanceMetadata;
 using std::deque;
 using std::lock_guard;
 using std::string;
@@ -119,10 +129,8 @@ using strings::Substitute;
 namespace kudu {
 namespace clock {
 
-constexpr int kStandardNtpPort = 123;
-
 // Number of seconds between Jan 1 1900 and the unix epoch start.
-constexpr uint64_t kNtpTimestampDelta = 2208988800ull;
+constexpr uint64_t kNtpTimestampDelta = 2208988800ULL;
 
 // Keep the last 8 polls from each server.
 constexpr int kResponsesToRememberPerServer = 8;
@@ -562,9 +570,28 @@ Status BuiltInNtp::InitImpl() {
     // That's the case when this object has been created using the default
     // constructor.
     vector<HostPort> hps;
-    RETURN_NOT_OK_PREPEND(HostPort::ParseStrings(FLAGS_builtin_ntp_servers,
-                                                 kStandardNtpPort, &hps),
-                          "could not parse --builtin_ntp_servers flag");
+    if (FLAGS_builtin_ntp_client_enable_auto_config_in_cloud) {
+      // Try to find the instance-only NTP server and configure the built-in
+      // NTP client with it.
+      InstanceDetector detector;
+      unique_ptr<InstanceMetadata> md;
+      string ntp_server;
+      auto s = detector.Detect(&md).AndThen([&] {
+        return md->GetNtpServer(&ntp_server);
+      }).AndThen([&] {
+        hps.emplace_back(ntp_server, 123);
+        return Status::OK();
+      });
+      WARN_NOT_OK(s, Substitute("auto-configuration of built-in NTP client "
+                                "for cloud instance failed, switching to "
+                                "the default set of NTP servers provided by "
+                                "the --builtin_ntp_servers flag"));
+    }
+    if (hps.empty()) {
+      RETURN_NOT_OK_PREPEND(HostPort::ParseStrings(FLAGS_builtin_ntp_servers,
+                                                   kStandardNtpPort, &hps),
+                            "could not parse --builtin_ntp_servers flag");
+    }
     RETURN_NOT_OK(PopulateServers(std::move(hps)));
   }
   for (const auto& s : servers_) {
diff --git a/src/kudu/clock/builtin_ntp.h b/src/kudu/clock/builtin_ntp.h
index 68dcc91..ffd8b9f 100644
--- a/src/kudu/clock/builtin_ntp.h
+++ b/src/kudu/clock/builtin_ntp.h
@@ -43,6 +43,9 @@ namespace internal {
 struct RecordedResponse;
 } // namespace internal
 
+// The starndard NTP port number.
+constexpr const int kStandardNtpPort = 123;
+
 // This time service is based on a simplified NTP client implementation.
 // It's not RFC-compliant yet (RFC 5905). The most important missing pieces 
are:
 //   * support of iburst/burst operation modes (see KUDU-2937)
diff --git a/src/kudu/clock/ntp-test.cc b/src/kudu/clock/ntp-test.cc
index 50232f6..93f17ed 100644
--- a/src/kudu/clock/ntp-test.cc
+++ b/src/kudu/clock/ntp-test.cc
@@ -23,6 +23,7 @@
 #endif
 #include <iterator>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <utility>
 #include <vector>
@@ -38,7 +39,11 @@
 #include "kudu/clock/builtin_ntp.h"
 #include "kudu/clock/test/mini_chronyd.h"
 #include "kudu/clock/test/mini_chronyd_test_util.h"
+#include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/cloud/instance_detector.h"
+#include "kudu/util/cloud/instance_metadata.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/status.h"
@@ -49,11 +54,14 @@
 DECLARE_int32(ntp_initial_sync_wait_secs);
 DECLARE_string(builtin_ntp_servers);
 DECLARE_uint32(builtin_ntp_poll_interval_ms);
+DECLARE_uint32(cloud_metadata_server_request_timeout_ms);
 
 using kudu::clock::internal::FindIntersection;
 using kudu::clock::internal::Interval;
 using kudu::clock::internal::RecordedResponse;
 using kudu::clock::internal::kIntervalNone;
+using kudu::clock::kStandardNtpPort;
+using kudu::cloud::InstanceDetector;
 using std::back_inserter;
 using std::copy;
 using std::string;
@@ -681,6 +689,68 @@ TEST_F(BuiltinNtpWithMiniChronydTest, 
SyncAndUnsyncReferenceServers) {
 #endif // #ifndef __APPLE__
 }
 
+// This is a basic scenario to verify that the built-in NTP client is able
+// to synchronize with NTP servers provided by supported cloud environments.
+TEST_F(BuiltinNtpWithMiniChronydTest, CloudInstanceNtpServer) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+#ifdef THREAD_SANITIZER
+  // In case of TSAN builds, it takes longer to spawn threads and, overall,
+  // the sanitized version of libcurl works an order of magnitude slower
+  // than the regular version.
+  FLAGS_cloud_metadata_server_request_timeout_ms = 10000;
+#endif
+  InstanceDetector detector(MonoDelta::FromMilliseconds(
+      FLAGS_cloud_metadata_server_request_timeout_ms));
+  unique_ptr<cloud::InstanceMetadata> md;
+  string ntp_server;
+  {
+    auto s = detector.Detect(&md);
+    if (!s.ok()) {
+      ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+      LOG(WARNING) << "test is skipped: non-supported or non-cloud 
environment";
+      return;
+    }
+  }
+  {
+    auto s = md->GetNtpServer(&ntp_server);
+    if (!s.ok()) {
+      // The only expected error in this case is Status::NotSupported().
+      ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+      LOG(WARNING) << strings::Substitute(
+          "test is skipped: $0 cloud instance doesn't provide NTP server",
+          cloud::TypeToString(md->type()));
+      return;
+    }
+  }
+
+  const vector<HostPort> servers_endpoints = {{ ntp_server, kStandardNtpPort 
}};
+
+  // All chronyd servers that use the system clock as a reference lock should
+  // present themselves as a set of NTP servers suitable for synchronisation.
+  ASSERT_OK(MiniChronyd::CheckNtpSource(servers_endpoints));
+
+  FLAGS_builtin_ntp_poll_interval_ms = 500;
+  BuiltInNtp c(servers_endpoints);
+  ASSERT_OK(c.Init());
+  ASSERT_EVENTUALLY([&] {
+    uint64_t now_us;
+    uint64_t err_us;
+    ASSERT_OK(c.WalltimeWithError(&now_us, &err_us));
+  });
+
+  // Make sure WalltimeWithError() works with the built-in NTP client once
+  // it has initted/synchronized with the reference NTP server available
+  // from within the cloud instance.
+  for (auto i = 0; i < 5; ++i) {
+    SleepFor(MonoDelta::FromMilliseconds(500));
+    uint64_t now;
+    uint64 error;
+    ASSERT_OK(c.WalltimeWithError(&now, &error));
+    LOG(INFO) << StringPrintf("built-in: " WALLTIME_DIAG_FMT, now, error);
+  }
+}
+
 #endif // #if !defined(NO_CHRONY)
 
 } // namespace clock
diff --git a/src/kudu/mini-cluster/external_mini_cluster-test.cc 
b/src/kudu/mini-cluster/external_mini_cluster-test.cc
index 70d81bf..66aaebe 100644
--- a/src/kudu/mini-cluster/external_mini_cluster-test.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster-test.cc
@@ -64,6 +64,7 @@ enum BuiltInNtp {
   DISABLED = 0,
   ENABLED_SINGLE_SERVER = 1,
   ENABLED_MULTIPLE_SERVERS = 5,
+  ENABLED_CLOUD_INSTANCE_SERVER = -1,
 };
 
 // Beautifies test output if a test scenario fails.
@@ -95,6 +96,8 @@ std::ostream& operator<<(std::ostream& o, BuiltInNtp opt) {
       return o << "BuiltInNtp::ENABLED_SINGLE_SERVER";
     case BuiltInNtp::ENABLED_MULTIPLE_SERVERS:
       return o << "BuiltInNtp::ENABLED_MULTIPLE_SERVERS";
+    case BuiltInNtp::ENABLED_CLOUD_INSTANCE_SERVER:
+      return o << "BuiltInNtp::ENABLED_CLOUD_INSTANCE_SERVER";
   }
   return o;
 }
@@ -119,7 +122,7 @@ INSTANTIATE_TEST_CASE_P(,
         ::testing::Values(BuiltInNtp::DISABLED,
                           BuiltInNtp::ENABLED_SINGLE_SERVER,
                           BuiltInNtp::ENABLED_MULTIPLE_SERVERS,
-                          BuiltInNtp::DISABLED)
+                          BuiltInNtp::ENABLED_CLOUD_INSTANCE_SERVER)
 #endif // #if !defined(NO_CHRONY) ...
                           ));
 
@@ -274,7 +277,13 @@ TEST_P(ExternalMiniClusterTest, TestBasicOperation) {
     opts.hms_mode = HmsMode::ENABLE_HIVE_METASTORE;
   }
 #if !defined(NO_CHRONY)
-  opts.num_ntp_servers = std::get<2>(param);
+  if (std::get<2>(param) != BuiltInNtp::ENABLED_CLOUD_INSTANCE_SERVER) {
+    opts.num_ntp_servers = std::get<2>(param);
+  } else {
+    opts.num_ntp_servers = 1;
+    opts.extra_master_flags.emplace_back(
+        "--builtin_ntp_client_enable_auto_config_in_cloud=true");
+  }
 #endif
 
   opts.num_masters = 3;

Reply via email to