This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new b5c9033  Cpp dev (#72)
b5c9033 is described below

commit b5c9033cb49a1f15cec3bdb5693accc4d2cd7553
Author: Zhanhui Li <[email protected]>
AuthorDate: Tue Jul 26 17:00:22 2022 +0800

    Cpp dev (#72)
    
    * Extract metric service endpoint parsing
    
    * Update examples, demonstrating usage of CredentialsProvider
---
 cpp/examples/ExampleProducer.cpp                   | 14 ++++++-
 cpp/examples/ExampleProducerWithAsync.cpp          | 18 ++++++++-
 cpp/examples/ExampleProducerWithFifoMessage.cpp    | 12 +++++-
 cpp/examples/ExampleProducerWithTimedMessage.cpp   | 12 +++++-
 .../ExampleProducerWithTransactionalMessage.cpp    | 12 +++++-
 cpp/examples/ExamplePushConsumer.cpp               |  8 ++++
 cpp/examples/ExampleSimpleConsumer.cpp             | 12 +++++-
 cpp/source/rocketmq/ClientImpl.cpp                 | 44 ++++++++++++----------
 cpp/source/rocketmq/include/ClientImpl.h           |  2 +
 9 files changed, 109 insertions(+), 25 deletions(-)

diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp
index 6284011..33e10ee 100644
--- a/cpp/examples/ExampleProducer.cpp
+++ b/cpp/examples/ExampleProducer.cpp
@@ -17,11 +17,13 @@
 #include <algorithm>
 #include <atomic>
 #include <iostream>
+#include <memory>
 #include <random>
 #include <string>
 #include <system_error>
 
 #include "gflags/gflags.h"
+#include "rocketmq/CredentialsProvider.h"
 #include "rocketmq/Logger.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/Producer.h"
@@ -53,6 +55,8 @@ DEFINE_string(topic, "standard_topic_sample", "Topic to which 
messages are publi
 DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
 DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -62,8 +66,16 @@ int main(int argc, char* argv[]) {
   logger.setLevel(Level::Debug);
   logger.init();
 
+  CredentialsProviderPtr credentials_provider;
+  if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+    credentials_provider = 
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, 
FLAGS_access_secret);
+  }
+
   auto producer = Producer::newBuilder()
-                      
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+                      .withConfiguration(Configuration::newBuilder()
+                                             .withEndpoints(FLAGS_access_point)
+                                             
.withCredentialsProvider(credentials_provider)
+                                             .build())
                       .build();
 
   std::atomic_bool stopped;
diff --git a/cpp/examples/ExampleProducerWithAsync.cpp 
b/cpp/examples/ExampleProducerWithAsync.cpp
index 0463cdd..c8f9dfb 100644
--- a/cpp/examples/ExampleProducerWithAsync.cpp
+++ b/cpp/examples/ExampleProducerWithAsync.cpp
@@ -54,11 +54,27 @@ DEFINE_string(topic, "lingchu_normal_topic", "Topic to 
which messages are publis
 DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
 DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  auto& logger = getLogger();
+  logger.setConsoleLevel(Level::Debug);
+  logger.setLevel(Level::Debug);
+  logger.init();
+
+  CredentialsProviderPtr credentials_provider;
+  if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+    credentials_provider = 
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, 
FLAGS_access_secret);
+  }
+
   auto producer = Producer::newBuilder()
-                      
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+                      .withConfiguration(Configuration::newBuilder()
+                                             .withEndpoints(FLAGS_access_point)
+                                             
.withCredentialsProvider(credentials_provider)
+                                             .build())
                       .build();
 
   std::atomic_bool stopped;
diff --git a/cpp/examples/ExampleProducerWithFifoMessage.cpp 
b/cpp/examples/ExampleProducerWithFifoMessage.cpp
index e522ad4..52a377b 100644
--- a/cpp/examples/ExampleProducerWithFifoMessage.cpp
+++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp
@@ -52,6 +52,8 @@ DEFINE_string(topic, "fifo_topic_sample", "Topic to which 
messages are published
 DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
 DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -62,8 +64,16 @@ int main(int argc, char* argv[]) {
   logger.setLevel(Level::Debug);
   logger.init();
 
+  CredentialsProviderPtr credentials_provider;
+  if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+    credentials_provider = 
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, 
FLAGS_access_secret);
+  }
+
   auto producer = Producer::newBuilder()
-                      
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+                      .withConfiguration(Configuration::newBuilder()
+                                             .withEndpoints(FLAGS_access_point)
+                                             
.withCredentialsProvider(credentials_provider)
+                                             .build())
                       .build();
 
   std::atomic_bool stopped;
diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp 
b/cpp/examples/ExampleProducerWithTimedMessage.cpp
index c44da05..30d3ea3 100644
--- a/cpp/examples/ExampleProducerWithTimedMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp
@@ -54,6 +54,8 @@ DEFINE_string(topic, "lingchu_normal_topic", "Topic to which 
messages are publis
 DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
 DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -63,8 +65,16 @@ int main(int argc, char* argv[]) {
   logger.setLevel(Level::Debug);
   logger.init();
 
+  CredentialsProviderPtr credentials_provider;
+  if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+    credentials_provider = 
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, 
FLAGS_access_secret);
+  }
+
   auto producer = Producer::newBuilder()
-                      
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+                      .withConfiguration(Configuration::newBuilder()
+                                             .withEndpoints(FLAGS_access_point)
+                                             
.withCredentialsProvider(credentials_provider)
+                                             .build())
                       .build();
 
   std::atomic_bool stopped;
diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp 
b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
index c740533..c220cf7 100644
--- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
@@ -52,6 +52,8 @@ DEFINE_string(topic, "tx_topic_sample", "Topic to which 
messages are published")
 DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
 DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -67,8 +69,16 @@ int main(int argc, char* argv[]) {
     return TransactionState::COMMIT;
   };
 
+  CredentialsProviderPtr credentials_provider;
+  if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+    credentials_provider = 
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, 
FLAGS_access_secret);
+  }
+
   auto producer = Producer::newBuilder()
