This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch cpp_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit aad2ba8c53ab9a51bb9fa668bf073d12136b2101 Author: Li Zhanhui <[email protected]> AuthorDate: Mon Jul 11 11:08:31 2022 +0800 Support client metrics exporting --- cpp/src/main/cpp/client/include/ClientManager.h | 6 ++++ cpp/src/main/cpp/rocketmq/ClientImpl.cpp | 40 ++++++++++++++++++++++ cpp/src/main/cpp/rocketmq/include/ClientImpl.h | 1 + cpp/src/main/cpp/stats/OpencensusExporter.cpp | 8 ++++- cpp/src/main/cpp/stats/include/Exporter.h | 30 ---------------- .../main/cpp/stats/include/OpencensusExporter.h | 7 ++-- 6 files changed, 58 insertions(+), 34 deletions(-) diff --git a/cpp/src/main/cpp/client/include/ClientManager.h b/cpp/src/main/cpp/client/include/ClientManager.h index 99ec6ad..2ec5b2e 100644 --- a/cpp/src/main/cpp/client/include/ClientManager.h +++ b/cpp/src/main/cpp/client/include/ClientManager.h @@ -50,6 +50,12 @@ public: virtual std::shared_ptr<RpcClient> getRpcClient(const std::string& target_host, bool need_heartbeat) = 0; + /** + * @brief Create a Channel object + * + * @param target_host gRPC naming targets, following https://github.com/grpc/grpc/blob/master/doc/naming.md + * @return std::shared_ptr<grpc::Channel> + */ virtual std::shared_ptr<grpc::Channel> createChannel(const std::string& target_host) = 0; virtual void resolveRoute(const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request, diff --git a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp index 8275922..a5e7b62 100644 --- a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp +++ b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp @@ -16,6 +16,8 @@ */ #include "ClientImpl.h" +#include <apache/rocketmq/v2/definition.pb.h> + #include <algorithm> #include <atomic> #include <chrono> @@ -34,6 +36,7 @@ #include "InvocationContext.h" #include "LoggerImpl.h" #include "MessageExt.h" +#include "MetricBidiReactor.h" #include "NamingScheme.h" #include "SessionImpl.h" #include "Signature.h" @@ -41,6 +44,7 @@ #include "absl/strings/numbers.h" #include "absl/strings/str_join.h" #include "absl/strings/str_split.h" +#include "opencensus/stats/stats.h" #include "rocketmq/Message.h" #include "rocketmq/MessageListener.h" @@ -165,6 +169,42 @@ void ClientImpl::start() { route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME, std::chrono::seconds(10), std::chrono::seconds(30)); + + auto endpoints = client_config_.metric.endpoints; + std::string target; + switch (endpoints.scheme()) { + case rmq::AddressScheme::IPv4: { + target.append("ipv4:"); + break; + } + case rmq::AddressScheme::IPv6: { + target.append("ipv6:"); + break; + } + case rmq::AddressScheme::DOMAIN_NAME: { + target.append("dns:"); + break; + } + default: { + SPDLOG_ERROR("Unknown address scheme"); + } + } + + bool first = true; + for (const auto& address : endpoints.addresses()) { + if (!first) { + target.push_back(','); + } else { + first = false; + } + target.append(address.host()); + target.push_back(':'); + target.append(std::to_string(address.port())); + } + + std::weak_ptr<Client> client_weak_ptr(self()); + opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1)); + opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<OpencensusExporter>(target, client_weak_ptr)); } void ClientImpl::shutdown() { diff --git a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h index c136cc9..e820351 100644 --- a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h +++ b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h @@ -30,6 +30,7 @@ #include "InvocationContext.h" #include "MessageExt.h" #include "NameServerResolver.h" +#include "OpencensusExporter.h" #include "RpcClient.h" #include "Session.h" #include "TelemetryBidiReactor.h" diff --git a/cpp/src/main/cpp/stats/OpencensusExporter.cpp b/cpp/src/main/cpp/stats/OpencensusExporter.cpp index 0bf13a4..cb76298 100644 --- a/cpp/src/main/cpp/stats/OpencensusExporter.cpp +++ b/cpp/src/main/cpp/stats/OpencensusExporter.cpp @@ -17,6 +17,7 @@ #include "OpencensusExporter.h" +#include "ClientManager.h" #include "MetricBidiReactor.h" #include "google/protobuf/util/time_util.h" @@ -25,6 +26,11 @@ ROCKETMQ_NAMESPACE_BEGIN namespace opencensus_proto = opencensus::proto::metrics::v1; OpencensusExporter::OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client) : client_(client) { + auto client_shared_ptr = client.lock(); + if (client_shared_ptr) { + auto channel = client_shared_ptr->manager()->createChannel(endpoints); + stub_ = opencensus::proto::agent::metrics::v1::MetricsService::NewStub(channel); + } } void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceRequest& request) { @@ -171,7 +177,7 @@ void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceReques } } -void OpencensusExporter::exportMetrics( +void OpencensusExporter::ExportViewData( const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) { opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest request; wrap(data, request); diff --git a/cpp/src/main/cpp/stats/include/Exporter.h b/cpp/src/main/cpp/stats/include/Exporter.h deleted file mode 100644 index 7f2f24a..0000000 --- a/cpp/src/main/cpp/stats/include/Exporter.h +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include "opencensus/stats/stats.h" -#include "rocketmq/RocketMQ.h" - -ROCKETMQ_NAMESPACE_BEGIN - -class Exporter { -public: - virtual void exportMetrics( - const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) = 0; -}; - -ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/stats/include/OpencensusExporter.h b/cpp/src/main/cpp/stats/include/OpencensusExporter.h index 7920ff5..161843c 100644 --- a/cpp/src/main/cpp/stats/include/OpencensusExporter.h +++ b/cpp/src/main/cpp/stats/include/OpencensusExporter.h @@ -17,9 +17,9 @@ #pragma once #include "Client.h" -#include "Exporter.h" #include "grpcpp/grpcpp.h" #include "opencensus/proto/agent/metrics/v1/metrics_service.grpc.pb.h" +#include "opencensus/stats/stats.h" ROCKETMQ_NAMESPACE_BEGIN @@ -30,11 +30,12 @@ using StubPtr = std::unique_ptr<Stub>; using MetricData = std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>; using ExportMetricsServiceRequest = opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest; -class OpencensusExporter : public Exporter, public std::enable_shared_from_this<OpencensusExporter> { +class OpencensusExporter : public opencensus::stats::StatsExporter::Handler, + public std::enable_shared_from_this<OpencensusExporter> { public: OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client); - void exportMetrics(const MetricData& data) override; + void ExportViewData(const MetricData& data) override; static void wrap(const MetricData& data, ExportMetricsServiceRequest& request);
