This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch cpp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 2f1925431e56a13f817f47139991b6e5184a364e
Author: Li Zhanhui <[email protected]>
AuthorDate: Mon Jul 11 11:08:31 2022 +0800

    Support client metrics exporting
---
 cpp/bazel/rocketmq_deps.bzl                        |  8 +--
 cpp/src/main/cpp/client/BUILD.bazel                |  1 +
 cpp/src/main/cpp/client/ClientManagerImpl.cpp      |  7 +-
 cpp/src/main/cpp/client/LogInterceptor.cpp         |  4 +-
 cpp/src/main/cpp/client/include/ClientManager.h    |  6 ++
 cpp/src/main/cpp/rocketmq/ClientImpl.cpp           | 47 +++++++++++++
 cpp/src/main/cpp/rocketmq/include/ClientImpl.h     |  1 +
 cpp/src/main/cpp/stats/MetricBidiReactor.cpp       | 23 +++---
 cpp/src/main/cpp/stats/OpencensusExporter.cpp      | 10 ++-
 .../{include/Exporter.h => OpencensusHandler.cpp}  | 17 ++---
 cpp/src/main/cpp/stats/StdoutHandler.cpp           | 81 ++++++++++++++++++++++
 cpp/src/main/cpp/stats/include/MetricBidiReactor.h |  1 +
 .../main/cpp/stats/include/OpencensusExporter.h    |  7 +-
 .../include/{Exporter.h => OpencensusHandler.h}    | 12 ++--
 cpp/src/main/cpp/stats/include/StdoutHandler.h     | 76 ++++++++++++++++++++
 15 files changed, 265 insertions(+), 36 deletions(-)

diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl
index 983a476..2530d52 100644
--- a/cpp/bazel/rocketmq_deps.bzl
+++ b/cpp/bazel/rocketmq_deps.bzl
@@ -114,11 +114,11 @@ def rocketmq_deps():
     maybe(
         http_archive,
         name = "com_github_grpc_grpc",
-        strip_prefix = "grpc-1.46.3",
-        sha256 = 
"d6cbf22cb5007af71b61c6be316a79397469c58c82a942552a62e708bce60964",
+        strip_prefix = "grpc-1.46.4",
+        sha256 = 
"e266aa0d9d9cddb876484a370b94f468248594a96ca0b6f87c21f969db2b8c5b",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/grpc/grpc-1.46.3.tar.gz";,
-            "https://github.com/grpc/grpc/archive/refs/tags/v1.46.3.tar.gz";,
+            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/grpc/grpc-1.46.4.tar.gz";,
+            "https://github.com/grpc/grpc/archive/refs/tags/v1.46.4.tar.gz";,
         ],
     )
 
diff --git a/cpp/src/main/cpp/client/BUILD.bazel 
b/cpp/src/main/cpp/client/BUILD.bazel
index c56f279..0af283e 100644
--- a/cpp/src/main/cpp/client/BUILD.bazel
+++ b/cpp/src/main/cpp/client/BUILD.bazel
@@ -37,5 +37,6 @@ cc_library(
         "//external:gtest",
     ],
     defines = [
+        "DEBUG_METRIC_EXPORTING",
     ],
 )
\ No newline at end of file
diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp 
b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
index 1f9eec6..78b13f7 100644
--- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
@@ -331,7 +331,7 @@ bool ClientManagerImpl::send(const std::string& 
target_host, const Metadata& met
     }
 
     if (State::STARTED != client_manager_ptr->state()) {
-      // TODO: Would this leak some memroy?
+      // TODO: Would this leak some memory?
       return;
     }
 
@@ -501,10 +501,7 @@ RpcClientSharedPtr ClientManagerImpl::getRpcClient(const 
std::string& target_hos
       } else if (!search->second->ok()) {
         SPDLOG_INFO("Prior RPC client to {} is not OK. Re-create one", 
target_host);
       }
-      
std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
 interceptor_factories;
-      
interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
-      auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
-          target_host, channel_credential_, channel_arguments_, 
std::move(interceptor_factories));
+      auto channel = createChannel(target_host);
       std::weak_ptr<ClientManager> client_manager(shared_from_this());
       client = std::make_shared<RpcClientImpl>(client_manager, channel, 
target_host, need_heartbeat);
       rpc_clients_.insert_or_assign(target_host, client);
diff --git a/cpp/src/main/cpp/client/LogInterceptor.cpp 
b/cpp/src/main/cpp/client/LogInterceptor.cpp
index 09dbc9f..7702864 100644
--- a/cpp/src/main/cpp/client/LogInterceptor.cpp
+++ b/cpp/src/main/cpp/client/LogInterceptor.cpp
@@ -15,13 +15,15 @@
  * limitations under the License.
  */
 #include "LogInterceptor.h"
