This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to tag kudu-1.12.0-mdh1.0.0-4c2c075-centos-release in repository https://gitbox.apache.org/repos/asf/kudu.git
commit d9b29d901bcbabd1210064f84d73ece9931baff2 Author: Yingchun Lai <[email protected]> AuthorDate: Wed Jun 19 16:20:06 2019 +0800 [collector] Add CPP implemented collector Change-Id: I498347605a09e832d3398e76d9cefd3e52b1cfc6 --- CMakeLists.txt | 1 + src/kudu/collector/CMakeLists.txt | 63 ++ src/kudu/collector/cluster_rebalancer-test.cc | 50 ++ src/kudu/collector/cluster_rebalancer.cc | 152 +++++ src/kudu/collector/cluster_rebalancer.h | 68 ++ src/kudu/collector/collector-test.cc | 44 ++ src/kudu/collector/collector.cc | 170 +++++ src/kudu/collector/collector.h | 74 +++ src/kudu/collector/collector_main.cc | 73 +++ src/kudu/collector/collector_util-test.cc | 33 + src/kudu/collector/collector_util.cc | 46 ++ src/kudu/collector/collector_util.h | 32 + src/kudu/collector/falcon_reporter-test.cc | 122 ++++ src/kudu/collector/falcon_reporter.cc | 255 ++++++++ src/kudu/collector/falcon_reporter.h | 108 ++++ src/kudu/collector/local_reporter.cc | 84 +++ src/kudu/collector/local_reporter.h | 58 ++ src/kudu/collector/metrics_collector-test.cc | 777 +++++++++++++++++++++++ src/kudu/collector/metrics_collector.cc | 852 ++++++++++++++++++++++++++ src/kudu/collector/metrics_collector.h | 205 +++++++ src/kudu/collector/nodes_checker-test.cc | 55 ++ src/kudu/collector/nodes_checker.cc | 358 +++++++++++ src/kudu/collector/nodes_checker.h | 90 +++ src/kudu/collector/reporter_base.h | 73 +++ src/kudu/master/catalog_manager.cc | 1 + src/kudu/tools/tool_action_table.cc | 2 - src/kudu/util/jsonreader.h | 6 + 27 files changed, 3850 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d2bb441..1797da9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1423,6 +1423,7 @@ add_subdirectory(src/kudu/cfile) add_subdirectory(src/kudu/client) add_subdirectory(src/kudu/clock) add_subdirectory(src/kudu/codegen) +add_subdirectory(src/kudu/collector) add_subdirectory(src/kudu/common) add_subdirectory(src/kudu/consensus) add_subdirectory(src/kudu/experiments) diff --git a/src/kudu/collector/CMakeLists.txt b/src/kudu/collector/CMakeLists.txt new file mode 100644 index 0000000..5bbb1cb --- /dev/null +++ b/src/kudu/collector/CMakeLists.txt @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +######################################### +# collector +######################################### + +set(COLLECTOR_SRCS + cluster_rebalancer.cc + collector.cc + collector_util.cc + falcon_reporter.cc + local_reporter.cc + metrics_collector.cc + nodes_checker.cc) + +add_library(collector ${COLLECTOR_SRCS}) +target_link_libraries(collector + kudu_curl_util + kudu_tools_test_util + log + security + server_process) + +######################################### +# kudu-collector +######################################### + +add_executable(kudu-collector collector_main.cc) +target_link_libraries(kudu-collector + ${SANITIZER_OPTIONS_OVERRIDE} + ${KRB5_REALM_OVERRIDE} + collector + ${KUDU_BASE_LIBS}) + +option(KUDU_COLLECTOR_INSTALL "Whether to install the Kudu Collector executable" ON) +if(KUDU_COLLECTOR_INSTALL) + install(TARGETS kudu-collector RUNTIME DESTINATION ${CMAKE_INSTALL_SBINDIR}) +else() + message(STATUS "Skipping install rule for the Kudu Collector executable") +endif() + +SET_KUDU_TEST_LINK_LIBS(collector) +ADD_KUDU_TEST(collector-test) +ADD_KUDU_TEST(collector_util-test) +ADD_KUDU_TEST(cluster_rebalancer-test) +ADD_KUDU_TEST(falcon_reporter-test) +ADD_KUDU_TEST(metrics_collector-test) +ADD_KUDU_TEST(nodes_checker-test) diff --git a/src/kudu/collector/cluster_rebalancer-test.cc b/src/kudu/collector/cluster_rebalancer-test.cc new file mode 100644 index 0000000..0a1e0a7 --- /dev/null +++ b/src/kudu/collector/cluster_rebalancer-test.cc @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/collector/cluster_rebalancer.h" + +#include <gtest/gtest.h> + +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" + +namespace kudu { +namespace collector { + +TEST(TestClusterRebalancer, TestValidateHMTime) { + // 'time' in error format. + ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:34:56").IsInvalidArgument()); + ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("1:23").IsInvalidArgument()); + ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:3").IsInvalidArgument()); + ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:").IsInvalidArgument()); + ASSERT_TRUE(ClusterRebalancer::ValidateHMTime(":3").IsInvalidArgument()); + ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12.34").IsInvalidArgument()); + ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("-1:30").IsInvalidArgument()); + ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("24:30").IsInvalidArgument()); + ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:-1").IsInvalidArgument()); + ASSERT_TRUE(ClusterRebalancer::ValidateHMTime("12:60").IsInvalidArgument()); + + // 'time' in valid format. + ASSERT_OK(ClusterRebalancer::ValidateHMTime("12:34")); + ASSERT_OK(ClusterRebalancer::ValidateHMTime("00:00")); + ASSERT_OK(ClusterRebalancer::ValidateHMTime("00:59")); + ASSERT_OK(ClusterRebalancer::ValidateHMTime("23:00")); + ASSERT_OK(ClusterRebalancer::ValidateHMTime("23:59")); +} +} // namespace collector +} // namespace kudu + diff --git a/src/kudu/collector/cluster_rebalancer.cc b/src/kudu/collector/cluster_rebalancer.cc new file mode 100644 index 0000000..0015544 --- /dev/null +++ b/src/kudu/collector/cluster_rebalancer.cc @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/collector/cluster_rebalancer.h" + +#include <stdio.h> +#include <time.h> + +#include <ostream> +#include <vector> + +#include <gflags/gflags.h> +#include <gflags/gflags_declare.h> +#include <glog/logging.h> + +#include "kudu/collector/collector_util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/walltime.h" +#include "kudu/tools/tool_test_util.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" +#include "kudu/util/thread.h" + +DEFINE_bool(auto_rebalance, true, "Whether to rebalance cluster automatically"); +DEFINE_string(rebalance_time, "00:00", + "Time to perform cluster rebalance, format in HH:MM"); + +DECLARE_string(collector_cluster_name); +DECLARE_string(collector_master_addrs); +DECLARE_int32(collector_interval_sec); +DECLARE_int32(collector_timeout_sec); + +using std::string; +using std::vector; +using strings::Substitute; + +namespace kudu { +namespace collector { + +ClusterRebalancer::ClusterRebalancer() + : initialized_(false), + stop_background_threads_latch_(1) { +} + +ClusterRebalancer::~ClusterRebalancer() { + Shutdown(); +} + +Status ClusterRebalancer::Init() { + CHECK(!initialized_); + + RETURN_NOT_OK(ValidateHMTime(FLAGS_rebalance_time)); + + initialized_ = true; + return Status::OK(); +} + +Status ClusterRebalancer::Start() { + CHECK(initialized_); + + if (!FLAGS_auto_rebalance) { + return Status::OK(); + } + + RETURN_NOT_OK(StartClusterRebalancerThread()); + + return Status::OK(); +} + +void ClusterRebalancer::Shutdown() { + if (initialized_) { + string name = ToString(); + LOG(INFO) << name << " shutting down..."; + + stop_background_threads_latch_.CountDown(); + + if (cluster_rebalancer_thread_) { + cluster_rebalancer_thread_->Join(); + } + + LOG(INFO) << name << " shutdown complete."; + } +} + +string ClusterRebalancer::ToString() const { + return "ClusterRebalancer"; +} + +Status ClusterRebalancer::StartClusterRebalancerThread() { + return Thread::Create("server", "cluster-rebalancer", &ClusterRebalancer::ClusterRebalancerThread, + this, &cluster_rebalancer_thread_); +} + +void ClusterRebalancer::ClusterRebalancerThread() { + const MonoDelta kWait = MonoDelta::FromSeconds(60); + while (!RunOnceMode() && !stop_background_threads_latch_.WaitFor(kWait)) { + string dst; + StringAppendStrftime(&dst, "%H:%M", time(nullptr), true); + if (dst == FLAGS_rebalance_time) { + WARN_NOT_OK(RebalanceCluster(), "Unable to rebalance cluster"); + } + } +} + +Status ClusterRebalancer::RebalanceCluster() { + vector<string> args = { + "cluster", + "rebalance", + FLAGS_collector_master_addrs + }; + string tool_stdout; + string tool_stderr; + RETURN_NOT_OK_PREPEND(tools::RunKuduTool(args, &tool_stdout, &tool_stderr), + Substitute("out: $0, err: $1", tool_stdout, tool_stderr)); + LOG(INFO) << std::endl + << tool_stdout; + return Status::OK(); +} + +Status ClusterRebalancer::ValidateHMTime(const string& time) { + Status err = Status::InvalidArgument( + Substitute("Invalid time format '$0', should in format 'HH:MM'", time)); + if (time.size() != 5) { + return err; + } + + int hour, minute; + int count = sscanf(time.c_str(), "%d:%d", &hour, &minute); + if (count == 2 && + 0 <= hour && hour < 24 && + 0 <= minute && minute < 60) { + return Status::OK(); + } + + return err; +} +} // namespace collector +} // namespace kudu diff --git a/src/kudu/collector/cluster_rebalancer.h b/src/kudu/collector/cluster_rebalancer.h new file mode 100644 index 0000000..ed08736 --- /dev/null +++ b/src/kudu/collector/cluster_rebalancer.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <string> + +#include <gtest/gtest_prod.h> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/status.h" + +namespace kudu { +class Thread; +} // namespace kudu + +namespace kudu { + +namespace collector { + +class ClusterRebalancer : public RefCounted<ClusterRebalancer> { + public: + ClusterRebalancer(); + ~ClusterRebalancer(); + + Status Init(); + Status Start(); + void Shutdown(); + + std::string ToString() const; + + private: + friend class RefCounted<ClusterRebalancer>; + + FRIEND_TEST(TestClusterRebalancer, TestValidateHMTime); + + // Start thread to rebalance cluster. + Status StartClusterRebalancerThread(); + void ClusterRebalancerThread(); + static Status RebalanceCluster(); + + static Status ValidateHMTime(const std::string& time); + + bool initialized_; + + CountDownLatch stop_background_threads_latch_; + + scoped_refptr<Thread> cluster_rebalancer_thread_; + + DISALLOW_COPY_AND_ASSIGN(ClusterRebalancer); +}; +} // namespace collector +} // namespace kudu diff --git a/src/kudu/collector/collector-test.cc b/src/kudu/collector/collector-test.cc new file mode 100644 index 0000000..e8e1298 --- /dev/null +++ b/src/kudu/collector/collector-test.cc @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/collector/collector.h" + +#include <gtest/gtest.h> + +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" + +namespace kudu { +namespace collector { + +TEST(TestCollector, TestValidateIntervalAndTimeout) { + // 'interval' in error range. + ASSERT_TRUE(Collector::ValidateIntervalAndTimeout(9, 1).IsInvalidArgument()); + ASSERT_TRUE(Collector::ValidateIntervalAndTimeout(61, 1).IsInvalidArgument()); + + // 'timeout' in error range. + ASSERT_TRUE(Collector::ValidateIntervalAndTimeout(10, 0).IsInvalidArgument()); + ASSERT_TRUE(Collector::ValidateIntervalAndTimeout(10, 10).IsInvalidArgument()); + + // Both 'interval' and 'timeout' are in valid range. + ASSERT_OK(Collector::ValidateIntervalAndTimeout(10, 9)); + ASSERT_OK(Collector::ValidateIntervalAndTimeout(60, 9)); + ASSERT_OK(Collector::ValidateIntervalAndTimeout(60, 59)); +} +} // namespace collector +} // namespace kudu + diff --git a/src/kudu/collector/collector.cc b/src/kudu/collector/collector.cc new file mode 100644 index 0000000..1c930e9 --- /dev/null +++ b/src/kudu/collector/collector.cc @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/collector/collector.h" + +#include <ostream> + +#include <gflags/gflags.h> +#include <gflags/gflags_declare.h> +#include <glog/logging.h> + +#include "kudu/collector/cluster_rebalancer.h" +#include "kudu/collector/falcon_reporter.h" +#include "kudu/collector/local_reporter.h" +#include "kudu/collector/metrics_collector.h" +#include "kudu/collector/nodes_checker.h" +#include "kudu/collector/reporter_base.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/security/init.h" +#include "kudu/util/env.h" +#include "kudu/util/logging.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" +#include "kudu/util/thread.h" + +DEFINE_string(collector_cluster_name, "", + "Cluster name of this collector to operate"); +DEFINE_string(collector_master_addrs, "", + "Comma-separated list of Kudu master addresses where each address is of " + "form 'hostname:port"); +DEFINE_int32(collector_interval_sec, 60, + "Number of interval seconds to collect metrics"); +DEFINE_string(collector_report_method, "", + "Which monitor system the metrics reported to. Now supported system: falcon"); +DEFINE_int32(collector_timeout_sec, 10, + "Number of seconds to wait for a master, tserver, or CLI tool to return metrics"); +DEFINE_int32(collector_warn_threshold_ms, 1000, + "If a task takes more than this number of milliseconds, issue a warning with a " + "trace."); + +DECLARE_string(principal); +DECLARE_string(keytab_file); + +using std::string; +using strings::Substitute; + +namespace kudu { +namespace collector { + +Collector::Collector() + : initialized_(false), + stop_background_threads_latch_(1) { +} + +Collector::~Collector() { + Shutdown(); +} + +Status Collector::Init() { + CHECK(!initialized_); + + RETURN_NOT_OK(ValidateIntervalAndTimeout(FLAGS_collector_interval_sec, + FLAGS_collector_timeout_sec)); + RETURN_NOT_OK(security::InitKerberosForServer(FLAGS_principal, FLAGS_keytab_file)); + + if (FLAGS_collector_report_method == "falcon") { + reporter_.reset(new FalconReporter()); + } else if (FLAGS_collector_report_method == "local") { + reporter_.reset(new LocalReporter()); + } else { + LOG(FATAL) << Substitute("Unsupported FLAGS_collector_report_method $0", + FLAGS_collector_report_method); + } + CHECK_OK(reporter_->Init()); + nodes_checker_.reset(new NodesChecker(reporter_)); + CHECK_OK(nodes_checker_->Init()); + metrics_collector_.reset(new MetricsCollector(nodes_checker_, reporter_)); + CHECK_OK(metrics_collector_->Init()); + cluster_rebalancer_.reset(new ClusterRebalancer()); + CHECK_OK(cluster_rebalancer_->Init()); + + initialized_ = true; + return Status::OK(); +} + +Status Collector::Start() { + CHECK(initialized_); + + google::FlushLogFiles(google::INFO); // Flush the startup messages. + + RETURN_NOT_OK(StartExcessLogFileDeleterThread()); + + reporter_->Start(); + nodes_checker_->Start(); + metrics_collector_->Start(); + cluster_rebalancer_->Start(); + + return Status::OK(); +} + +void Collector::Shutdown() { + if (initialized_) { + string name = ToString(); + LOG(INFO) << name << " shutting down..."; + + reporter_->Shutdown(); + metrics_collector_->Shutdown(); + nodes_checker_->Shutdown(); + cluster_rebalancer_->Shutdown(); + + stop_background_threads_latch_.CountDown(); + + if (excess_log_deleter_thread_) { + excess_log_deleter_thread_->Join(); + } + + LOG(INFO) << name << " shutdown complete."; + } +} + +string Collector::ToString() const { + return "Collector"; +} + +Status Collector::StartExcessLogFileDeleterThread() { + // Try synchronously deleting excess log files once at startup to make sure it + // works, then start a background thread to continue deleting them in the + // future. + if (!FLAGS_logtostderr) { + RETURN_NOT_OK_PREPEND(DeleteExcessLogFiles(Env::Default()), + "Unable to delete excess log files"); + } + return Thread::Create("server", "excess-log-deleter", &Collector::ExcessLogFileDeleterThread, + this, &excess_log_deleter_thread_); +} + +void Collector::ExcessLogFileDeleterThread() { + // How often to attempt to clean up excess glog files. + const MonoDelta kWait = MonoDelta::FromSeconds(60); + while (!stop_background_threads_latch_.WaitFor(kWait)) { + WARN_NOT_OK(DeleteExcessLogFiles(Env::Default()), "Unable to delete excess log files"); + } +} + +Status Collector::ValidateIntervalAndTimeout(int interval, int timeout) { + if (10 <= interval && interval <= 60 && + 0 < timeout && timeout < interval) { + return Status::OK(); + } + + return Status::InvalidArgument( + Substitute("Invalid interval '$0'(should in range [10, 60]), " + "or invalid timeout '$1'(should in range (0, interval))", interval, timeout)); +} +} // namespace collector +} // namespace kudu diff --git a/src/kudu/collector/collector.h b/src/kudu/collector/collector.h new file mode 100644 index 0000000..8e4e236 --- /dev/null +++ b/src/kudu/collector/collector.h @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <string> + +#include <gtest/gtest_prod.h> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/status.h" + +namespace kudu { +class Thread; +} // namespace kudu + +namespace kudu { + +namespace collector { + +class ClusterRebalancer; +class MetricsCollector; +class NodesChecker; +class ReporterBase; + +class Collector { + public: + Collector(); + ~Collector(); + + Status Init(); + Status Start(); + void Shutdown(); + + std::string ToString() const; + + private: + FRIEND_TEST(TestCollector, TestValidateIntervalAndTimeout); + + // Start thread to remove excess glog files. + Status StartExcessLogFileDeleterThread(); + void ExcessLogFileDeleterThread(); + + static Status ValidateIntervalAndTimeout(int interval, int timeout); + + bool initialized_; + + scoped_refptr<ReporterBase> reporter_; + scoped_refptr<MetricsCollector> metrics_collector_; + scoped_refptr<NodesChecker> nodes_checker_; + scoped_refptr<ClusterRebalancer> cluster_rebalancer_; + + CountDownLatch stop_background_threads_latch_; + scoped_refptr<Thread> excess_log_deleter_thread_; + + DISALLOW_COPY_AND_ASSIGN(Collector); +}; +} // namespace collector +} // namespace kudu diff --git a/src/kudu/collector/collector_main.cc b/src/kudu/collector/collector_main.cc new file mode 100644 index 0000000..afc0768 --- /dev/null +++ b/src/kudu/collector/collector_main.cc @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <iostream> +#include <string> + +#include <glog/logging.h> + +#include "kudu/collector/collector.h" +#include "kudu/collector/collector_util.h" +#include "kudu/util/flags.h" +#include "kudu/util/init.h" +#include "kudu/util/logging.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" +#include "kudu/util/version_info.h" + +namespace kudu { +namespace collector { + +static int CollectorMain(int argc, char** argv) { + InitKuduOrDie(); + + GFlagsMap default_flags = GetFlagsMap(); + + ParseCommandLineFlags(&argc, &argv, true); + if (argc != 1) { + std::cerr << "usage: " << argv[0] << std::endl; + return 1; + } + std::string nondefault_flags = GetNonDefaultFlags(default_flags); + InitGoogleLoggingSafe(argv[0]); + + LOG(INFO) << "Collector non-default flags:\n" + << nondefault_flags << '\n' + << "Collector version:\n" + << VersionInfo::GetAllVersionInfo(); + + Collector collector; + LOG(INFO) << "Initializing collector..."; + CHECK_OK(collector.Init()); + + LOG(INFO) << "Starting collector..."; + CHECK_OK(collector.Start()); + + LOG(INFO) << "Collector successfully started."; + while (!RunOnceMode()) { + SleepFor(MonoDelta::FromSeconds(60)); + } + + return 0; +} + +} // namespace collector +} // namespace kudu + +int main(int argc, char** argv) { + return kudu::collector::CollectorMain(argc, argv); +} diff --git a/src/kudu/collector/collector_util-test.cc b/src/kudu/collector/collector_util-test.cc new file mode 100644 index 0000000..8b29c56 --- /dev/null +++ b/src/kudu/collector/collector_util-test.cc @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/collector/collector_util.h" + +#include <gtest/gtest.h> + +namespace kudu { +namespace collector { + +TEST(TestCollectorUtil, TestExtractHostName) { + ASSERT_EQ(ExtractHostName("1.2.3.4:5555"), "1.2.3.4"); + ASSERT_EQ(ExtractHostName("host-name.bj:5555"), "host-name.bj"); + ASSERT_EQ(ExtractHostName("1.2.3.4"), "1.2.3.4"); + ASSERT_EQ(ExtractHostName("host-name.bj"), "host-name.bj"); +} +} // namespace collector +} // namespace kudu + diff --git a/src/kudu/collector/collector_util.cc b/src/kudu/collector/collector_util.cc new file mode 100644 index 0000000..aa79c40 --- /dev/null +++ b/src/kudu/collector/collector_util.cc @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Utility functions for generating data for use by tools and tests. + +#include "kudu/collector/collector_util.h" + +#include <stddef.h> + +#include <gflags/gflags_declare.h> + +DECLARE_string(collector_report_method); + +using std::string; + +namespace kudu { +namespace collector { + +string ExtractHostName(const string& url) { + size_t pos = url.find(':'); + if (pos == string::npos) { + return url; + } + return url.substr(0, pos); +} + +bool RunOnceMode() { + static bool run_once = (FLAGS_collector_report_method == "local"); + return run_once; +} +} // namespace collector +} // namespace kudu diff --git a/src/kudu/collector/collector_util.h b/src/kudu/collector/collector_util.h new file mode 100644 index 0000000..f9badc8 --- /dev/null +++ b/src/kudu/collector/collector_util.h @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Utility functions for generating data for use by tools and tests. + +#pragma once + +#include <string> + +namespace kudu { +namespace collector { + +std::string ExtractHostName(const std::string& url); + +bool RunOnceMode(); + +} // namespace collector +} // namespace kudu diff --git a/src/kudu/collector/falcon_reporter-test.cc b/src/kudu/collector/falcon_reporter-test.cc new file mode 100644 index 0000000..85810c7 --- /dev/null +++ b/src/kudu/collector/falcon_reporter-test.cc @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/collector/falcon_reporter.h" + +#include <list> +#include <string> +#include <utility> + +#include <gflags/gflags_declare.h> +#include <gtest/gtest.h> + +#include "kudu/collector/reporter_base.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/test_macros.h" + +DECLARE_string(collector_cluster_name); +DECLARE_int32(collector_falcon_metrics_version); +DECLARE_int32(collector_interval_sec); + +using std::list; +using std::string; +using strings::Substitute; + +namespace kudu { +namespace collector { + +TEST(TestFalconReporter, TestSerializeItems) { + FLAGS_collector_interval_sec = 30; + FLAGS_collector_cluster_name = "test"; + FLAGS_collector_falcon_metrics_version = 8; + scoped_refptr<FalconReporter> reporter(new FalconReporter()); + list<scoped_refptr<ItemBase>> falcon_items; + string data; + ASSERT_OK(FalconReporter::SerializeItems(falcon_items, &data)); + ASSERT_EQ(data, ""); + + falcon_items.emplace_back(reporter->ConstructItem( + "tserver1", + "scan_count", + "host", + 1234567890, + 12345, + "COUNTER", + "")); + ASSERT_OK(FalconReporter::SerializeItems(falcon_items, &data)); + ASSERT_EQ(data, Substitute( + R"*([{"endpoint":"tserver1","metric":"scan_count","timestamp":1234567890,)*" + R"*("step":$0,"value":12345,"counterType":"COUNTER",)*" + R"*("tags":"service=kudu,cluster=$1,level=host,v=$2"}])*", + FLAGS_collector_interval_sec, + FLAGS_collector_cluster_name, + FLAGS_collector_falcon_metrics_version)); + + falcon_items.emplace_back(reporter->ConstructItem( + "table1", + "disk_size", + "table", + 1234567891, + 67890, + "GAUGE", + "")); + ASSERT_OK(FalconReporter::SerializeItems(falcon_items, &data)); + ASSERT_EQ(data, Substitute( + R"*([{"endpoint":"tserver1","metric":"scan_count","timestamp":1234567890,)*" + R"*("step":$0,"value":12345,"counterType":"COUNTER",)*" + R"*("tags":"service=kudu,cluster=$1,level=host,v=$2"},)*" + R"*({"endpoint":"table1","metric":"disk_size","timestamp":1234567891,)*" + R"*("step":$0,"value":67890,"counterType":"GAUGE",)*" + R"*("tags":"service=kudu,cluster=$1,level=table,v=$2"}])*", + FLAGS_collector_interval_sec, + FLAGS_collector_cluster_name, + FLAGS_collector_falcon_metrics_version)); +} + +void GenerateItems(const scoped_refptr<FalconReporter>& reporter, int count) { + list<scoped_refptr<ItemBase>> items; + for (int i = 0; i < count; ++i) { + items.emplace_back(reporter->ConstructItem("endpoint", "metric", "level", 0, i, "GAUGE", "")); + } + reporter->PushItems(std::move(items)); +} + +TEST(TestFalconReporter, TestPushAndPopItems) { + scoped_refptr<FalconReporter> reporter(new FalconReporter()); + ASSERT_FALSE(reporter->HasItems()); + NO_FATALS(GenerateItems(reporter, 1)); + ASSERT_TRUE(reporter->HasItems()); + NO_FATALS(GenerateItems(reporter, 9)); + ASSERT_TRUE(reporter->HasItems()); + + list<scoped_refptr<ItemBase>> falcon_items; + reporter->PopItems(&falcon_items); + ASSERT_FALSE(reporter->HasItems()); + ASSERT_EQ(falcon_items.size(), 10); + + NO_FATALS(GenerateItems(reporter, 5)); + ASSERT_TRUE(reporter->HasItems()); + + list<scoped_refptr<ItemBase>> falcon_items2; + reporter->PopItems(&falcon_items2); + ASSERT_FALSE(reporter->HasItems()); + ASSERT_EQ(falcon_items2.size(), 5); +} +} // namespace collector +} // namespace kudu + diff --git a/src/kudu/collector/falcon_reporter.cc b/src/kudu/collector/falcon_reporter.cc new file mode 100644 index 0000000..e492fd7 --- /dev/null +++ b/src/kudu/collector/falcon_reporter.cc @@ -0,0 +1,255 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/collector/falcon_reporter.h" + +#include <kudu/util/curl_util.h> +#include <stddef.h> + +#include <algorithm> +#include <functional> +#include <iterator> +#include <mutex> +#include <ostream> +#include <utility> + +#include <gflags/gflags.h> +#include <gflags/gflags_declare.h> +#include <glog/logging.h> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/faststring.h" +#include "kudu/util/jsonwriter.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" +#include "kudu/util/threadpool.h" +#include "kudu/util/trace.h" + +DEFINE_bool(collector_direct_push, false, + "Whether to push collected items to falcon agent directly, " + "otherwise items will be cached and then pushed to falcon " + "agent asynchronous"); +DEFINE_string(collector_falcon_agent, "http://127.0.0.1:1988/v1/push", + "The falcon agent URL to push metrics to"); +DEFINE_int32(collector_falcon_metrics_version, 4, + "Version of metrics pushed to falcon, it will be tagged in " + "'tag' section of an item"); +DEFINE_int32(collector_falcon_pusher_count, 4, + "Thread count to push collected items to falcon agent"); +DEFINE_int32(collector_report_batch_size, 1000, + "Count of items will be pushed to falcon agent by batch"); +DEFINE_int32(collector_push_timeout_ms, 20, + "Timeout for pushing items to falcon agent"); + +DECLARE_string(collector_cluster_name); +DECLARE_int32(collector_interval_sec); +DECLARE_int32(collector_timeout_sec); +DECLARE_int32(collector_warn_threshold_ms); + +using std::list; +using std::string; +using strings::Substitute; + +namespace kudu { +namespace collector { + +FalconReporter::FalconReporter() + : initialized_(false), + stop_background_threads_latch_(1) { +} + +FalconReporter::~FalconReporter() { + Shutdown(); +} + +Status FalconReporter::Init() { + CHECK(!initialized_); + + // Simple test falcon agent. + EasyCurl curl; + faststring dst; + RETURN_NOT_OK(curl.PostToURL(FLAGS_collector_falcon_agent, "", &dst)); + + initialized_ = true; + return Status::OK(); +} + +Status FalconReporter::Start() { + CHECK(initialized_); + + if (!FLAGS_collector_direct_push) { + RETURN_NOT_OK(StartFalconPusherThreadPool()); + } + + return Status::OK(); +} + +void FalconReporter::Shutdown() { + if (initialized_) { + string name = ToString(); + LOG(INFO) << name << " shutting down..."; + + stop_background_threads_latch_.CountDown(); + + pusher_thread_pool_->Wait(); + + LOG(INFO) << name << " shutdown complete."; + } +} + +string FalconReporter::ToString() const { + return "FalconReporter"; +} + +scoped_refptr<ItemBase> FalconReporter::ConstructItem(string endpoint, + string metric, + string level, + uint64_t timestamp, + int64_t value, + string counter_type, + string extra_tags) { + scoped_refptr<ItemBase> tmp(new FalconItem(std::move(endpoint), + std::move(metric), + Substitute("service=kudu,cluster=$0,level=$1,v=$2$3", + FLAGS_collector_cluster_name, level, + FLAGS_collector_falcon_metrics_version, + extra_tags.empty() ? "" : "," + extra_tags), + timestamp, + FLAGS_collector_interval_sec, + value, + std::move(counter_type))); + return tmp; +} + +Status FalconReporter::PushItems(list<scoped_refptr<ItemBase>> items) { + if (FLAGS_collector_direct_push) { + RETURN_NOT_OK(PushToAgent(std::move(items))); + } else { + std::lock_guard<RWMutex> l(items_lock_); + buffer_items_.splice(buffer_items_.end(), std::move(items)); + } + return Status::OK(); +} + +Status FalconReporter::StartFalconPusherThreadPool() { + RETURN_NOT_OK(ThreadPoolBuilder("falcon-pusher") + .set_min_threads(FLAGS_collector_falcon_pusher_count) + .set_max_threads(FLAGS_collector_falcon_pusher_count) + .set_idle_timeout(MonoDelta::FromMilliseconds(1)) + .Build(&pusher_thread_pool_)); + for (int i = 0; i < FLAGS_collector_falcon_pusher_count; ++i) { + RETURN_NOT_OK(pusher_thread_pool_->SubmitFunc(std::bind(&FalconReporter::FalconPusher, + this))); + } + + return Status::OK(); +} + +void FalconReporter::FalconPusher() { + while (HasItems() || !stop_background_threads_latch_.WaitFor(MonoDelta::FromSeconds(1))) { + ReportItems(); + } +} + +void FalconReporter::ReportItems() { + MonoTime start(MonoTime::Now()); + scoped_refptr<Trace> trace(new Trace); + ADOPT_TRACE(trace.get()); + TRACE_EVENT0("collector", "FalconReporter::ReportItems"); + TRACE("init"); + + list<scoped_refptr<ItemBase>> falcon_items; + PopItems(&falcon_items); + WARN_NOT_OK(PushToAgent(std::move(falcon_items)), "PushToAgent failed"); + int64_t elapsed_ms = (MonoTime::Now() - start).ToMilliseconds(); + if (elapsed_ms > FLAGS_collector_warn_threshold_ms) { + if (Trace::CurrentTrace()) { + LOG(WARNING) << "Trace:" << std::endl + << Trace::CurrentTrace()->DumpToString(); + } + } +} + +bool FalconReporter::HasItems() const { + std::lock_guard<RWMutex> l(items_lock_); + return !buffer_items_.empty(); +} + +void FalconReporter::PopItems(list<scoped_refptr<ItemBase>>* falcon_items) { + int items_left = 0; + CHECK(falcon_items); + { + std::lock_guard<RWMutex> l(items_lock_); + auto end_item = buffer_items_.begin(); + std::advance(end_item, std::min(buffer_items_.size(), + static_cast<size_t>(FLAGS_collector_report_batch_size))); + falcon_items->splice(falcon_items->end(), buffer_items_, buffer_items_.begin(), end_item); + items_left = buffer_items_.size(); + } + if (items_left > 1000000) { + LOG(INFO) << "Items left " << items_left << std::endl; + } + TRACE(Substitute("Pop items, count $0", falcon_items->size())); +} + +Status FalconReporter::PushToAgent(list<scoped_refptr<ItemBase>> falcon_items) { + string data; + RETURN_NOT_OK(SerializeItems(std::move(falcon_items), &data)); + + EasyCurl curl; + faststring dst; + curl.set_timeout(MonoDelta::FromMilliseconds(FLAGS_collector_push_timeout_ms)); + RETURN_NOT_OK(curl.PostToURL(FLAGS_collector_falcon_agent, data, &dst)); + TRACE(Substitute("Pushed items to agent, size $0", data.size())); + return Status::OK(); +} + +Status FalconReporter::SerializeItems(list<scoped_refptr<ItemBase>> items, string* data) { + CHECK(data); + if (items.empty()) { + return Status::OK(); + } + std::ostringstream str; + JsonWriter jw(&str, JsonWriter::COMPACT); + jw.StartArray(); + for (const auto& item : items) { + scoped_refptr<FalconItem> falcon_item = dynamic_cast<FalconItem*>(item.get()); + jw.StartObject(); + jw.String("endpoint"); + jw.String(falcon_item->endpoint); + jw.String("metric"); + jw.String(falcon_item->metric); + jw.String("timestamp"); + jw.Uint64(falcon_item->timestamp); + jw.String("step"); + jw.Int(falcon_item->step); + jw.String("value"); + jw.Int64(falcon_item->value); + jw.String("counterType"); + jw.String(falcon_item->counter_type); + jw.String("tags"); + jw.String(falcon_item->tags); + jw.EndObject(); + } + jw.EndArray(); + *data = str.str(); + TRACE(Substitute("SerializeItems done, count $0, size $1", items.size(), data->size())); + return Status::OK(); +} +} // namespace collector +} // namespace kudu diff --git a/src/kudu/collector/falcon_reporter.h b/src/kudu/collector/falcon_reporter.h new file mode 100644 index 0000000..3b3cde5 --- /dev/null +++ b/src/kudu/collector/falcon_reporter.h @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <cstdint> +#include <list> +#include <memory> +#include <string> +#include <utility> + +#include <gtest/gtest_prod.h> + +#include "kudu/collector/reporter_base.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/rw_mutex.h" +#include "kudu/util/status.h" + +namespace kudu { +class ThreadPool; +} // namespace kudu + +namespace kudu { + +namespace collector { + +// Open-Falcon is a distributed and high-performance monitoring system, +// see more details http://open-falcon.com +struct FalconItem : public ItemBase { + FalconItem(std::string ep, std::string m, std::string t, + uint64_t ts, int32_t s, int64_t v, std::string ct) + : endpoint(std::move(ep)), + metric(std::move(m)), + tags(std::move(t)), + timestamp(ts), + step(s), + value(v), + counter_type(std::move(ct)) { + } + ~FalconItem() override = default; + + std::string endpoint; + std::string metric; + std::string tags; + uint64_t timestamp; + int32_t step; + int64_t value; + std::string counter_type; +}; + +class FalconReporter : public ReporterBase { + public: + FalconReporter(); + ~FalconReporter() override; + + Status Init() override; + Status Start() override; + void Shutdown() override; + + std::string ToString() const override; + + scoped_refptr<ItemBase> ConstructItem(std::string endpoint, + std::string metric, + std::string level, + uint64_t timestamp, + int64_t value, + std::string counter_type, + std::string extra_tags) override; + + Status PushItems(std::list<scoped_refptr<ItemBase>> items) override; + + private: + FRIEND_TEST(TestFalconReporter, TestSerializeItems); + FRIEND_TEST(TestFalconReporter, TestPushAndPopItems); + + Status StartFalconPusherThreadPool(); + void FalconPusher(); + + bool HasItems() const; + void ReportItems(); + void PopItems(std::list<scoped_refptr<ItemBase>>* falcon_items); + static Status PushToAgent(std::list<scoped_refptr<ItemBase>> falcon_items); + static Status SerializeItems(std::list<scoped_refptr<ItemBase>> items, std::string* data); + + bool initialized_; + + CountDownLatch stop_background_threads_latch_; + std::unique_ptr<ThreadPool> pusher_thread_pool_; + + mutable RWMutex items_lock_; + std::list<scoped_refptr<ItemBase>> buffer_items_; +}; +} // namespace collector +} // namespace kudu diff --git a/src/kudu/collector/local_reporter.cc b/src/kudu/collector/local_reporter.cc new file mode 100644 index 0000000..04110bb --- /dev/null +++ b/src/kudu/collector/local_reporter.cc @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/collector/local_reporter.h" + +#include <iostream> + +#include <glog/logging.h> + +#include "kudu/util/status.h" + +using std::list; +using std::string; + +namespace kudu { +namespace collector { + +LocalReporter::LocalReporter() + : initialized_(false) { +} + +LocalReporter::~LocalReporter() { + Shutdown(); +} + +Status LocalReporter::Init() { + CHECK(!initialized_); + + initialized_ = true; + return Status::OK(); +} + +Status LocalReporter::Start() { + CHECK(initialized_); + + return Status::OK(); +} + +void LocalReporter::Shutdown() { + if (initialized_) { + string name = ToString(); + LOG(INFO) << name << " shutting down..."; + + LOG(INFO) << name << " shutdown complete."; + } +} + +string LocalReporter::ToString() const { + return "LocalReporter"; +} + +scoped_refptr<ItemBase> LocalReporter::ConstructItem(string endpoint, + string metric, + string level, + uint64_t /*timestamp*/, + int64_t value, + string /*counter_type*/, + string extra_tags) { + MutexLock l(output_lock_); + std::cout << level << " " << metric << " " << endpoint << " " + << (extra_tags.empty() ? "" : extra_tags + " ") << value << std::endl; + return nullptr; +} + +Status LocalReporter::PushItems(list<scoped_refptr<ItemBase>> /*items*/) { + return Status::OK(); +} + +} // namespace collector +} // namespace kudu diff --git a/src/kudu/collector/local_reporter.h b/src/kudu/collector/local_reporter.h new file mode 100644 index 0000000..d796b3b --- /dev/null +++ b/src/kudu/collector/local_reporter.h @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <cstdint> +#include <list> +#include <string> + +#include "kudu/collector/reporter_base.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/mutex.h" +#include "kudu/util/status.h" + +namespace kudu { + +namespace collector { + +class LocalReporter : public ReporterBase { + public: + LocalReporter(); + ~LocalReporter() override; + + Status Init() override; + Status Start() override; + void Shutdown() override; + + std::string ToString() const override; + + scoped_refptr<ItemBase> ConstructItem(std::string endpoint, + std::string metric, + std::string level, + uint64_t timestamp, + int64_t value, + std::string counter_type, + std::string extra_tags) override; + + Status PushItems(std::list<scoped_refptr<ItemBase>> items) override; + + private: + bool initialized_; + Mutex output_lock_; +}; +} // namespace collector +} // namespace kudu diff --git a/src/kudu/collector/metrics_collector-test.cc b/src/kudu/collector/metrics_collector-test.cc new file mode 100644 index 0000000..4a3e050 --- /dev/null +++ b/src/kudu/collector/metrics_collector-test.cc @@ -0,0 +1,777 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/collector/metrics_collector.h" + +#include <stdint.h> + +#include <map> +#include <set> +#include <string> +#include <unordered_map> +#include <vector> + +#include <gflags/gflags_declare.h> +#include <gtest/gtest.h> +#include <rapidjson/document.h> + +#include "kudu/collector/local_reporter.h" +#include "kudu/collector/nodes_checker.h" +#include "kudu/collector/reporter_base.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/jsonreader.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" + +DECLARE_bool(collector_request_merged_metrics); +DECLARE_string(collector_attributes); +DECLARE_string(collector_cluster_level_metrics); +DECLARE_string(collector_metrics); +DECLARE_string(collector_table_names); +DECLARE_string(collector_metrics_types_for_test); + +using std::map; +using std::set; +using std::string; +using std::unordered_map; +using std::vector; + +namespace kudu { +namespace collector { + +scoped_refptr<MetricsCollector> BuildCollector() { + scoped_refptr<ReporterBase> reporter(new LocalReporter()); + scoped_refptr<NodesChecker> nodes_checker(new NodesChecker(reporter)); + return new MetricsCollector(nodes_checker, reporter); +} + +TEST(TestMetricsCollector, TestConvertStateToInt) { + int64_t result = 1; + ASSERT_OK(MetricsCollector::ConvertStateToInt("", &result)); + ASSERT_EQ(result, 0); + ASSERT_OK(MetricsCollector::ConvertStateToInt("STOPPED", &result)); + ASSERT_EQ(result, 0); + ASSERT_OK(MetricsCollector::ConvertStateToInt("RUNNINGSTOPPED", &result)); + ASSERT_EQ(result, 0); + ASSERT_OK(MetricsCollector::ConvertStateToInt("RUNNINGBOOTSTRAPPING", &result)); + ASSERT_EQ(result, 0); + ASSERT_OK(MetricsCollector::ConvertStateToInt("RUNNING", &result)); + ASSERT_EQ(result, 1); + ASSERT_OK(MetricsCollector::ConvertStateToInt("RUNNINGRUNNING", &result)); + ASSERT_EQ(result, 1); +} + +TEST(TestMetricsCollector, TestGetHistValue) { + { + vector<MetricsCollector::SimpleHistogram> hist_values({{10, 100}}); + ASSERT_EQ(MetricsCollector::GetHistValue(hist_values), 100); + } + { + vector<MetricsCollector::SimpleHistogram> hist_values({{10, 100}, + {20, 200}}); + ASSERT_EQ(MetricsCollector::GetHistValue(hist_values), 167); + } +} + +TEST(TestMetricsCollector, TestMergeToTableLevelMetrics) { + // Merge empty metrics. + { + vector<MetricsCollector::TablesMetrics> hosts_tables_metrics; + vector<MetricsCollector::TablesHistMetrics> hosts_tables_hist_metrics; + MetricsCollector::TablesMetrics tables_metrics; + MetricsCollector::TablesHistMetrics tables_hist_metrics; + ASSERT_OK(MetricsCollector::MergeToTableLevelMetrics( + hosts_tables_metrics, hosts_tables_hist_metrics, + &tables_metrics, &tables_hist_metrics)); + ASSERT_TRUE(tables_metrics.empty()); + ASSERT_TRUE(tables_hist_metrics.empty()); + } + // Merge multi metrics. + { + vector<MetricsCollector::TablesMetrics> hosts_tables_metrics{ + { // host-1 + { + "table1", + { + {"metric1", 1}, + {"metric2", 2} + } + }, + { + "table2", + { + {"metric1", 100}, + {"metric3", 200} + } + } + }, + { // host-2 + { + "table1", + { + {"metric1", 100}, + {"metric2", 200} + } + }, + { + "table2", + { + {"metric1", 1}, + {"metric2", 2} + } + }, + { + "table3", + { + {"metric1", 1}, + {"metric2", 2} + } + } + } + }; + vector<MetricsCollector::TablesHistMetrics> hosts_tables_hist_metrics{ + { // host-1 + { + "table1", + { + { + "metric3", + { + {10, 100}, + {20, 200} + } + }, + { + "metric4", + { + {30, 300}, + {40, 400} + } + } + } + }, + { + "table2", + { + { + "metric3", + { + {10, 200}, + {20, 300} + } + }, + { + "metric4", + { + {40, 300}, + {50, 400} + } + } + } + } + }, + { // host-2 + { + "table1", + { + { + "metric3", + { + {10, 100}, + {20, 200} + } + }, + { + "metric4", + { + {30, 300}, + {40, 400} + } + } + } + }, + { + "table2", + { + { + "metric3", + { + {10, 200}, + {20, 300} + } + }, + { + "metric4", + { + {40, 300}, + {50, 400} + } + } + } + }, + { + "table3", + { + { + "metric3", + { + {10, 200}, + {20, 300} + } + }, + { + "metric4", + { + {40, 300}, + {50, 400} + } + } + } + } + } + }; + MetricsCollector::TablesMetrics tables_metrics; + MetricsCollector::TablesHistMetrics tables_hist_metrics; + ASSERT_OK(MetricsCollector::MergeToTableLevelMetrics( + hosts_tables_metrics, hosts_tables_hist_metrics, + &tables_metrics, &tables_hist_metrics)); + ASSERT_EQ(tables_metrics, MetricsCollector::TablesMetrics({ + { + "table1", + { + {"metric1", 101}, + {"metric2", 202} + } + }, + { + "table2", + { + {"metric1", 101}, + {"metric2", 2}, + {"metric3", 200}, + } + }, + { + "table3", + { + {"metric1", 1}, + {"metric2", 2} + } + } + })); + ASSERT_EQ(tables_hist_metrics, MetricsCollector::TablesHistMetrics({ + { + "table1", + { + { + "metric3", + { + {10, 100}, + {20, 200}, + {10, 100}, + {20, 200} + } + }, + { + "metric4", + { + {30, 300}, + {40, 400}, + {30, 300}, + {40, 400} + } + } + } + }, + { + "table2", + { + { + "metric3", + { + {10, 200}, + {20, 300}, + {10, 200}, + {20, 300} + } + }, + { + "metric4", + { + {40, 300}, + {50, 400}, + {40, 300}, + {50, 400} + } + } + } + }, + { + "table3", + { + { + "metric3", + { + {10, 200}, + {20, 300} + } + }, + { + "metric4", + { + {40, 300}, + {50, 400} + } + } + } + } + })); + } +} + +TEST(TestMetricsCollector, TestMergeToClusterLevelMetrics) { + // Merge empty metrics. + { + MetricsCollector::TablesMetrics tables_metrics; + MetricsCollector::TablesHistMetrics tables_hist_metrics; + MetricsCollector::Metrics cluster_metrics; + ASSERT_OK(MetricsCollector::MergeToClusterLevelMetrics(tables_metrics, tables_hist_metrics, + &cluster_metrics)); + ASSERT_TRUE(cluster_metrics.empty()); + } + // Merge multi metrics. + { + MetricsCollector::TablesMetrics tables_metrics( + { + { + "table1", + { + {"metric1", 100} + } + }, + { + "table2", + { + {"metric1", 10}, + {"metric2", 20} + } + }, + { + "table3", + { + {"metric1", 1}, + {"metric2", 2}, + {"metric3", 3} + } + } + } + ); + MetricsCollector::TablesHistMetrics tables_hist_metrics; // TODO(yingchun) not used now. + MetricsCollector::Metrics cluster_metrics({{"metric2", 0}}); + ASSERT_OK(MetricsCollector::MergeToClusterLevelMetrics(tables_metrics, tables_hist_metrics, + &cluster_metrics)); + ASSERT_EQ(cluster_metrics, MetricsCollector::Metrics({ + { + {"metric2", 22} + } + })); + } +} + +TEST(TestMetricsCollector, TestParseMetrics) { + // Check ParseServerMetrics and ParseTabletMetrics. + { + string data; + JsonReader r(data); + const rapidjson::Value entity; + ASSERT_TRUE(MetricsCollector::ParseServerMetrics(r, &entity).IsNotSupported()); + ASSERT_TRUE(MetricsCollector::ParseTabletMetrics(r, &entity).IsNotSupported()); + } + // Check ParseTableMetrics. + { + auto collector = BuildCollector(); + collector->metric_types_by_entity_type_["tablet"] = { + {"test_metric", "COUNTER"}, + {"metric_counter1", "COUNTER"}, + {"metric_counter2", "COUNTER"}, + {"metric_histogram1", "HISTOGRAM"}, + {"metric_histogram2", "HISTOGRAM"} + }; + string data( + R"*([ )*" + R"*( { )*" + R"*( "type": "server", )*" + R"*( "id": "server1", )*" + R"*( "attributes": { )*" + R"*( "attrA": "val1", )*" + R"*( "attrB": "val2" )*" + R"*( }, )*" + R"*( "metrics": [ )*" + R"*( { )*" + R"*( "name": "test_metric", )*" + R"*( "value": 123 )*" + R"*( } )*" + R"*( ] )*" + R"*( }, )*" + R"*( { )*" + R"*( "type": "tablet", )*" + R"*( "id": "tablet1", )*" + R"*( "attributes": { )*" + R"*( "attr1": "val1", )*" + R"*( "attr2": "val2" )*" + R"*( }, )*" + R"*( "metrics": [ )*" + R"*( { )*" + R"*( "name": "test_metric", )*" + R"*( "value": 321 )*" + R"*( } )*" + R"*( ] )*" + R"*( }, )*" + R"*( { )*" + R"*( "type": "table", )*" + R"*( "id": "table1", )*" + R"*( "attributes": { )*" + R"*( "attr1": "val2", )*" + R"*( "attr2": "val3" )*" + R"*( }, )*" + R"*( "metrics": [ )*" + R"*( { )*" + R"*( "name": "metric_counter1", )*" + R"*( "value": 10 )*" + R"*( }, )*" + R"*( { )*" + R"*( "name": "metric_counter2", )*" + R"*( "value": 20 )*" + R"*( }, )*" + R"*( { )*" + R"*( "name": "metric_histogram1", )*" + R"*( "total_count": 17, )*" + R"*( "min": 6, )*" + R"*( "mean": 47.8235, )*" + R"*( "percentile_75": 62, )*" + R"*( "percentile_95": 72, )*" + R"*( "percentile_99": 73, )*" + R"*( "percentile_99_9": 73, )*" + R"*( "percentile_99_99": 73, )*" + R"*( "max": 73, )*" + R"*( "total_sum": 813 )*" + R"*( } )*" + R"*( ] )*" + R"*( }, )*" + R"*( { )*" + R"*( "type": "table", )*" + R"*( "id": "table2", )*" + R"*( "attributes": { )*" + R"*( "attr1": "val3", )*" + R"*( "attr2": "val2" )*" + R"*( }, )*" + R"*( "metrics": [ )*" + R"*( { )*" + R"*( "name": "metric_counter1", )*" + R"*( "value": 100 )*" + R"*( }, )*" + R"*( { )*" + R"*( "name": "metric_histogram1", )*" + R"*( "total_count": 170, )*" + R"*( "min": 60, )*" + R"*( "mean": 478.235, )*" + R"*( "percentile_75": 620, )*" + R"*( "percentile_95": 720, )*" + R"*( "percentile_99": 730, )*" + R"*( "percentile_99_9": 735, )*" + R"*( "percentile_99_99": 735, )*" + R"*( "max": 735, )*" + R"*( "total_sum": 8130 )*" + R"*( }, )*" + R"*( { )*" + R"*( "name": "metric_histogram2", )*" + R"*( "total_count": 34, )*" + R"*( "min": 6, )*" + R"*( "mean": 47.8235, )*" + R"*( "percentile_75": 62, )*" + R"*( "percentile_95": 72, )*" + R"*( "percentile_99": 72, )*" + R"*( "percentile_99_9": 73, )*" + R"*( "percentile_99_99": 73, )*" + R"*( "max": 73, )*" + R"*( "total_sum": 813 )*" + R"*( } )*" + R"*( ] )*" + R"*( } )*" + R"*(] )*"); + + // Attribute filter is empty. + { + MetricsCollector::TablesMetrics tables_metrics; + MetricsCollector::TablesHistMetrics tables_hist_metrics; + MetricsCollector::Metrics host_metrics; + MetricsCollector::HistMetrics host_hist_metrics; + ASSERT_OK(collector->ParseMetrics(data, + &tables_metrics, &host_metrics, + &tables_hist_metrics, &host_hist_metrics)); + ASSERT_EQ(tables_metrics, MetricsCollector::TablesMetrics({ + { + "table1", + { + {"metric_counter1", 10}, + {"metric_counter2", 20}, + } + }, + { + "table2", + { + {"metric_counter1", 100} + } + } + })); + ASSERT_EQ(tables_hist_metrics, MetricsCollector::TablesHistMetrics({ + { + "table1", + { + { + "metric_histogram1_percentile_99", + { + {17, 73} + } + } + } + }, + { + "table2", + { + { + "metric_histogram1_percentile_99", + { + {170, 730} + } + }, + { + "metric_histogram2_percentile_99", + { + {34, 72} + } + } + } + } + })); + ASSERT_EQ(host_metrics, MetricsCollector::Metrics({ + {"metric_counter1", 110}, + {"metric_counter2", 20} + })); + ASSERT_EQ(host_hist_metrics, MetricsCollector::HistMetrics({ + { + "metric_histogram1_percentile_99", + { + {17, 73}, + {170, 730} + } + }, + { + "metric_histogram2_percentile_99", + { + {34, 72} + } + } + })); + } + + // Attribute filter is not empty. + { + collector->attributes_filter_ = {{"attr1", {"val1", "val2"}}}; + + MetricsCollector::TablesMetrics tables_metrics; + MetricsCollector::TablesHistMetrics tables_hist_metrics; + MetricsCollector::Metrics host_metrics; + MetricsCollector::HistMetrics host_hist_metrics; + ASSERT_OK(collector->ParseMetrics(data, + &tables_metrics, &host_metrics, + &tables_hist_metrics, &host_hist_metrics)); + ASSERT_EQ(tables_metrics, MetricsCollector::TablesMetrics({ + { + "table1", + { + {"metric_counter1", 10}, + {"metric_counter2", 20}, + } + } + })); + ASSERT_EQ(tables_hist_metrics, MetricsCollector::TablesHistMetrics({ + { + "table1", + { + { + "metric_histogram1_percentile_99", + { + {17, 73} + } + } + } + } + })); + ASSERT_EQ(host_metrics, MetricsCollector::Metrics({ + {"metric_counter1", 10}, + {"metric_counter2", 20} + })); + ASSERT_EQ(host_hist_metrics, MetricsCollector::HistMetrics({ + { + "metric_histogram1_percentile_99", + { + {17, 73}, + } + } + })); + } + } +} + +TEST(TestMetricsCollector, TestInitMetrics) { + FLAGS_collector_metrics_types_for_test = + R"*([ )*" + R"*( { )*" + R"*( "type": "tablet", )*" + R"*( "id": "table1", )*" + R"*( "metrics": [ )*" + R"*( { )*" + R"*( "name": "counter_metric1", )*" + R"*( "type": "counter" )*" + R"*( }, )*" + R"*( { )*" + R"*( "name": "histogram_metric1", )*" + R"*( "type": "histogram" )*" + R"*( }, )*" + R"*( { )*" + R"*( "name": "gauge_metric1", )*" + R"*( "type": "gauge" )*" + R"*( } )*" + R"*( ] )*" + R"*( }, )*" + R"*( { )*" + R"*( "type": "tablet", )*" + R"*( "id": "table2", )*" + R"*( "metrics": [ )*" + R"*( { )*" + R"*( "name": "counter_metric1", )*" + R"*( "type": "counter" )*" + R"*( }, )*" + R"*( { )*" + R"*( "name": "histogram_metric1", )*" + R"*( "type": "histogram" )*" + R"*( }, )*" + R"*( { )*" + R"*( "name": "gauge_metric1", )*" + R"*( "type": "gauge" )*" + R"*( } )*" + R"*( ] )*" + R"*( }, )*" + R"*( { )*" + R"*( "type": "server", )*" + R"*( "metrics": [ )*" + R"*( { )*" + R"*( "name": "counter_metric2", )*" + R"*( "type": "counter" )*" + R"*( }, )*" + R"*( { )*" + R"*( "name": "histogram_metric2", )*" + R"*( "type": "histogram" )*" + R"*( }, )*" + R"*( { )*" + R"*( "name": "gauge_metric2", )*" + R"*( "type": "gauge" )*" + R"*( } )*" + R"*( ] )*" + R"*( } )*" + R"*(] )*"; + auto collector = BuildCollector(); + ASSERT_OK(collector->InitMetrics()); + map<string, MetricsCollector::MetricTypes> expect_metric_types({ + { + "tablet", + { + {"counter_metric1", "COUNTER"}, + {"histogram_metric1", "HISTOGRAM"}, + {"gauge_metric1", "GAUGE"}, + } + }, + { + "server", + { + {"counter_metric2", "COUNTER"}, + {"histogram_metric2", "HISTOGRAM"}, + {"gauge_metric2", "GAUGE"}, + } + } + }); + ASSERT_EQ(collector->metric_types_by_entity_type_, expect_metric_types); +} + +TEST(TestMetricsCollector, TestInitFilters) { + FLAGS_collector_attributes = "attr1:val1,val2;attr2:val1"; + auto collector = BuildCollector(); + ASSERT_OK(collector->InitFilters()); + unordered_map<string, set<string>> expect_attributes_filter({ + { + "attr1", + {"val1", "val2"} + }, + { + "attr2", + {"val1"} + } + }); + ASSERT_EQ(collector->attributes_filter_, expect_attributes_filter); +} + +#define CHECK_URL_PARAMETERS(metrics, request_merged, attributes, table_names, expect_url) \ +do { \ + FLAGS_collector_metrics = metrics; \ + FLAGS_collector_request_merged_metrics = request_merged; \ + FLAGS_collector_attributes = attributes; \ + FLAGS_collector_table_names = table_names; \ + auto collector = BuildCollector(); \ + ASSERT_OK(collector->InitFilters()); \ + ASSERT_OK(collector->InitMetricsUrlParameters()); \ + ASSERT_EQ(collector->metric_url_parameters_, expect_url); \ +} while (false) + +TEST(TestMetricsCollector, TestInitMetricsUrlParameters) { + CHECK_URL_PARAMETERS("", true, "", "", + "/metrics?compact=1&origin=false&merge=true"); + CHECK_URL_PARAMETERS("m1,m2,m3", true, "", "", + "/metrics?compact=1&metrics=m1,m2,m3&origin=false&merge=true"); + // TODO(yingchun): now FLAGS_collector_request_merged_metrics must be true + //CHECK_URL_PARAMETERS("", false, "", "", + // "/metrics?compact=1"); + CHECK_URL_PARAMETERS("", true, "attr1:a1,a2;attr2:a3", "", + "/metrics?compact=1&origin=false&merge=true&attributes=attr2,a3,attr1,a1,attr1,a2,"); + CHECK_URL_PARAMETERS("", true, "", "t1,t2,t3", + "/metrics?compact=1&origin=false&merge=true&table_names=t1,t2,t3"); +} + +TEST(TestMetricsCollector, TestInitClusterLevelMetrics) { + FLAGS_collector_cluster_level_metrics = "m1,m2,m3"; + auto collector = BuildCollector(); + ASSERT_OK(collector->InitClusterLevelMetrics()); + MetricsCollector::Metrics cluster_metrics({ + {"m1", 0}, + {"m2", 0}, + {"m3", 0}, + }); + ASSERT_EQ(collector->cluster_metrics_, cluster_metrics); +} +} // namespace collector +} // namespace kudu + diff --git a/src/kudu/collector/metrics_collector.cc b/src/kudu/collector/metrics_collector.cc new file mode 100644 index 0000000..e5adfcb --- /dev/null +++ b/src/kudu/collector/metrics_collector.cc @@ -0,0 +1,852 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/collector/metrics_collector.h" + +#include <string.h> + +#include <cmath> +#include <functional> +#include <list> +#include <ostream> +#include <set> +#include <utility> +#include <vector> + +#include <gflags/gflags.h> +#include <gflags/gflags_declare.h> +#include <glog/logging.h> +#include <rapidjson/rapidjson.h> + +#include "kudu/collector/collector_util.h" +#include "kudu/collector/nodes_checker.h" +#include "kudu/collector/reporter_base.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/strings/split.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/curl_util.h" +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/faststring.h" +#include "kudu/util/jsonreader.h" +#include "kudu/util/monotime.h" +#include "kudu/util/slice.h" +#include "kudu/util/status.h" +#include "kudu/util/string_case.h" +#include "kudu/util/thread.h" +#include "kudu/util/threadpool.h" +#include "kudu/util/trace.h" +#include "kudu/util/zlib.h" + +DEFINE_string(collector_attributes, "", + "Entity attributes to collect (semicolon-separated list of entity attribute " + "name and values). e.g. attr_name1:attr_val1,attr_val2;attr_name2:attr_val3"); +DEFINE_string(collector_cluster_level_metrics, "on_disk_size,on_disk_data_size", + "Metric names which should be merged and pushed to cluster level view " + "(comma-separated list of metric names)"); +DEFINE_bool(collector_ignore_hosttable_level_metrics, false, + "Whether to ignore to report host-table level metrics."); +DEFINE_string(collector_metrics, "", + "Metrics to collect (comma-separated list of metric names)"); +DEFINE_string(collector_metrics_types_for_test, "", + "Only for test, used to initialize metric_types_by_entity_type_"); +DEFINE_bool(collector_request_merged_metrics, true, + "Whether to request merged metrics and exclude unmerged metrics from server"); +DEFINE_string(collector_table_names, "", + "Table names to collect (comma-separated list of table names)"); + +DECLARE_string(collector_cluster_name); +DECLARE_int32(collector_interval_sec); +DECLARE_int32(collector_timeout_sec); +DECLARE_int32(collector_warn_threshold_ms); + +using rapidjson::Value; +using std::list; +using std::map; +using std::set; +using std::string; +using std::vector; +using std::unordered_map; +using strings::Substitute; + +namespace kudu { +namespace collector { + +const set<string> MetricsCollector::kRegisterPercentiles = {"percentile_99"}; + +MetricsCollector::MetricsCollector(scoped_refptr<NodesChecker> nodes_checker, + scoped_refptr<ReporterBase> reporter) + : initialized_(false), + nodes_checker_(std::move(nodes_checker)), + reporter_(std::move(reporter)), + stop_background_threads_latch_(1) { +} + +MetricsCollector::~MetricsCollector() { + Shutdown(); +} + +Status MetricsCollector::Init() { + CHECK(!initialized_); + + RETURN_NOT_OK(ValidateTableFilter(FLAGS_collector_attributes, FLAGS_collector_table_names)); + RETURN_NOT_OK(InitMetrics()); + RETURN_NOT_OK(InitFilters()); + CHECK(attributes_filter_.empty()); // TODO(yingchun) disable now + RETURN_NOT_OK(InitMetricsUrlParameters()); + RETURN_NOT_OK(InitClusterLevelMetrics()); + + initialized_ = true; + return Status::OK(); +} + +Status MetricsCollector::Start() { + CHECK(initialized_); + + RETURN_NOT_OK(StartMetricCollectorThread()); + + return Status::OK(); +} + +void MetricsCollector::Shutdown() { + if (initialized_) { + string name = ToString(); + LOG(INFO) << name << " shutting down..."; + + stop_background_threads_latch_.CountDown(); + + if (metric_collector_thread_) { + metric_collector_thread_->Join(); + } + + LOG(INFO) << name << " shutdown complete."; + } +} + +string MetricsCollector::ToString() const { + return "MetricsCollector"; +} + +Status MetricsCollector::StartMetricCollectorThread() { + return Thread::Create("server", "metric-collector", &MetricsCollector::MetricCollectorThread, + this, &metric_collector_thread_); +} + +void MetricsCollector::MetricCollectorThread() { + MonoTime collect_time; + do { + collect_time = MonoTime::Now(); + WARN_NOT_OK(CollectAndReportMetrics(), "Unable to collect metrics"); + collect_time += MonoDelta::FromSeconds(FLAGS_collector_interval_sec); + } while (!RunOnceMode() && !stop_background_threads_latch_.WaitUntil(collect_time)); + LOG(INFO) << "MetricCollectorThread exit"; +} + +Status MetricsCollector::UpdateThreadPool(int32_t thread_count) { + if (host_metric_collector_thread_pool_ && + host_metric_collector_thread_pool_->num_threads() == thread_count) { + return Status::OK(); + } + + if (host_metric_collector_thread_pool_) { + host_metric_collector_thread_pool_->Shutdown(); + } + TRACE("Old thread pool shutdown"); + + RETURN_NOT_OK(ThreadPoolBuilder("host-metric-collector") + .set_min_threads(thread_count) + .set_max_threads(thread_count) + .set_idle_timeout(MonoDelta::FromMilliseconds(1)) + .Build(&host_metric_collector_thread_pool_)); + TRACE("New thread pool built"); + + return Status::OK(); +} + +Status MetricsCollector::ValidateTableFilter(const string& attribute_filter, + const string& /*table_filter*/) { + if (attribute_filter.empty()) { + return Status::OK(); + } + + return Status::InvalidArgument("attribute filter is not supported now"); +} + +Status MetricsCollector::InitMetrics() { + string resp; + if (PREDICT_TRUE(FLAGS_collector_metrics_types_for_test.empty())) { + RETURN_NOT_OK(GetMetrics( + nodes_checker_->GetFirstMaster() + "/metrics?include_schema=1", &resp)); + } else { + resp = FLAGS_collector_metrics_types_for_test; + } + JsonReader r(resp); + RETURN_NOT_OK(r.Init()); + vector<const Value*> entities; + RETURN_NOT_OK(r.ExtractObjectArray(r.root(), nullptr, &entities)); + + map<string, MetricTypes> metric_types_by_entity_type; + bool tablet_entity_inited = false; + bool server_entity_inited = false; + for (const Value* entity : entities) { + string entity_type; + CHECK_OK(r.ExtractString(entity, "type", &entity_type)); + if (entity_type == "tablet") { + if (tablet_entity_inited) continue; + EmplaceOrDie(&metric_types_by_entity_type, std::make_pair("tablet", MetricTypes())); + auto& tablet_metric_types = FindOrDie(metric_types_by_entity_type, "tablet"); + ExtractMetricTypes(r, entity, &tablet_metric_types); + tablet_entity_inited = true; + } else if (entity_type == "server") { + if (server_entity_inited) continue; + EmplaceOrDie(&metric_types_by_entity_type, std::make_pair("server", MetricTypes())); + auto& server_metric_types = FindOrDie(metric_types_by_entity_type, "server"); + ExtractMetricTypes(r, entity, &server_metric_types); + server_entity_inited = true; + } else { + LOG(WARNING) << "unhandled entity type " << entity_type; + } + } + metric_types_by_entity_type_.swap(metric_types_by_entity_type); + return Status::OK(); +} + +Status MetricsCollector::ExtractMetricTypes(const JsonReader& r, + const Value* entity, + MetricTypes* metric_types) { + CHECK(metric_types); + vector<const Value*> metrics; + RETURN_NOT_OK(r.ExtractObjectArray(entity, "metrics", &metrics)); + for (const Value* metric : metrics) { + string name; + RETURN_NOT_OK(r.ExtractString(metric, "name", &name)); + string type; + RETURN_NOT_OK(r.ExtractString(metric, "type", &type)); + string upper_type; + ToUpperCase(type, &upper_type); + EmplaceOrDie(metric_types, std::make_pair(name, upper_type)); + } + return Status::OK(); +} + +Status MetricsCollector::InitFilters() { + unordered_map<string, set<string>> attributes_filter; + vector<string> attribute_values_by_name_list = + Split(FLAGS_collector_attributes, ";", strings::SkipEmpty()); + for (const auto& attribute_values_by_name : attribute_values_by_name_list) { + vector<string> attribute_name_and_values = + Split(attribute_values_by_name, ":", strings::SkipEmpty()); + CHECK_EQ(attribute_name_and_values.size(), 2); + set<string> values(Split(attribute_name_and_values[1], ",", strings::SkipEmpty())); + CHECK(!values.empty()); + EmplaceOrDie(&attributes_filter, std::make_pair(attribute_name_and_values[0], values)); + } + attributes_filter_.swap(attributes_filter); + return Status::OK(); +} + +Status MetricsCollector::InitMetricsUrlParameters() { + metric_url_parameters_ = "/metrics?compact=1"; + if (!FLAGS_collector_metrics.empty()) { + metric_url_parameters_ += "&metrics=" + FLAGS_collector_metrics; + } + if (FLAGS_collector_request_merged_metrics) { + metric_url_parameters_ += "&origin=false&merge=true"; + } else { + LOG(FATAL) << "Non-merge mode is not supported now, you should set " + "FLAGS_collector_request_merged_metrics to true if you " + "want collector work well"; + } + + // TODO(yingchun) This is supported since version 1.10 + if (!attributes_filter_.empty()) { + metric_url_parameters_ += "&attributes="; + } + for (const auto& attribute_filter : attributes_filter_) { + for (const auto& value : attribute_filter.second) { + metric_url_parameters_ += Substitute("$0,$1,", attribute_filter.first, value); + } + } + // TODO(yingchun) This is supported since internal version 1.8.0 + if (!FLAGS_collector_table_names.empty()) { + metric_url_parameters_ += "&table_names=" + FLAGS_collector_table_names; + } + return Status::OK(); +} + +Status MetricsCollector::InitClusterLevelMetrics() { + Metrics cluster_metrics; + vector<string> metric_names = + Split(FLAGS_collector_cluster_level_metrics, ",", strings::SkipEmpty()); + for (const auto& metric_name : metric_names) { + cluster_metrics[metric_name] = 0; + } + cluster_metrics_.swap(cluster_metrics); + return Status::OK(); +} + +Status MetricsCollector::CollectAndReportMetrics() { + LOG(INFO) << "Start to CollectAndReportMetrics"; + MonoTime start(MonoTime::Now()); + scoped_refptr<Trace> trace(new Trace); + ADOPT_TRACE(trace.get()); + TRACE_EVENT0("collector", "MetricsCollector::CollectAndReportMetrics"); + TRACE("init"); + vector<string> tserver_http_addrs = nodes_checker_->GetNodes(); + TRACE("Nodes got"); + if (tserver_http_addrs.empty()) { + return Status::OK(); + } + RETURN_NOT_OK(UpdateThreadPool(static_cast<int32_t>(tserver_http_addrs.size()))); + vector<TablesMetrics> hosts_metrics_by_table_name(tserver_http_addrs.size()); + vector<TablesHistMetrics> hosts_hist_metrics_by_table_name(tserver_http_addrs.size()); + for (int i = 0; i < tserver_http_addrs.size(); ++i) { + RETURN_NOT_OK(host_metric_collector_thread_pool_->SubmitFunc( + std::bind(&MetricsCollector::CollectAndReportHostLevelMetrics, + this, + tserver_http_addrs[i] + metric_url_parameters_, + &hosts_metrics_by_table_name[i], + &hosts_hist_metrics_by_table_name[i]))); + } + TRACE("Thead pool jobs submitted"); + host_metric_collector_thread_pool_->Wait(); + TRACE("Thead pool jobs done"); + + // Merge to table level metrics. + TablesMetrics metrics_by_table_name; + TablesHistMetrics hist_metrics_by_table_name; + RETURN_NOT_OK(MergeToTableLevelMetrics(hosts_metrics_by_table_name, + hosts_hist_metrics_by_table_name, + &metrics_by_table_name, + &hist_metrics_by_table_name)); + + // Merge to cluster level metrics. + Metrics cluster_metrics(cluster_metrics_); + RETURN_NOT_OK(MergeToClusterLevelMetrics(metrics_by_table_name, + hist_metrics_by_table_name, + &cluster_metrics)); + + auto timestamp = static_cast<uint64_t>(WallTime_Now()); + + // Push table level metrics. + RETURN_NOT_OK(ReportTableLevelMetrics(timestamp, + metrics_by_table_name, + hist_metrics_by_table_name)); + + // Push cluster level metrics. + RETURN_NOT_OK(ReportClusterLevelMetrics(timestamp, cluster_metrics)); + + int64_t elapsed_ms = (MonoTime::Now() - start).ToMilliseconds(); + if (elapsed_ms > FLAGS_collector_warn_threshold_ms) { + if (Trace::CurrentTrace()) { + LOG(WARNING) << "Trace:" << std::endl + << Trace::CurrentTrace()->DumpToString(); + } + } + + return Status::OK(); +} + +Status MetricsCollector::MergeToTableLevelMetrics( + const vector<TablesMetrics>& hosts_metrics_by_table_name, + const vector<TablesHistMetrics>& hosts_hist_metrics_by_table_name, + TablesMetrics* metrics_by_table_name, + TablesHistMetrics* hist_metrics_by_table_name) { + CHECK(metrics_by_table_name); + CHECK(hist_metrics_by_table_name); + + // GAUGE/COUNTER type metrics. + int metrics_count = 0; + for (const auto& host_metrics_by_table_name : hosts_metrics_by_table_name) { + for (const auto& table_metrics1 : host_metrics_by_table_name) { + const auto& table_name = table_metrics1.first; + const auto& metrics = table_metrics1.second; + metrics_count += metrics.size(); + if (EmplaceIfNotPresent(metrics_by_table_name, std::make_pair(table_name, metrics))) { + continue; + } + // This table has been fetched by some other tserver. + auto& table_metrics = FindOrDie(*metrics_by_table_name, table_name); + for (const auto& metric_value : metrics) { + const auto& metric = metric_value.first; + const auto& value = metric_value.second; + if (EmplaceIfNotPresent(&table_metrics, std::make_pair(metric, value))) { + continue; + } + // This metric has been fetched by some other tserver. + auto& old_value = FindOrDie(table_metrics, metric); + old_value += value; + } + } + } + TRACE(Substitute("Table GAUGE/COUNTER type metrics merged, count $0", metrics_count)); + + // HISTOGRAM type metrics. + metrics_count = 0; + for (const auto& host_hist_metrics_by_table_name : hosts_hist_metrics_by_table_name) { + for (const auto& table_hist_metrics1 : host_hist_metrics_by_table_name) { + const auto& table_name = table_hist_metrics1.first; + const auto& metrics = table_hist_metrics1.second; + metrics_count += metrics.size(); + if (EmplaceIfNotPresent(hist_metrics_by_table_name, std::make_pair(table_name, metrics))) { + continue; + } + // This table has been fetched by some other tserver. + auto& table_hist_metrics = FindOrDie(*hist_metrics_by_table_name, table_name); + for (const auto& metric_hist_values : metrics) { + const auto& metric = metric_hist_values.first; + const auto& hist_values = metric_hist_values.second; + if (EmplaceIfNotPresent(&table_hist_metrics, std::make_pair(metric, hist_values))) { + continue; + } + // This metric has been fetched by some other tserver. + auto& old_hist_values = FindOrDie(table_hist_metrics, metric); + for (auto& hist_value : hist_values) { + old_hist_values.emplace_back(hist_value); + } + } + } + } + TRACE(Substitute("Table HISTOGRAM type metrics merged, count $0", metrics_count)); + + return Status::OK(); +} + +Status MetricsCollector::MergeToClusterLevelMetrics( + const TablesMetrics& metrics_by_table_name, + const TablesHistMetrics& /*hist_metrics_by_table_name*/, + Metrics* cluster_metrics) { + CHECK(cluster_metrics); + if (!cluster_metrics->empty()) { + for (const auto& table_metrics : metrics_by_table_name) { + for (auto& cluster_metric : *cluster_metrics) { + auto *find = FindOrNull(table_metrics.second, cluster_metric.first); + if (find) { + cluster_metric.second += *find; + } + } + } + } + TRACE(Substitute("Cluster metrics merged, count $0", cluster_metrics->size())); + + return Status::OK(); +} + +Status MetricsCollector::GetNumberMetricValue(const rapidjson::Value* metric, + const string& metric_name /*metric_name*/, + int64_t* result) const { + CHECK(result); + if (metric->IsUint64() || metric->IsInt64() || metric->IsUint() || metric->IsInt()) { + *result = metric->GetInt64(); + return Status::OK(); + } + + if (metric->IsDouble()) { + double result_temp = metric->GetDouble(); + // Multiply by 1000000 and convert to int64_t to avoid much data loss and keep compatibility + // with monitor system like Falcon. + *result = static_cast<int64_t>(result_temp * 1000000); + return Status::OK(); + } + + return Status::NotSupported(Substitute("unsupported metric $0", metric_name)); +} + +Status MetricsCollector::GetStringMetricValue(const Value* metric, + const string& metric_name, + int64_t* result) const { + CHECK(result); + string value(metric->GetString()); + if (metric_name == "state") { + return ConvertStateToInt(value, result); + } + return Status::NotSupported(Substitute("unsupported metric $0", metric_name)); +} + +Status MetricsCollector::ConvertStateToInt(const string& value, int64_t* result) { + CHECK(result); + // TODO(yingchun) Here, table state is merged by several original tablet states, which is + // contacted by several sub-strings, like 'RUNNING', 'BOOTSTRAPPING', etc. It's tricky to + // fetch state now, we will improve in server side later. + const char* running = "RUNNING"; + if (value.empty() || value.size() % strlen(running) != 0) { + *result = 0; + return Status::OK(); + } + for (int i = 0; i < value.size(); i += strlen(running)) { + if (0 != strncmp(running, value.c_str() + i, strlen(running))) { + *result = 0; + return Status::OK(); + } + } + *result = 1; + return Status::OK(); +} + +bool MetricsCollector::FilterByAttribute(const JsonReader& r, + const rapidjson::Value* entity) const { + if (attributes_filter_.empty()) { + return false; + } + const Value* attributes; + CHECK_OK(r.ExtractObject(entity, "attributes", &attributes)); + for (const auto& name_values : attributes_filter_) { + string value; + Status s = r.ExtractString(attributes, name_values.first.c_str(), &value); + if (s.ok() && ContainsKey(name_values.second, value)) { + return false; + } + } + return true; +} + +Status MetricsCollector::ParseServerMetrics(const JsonReader& /*r*/, + const rapidjson::Value* /*entity*/) { + return Status::NotSupported("server entity is not supported"); +} + +Status MetricsCollector::ParseTableMetrics(const JsonReader& r, + const rapidjson::Value* entity, + TablesMetrics* metrics_by_table_name, + Metrics* host_metrics, + TablesHistMetrics* hist_metrics_by_table_name, + HistMetrics* host_hist_metrics) const { + CHECK(metrics_by_table_name); + CHECK(host_metrics); + CHECK(hist_metrics_by_table_name); + CHECK(host_hist_metrics); + + string table_name; + CHECK_OK(r.ExtractString(entity, "id", &table_name)); + CHECK(!ContainsKey(*metrics_by_table_name, table_name)); + CHECK(!ContainsKey(*hist_metrics_by_table_name, table_name)); + + EmplaceOrDie(metrics_by_table_name, std::make_pair(table_name, Metrics())); + auto& table_metrics = FindOrDie(*metrics_by_table_name, table_name); + + EmplaceOrDie(hist_metrics_by_table_name, std::make_pair(table_name, HistMetrics())); + auto& table_hist_metrics = FindOrDie(*hist_metrics_by_table_name, table_name); + + vector<const Value*> metrics; + CHECK_OK(r.ExtractObjectArray(entity, "metrics", &metrics)); + for (const Value* metric : metrics) { + string name; + CHECK_OK(r.ExtractString(metric, "name", &name)); + const auto* tablet_metric_types = FindOrNull(metric_types_by_entity_type_, "tablet"); + CHECK(tablet_metric_types); + const auto* known_type = FindOrNull(*tablet_metric_types, name); + if (!known_type) { + LOG(ERROR) << Substitute("metric $0 has unknown type, ignore it", name); + continue; + } + + if (*known_type == "GAUGE" || *known_type == "COUNTER") { + int64_t value = 0; + const Value* val; + RETURN_NOT_OK(r.ExtractField(metric, "value", &val)); + rapidjson::Type type = val->GetType(); + switch (type) { + case rapidjson::Type::kStringType: + CHECK_OK(GetStringMetricValue(val, name, &value)); + break; + case rapidjson::Type::kNumberType: + CHECK_OK(GetNumberMetricValue(val, name, &value)); + break; + default: + LOG(FATAL) << "Unknown type, metrics name: " << name; + } + + EmplaceOrDie(&table_metrics, std::make_pair(name, value)); + if (!EmplaceIfNotPresent(host_metrics, std::make_pair(name, value))) { + auto& host_metric = FindOrDie(*host_metrics, name); + host_metric += value; + } + } else if (*known_type == "HISTOGRAM") { + for (const auto& percentile : kRegisterPercentiles) { + string hist_metric_name(name); + hist_metric_name += "_" + percentile; + int64_t total_count; + CHECK_OK(r.ExtractInt64(metric, "total_count", &total_count)); + int64_t percentile_value; + CHECK_OK(r.ExtractInt64(metric, percentile.c_str(), &percentile_value)); + vector<SimpleHistogram> tmp({{total_count, percentile_value}}); + EmplaceOrDie(&table_hist_metrics, std::make_pair(hist_metric_name, tmp)); + if (!EmplaceIfNotPresent(host_hist_metrics, std::make_pair(hist_metric_name, tmp))) { + auto& host_hist_metric = FindOrDie(*host_hist_metrics, hist_metric_name); + host_hist_metric.emplace_back(tmp[0]); + } + } + } else { + LOG(FATAL) << "Unknown metric type: " << *known_type; + } + } + + return Status::OK(); +} + +Status MetricsCollector::ParseTabletMetrics(const JsonReader& /*r*/, + const rapidjson::Value* /*entity*/) { + return Status::NotSupported("tablet entity is not supported"); +} + +Status MetricsCollector::CollectAndReportHostLevelMetrics( + const string& url, + TablesMetrics* metrics_by_table_name, + TablesHistMetrics* hist_metrics_by_table_name) { + MonoTime start(MonoTime::Now()); + scoped_refptr<Trace> trace(new Trace); + ADOPT_TRACE(trace.get()); + TRACE_EVENT1("collector", "MetricsCollector::CollectAndReportHostLevelMetrics", + "url", url); + TRACE("init"); + CHECK(metrics_by_table_name); + CHECK(hist_metrics_by_table_name); + + // Get metrics from server. + string resp; + RETURN_NOT_OK(GetMetrics(url, &resp)); + + // Merge metrics by table and metric type. + Metrics host_metrics; + HistMetrics host_hist_metrics; + RETURN_NOT_OK(ParseMetrics(resp, metrics_by_table_name, &host_metrics, + hist_metrics_by_table_name, &host_hist_metrics)); + + string host_name = ExtractHostName(url); + auto timestamp = static_cast<uint64_t>(WallTime_Now()); + + // Host table level. + if (!FLAGS_collector_ignore_hosttable_level_metrics) { + RETURN_NOT_OK(ReportHostTableLevelMetrics(host_name, timestamp, + *metrics_by_table_name, + *hist_metrics_by_table_name)); + } + + // Host level. + RETURN_NOT_OK(ReportHostLevelMetrics(host_name, timestamp, + host_metrics, + host_hist_metrics)); + + int64_t elapsed_ms = (MonoTime::Now() - start).ToMilliseconds(); + if (elapsed_ms > FLAGS_collector_warn_threshold_ms) { + if (Trace::CurrentTrace()) { + LOG(WARNING) << "Trace:" << std::endl + << Trace::CurrentTrace()->DumpToString(); + } + } + return Status::OK(); +} + +Status MetricsCollector::ParseMetrics(const string& data, + TablesMetrics* metrics_by_table_name, + Metrics* host_metrics, + TablesHistMetrics* hist_metrics_by_table_name, + HistMetrics* host_hist_metrics) { + JsonReader r(data); + RETURN_NOT_OK(r.Init()); + vector<const Value*> entities; + RETURN_NOT_OK(r.ExtractObjectArray(r.root(), nullptr, &entities)); + + for (const Value* entity : entities) { + if (FilterByAttribute(r, entity)) { + continue; + } + string entity_type; + CHECK_OK(r.ExtractString(entity, "type", &entity_type)); + if (entity_type == "server") { + CHECK(ParseServerMetrics(r, entity).IsNotSupported()); + } else if (entity_type == "table") { + CHECK_OK(ParseTableMetrics(r, entity, + metrics_by_table_name, host_metrics, + hist_metrics_by_table_name, host_hist_metrics)); + } else if (entity_type == "tablet") { + CHECK(ParseTabletMetrics(r, entity).IsNotSupported()); + } else { + LOG(FATAL) << "Unknown entity_type: " << entity_type; + } + } + TRACE(Substitute("Metrics parsed, entity count $0", entities.size())); + + return Status::OK(); +} + +void MetricsCollector::CollectMetrics(const string& endpoint, + const Metrics& metrics, + const std::string& level, + uint64_t timestamp, + const std::string& extra_tags, + list<scoped_refptr<ItemBase>>* items) { + for (const auto& metric : metrics) { + items->emplace_back( + reporter_->ConstructItem(endpoint, + metric.first, + level, + timestamp, + metric.second, + FindOrDie(metric_types_by_entity_type_["tablet"], metric.first), + extra_tags)); + } +} + +void MetricsCollector::CollectMetrics(const string& endpoint, + const HistMetrics& metrics, + const string& level, + uint64_t timestamp, + const string& extra_tags, + list<scoped_refptr<ItemBase>>* items) { + for (const auto& metric : metrics) { + items->emplace_back( + reporter_->ConstructItem(endpoint, + metric.first, + level, + timestamp, + GetHistValue(metric.second), + "GAUGE", + extra_tags)); + } +} + +Status MetricsCollector::ReportHostTableLevelMetrics( + const string& host_name, + uint64_t timestamp, + const TablesMetrics& metrics_by_table_name, + const TablesHistMetrics& hist_metrics_by_table_name) { + list<scoped_refptr<ItemBase>> items; + // GAUGE/COUNTER type metrics. + int metrics_count = 0; + for (const auto& table_metrics : metrics_by_table_name) { + const auto extra_tag = Substitute("table=$0", table_metrics.first); + metrics_count += table_metrics.second.size(); + CollectMetrics(host_name, table_metrics.second, "host_table", timestamp, extra_tag, &items); + } + TRACE(Substitute("Host-table GAUGE/COUNTER type metrics collected, count $0", metrics_count)); + + // HISTOGRAM type metrics. + int hist_metrics_count = 0; + for (const auto& table_hist_metrics : hist_metrics_by_table_name) { + const auto extra_tag = Substitute("table=$0", table_hist_metrics.first); + hist_metrics_count += table_hist_metrics.second.size(); + CollectMetrics(host_name, table_hist_metrics.second, + "host_table", timestamp, extra_tag, + &items); + } + TRACE(Substitute("Host-table HISTOGRAM type metrics collected, count $0", hist_metrics_count)); + + reporter_->PushItems(std::move(items)); + TRACE(Substitute("Host-table metrics reported, count $0", metrics_count + hist_metrics_count)); + + return Status::OK(); +} + +Status MetricsCollector::ReportHostLevelMetrics( + const string& host_name, + uint64_t timestamp, + const Metrics& host_metrics, + const HistMetrics& host_hist_metrics) { + list<scoped_refptr<ItemBase>> items; + // GAUGE/COUNTER type metrics. + CollectMetrics(host_name, host_metrics, "host", timestamp, "", &items); + TRACE(Substitute("Host GAUGE/COUNTER type metrics collected, count $0", host_metrics.size())); + + // HISTOGRAM type metrics. + CollectMetrics(host_name, host_hist_metrics, "host", timestamp, "", &items); + TRACE(Substitute("Host HISTOGRAM type metrics collected, count $0", host_hist_metrics.size())); + + reporter_->PushItems(std::move(items)); + TRACE(Substitute("Host metrics reported, count $0", + host_metrics.size() + host_hist_metrics.size())); + + return Status::OK(); +} + +Status MetricsCollector::ReportTableLevelMetrics( + uint64_t timestamp, + const TablesMetrics& metrics_by_table_name, + const TablesHistMetrics& hist_metrics_by_table_name) { + list<scoped_refptr<ItemBase>> items; + // GAUGE/COUNTER type metrics. + int metrics_count = 0; + for (const auto& table_metrics : metrics_by_table_name) { + metrics_count += table_metrics.second.size(); + CollectMetrics(table_metrics.first, + table_metrics.second, + "table", timestamp, "", &items); + } + TRACE(Substitute("Table GAUGE/COUNTER type metrics collected, count $0", metrics_count)); + + // HISTOGRAM type metrics. + int hist_metrics_count = 0; + for (const auto& table_hist_metrics : hist_metrics_by_table_name) { + hist_metrics_count += table_hist_metrics.second.size(); + CollectMetrics(table_hist_metrics.first, + table_hist_metrics.second, + "table", timestamp, "", &items); + } + TRACE(Substitute("Table HISTOGRAM type metrics collected, count $0", hist_metrics_count)); + + reporter_->PushItems(std::move(items)); + TRACE(Substitute("Table metrics reported, count $0", metrics_count + hist_metrics_count)); + + return Status::OK(); +} + +Status MetricsCollector::ReportClusterLevelMetrics(uint64_t timestamp, + const Metrics& cluster_metrics) { + list<scoped_refptr<ItemBase>> items; + CollectMetrics(FLAGS_collector_cluster_name, cluster_metrics, "cluster", timestamp, "", &items); + TRACE(Substitute("Cluster metrics collected, count $0", cluster_metrics.size())); + + reporter_->PushItems(std::move(items)); + TRACE(Substitute("Cluster metrics reported, count $0", cluster_metrics.size())); + + return Status::OK(); +} + +int64_t MetricsCollector::GetHistValue(const vector<SimpleHistogram>& hist_values) { + int64_t total_count = 0; + double total_value = 0.0; + for (const auto& hist_value : hist_values) { + total_count += hist_value.count; + total_value += hist_value.count * hist_value.value; + } + int64_t value = 0; + if (total_count != 0) { + value = std::llround(total_value / total_count); + } + return value; +} + +Status MetricsCollector::GetMetrics(const string& url, string* resp) { + CHECK(resp); + EasyCurl curl; + faststring dst; + //curl.set_return_headers(true); + RETURN_NOT_OK(curl.FetchURL(url, &dst, {"Accept-Encoding: gzip"})); + std::ostringstream oss; + string dst_str = dst.ToString(); + if (zlib::Uncompress(Slice(dst_str), &oss).ok()) { + *resp = oss.str(); + } else { + *resp = dst_str; + } + TRACE(Substitute("Metrics got from server: $0", url)); + + return Status::OK(); +} +} // namespace collector +} // namespace kudu diff --git a/src/kudu/collector/metrics_collector.h b/src/kudu/collector/metrics_collector.h new file mode 100644 index 0000000..1435e52 --- /dev/null +++ b/src/kudu/collector/metrics_collector.h @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <cstdint> +#include <list> +#include <map> +#include <memory> +#include <set> +#include <string> +#include <unordered_map> +#include <vector> + +#include <gtest/gtest_prod.h> +#include <rapidjson/document.h> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/status.h" + +namespace kudu { +class JsonReader; +class Thread; +class ThreadPool; + +namespace collector { +struct ItemBase; +} // namespace collector +} // namespace kudu + +namespace kudu { + +namespace collector { + +class NodesChecker; +class ReporterBase; + +class MetricsCollector : public RefCounted<MetricsCollector> { + public: + MetricsCollector(scoped_refptr<NodesChecker> nodes_checker, + scoped_refptr<ReporterBase> reporter); + ~MetricsCollector(); + + Status Init(); + Status Start(); + void Shutdown(); + + std::string ToString() const; + + private: + friend class RefCounted<MetricsCollector>; + + FRIEND_TEST(TestMetricsCollector, TestConvertStateToInt); + FRIEND_TEST(TestMetricsCollector, TestGetHistValue); + FRIEND_TEST(TestMetricsCollector, TestMergeToTableLevelMetrics); + FRIEND_TEST(TestMetricsCollector, TestMergeToClusterLevelMetrics); + FRIEND_TEST(TestMetricsCollector, TestParseMetrics); + FRIEND_TEST(TestMetricsCollector, TestInitMetrics); + FRIEND_TEST(TestMetricsCollector, TestInitFilters); + FRIEND_TEST(TestMetricsCollector, TestInitMetricsUrlParameters); + FRIEND_TEST(TestMetricsCollector, TestInitClusterLevelMetrics); + + typedef std::unordered_map<std::string, int64_t> Metrics; + typedef std::unordered_map<std::string, Metrics> TablesMetrics; + struct SimpleHistogram { + int64_t count; + int64_t value; + SimpleHistogram(int64_t c, int64_t v) : count(c), value(v) { + } + inline bool operator==(const SimpleHistogram& rhs) const { + return count == rhs.count && value == rhs.value; + } + }; + + typedef std::unordered_map<std::string, std::vector<SimpleHistogram>> HistMetrics; + typedef std::unordered_map<std::string, HistMetrics> TablesHistMetrics; + + typedef std::unordered_map<std::string, std::string> MetricTypes; + + Status ValidateTableFilter(const std::string& attribute_filter, const std::string& table_filter); + Status InitMetrics(); + static Status ExtractMetricTypes(const JsonReader& r, + const rapidjson::Value* entity, + MetricTypes* metric_types); + Status InitFilters(); + Status InitMetricsUrlParameters(); + Status InitClusterLevelMetrics(); + + Status StartMetricCollectorThread(); + void MetricCollectorThread(); + Status CollectAndReportMetrics(); + + Status UpdateThreadPool(int32_t thread_count); + + Status CollectAndReportHostLevelMetrics(const std::string& url, + TablesMetrics* metrics_by_table_name, + TablesHistMetrics* hist_metrics_by_table_name); + + static Status MergeToTableLevelMetrics( + const std::vector<TablesMetrics>& hosts_metrics_by_table_name, + const std::vector<TablesHistMetrics>& hosts_hist_metrics_by_table_name, + TablesMetrics* metrics_by_table_name, + TablesHistMetrics* hist_metrics_by_table_name); + static Status MergeToClusterLevelMetrics(const TablesMetrics& metrics_by_table_name, + const TablesHistMetrics& hist_metrics_by_table_name, + Metrics* cluster_metrics); + + // Report metrics to third-party monitor system. + void CollectMetrics(const std::string& endpoint, + const Metrics& metrics, + const std::string& level, + uint64_t timestamp, + const std::string& extra_tags, + std::list<scoped_refptr<ItemBase>>* items); + void CollectMetrics(const std::string& endpoint, + const HistMetrics& metrics, + const std::string& level, + uint64_t timestamp, + const std::string& extra_tags, + std::list<scoped_refptr<ItemBase>>* items); + + Status ReportHostTableLevelMetrics(const std::string& host_name, + uint64_t timestamp, + const TablesMetrics& metrics_by_table_name, + const TablesHistMetrics& hist_metrics_by_table_name); + Status ReportHostLevelMetrics(const std::string& host_name, + uint64_t timestamp, + const Metrics& host_metrics, + const HistMetrics& host_hist_metrics); + Status ReportTableLevelMetrics(uint64_t timestamp, + const TablesMetrics& metrics_by_table_name, + const TablesHistMetrics& hist_metrics_by_table_name); + Status ReportClusterLevelMetrics(uint64_t timestamp, + const Metrics& cluster_metrics); + static int64_t GetHistValue(const std::vector<SimpleHistogram>& hist_values); + + // Get metrics from server by http method. + static Status GetMetrics(const std::string& url, std::string* resp); + + // Parse metrics from http response, entities may be in different types. + Status ParseMetrics(const std::string& data, + TablesMetrics* metrics_by_table_name, + Metrics* host_metrics, + TablesHistMetrics* hist_metrics_by_table_name, + HistMetrics* host_hist_metrics); + static Status ParseServerMetrics(const JsonReader& r, + const rapidjson::Value* entity); + Status ParseTableMetrics(const JsonReader& r, + const rapidjson::Value* entity, + TablesMetrics* metrics_by_table_name, + Metrics* host_metrics, + TablesHistMetrics* hist_metrics_by_table_name, + HistMetrics* host_hist_metrics) const; + static Status ParseTabletMetrics(const JsonReader& r, + const rapidjson::Value* entity); + + // Return true when this entity could be filtered. + // When server side support attributes filter, this function has no effect. + bool FilterByAttribute(const JsonReader& r, + const rapidjson::Value* entity) const; + Status GetNumberMetricValue(const rapidjson::Value* metric, + const std::string& metric_name, + int64_t* result) const; + Status GetStringMetricValue(const rapidjson::Value* metric, + const std::string& metric_name, + int64_t* result) const; + static Status ConvertStateToInt(const std::string& value, int64_t* result); + + static const std::set<std::string> kRegisterPercentiles; + + bool initialized_; + + scoped_refptr<NodesChecker> nodes_checker_; + scoped_refptr<ReporterBase> reporter_; + + std::map<std::string, MetricTypes> metric_types_by_entity_type_; + // Attribute filter, attributes not in this map will be filtered if it's not empty. + // attribute name ---> attribute values + std::unordered_map<std::string, std::set<std::string>> attributes_filter_; + std::string metric_url_parameters_; + Metrics cluster_metrics_; + + CountDownLatch stop_background_threads_latch_; + scoped_refptr<Thread> metric_collector_thread_; + std::unique_ptr<ThreadPool> host_metric_collector_thread_pool_; + + DISALLOW_COPY_AND_ASSIGN(MetricsCollector); +}; +} // namespace collector +} // namespace kudu diff --git a/src/kudu/collector/nodes_checker-test.cc b/src/kudu/collector/nodes_checker-test.cc new file mode 100644 index 0000000..2390a52 --- /dev/null +++ b/src/kudu/collector/nodes_checker-test.cc @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/collector/nodes_checker.h" + +#include <gtest/gtest.h> + +#include "kudu/rebalance/cluster_status.h" + +using kudu::cluster_summary::ServerHealth; +using kudu::cluster_summary::HealthCheckResult; + +namespace kudu { +namespace collector { + +TEST(TestNodesChecker, TestExtractServerHealthStatus) { + ASSERT_EQ(ServerHealth::HEALTHY, + NodesChecker::ExtractServerHealthStatus("HEALTHY")); + ASSERT_EQ(ServerHealth::UNAUTHORIZED, + NodesChecker::ExtractServerHealthStatus("UNAUTHORIZED")); + ASSERT_EQ(ServerHealth::UNAVAILABLE, + NodesChecker::ExtractServerHealthStatus("UNAVAILABLE")); + ASSERT_EQ(ServerHealth::WRONG_SERVER_UUID, + NodesChecker::ExtractServerHealthStatus("WRONG_SERVER_UUID")); +} + +TEST(TestNodesChecker, TestExtractTableHealthStatus) { + ASSERT_EQ(HealthCheckResult::HEALTHY, + NodesChecker::ExtractTableHealthStatus("HEALTHY")); + ASSERT_EQ(HealthCheckResult::RECOVERING, + NodesChecker::ExtractTableHealthStatus("RECOVERING")); + ASSERT_EQ(HealthCheckResult::UNDER_REPLICATED, + NodesChecker::ExtractTableHealthStatus("UNDER_REPLICATED")); + ASSERT_EQ(HealthCheckResult::UNAVAILABLE, + NodesChecker::ExtractTableHealthStatus("UNAVAILABLE")); + ASSERT_EQ(HealthCheckResult::CONSENSUS_MISMATCH, + NodesChecker::ExtractTableHealthStatus("CONSENSUS_MISMATCH")); +} +} // namespace collector +} // namespace kudu + diff --git a/src/kudu/collector/nodes_checker.cc b/src/kudu/collector/nodes_checker.cc new file mode 100644 index 0000000..8b64c29 --- /dev/null +++ b/src/kudu/collector/nodes_checker.cc @@ -0,0 +1,358 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/collector/nodes_checker.h" + +#include <cstdint> +#include <list> +#include <mutex> +#include <ostream> +#include <utility> +#include <vector> + +#include <gflags/gflags_declare.h> +#include <glog/logging.h> +#include <rapidjson/document.h> + +#include "kudu/collector/collector_util.h" +#include "kudu/collector/reporter_base.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/walltime.h" +#include "kudu/tools/tool_test_util.h" +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/jsonreader.h" +#include "kudu/util/locks.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" +#include "kudu/util/thread.h" +#include "kudu/util/trace.h" + +DECLARE_string(collector_cluster_name); +DECLARE_string(collector_master_addrs); +DECLARE_int32(collector_interval_sec); +DECLARE_int32(collector_timeout_sec); +DECLARE_int32(collector_warn_threshold_ms); + +using rapidjson::Value; +using std::list; +using std::string; +using std::vector; +using strings::Substitute; +using kudu::cluster_summary::ServerHealth; +using kudu::cluster_summary::HealthCheckResult; + +namespace kudu { +namespace collector { + +const std::string NodesChecker::kMaster = "master"; +const std::string NodesChecker::kTserver = "tserver"; + +NodesChecker::NodesChecker(scoped_refptr<ReporterBase> reporter) + : initialized_(false), + reporter_(std::move(reporter)), + stop_background_threads_latch_(1) { +} + +NodesChecker::~NodesChecker() { + Shutdown(); +} + +Status NodesChecker::Init() { + CHECK(!initialized_); + + RETURN_NOT_OK(UpdateNodes()); + CHECK(!master_http_addrs_.empty()); + + initialized_ = true; + return Status::OK(); +} + +Status NodesChecker::Start() { + CHECK(initialized_); + + RETURN_NOT_OK(StartNodesCheckerThread()); + + return Status::OK(); +} + +void NodesChecker::Shutdown() { + if (initialized_) { + string name = ToString(); + LOG(INFO) << name << " shutting down..."; + + stop_background_threads_latch_.CountDown(); + + if (nodes_checker_thread_) { + nodes_checker_thread_->Join(); + } + + LOG(INFO) << name << " shutdown complete."; + } +} + +string NodesChecker::ToString() const { + return "NodesChecker"; +} + +vector<string> NodesChecker::GetNodes() { + shared_lock<RWMutex> l(nodes_lock_); + return tserver_http_addrs_; +} + +string NodesChecker::GetFirstMaster() { + shared_lock<RWMutex> l(nodes_lock_); + CHECK(!master_http_addrs_.empty()); + return master_http_addrs_[0]; +} + +Status NodesChecker::StartNodesCheckerThread() { + return Thread::Create("collector", "nodes-checker", &NodesChecker::NodesCheckerThread, + this, &nodes_checker_thread_); +} + +void NodesChecker::NodesCheckerThread() { + MonoTime check_time; + do { + check_time = MonoTime::Now(); + UpdateAndCheckNodes(); + check_time += MonoDelta::FromSeconds(FLAGS_collector_interval_sec); + } while (!RunOnceMode() && !stop_background_threads_latch_.WaitUntil(check_time)); + LOG(INFO) << "FalconPusherThread exit"; +} + +void NodesChecker::UpdateAndCheckNodes() { + LOG(INFO) << "Start to UpdateAndCheckNodes"; + MonoTime start(MonoTime::Now()); + scoped_refptr<Trace> trace(new Trace); + ADOPT_TRACE(trace.get()); + TRACE_EVENT0("collector", "NodesChecker::UpdateAndCheckNodes"); + WARN_NOT_OK(UpdateNodes(), "Unable to update nodes"); + WARN_NOT_OK(CheckNodes(), "Unable to check nodes"); + int64_t elapsed_ms = (MonoTime::Now() - start).ToMilliseconds(); + if (elapsed_ms > FLAGS_collector_warn_threshold_ms) { + if (Trace::CurrentTrace()) { + LOG(WARNING) << "Trace:" << std::endl + << Trace::CurrentTrace()->DumpToString(); + } + } +} + +Status NodesChecker::UpdateNodes() { + RETURN_NOT_OK(UpdateServers(kMaster)); + RETURN_NOT_OK(UpdateServers(kTserver)); + return Status::OK(); +} + +Status NodesChecker::UpdateServers(const std::string& role) { + DCHECK(role == kTserver || role == kMaster); + vector<string> args = { + role, + "list", + FLAGS_collector_master_addrs, + "-columns=http-addresses", + "-format=json", + Substitute("-timeout_ms=$0", FLAGS_collector_timeout_sec*1000) + }; + string tool_stdout; + string tool_stderr; + RETURN_NOT_OK_PREPEND(tools::RunKuduTool(args, &tool_stdout, &tool_stderr), + Substitute("out: $0, err: $1", tool_stdout, tool_stderr)); + TRACE(Substitute("'$0 list' done", role)); + + JsonReader r(tool_stdout); + RETURN_NOT_OK(r.Init()); + vector<const Value*> servers; + CHECK_OK(r.ExtractObjectArray(r.root(), nullptr, &servers)); + vector<string> server_http_addrs; + for (const Value* server : servers) { + string http_address; + CHECK_OK(r.ExtractString(server, "http-addresses", &http_address)); + server_http_addrs.emplace_back(http_address); + } + TRACE(Substitute("Result parsed, nodes count $0", server_http_addrs.size())); + + if (role == kTserver) { + std::lock_guard<RWMutex> l(nodes_lock_); + tserver_http_addrs_.swap(server_http_addrs); + } else { + std::lock_guard<RWMutex> l(nodes_lock_); + master_http_addrs_.swap(server_http_addrs); + } + TRACE("Nodes updated"); + + return Status::OK(); +} + +Status NodesChecker::CheckNodes() const { + vector<string> args = { + "cluster", + "ksck", + FLAGS_collector_master_addrs, + "-consensus=false", + "-ksck_format=json_compact", + "-color=never", + "-sections=MASTER_SUMMARIES,TSERVER_SUMMARIES,TABLE_SUMMARIES,TOTAL_COUNT", + Substitute("-timeout_ms=$0", FLAGS_collector_timeout_sec*1000) + }; + string tool_stdout; + string tool_stderr; + WARN_NOT_OK(tools::RunKuduTool(args, &tool_stdout, &tool_stderr), + Substitute("out: $0, err: $1", tool_stdout, tool_stderr)); + + TRACE("'cluster ksck' done"); + + RETURN_NOT_OK(ReportNodesMetrics(tool_stdout)); + return Status::OK(); +} + +Status NodesChecker::ReportNodesMetrics(const string& data) const { + JsonReader r(data); + RETURN_NOT_OK(r.Init()); + const Value* ksck; + CHECK_OK(r.ExtractObject(r.root(), nullptr, &ksck)); + auto timestamp = static_cast<uint64_t>(WallTime_Now()); + + list<scoped_refptr<ItemBase>> items; + // Maters health info. + vector<const Value*> masters; + CHECK_OK(r.ExtractObjectArray(ksck, "master_summaries", &masters)); + for (const Value* master : masters) { + string address; + CHECK_OK(r.ExtractString(master, "address", &address)); + string health; + CHECK_OK(r.ExtractString(master, "health", &health)); + items.emplace_back(reporter_->ConstructItem( + ExtractHostName(address), + "kudu-master-health", + "host", + timestamp, + static_cast<int64_t>(ExtractServerHealthStatus(health)), + "GAUGE", + "")); + } + TRACE(Substitute("Maters health info reported, count $0", masters.size())); + + // Tservers health info. + vector<const Value*> tservers; + Status s = r.ExtractObjectArray(ksck, "tserver_summaries", &tservers); + CHECK(s.ok() || s.IsNotFound()); + if (s.ok()) { + for (const Value* tserver : tservers) { + string address; + CHECK_OK(r.ExtractString(tserver, "address", &address)); + string health; + CHECK_OK(r.ExtractString(tserver, "health", &health)); + items.emplace_back(reporter_->ConstructItem( + ExtractHostName(address), + "kudu-tserver-health", + "host", + timestamp, + static_cast<int64_t>(ExtractServerHealthStatus(health)), + "GAUGE", + "")); + } + TRACE(Substitute("Tservers health info reported, count $0", tservers.size())); + } + + // Tables health info. + uint32_t health_table_count = 0; + vector<const Value*> tables; + s = r.ExtractObjectArray(ksck, "table_summaries", &tables); + CHECK(s.ok() || s.IsNotFound()); + if (s.ok()) { + for (const Value* table : tables) { + string name; + CHECK_OK(r.ExtractString(table, "name", &name)); + string health; + CHECK_OK(r.ExtractString(table, "health", &health)); + HealthCheckResult health_status = ExtractTableHealthStatus(health); + items.emplace_back(reporter_->ConstructItem( + name, + "kudu-table-health", + "table", + timestamp, + static_cast<int64_t>(health_status), + "GAUGE", + "")); + if (health_status == HealthCheckResult::HEALTHY) { + health_table_count += 1; + } + } + TRACE(Substitute("Tables health info reported, count $0", tables.size())); + } + + // Healthy table ratio. + if (!tables.empty()) { + items.emplace_back(reporter_->ConstructItem( + FLAGS_collector_cluster_name, + "healthy_table_proportion", + "cluster", + timestamp, + 100 * health_table_count / tables.size(), + "GAUGE", + "")); + TRACE("Healthy table ratio reported"); + } + + // Count summaries. + vector<const Value*> count_summaries; + CHECK_OK(r.ExtractObjectArray(ksck, "count_summaries", &count_summaries)); + for (const Value* count_summarie : count_summaries) { + // TODO(yingchun) should auto iterate items + static const vector<string> + count_names({"masters", "tservers", "tables", "tablets", "replicas"}); + for (const auto& name : count_names) { + int64_t count; + CHECK_OK(r.ExtractInt64(count_summarie, name.c_str(), &count)); + items.emplace_back(reporter_->ConstructItem( + FLAGS_collector_cluster_name, + name + "_count", + "cluster", + timestamp, + count, + "GAUGE", + "")); + } + } + TRACE("Count summaries reported"); + + reporter_->PushItems(std::move(items)); + TRACE("Pushed"); + + return Status::OK(); +} + +ServerHealth NodesChecker::ExtractServerHealthStatus(const string& health) { + if (health == "HEALTHY") return ServerHealth::HEALTHY; + if (health == "UNAUTHORIZED") return ServerHealth::UNAUTHORIZED; + if (health == "UNAVAILABLE") return ServerHealth::UNAVAILABLE; + if (health == "WRONG_SERVER_UUID") return ServerHealth::WRONG_SERVER_UUID; + CHECK(false) << "Unknown server health: " << health; + __builtin_unreachable(); +} + +HealthCheckResult NodesChecker::ExtractTableHealthStatus(const string& health) { + if (health == "HEALTHY") return HealthCheckResult::HEALTHY; + if (health == "RECOVERING") return HealthCheckResult::RECOVERING; + if (health == "UNDER_REPLICATED") return HealthCheckResult::UNDER_REPLICATED; + if (health == "UNAVAILABLE") return HealthCheckResult::UNAVAILABLE; + if (health == "CONSENSUS_MISMATCH") return HealthCheckResult::CONSENSUS_MISMATCH; + CHECK(false) << "Unknown table health: " << health; + __builtin_unreachable(); +} +} // namespace collector +} // namespace kudu diff --git a/src/kudu/collector/nodes_checker.h b/src/kudu/collector/nodes_checker.h new file mode 100644 index 0000000..26aee89 --- /dev/null +++ b/src/kudu/collector/nodes_checker.h @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <string> +#include <vector> + +#include <gtest/gtest_prod.h> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/rebalance/cluster_status.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/rw_mutex.h" +#include "kudu/util/status.h" + +namespace kudu { +class Thread; +} // namespace kudu + +namespace kudu { + +namespace collector { + +class ReporterBase; + +class NodesChecker : public RefCounted<NodesChecker> { + public: + explicit NodesChecker(scoped_refptr<ReporterBase> reporter); + ~NodesChecker(); + + Status Init(); + Status Start(); + void Shutdown(); + + std::string ToString() const; + + std::vector<std::string> GetNodes(); + std::string GetFirstMaster(); + + private: + friend class RefCounted<NodesChecker>; + + FRIEND_TEST(TestNodesChecker, TestExtractServerHealthStatus); + FRIEND_TEST(TestNodesChecker, TestExtractTableHealthStatus); + + Status StartNodesCheckerThread(); + void NodesCheckerThread(); + + void UpdateAndCheckNodes(); + Status UpdateNodes(); + Status UpdateServers(const std::string& role); + Status CheckNodes() const; + Status ReportNodesMetrics(const std::string& data) const; + + static cluster_summary::ServerHealth ExtractServerHealthStatus(const std::string& health); + static cluster_summary::HealthCheckResult ExtractTableHealthStatus(const std::string& health); + + static const std::string kMaster; + static const std::string kTserver; + + bool initialized_; + + scoped_refptr<ReporterBase> reporter_; + + CountDownLatch stop_background_threads_latch_; + scoped_refptr<Thread> nodes_checker_thread_; + + mutable RWMutex nodes_lock_; + std::vector<std::string> tserver_http_addrs_; + std::vector<std::string> master_http_addrs_; + + DISALLOW_COPY_AND_ASSIGN(NodesChecker); +}; +} // namespace collector +} // namespace kudu diff --git a/src/kudu/collector/reporter_base.h b/src/kudu/collector/reporter_base.h new file mode 100644 index 0000000..d03b80c --- /dev/null +++ b/src/kudu/collector/reporter_base.h @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <cstdint> +#include <list> +#include <memory> +#include <string> +#include <unordered_map> + +#include <rapidjson/document.h> + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/macros.h" +#include "kudu/server/server_base.h" +#include "kudu/tools/ksck_results.h" +#include "kudu/util/jsonreader.h" +#include "kudu/util/rw_mutex.h" +#include "kudu/util/status.h" +#include "kudu/util/threadpool.h" + +namespace kudu { + +namespace collector { + +struct ItemBase : public RefCounted<ItemBase> { + virtual ~ItemBase() = default; + + private: + friend class RefCounted<ItemBase>; +}; + +class ReporterBase : public RefCounted<ReporterBase> { + public: + virtual ~ReporterBase() = default; + + virtual Status Init() = 0; + virtual Status Start() = 0; + virtual void Shutdown() = 0; + + virtual std::string ToString() const = 0; + + // TODO(yingchun) This function is not generic enough for base class. + virtual scoped_refptr<ItemBase> ConstructItem(std::string endpoint, + std::string metric, + std::string level, + uint64_t timestamp, + int64_t value, + std::string counter_type, + std::string extra_tags) = 0; + virtual Status PushItems(std::list<scoped_refptr<ItemBase>> items) = 0; + + protected: + friend class RefCounted<ReporterBase>; +}; +} // namespace collector +} // namespace kudu + diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index 6aefb42..21de53d 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -61,6 +61,7 @@ #include <boost/optional/optional.hpp> #include <gflags/gflags.h> +#include <gflags/gflags_declare.h> #include <glog/logging.h> #include <google/protobuf/stubs/common.h> diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc index 8e98c5d..7834ea4 100644 --- a/src/kudu/tools/tool_action_table.cc +++ b/src/kudu/tools/tool_action_table.cc @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -#include <stdlib.h> - #include <algorithm> #include <cstdint> #include <functional> diff --git a/src/kudu/util/jsonreader.h b/src/kudu/util/jsonreader.h index 125b762..dcc6603 100644 --- a/src/kudu/util/jsonreader.h +++ b/src/kudu/util/jsonreader.h @@ -28,6 +28,10 @@ namespace kudu { +namespace collector { +class MetricsCollector; +} // namespace collector + // Wraps the JSON parsing functionality of rapidjson::Document. // // Unlike JsonWriter, this class does not hide rapidjson internals from @@ -93,6 +97,8 @@ class JsonReader { const rapidjson::Value* root() const { return &document_; } private: + friend class collector::MetricsCollector; + Status ExtractField(const rapidjson::Value* object, const char* field, const rapidjson::Value** result) const;