-                      
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+                      .withConfiguration(Configuration::newBuilder()
+                                             .withEndpoints(FLAGS_access_point)
+                                             
.withCredentialsProvider(credentials_provider)
+                                             .build())
                       .withTransactionChecker(checker)
                       .build();
 
diff --git a/cpp/examples/ExamplePushConsumer.cpp 
b/cpp/examples/ExamplePushConsumer.cpp
index 793436c..a10a1e2 100644
--- a/cpp/examples/ExamplePushConsumer.cpp
+++ b/cpp/examples/ExamplePushConsumer.cpp
@@ -28,6 +28,8 @@ using namespace ROCKETMQ_NAMESPACE;
 DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are 
published");
 DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
 DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through 
your instance management console");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -44,11 +46,17 @@ int main(int argc, char* argv[]) {
     return ConsumeResult::SUCCESS;
   };
 
+  CredentialsProviderPtr credentials_provider;
+  if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+    credentials_provider = 
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, 
FLAGS_access_secret);
+  }
+
   auto push_consumer = PushConsumer::newBuilder()
                            .withGroup(FLAGS_group)
                            .withConfiguration(Configuration::newBuilder()
                                                   
.withEndpoints(FLAGS_access_point)
                                                   
.withRequestTimeout(std::chrono::seconds(3))
+                                                  
.withCredentialsProvider(credentials_provider)
                                                   .build())
                            .withConsumeThreads(4)
                            .withListener(listener)
diff --git a/cpp/examples/ExampleSimpleConsumer.cpp 
b/cpp/examples/ExampleSimpleConsumer.cpp
index 4cbea38..94c8d6c 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -27,6 +27,8 @@ using namespace ROCKETMQ_NAMESPACE;
 DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are 
published");
 DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
 DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through 