+
+#include <cstddef>
+
 #include "InterceptorContinuation.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/strings/str_join.h"
 #include "google/protobuf/message.h"
 #include "rocketmq/Logger.h"
 #include "spdlog/spdlog.h"
-#include <cstddef>
 
 ROCKETMQ_NAMESPACE_BEGIN
 
diff --git a/cpp/src/main/cpp/client/include/ClientManager.h 
b/cpp/src/main/cpp/client/include/ClientManager.h
index 99ec6ad..2ec5b2e 100644
--- a/cpp/src/main/cpp/client/include/ClientManager.h
+++ b/cpp/src/main/cpp/client/include/ClientManager.h
@@ -50,6 +50,12 @@ public:
 
   virtual std::shared_ptr<RpcClient> getRpcClient(const std::string& 
target_host, bool need_heartbeat) = 0;
 
+  /**
+   * @brief Create a Channel object
+   *
+   * @param target_host gRPC naming targets, following 
https://github.com/grpc/grpc/blob/master/doc/naming.md
+   * @return std::shared_ptr<grpc::Channel>
+   */
   virtual std::shared_ptr<grpc::Channel> createChannel(const std::string& 
target_host) = 0;
 
   virtual void resolveRoute(const std::string& target_host, const Metadata& 
metadata, const QueryRouteRequest& request,
diff --git a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp 
b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
index 8275922..61e6ed9 100644
--- a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -16,6 +16,8 @@
  */
 #include "ClientImpl.h"
 
+#include <apache/rocketmq/v2/definition.pb.h>
+
 #include <algorithm>
 #include <atomic>
 #include <chrono>
@@ -37,10 +39,12 @@
 #include "NamingScheme.h"
 #include "SessionImpl.h"
 #include "Signature.h"
+#include "StdoutHandler.h"
 #include "UtilAll.h"
 #include "absl/strings/numbers.h"
 #include "absl/strings/str_join.h"
 #include "absl/strings/str_split.h"
+#include "opencensus/stats/stats.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/MessageListener.h"
 
@@ -165,6 +169,49 @@ void ClientImpl::start() {
 
   route_update_handle_ = 
client_manager_->getScheduler()->schedule(route_update_functor, 
UPDATE_ROUTE_TASK_NAME,
                                                                    
std::chrono::seconds(10), std::chrono::seconds(30));
+
+  auto endpoints = client_config_.metric.endpoints;
+  std::string target;
+  switch (endpoints.scheme()) {
+    case rmq::AddressScheme::IPv4: {
+      target.append("ipv4:");
+      break;
+    }
+    case rmq::AddressScheme::IPv6: {
+      target.append("ipv6:");
+      break;
+    }
+    case rmq::AddressScheme::DOMAIN_NAME: {
+      target.append("dns:");
+      break;
+    }
+    default: {
+      SPDLOG_ERROR("Unknown metric address scheme");
+    }
+  }
+
+  bool first = true;
+  for (const auto& address : endpoints.addresses()) {
+    if (!first) {
+      target.push_back(',');
+    } else {
+      first = false;
+    }
+    target.append(address.host());
+    target.push_back(':');
+    target.append(std::to_string(address.port()));
+  }
+
+  std::weak_ptr<Client> client_weak_ptr(self());
+
+#ifdef DEBUG_METRIC_EXPORTING
+  opencensus::stats::StatsExporter::SetInterval(absl::Seconds(1));
+  
opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<StdoutHandler>());
+#else
+  opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1));
+#endif
+  SPDLOG_INFO("Export client metrics to {}", target);
+  
opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<OpencensusHandler>(target,
 client_weak_ptr));
 }
 
 void ClientImpl::shutdown() {
diff --git a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h 
b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
index c136cc9..b9912dc 100644
--- a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -30,6 +30,7 @@
 #include "InvocationContext.h"
 #include "MessageExt.h"
 #include "NameServerResolver.h"
+#include "OpencensusHandler.h"
 #include "RpcClient.h"
 #include "Session.h"
 #include "TelemetryBidiReactor.h"
diff --git a/cpp/src/main/cpp/stats/MetricBidiReactor.cpp 
b/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
index 62983ee..5c9f87e 100644
--- a/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
+++ b/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
@@ -24,23 +24,22 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client> client, 
std::weak_ptr<OpencensusExporter> exporter)
     : client_(client), exporter_(exporter) {
-  grpc::ClientContext context;
   auto ptr = client_.lock();
 
   Metadata metadata;
   Signature::sign(ptr->config(), metadata);
 
   for (const auto& entry : metadata) {
-    context.AddMetadata(entry.first, entry.second);
+    context_.AddMetadata(entry.first, entry.second);
   }
-  context.set_deadline(std::chrono::system_clock::now() + 
absl::ToChronoMilliseconds(ptr->config().request_timeout));
+  context_.set_deadline(std::chrono::system_clock::now() + 
absl::ToChronoMilliseconds(ptr->config().request_timeout));
 
   auto exporter_ptr = exporter_.lock();
   if (!exporter_ptr) {
+    SPDLOG_WARN("Exporter has already been destructed");
     return;
   }
-
-  exporter_ptr->stub()->async()->Export(&context, this);
+  exporter_ptr->stub()->async()->Export(&context_, this);
   StartCall();
 }
 
