This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch cpp_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 2f1925431e56a13f817f47139991b6e5184a364e Author: Li Zhanhui <[email protected]> AuthorDate: Mon Jul 11 11:08:31 2022 +0800 Support client metrics exporting --- cpp/bazel/rocketmq_deps.bzl | 8 +-- cpp/src/main/cpp/client/BUILD.bazel | 1 + cpp/src/main/cpp/client/ClientManagerImpl.cpp | 7 +- cpp/src/main/cpp/client/LogInterceptor.cpp | 4 +- cpp/src/main/cpp/client/include/ClientManager.h | 6 ++ cpp/src/main/cpp/rocketmq/ClientImpl.cpp | 47 +++++++++++++ cpp/src/main/cpp/rocketmq/include/ClientImpl.h | 1 + cpp/src/main/cpp/stats/MetricBidiReactor.cpp | 23 +++--- cpp/src/main/cpp/stats/OpencensusExporter.cpp | 10 ++- .../{include/Exporter.h => OpencensusHandler.cpp} | 17 ++--- cpp/src/main/cpp/stats/StdoutHandler.cpp | 81 ++++++++++++++++++++++ cpp/src/main/cpp/stats/include/MetricBidiReactor.h | 1 + .../main/cpp/stats/include/OpencensusExporter.h | 7 +- .../include/{Exporter.h => OpencensusHandler.h} | 12 ++-- cpp/src/main/cpp/stats/include/StdoutHandler.h | 76 ++++++++++++++++++++ 15 files changed, 265 insertions(+), 36 deletions(-) diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl index 983a476..2530d52 100644 --- a/cpp/bazel/rocketmq_deps.bzl +++ b/cpp/bazel/rocketmq_deps.bzl @@ -114,11 +114,11 @@ def rocketmq_deps(): maybe( http_archive, name = "com_github_grpc_grpc", - strip_prefix = "grpc-1.46.3", - sha256 = "d6cbf22cb5007af71b61c6be316a79397469c58c82a942552a62e708bce60964", + strip_prefix = "grpc-1.46.4", + sha256 = "e266aa0d9d9cddb876484a370b94f468248594a96ca0b6f87c21f969db2b8c5b", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/grpc/grpc-1.46.3.tar.gz", - "https://github.com/grpc/grpc/archive/refs/tags/v1.46.3.tar.gz", + "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/grpc/grpc-1.46.4.tar.gz", + "https://github.com/grpc/grpc/archive/refs/tags/v1.46.4.tar.gz", ], ) diff --git a/cpp/src/main/cpp/client/BUILD.bazel b/cpp/src/main/cpp/client/BUILD.bazel index c56f279..0af283e 100644 --- a/cpp/src/main/cpp/client/BUILD.bazel +++ b/cpp/src/main/cpp/client/BUILD.bazel @@ -37,5 +37,6 @@ cc_library( "//external:gtest", ], defines = [ + "DEBUG_METRIC_EXPORTING", ], ) \ No newline at end of file diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp b/cpp/src/main/cpp/client/ClientManagerImpl.cpp index 1f9eec6..78b13f7 100644 --- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp +++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp @@ -331,7 +331,7 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met } if (State::STARTED != client_manager_ptr->state()) { - // TODO: Would this leak some memroy? + // TODO: Would this leak some memory? return; } @@ -501,10 +501,7 @@ RpcClientSharedPtr ClientManagerImpl::getRpcClient(const std::string& target_hos } else if (!search->second->ok()) { SPDLOG_INFO("Prior RPC client to {} is not OK. Re-create one", target_host); } - std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories; - interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>()); - auto channel = grpc::experimental::CreateCustomChannelWithInterceptors( - target_host, channel_credential_, channel_arguments_, std::move(interceptor_factories)); + auto channel = createChannel(target_host); std::weak_ptr<ClientManager> client_manager(shared_from_this()); client = std::make_shared<RpcClientImpl>(client_manager, channel, target_host, need_heartbeat); rpc_clients_.insert_or_assign(target_host, client); diff --git a/cpp/src/main/cpp/client/LogInterceptor.cpp b/cpp/src/main/cpp/client/LogInterceptor.cpp index 09dbc9f..7702864 100644 --- a/cpp/src/main/cpp/client/LogInterceptor.cpp +++ b/cpp/src/main/cpp/client/LogInterceptor.cpp @@ -15,13 +15,15 @@ * limitations under the License. */ #include "LogInterceptor.h" + +#include <cstddef> + #include "InterceptorContinuation.h" #include "absl/container/flat_hash_map.h" #include "absl/strings/str_join.h" #include "google/protobuf/message.h" #include "rocketmq/Logger.h" #include "spdlog/spdlog.h" -#include <cstddef> ROCKETMQ_NAMESPACE_BEGIN diff --git a/cpp/src/main/cpp/client/include/ClientManager.h b/cpp/src/main/cpp/client/include/ClientManager.h index 99ec6ad..2ec5b2e 100644 --- a/cpp/src/main/cpp/client/include/ClientManager.h +++ b/cpp/src/main/cpp/client/include/ClientManager.h @@ -50,6 +50,12 @@ public: virtual std::shared_ptr<RpcClient> getRpcClient(const std::string& target_host, bool need_heartbeat) = 0; + /** + * @brief Create a Channel object + * + * @param target_host gRPC naming targets, following https://github.com/grpc/grpc/blob/master/doc/naming.md + * @return std::shared_ptr<grpc::Channel> + */ virtual std::shared_ptr<grpc::Channel> createChannel(const std::string& target_host) = 0; virtual void resolveRoute(const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request, diff --git a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp index 8275922..61e6ed9 100644 --- a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp +++ b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp @@ -16,6 +16,8 @@ */ #include "ClientImpl.h" +#include <apache/rocketmq/v2/definition.pb.h> + #include <algorithm> #include <atomic> #include <chrono> @@ -37,10 +39,12 @@ #include "NamingScheme.h" #include "SessionImpl.h" #include "Signature.h" +#include "StdoutHandler.h" #include "UtilAll.h" #include "absl/strings/numbers.h" #include "absl/strings/str_join.h" #include "absl/strings/str_split.h" +#include "opencensus/stats/stats.h" #include "rocketmq/Message.h" #include "rocketmq/MessageListener.h" @@ -165,6 +169,49 @@ void ClientImpl::start() { route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME, std::chrono::seconds(10), std::chrono::seconds(30)); + + auto endpoints = client_config_.metric.endpoints; + std::string target; + switch (endpoints.scheme()) { + case rmq::AddressScheme::IPv4: { + target.append("ipv4:"); + break; + } + case rmq::AddressScheme::IPv6: { + target.append("ipv6:"); + break; + } + case rmq::AddressScheme::DOMAIN_NAME: { + target.append("dns:"); + break; + } + default: { + SPDLOG_ERROR("Unknown metric address scheme"); + } + } + + bool first = true; + for (const auto& address : endpoints.addresses()) { + if (!first) { + target.push_back(','); + } else { + first = false; + } + target.append(address.host()); + target.push_back(':'); + target.append(std::to_string(address.port())); + } + + std::weak_ptr<Client> client_weak_ptr(self()); + +#ifdef DEBUG_METRIC_EXPORTING + opencensus::stats::StatsExporter::SetInterval(absl::Seconds(1)); + opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<StdoutHandler>()); +#else + opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1)); +#endif + SPDLOG_INFO("Export client metrics to {}", target); + opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<OpencensusHandler>(target, client_weak_ptr)); } void ClientImpl::shutdown() { diff --git a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h index c136cc9..b9912dc 100644 --- a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h +++ b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h @@ -30,6 +30,7 @@ #include "InvocationContext.h" #include "MessageExt.h" #include "NameServerResolver.h" +#include "OpencensusHandler.h" #include "RpcClient.h" #include "Session.h" #include "TelemetryBidiReactor.h" diff --git a/cpp/src/main/cpp/stats/MetricBidiReactor.cpp b/cpp/src/main/cpp/stats/MetricBidiReactor.cpp index 62983ee..5c9f87e 100644 --- a/cpp/src/main/cpp/stats/MetricBidiReactor.cpp +++ b/cpp/src/main/cpp/stats/MetricBidiReactor.cpp @@ -24,23 +24,22 @@ ROCKETMQ_NAMESPACE_BEGIN MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client> client, std::weak_ptr<OpencensusExporter> exporter) : client_(client), exporter_(exporter) { - grpc::ClientContext context; auto ptr = client_.lock(); Metadata metadata; Signature::sign(ptr->config(), metadata); for (const auto& entry : metadata) { - context.AddMetadata(entry.first, entry.second); + context_.AddMetadata(entry.first, entry.second); } - context.set_deadline(std::chrono::system_clock::now() + absl::ToChronoMilliseconds(ptr->config().request_timeout)); + context_.set_deadline(std::chrono::system_clock::now() + absl::ToChronoMilliseconds(ptr->config().request_timeout)); auto exporter_ptr = exporter_.lock(); if (!exporter_ptr) { + SPDLOG_WARN("Exporter has already been destructed"); return; } - - exporter_ptr->stub()->async()->Export(&context, this); + exporter_ptr->stub()->async()->Export(&context_, this); StartCall(); } @@ -49,7 +48,7 @@ void MetricBidiReactor::OnReadDone(bool ok) { SPDLOG_WARN("Failed to read response"); return; } - + SPDLOG_DEBUG("OnReadDone OK"); StartRead(&response_); } @@ -58,7 +57,8 @@ void MetricBidiReactor::OnWriteDone(bool ok) { SPDLOG_WARN("Failed to report metrics"); return; } - + SPDLOG_DEBUG("OnWriteDone OK"); + fireRead(); bool expected = true; if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) { fireWrite(); @@ -75,10 +75,15 @@ void MetricBidiReactor::OnDone(const grpc::Status& s) { SPDLOG_DEBUG("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message()); } else { SPDLOG_WARN("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message()); + auto exporter = exporter_.lock(); + if (exporter) { + exporter->resetStream(); + } } } void MetricBidiReactor::write(ExportMetricsServiceRequest request) { + SPDLOG_DEBUG("Append ExportMetricsServiceRequest to buffer"); { absl::MutexLock lk(&requests_mtx_); requests_.emplace_back(std::move(request)); @@ -99,10 +104,10 @@ void MetricBidiReactor::fireWrite() { bool expected = false; if (inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) { absl::MutexLock lk(&requests_mtx_); - request_.CopyFrom(requests_[0]); + request_ = std::move(*requests_.begin()); requests_.erase(requests_.begin()); + SPDLOG_DEBUG("MetricBidiReactor#StartWrite"); StartWrite(&request_); - fireRead(); } } diff --git a/cpp/src/main/cpp/stats/OpencensusExporter.cpp b/cpp/src/main/cpp/stats/OpencensusExporter.cpp index 0bf13a4..651965d 100644 --- a/cpp/src/main/cpp/stats/OpencensusExporter.cpp +++ b/cpp/src/main/cpp/stats/OpencensusExporter.cpp @@ -17,6 +17,7 @@ #include "OpencensusExporter.h" +#include "ClientManager.h" #include "MetricBidiReactor.h" #include "google/protobuf/util/time_util.h" @@ -25,6 +26,13 @@ ROCKETMQ_NAMESPACE_BEGIN namespace opencensus_proto = opencensus::proto::metrics::v1; OpencensusExporter::OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client) : client_(client) { + auto client_shared_ptr = client.lock(); + if (client_shared_ptr) { + auto channel = client_shared_ptr->manager()->createChannel(endpoints); + stub_ = opencensus::proto::agent::metrics::v1::MetricsService::NewStub(channel); + } else { + SPDLOG_ERROR("Failed to initialize OpencensusExporter. weak_ptr to Client is nullptr"); + } } void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceRequest& request) { @@ -171,7 +179,7 @@ void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceReques } } -void OpencensusExporter::exportMetrics( +void OpencensusExporter::ExportViewData( const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) { opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest request; wrap(data, request); diff --git a/cpp/src/main/cpp/stats/include/Exporter.h b/cpp/src/main/cpp/stats/OpencensusHandler.cpp similarity index 68% copy from cpp/src/main/cpp/stats/include/Exporter.h copy to cpp/src/main/cpp/stats/OpencensusHandler.cpp index 7f2f24a..422108d 100644 --- a/cpp/src/main/cpp/stats/include/Exporter.h +++ b/cpp/src/main/cpp/stats/OpencensusHandler.cpp @@ -14,17 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#pragma once +#include "OpencensusHandler.h" -#include "opencensus/stats/stats.h" -#include "rocketmq/RocketMQ.h" +#include "MetricBidiReactor.h" ROCKETMQ_NAMESPACE_BEGIN -class Exporter { -public: - virtual void exportMetrics( - const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) = 0; -}; +OpencensusHandler::OpencensusHandler(std::string endpoints, std::weak_ptr<Client> client) + : exporter_(std::make_shared<OpencensusExporter>(std::move(endpoints), std::move(client))) { +} + +void OpencensusHandler::ExportViewData(const MetricData& data) { + exporter_->ExportViewData(data); +} ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/stats/StdoutHandler.cpp b/cpp/src/main/cpp/stats/StdoutHandler.cpp new file mode 100644 index 0000000..9d18d78 --- /dev/null +++ b/cpp/src/main/cpp/stats/StdoutHandler.cpp @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "StdoutHandler.h" + +#include "rocketmq/RocketMQ.h" + +ROCKETMQ_NAMESPACE_BEGIN + +void StdoutHandler::ExportViewData( + const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) { + for (const auto& datum : data) { + const auto& view_data = datum.second; + const auto& descriptor = datum.first; + auto start_times = view_data.start_times(); + auto columns = descriptor.columns(); + + switch (view_data.type()) { + case opencensus::stats::ViewData::Type::kInt64: { + auto data_map = view_data.int_data(); + for (const auto& entry : data_map) { + absl::Time time = start_times[entry.first]; + std::string line; + line.append(absl::FormatTime(time)).append(" "); + line.append(descriptor.name()); + line.append("{"); + for (std::size_t i = 0; i < columns.size(); i++) { + line.append(columns[i].name()).append("=").append(entry.first[i]); + if (i < columns.size() - 1) { + line.append(", "); + } else { + line.append("} ==> "); + } + } + line.append(std::to_string(entry.second)); + println(line); + } + break; + } + case opencensus::stats::ViewData::Type::kDouble: { + exportDatum(datum.first, view_data.start_time(), view_data.end_time(), view_data.double_data()); + break; + } + case opencensus::stats::ViewData::Type::kDistribution: { + for (const auto& entry : view_data.distribution_data()) { + std::string line(descriptor.name()); + line.append("{"); + for (std::size_t i = 0; i < columns.size(); i++) { + line.append(columns[i].name()).append("=").append(entry.first[i]); + if (i < columns.size() - 1) { + line.append(", "); + } else { + line.append("} ==> "); + } + } + line.append(entry.second.DebugString()); + println(line); + + println(absl::StrJoin(entry.second.bucket_boundaries().lower_boundaries(), ",")); + } + break; + } + } + } +} + +ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/stats/include/MetricBidiReactor.h b/cpp/src/main/cpp/stats/include/MetricBidiReactor.h index cc08c1d..0a10cd8 100644 --- a/cpp/src/main/cpp/stats/include/MetricBidiReactor.h +++ b/cpp/src/main/cpp/stats/include/MetricBidiReactor.h @@ -59,6 +59,7 @@ public: private: std::weak_ptr<Client> client_; std::weak_ptr<OpencensusExporter> exporter_; + grpc::ClientContext context_; ExportMetricsServiceRequest request_; diff --git a/cpp/src/main/cpp/stats/include/OpencensusExporter.h b/cpp/src/main/cpp/stats/include/OpencensusExporter.h index 7920ff5..161843c 100644 --- a/cpp/src/main/cpp/stats/include/OpencensusExporter.h +++ b/cpp/src/main/cpp/stats/include/OpencensusExporter.h @@ -17,9 +17,9 @@ #pragma once #include "Client.h" -#include "Exporter.h" #include "grpcpp/grpcpp.h" #include "opencensus/proto/agent/metrics/v1/metrics_service.grpc.pb.h" +#include "opencensus/stats/stats.h" ROCKETMQ_NAMESPACE_BEGIN @@ -30,11 +30,12 @@ using StubPtr = std::unique_ptr<Stub>; using MetricData = std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>; using ExportMetricsServiceRequest = opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest; -class OpencensusExporter : public Exporter, public std::enable_shared_from_this<OpencensusExporter> { +class OpencensusExporter : public opencensus::stats::StatsExporter::Handler, + public std::enable_shared_from_this<OpencensusExporter> { public: OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client); - void exportMetrics(const MetricData& data) override; + void ExportViewData(const MetricData& data) override; static void wrap(const MetricData& data, ExportMetricsServiceRequest& request); diff --git a/cpp/src/main/cpp/stats/include/Exporter.h b/cpp/src/main/cpp/stats/include/OpencensusHandler.h similarity index 73% rename from cpp/src/main/cpp/stats/include/Exporter.h rename to cpp/src/main/cpp/stats/include/OpencensusHandler.h index 7f2f24a..5c6ec1f 100644 --- a/cpp/src/main/cpp/stats/include/Exporter.h +++ b/cpp/src/main/cpp/stats/include/OpencensusHandler.h @@ -16,15 +16,17 @@ */ #pragma once -#include "opencensus/stats/stats.h" -#include "rocketmq/RocketMQ.h" +#include "OpencensusExporter.h" ROCKETMQ_NAMESPACE_BEGIN -class Exporter { +class OpencensusHandler : public opencensus::stats::StatsExporter::Handler { public: - virtual void exportMetrics( - const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) = 0; + OpencensusHandler(std::string endpoints, std::weak_ptr<Client> client); + + void ExportViewData(const MetricData& data) override; + + std::shared_ptr<OpencensusExporter> exporter_; }; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/stats/include/StdoutHandler.h b/cpp/src/main/cpp/stats/include/StdoutHandler.h new file mode 100644 index 0000000..1bfc3dc --- /dev/null +++ b/cpp/src/main/cpp/stats/include/StdoutHandler.h @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <mutex> +#include <string> + +#include "opencensus/stats/stats.h" +#include "rocketmq/RocketMQ.h" + +ROCKETMQ_NAMESPACE_BEGIN + +class StdoutHandler : public opencensus::stats::StatsExporter::Handler { +public: + void ExportViewData( + const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) override; + +private: + template <typename T> + void exportDatum(const opencensus::stats::ViewDescriptor& descriptor, + absl::Time start_time, + absl::Time end_time, + const opencensus::stats::ViewData::DataMap<T>& data) { + if (data.empty()) { + // std::cout << "No data for " << descriptor.name() << std::endl; + return; + } + + for (const auto& row : data) { + for (std::size_t column = 0; column < descriptor.columns().size(); column++) { + std::cout << descriptor.name() << "[" << descriptor.columns()[column].name() << "=" << row.first[column] << "]" + << dataToString(row.second) << std::endl; + } + } + } + + std::mutex console_mtx_; + + void println(const std::string& line) { + std::lock_guard<std::mutex> lk(console_mtx_); + std::cout << line << std::endl; + } + + // Functions to format data for different aggregation types. + std::string dataToString(double data) { + return absl::StrCat(": ", data, "\n"); + } + std::string dataToString(int64_t data) { + return absl::StrCat(": ", data, "\n"); + } + std::string dataToString(const opencensus::stats::Distribution& data) { + std::string output = "\n"; + std::vector<std::string> lines = absl::StrSplit(data.DebugString(), '\n'); + // Add indent. + for (const auto& line : lines) { + absl::StrAppend(&output, " ", line, "\n"); + } + return output; + } +}; + +ROCKETMQ_NAMESPACE_END \ No newline at end of file