your instance management console");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -38,9 +40,17 @@ int main(int argc, char* argv[]) {
 
   std::string tag = "*";
 
+  CredentialsProviderPtr credentials_provider;
+  if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+    credentials_provider = 
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, 
FLAGS_access_secret);
+  }
+
   auto simple_consumer = SimpleConsumer::newBuilder()
                              .withGroup(FLAGS_group)
-                             
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+                             .withConfiguration(Configuration::newBuilder()
+                                                    
.withEndpoints(FLAGS_access_point)
+                                                    
.withCredentialsProvider(credentials_provider)
+                                                    .build())
                              .subscribe(FLAGS_topic, tag)
                              .build();
   std::vector<MessageConstSharedPtr> messages;
diff --git a/cpp/source/rocketmq/ClientImpl.cpp 
b/cpp/source/rocketmq/ClientImpl.cpp
index b90035e..d47133a 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -179,19 +179,35 @@ void ClientImpl::start() {
   telemetry_handle_ = 
client_manager_->getScheduler()->schedule(telemetry_functor, 
TELEMETRY_TASK_NAME,
                                                                 
std::chrono::minutes(5), std::chrono::minutes(5));
 
+  auto&& metric_service_endpoint = metricServiceEndpoint();
+  if (!metric_service_endpoint.empty()) {
+    std::weak_ptr<Client> client_weak_ptr(self());
+#ifdef DEBUG_METRIC_EXPORTING
+    opencensus::stats::StatsExporter::SetInterval(absl::Seconds(1));
+    
opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<StdoutHandler>());
+#else
+    opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1));
+#endif
+    SPDLOG_INFO("Export client metrics to {}", metric_service_endpoint);
+    opencensus::stats::StatsExporter::RegisterPushHandler(
+        absl::make_unique<OpencensusHandler>(metric_service_endpoint, 
client_weak_ptr));
+  }
+}
+
+std::string ClientImpl::metricServiceEndpoint() const {
   auto endpoints = client_config_.metric.endpoints;
-  std::string target;
+  std::string service_endpoint;
   switch (endpoints.scheme()) {
     case rmq::AddressScheme::IPv4: {
-      target.append("ipv4:");
+      service_endpoint.append("ipv4:");
       break;
     }
     case rmq::AddressScheme::IPv6: {
-      target.append("ipv6:");
+      service_endpoint.append("ipv6:");
       break;
     }
     case rmq::AddressScheme::DOMAIN_NAME: {
-      target.append("dns:");
+      service_endpoint.append("dns:");
       break;
     }
     default: {
@@ -202,25 +218,15 @@ void ClientImpl::start() {
   bool first = true;
   for (const auto& address : endpoints.addresses()) {
     if (!first) {
-      target.push_back(',');
+      service_endpoint.push_back(',');
     } else {
       first = false;
     }
-    target.append(address.host());
-    target.push_back(':');
-    target.append(std::to_string(address.port()));
+    service_endpoint.append(address.host());
+    service_endpoint.push_back(':');
+    service_endpoint.append(std::to_string(address.port()));
   }
-
-  std::weak_ptr<Client> client_weak_ptr(self());
-
-#ifdef DEBUG_METRIC_EXPORTING
-  opencensus::stats::StatsExporter::SetInterval(absl::Seconds(1));
-  
opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<StdoutHandler>());
-#else
-  opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1));
-#endif
-  SPDLOG_INFO("Export client metrics to {}", target);
-  
opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<OpencensusHandler>(target,
 client_weak_ptr));
+  return service_endpoint;
 }
 
 void ClientImpl::shutdown() {
diff --git a/cpp/source/rocketmq/include/ClientImpl.h 
b/cpp/source/rocketmq/include/ClientImpl.h
index 2a06a0f..f0b29f7 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -229,6 +229,8 @@ private:
   void doVerify(std::string target, std::string command_id, MessageConstPtr 
message);
 
   static std::string clientId();
+
+  std::string metricServiceEndpoint() const;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file

Reply via email to