@@ -49,7 +48,7 @@ void MetricBidiReactor::OnReadDone(bool ok) {
     SPDLOG_WARN("Failed to read response");
     return;
   }
-
+  SPDLOG_DEBUG("OnReadDone OK");
   StartRead(&response_);
 }
 
@@ -58,7 +57,8 @@ void MetricBidiReactor::OnWriteDone(bool ok) {
     SPDLOG_WARN("Failed to report metrics");
     return;
   }
-
+  SPDLOG_DEBUG("OnWriteDone OK");
+  fireRead();
   bool expected = true;
   if (inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
     fireWrite();
@@ -75,10 +75,15 @@ void MetricBidiReactor::OnDone(const grpc::Status& s) {
     SPDLOG_DEBUG("Bi-directional stream ended. status.code={}, 
status.message={}", s.error_code(), s.error_message());
   } else {
     SPDLOG_WARN("Bi-directional stream ended. status.code={}, 
status.message={}", s.error_code(), s.error_message());
+    auto exporter = exporter_.lock();
+    if (exporter) {
+      exporter->resetStream();
+    }
   }
 }
 
 void MetricBidiReactor::write(ExportMetricsServiceRequest request) {
+  SPDLOG_DEBUG("Append ExportMetricsServiceRequest to buffer");
   {
     absl::MutexLock lk(&requests_mtx_);
     requests_.emplace_back(std::move(request));
@@ -99,10 +104,10 @@ void MetricBidiReactor::fireWrite() {
   bool expected = false;
   if (inflight_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
     absl::MutexLock lk(&requests_mtx_);
-    request_.CopyFrom(requests_[0]);
+    request_ = std::move(*requests_.begin());
     requests_.erase(requests_.begin());
+    SPDLOG_DEBUG("MetricBidiReactor#StartWrite");
     StartWrite(&request_);
-    fireRead();
   }
 }
 
diff --git a/cpp/src/main/cpp/stats/OpencensusExporter.cpp 
b/cpp/src/main/cpp/stats/OpencensusExporter.cpp
index 0bf13a4..651965d 100644
--- a/cpp/src/main/cpp/stats/OpencensusExporter.cpp
+++ b/cpp/src/main/cpp/stats/OpencensusExporter.cpp
@@ -17,6 +17,7 @@
 
 #include "OpencensusExporter.h"
 
+#include "ClientManager.h"
 #include "MetricBidiReactor.h"
 #include "google/protobuf/util/time_util.h"
 
@@ -25,6 +26,13 @@ ROCKETMQ_NAMESPACE_BEGIN
 namespace opencensus_proto = opencensus::proto::metrics::v1;
 
 OpencensusExporter::OpencensusExporter(std::string endpoints, 
std::weak_ptr<Client> client) : client_(client) {
+  auto client_shared_ptr = client.lock();
+  if (client_shared_ptr) {
+    auto channel = client_shared_ptr->manager()->createChannel(endpoints);
+    stub_ = 
opencensus::proto::agent::metrics::v1::MetricsService::NewStub(channel);
+  } else {
+    SPDLOG_ERROR("Failed to initialize OpencensusExporter. weak_ptr to Client 
is nullptr");
+  }
 }
 
 void OpencensusExporter::wrap(const MetricData& data, 
ExportMetricsServiceRequest& request) {
@@ -171,7 +179,7 @@ void OpencensusExporter::wrap(const MetricData& data, 
ExportMetricsServiceReques
   }
 }
 
-void OpencensusExporter::exportMetrics(
+void OpencensusExporter::ExportViewData(
     const std::vector<std::pair<opencensus::stats::ViewDescriptor, 
opencensus::stats::ViewData>>& data) {
   opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest request;
   wrap(data, request);
diff --git a/cpp/src/main/cpp/stats/include/Exporter.h 
b/cpp/src/main/cpp/stats/OpencensusHandler.cpp
similarity index 68%
copy from cpp/src/main/cpp/stats/include/Exporter.h
copy to cpp/src/main/cpp/stats/OpencensusHandler.cpp
index 7f2f24a..422108d 100644
--- a/cpp/src/main/cpp/stats/include/Exporter.h
+++ b/cpp/src/main/cpp/stats/OpencensusHandler.cpp
@@ -14,17 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#pragma once
+#include "OpencensusHandler.h"
 
-#include "opencensus/stats/stats.h"
-#include "rocketmq/RocketMQ.h"
+#include "MetricBidiReactor.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-class Exporter {
-public:
-  virtual void exportMetrics(
-      const std::vector<std::pair<opencensus::stats::ViewDescriptor, 
opencensus::stats::ViewData>>& data) = 0;
-};
+OpencensusHandler::OpencensusHandler(std::string endpoints, 
std::weak_ptr<Client> client)
+    : exporter_(std::make_shared<OpencensusExporter>(std::move(endpoints), 
std::move(client))) {
+}
+
+void OpencensusHandler::ExportViewData(const MetricData& data) {
+  exporter_->ExportViewData(data);
+}
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/StdoutHandler.cpp 
b/cpp/src/main/cpp/stats/StdoutHandler.cpp
new file mode 100644
index 0000000..9d18d78
--- /dev/null
+++ b/cpp/src/main/cpp/stats/StdoutHandler.cpp
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StdoutHandler.h"
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void StdoutHandler::ExportViewData(
+    const std::vector<std::pair<opencensus::stats::ViewDescriptor, 
opencensus::stats::ViewData>>& data) {
+  for (const auto& datum : data) {
+    const auto& view_data = datum.second;
+    const auto& descriptor = datum.first;
+    auto start_times = view_data.start_times();
+    auto columns = descriptor.columns();
+
+    switch (view_data.type()) {
+      case opencensus::stats::ViewData::Type::kInt64: {
+        auto data_map = view_data.int_data();
+        for (const auto& entry : data_map) {
+          absl::Time time = start_times[entry.first];
+          std::string line;
+          line.append(absl::FormatTime(time)).append(" ");
+          line.append(descriptor.name());
+          line.append("{");
+          for (std::size_t i = 0; i < columns.size(); i++) {
+            line.append(columns[i].name()).append("=").append(entry.first[i]);
+            if (i < columns.size() - 1) {
+              line.append(", ");
+            } else {
+              line.append("} ==> ");
+            }
+          }
+          line.append(std::to_string(entry.second));
+          println(line);
+        }
+        break;
+      }
+      case opencensus::stats::ViewData::Type::kDouble: {
+        exportDatum(datum.first, view_data.start_time(), view_data.end_time(), 
view_data.double_data());
+        break;
+      }
+      case opencensus::stats::ViewData::Type::kDistribution: {
+        for (const auto& entry : view_data.distribution_data()) {
+          std::string line(descriptor.name());
+          line.append("{");
+          for (std::size_t i = 0; i < columns.size(); i++) {
+            line.append(columns[i].name()).append("=").append(entry.first[i]);
+            if (i < columns.size() - 1) {
+              line.append(", ");
+            } else {
+              line.append("} ==> ");
+            }
+          }
+          line.append(entry.second.DebugString());
+          println(line);
+
+          
println(absl::StrJoin(entry.second.bucket_boundaries().lower_boundaries(), 
","));
+        }
+        break;
+      }
+    }
+  }
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/include/MetricBidiReactor.h 
b/cpp/src/main/cpp/stats/include/MetricBidiReactor.h
index cc08c1d..0a10cd8 100644
--- a/cpp/src/main/cpp/stats/include/MetricBidiReactor.h
+++ b/cpp/src/main/cpp/stats/include/MetricBidiReactor.h
@@ -59,6 +59,7 @@ public:
 private:
   std::weak_ptr<Client> client_;
   std::weak_ptr<OpencensusExporter> exporter_;
+  grpc::ClientContext context_;
 
   ExportMetricsServiceRequest request_;
 
diff --git a/cpp/src/main/cpp/stats/include/OpencensusExporter.h 
b/cpp/src/main/cpp/stats/include/OpencensusExporter.h
index 7920ff5..161843c 100644
--- a/cpp/src/main/cpp/stats/include/OpencensusExporter.h
+++ b/cpp/src/main/cpp/stats/include/OpencensusExporter.h
@@ -17,9 +17,9 @@
 #pragma once
 
 #include "Client.h"
-#include "Exporter.h"
 #include "grpcpp/grpcpp.h"
 #include "opencensus/proto/agent/metrics/v1/metrics_service.grpc.pb.h"
+#include "opencensus/stats/stats.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -30,11 +30,12 @@ using StubPtr = std::unique_ptr<Stub>;
 using MetricData = std::vector<std::pair<opencensus::stats::ViewDescriptor, 
opencensus::stats::ViewData>>;
 using ExportMetricsServiceRequest = 
opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
 
-class OpencensusExporter : public Exporter, public 
std::enable_shared_from_this<OpencensusExporter> {
+class OpencensusExporter : public opencensus::stats::StatsExporter::Handler,
+                           public 
std::enable_shared_from_this<OpencensusExporter> {
 public:
   OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client);
 
-  void exportMetrics(const MetricData& data) override;
+  void ExportViewData(const MetricData& data) override;
 
   static void wrap(const MetricData& data, ExportMetricsServiceRequest& 
request);
 
diff --git a/cpp/src/main/cpp/stats/include/Exporter.h 
b/cpp/src/main/cpp/stats/include/OpencensusHandler.h
similarity index 73%
rename from cpp/src/main/cpp/stats/include/Exporter.h
rename to cpp/src/main/cpp/stats/include/OpencensusHandler.h
index 7f2f24a..5c6ec1f 100644
--- a/cpp/src/main/cpp/stats/include/Exporter.h
+++ b/cpp/src/main/cpp/stats/include/OpencensusHandler.h
@@ -16,15 +16,17 @@
  */
 #pragma once
 
-#include "opencensus/stats/stats.h"
-#include "rocketmq/RocketMQ.h"
+#include "OpencensusExporter.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-class Exporter {
+class OpencensusHandler : public opencensus::stats::StatsExporter::Handler {
 public:
-  virtual void exportMetrics(
-      const std::vector<std::pair<opencensus::stats::ViewDescriptor, 
opencensus::stats::ViewData>>& data) = 0;
+  OpencensusHandler(std::string endpoints, std::weak_ptr<Client> client);
+
+  void ExportViewData(const MetricData& data) override;
+
+  std::shared_ptr<OpencensusExporter> exporter_;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/include/StdoutHandler.h 
b/cpp/src/main/cpp/stats/include/StdoutHandler.h
new file mode 100644
index 0000000..1bfc3dc
--- /dev/null
+++ b/cpp/src/main/cpp/stats/include/StdoutHandler.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <mutex>
+#include <string>
+
+#include "opencensus/stats/stats.h"
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class StdoutHandler : public opencensus::stats::StatsExporter::Handler {
+public:
+  void ExportViewData(
+      const std::vector<std::pair<opencensus::stats::ViewDescriptor, 
opencensus::stats::ViewData>>& data) override;
+
+private:
+  template <typename T>
+  void exportDatum(const opencensus::stats::ViewDescriptor& descriptor,
+                   absl::Time start_time,
+                   absl::Time end_time,
+                   const opencensus::stats::ViewData::DataMap<T>& data) {
+    if (data.empty()) {
+      // std::cout << "No data for " << descriptor.name() << std::endl;
+      return;
+    }
+
+    for (const auto& row : data) {
+      for (std::size_t column = 0; column < descriptor.columns().size(); 
column++) {
+        std::cout << descriptor.name() << "[" << 
descriptor.columns()[column].name() << "=" << row.first[column] << "]"
+                  << dataToString(row.second) << std::endl;
+      }
+    }
+  }
+
+  std::mutex console_mtx_;
+
+  void println(const std::string& line) {
+    std::lock_guard<std::mutex> lk(console_mtx_);
+    std::cout << line << std::endl;
+  }
+
+  // Functions to format data for different aggregation types.
+  std::string dataToString(double data) {
+    return absl::StrCat(": ", data, "\n");
+  }
+  std::string dataToString(int64_t data) {
+    return absl::StrCat(": ", data, "\n");
+  }
+  std::string dataToString(const opencensus::stats::Distribution& data) {
+    std::string output = "\n";
+    std::vector<std::string> lines = absl::StrSplit(data.DebugString(), '\n');
+    // Add indent.
+    for (const auto& line : lines) {
+      absl::StrAppend(&output, "    ", line, "\n");
+    }
+    return output;
+  }
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file

Reply via email to