This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new b5c9033 Cpp dev (#72)
b5c9033 is described below
commit b5c9033cb49a1f15cec3bdb5693accc4d2cd7553
Author: Zhanhui Li <[email protected]>
AuthorDate: Tue Jul 26 17:00:22 2022 +0800
Cpp dev (#72)
* Extract metric service endpoint parsing
* Update examples, demonstrating usage of CredentialsProvider
---
cpp/examples/ExampleProducer.cpp | 14 ++++++-
cpp/examples/ExampleProducerWithAsync.cpp | 18 ++++++++-
cpp/examples/ExampleProducerWithFifoMessage.cpp | 12 +++++-
cpp/examples/ExampleProducerWithTimedMessage.cpp | 12 +++++-
.../ExampleProducerWithTransactionalMessage.cpp | 12 +++++-
cpp/examples/ExamplePushConsumer.cpp | 8 ++++
cpp/examples/ExampleSimpleConsumer.cpp | 12 +++++-
cpp/source/rocketmq/ClientImpl.cpp | 44 ++++++++++++----------
cpp/source/rocketmq/include/ClientImpl.h | 2 +
9 files changed, 109 insertions(+), 25 deletions(-)
diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp
index 6284011..33e10ee 100644
--- a/cpp/examples/ExampleProducer.cpp
+++ b/cpp/examples/ExampleProducer.cpp
@@ -17,11 +17,13 @@
#include <algorithm>
#include <atomic>
#include <iostream>
+#include <memory>
#include <random>
#include <string>
#include <system_error>
#include "gflags/gflags.h"
+#include "rocketmq/CredentialsProvider.h"
#include "rocketmq/Logger.h"
#include "rocketmq/Message.h"
#include "rocketmq/Producer.h"
@@ -53,6 +55,8 @@ DEFINE_string(topic, "standard_topic_sample", "Topic to which
messages are publi
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -62,8 +66,16 @@ int main(int argc, char* argv[]) {
logger.setLevel(Level::Debug);
logger.init();
+ CredentialsProviderPtr credentials_provider;
+ if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+ credentials_provider =
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key,
FLAGS_access_secret);
+ }
+
auto producer = Producer::newBuilder()
-
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+ .withConfiguration(Configuration::newBuilder()
+ .withEndpoints(FLAGS_access_point)
+
.withCredentialsProvider(credentials_provider)
+ .build())
.build();
std::atomic_bool stopped;
diff --git a/cpp/examples/ExampleProducerWithAsync.cpp
b/cpp/examples/ExampleProducerWithAsync.cpp
index 0463cdd..c8f9dfb 100644
--- a/cpp/examples/ExampleProducerWithAsync.cpp
+++ b/cpp/examples/ExampleProducerWithAsync.cpp
@@ -54,11 +54,27 @@ DEFINE_string(topic, "lingchu_normal_topic", "Topic to
which messages are publis
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ auto& logger = getLogger();
+ logger.setConsoleLevel(Level::Debug);
+ logger.setLevel(Level::Debug);
+ logger.init();
+
+ CredentialsProviderPtr credentials_provider;
+ if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+ credentials_provider =
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key,
FLAGS_access_secret);
+ }
+
auto producer = Producer::newBuilder()
-
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+ .withConfiguration(Configuration::newBuilder()
+ .withEndpoints(FLAGS_access_point)
+
.withCredentialsProvider(credentials_provider)
+ .build())
.build();
std::atomic_bool stopped;
diff --git a/cpp/examples/ExampleProducerWithFifoMessage.cpp
b/cpp/examples/ExampleProducerWithFifoMessage.cpp
index e522ad4..52a377b 100644
--- a/cpp/examples/ExampleProducerWithFifoMessage.cpp
+++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp
@@ -52,6 +52,8 @@ DEFINE_string(topic, "fifo_topic_sample", "Topic to which
messages are published
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -62,8 +64,16 @@ int main(int argc, char* argv[]) {
logger.setLevel(Level::Debug);
logger.init();
+ CredentialsProviderPtr credentials_provider;
+ if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+ credentials_provider =
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key,
FLAGS_access_secret);
+ }
+
auto producer = Producer::newBuilder()
-
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+ .withConfiguration(Configuration::newBuilder()
+ .withEndpoints(FLAGS_access_point)
+
.withCredentialsProvider(credentials_provider)
+ .build())
.build();
std::atomic_bool stopped;
diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp
b/cpp/examples/ExampleProducerWithTimedMessage.cpp
index c44da05..30d3ea3 100644
--- a/cpp/examples/ExampleProducerWithTimedMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp
@@ -54,6 +54,8 @@ DEFINE_string(topic, "lingchu_normal_topic", "Topic to which
messages are publis
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -63,8 +65,16 @@ int main(int argc, char* argv[]) {
logger.setLevel(Level::Debug);
logger.init();
+ CredentialsProviderPtr credentials_provider;
+ if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+ credentials_provider =
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key,
FLAGS_access_secret);
+ }
+
auto producer = Producer::newBuilder()
-
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+ .withConfiguration(Configuration::newBuilder()
+ .withEndpoints(FLAGS_access_point)
+
.withCredentialsProvider(credentials_provider)
+ .build())
.build();
std::atomic_bool stopped;
diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
index c740533..c220cf7 100644
--- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
@@ -52,6 +52,8 @@ DEFINE_string(topic, "tx_topic_sample", "Topic to which
messages are published")
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -67,8 +69,16 @@ int main(int argc, char* argv[]) {
return TransactionState::COMMIT;
};
+ CredentialsProviderPtr credentials_provider;
+ if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+ credentials_provider =
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key,
FLAGS_access_secret);
+ }
+
auto producer = Producer::newBuilder()
-
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+ .withConfiguration(Configuration::newBuilder()
+ .withEndpoints(FLAGS_access_point)
+
.withCredentialsProvider(credentials_provider)
+ .build())
.withTransactionChecker(checker)
.build();
diff --git a/cpp/examples/ExamplePushConsumer.cpp
b/cpp/examples/ExamplePushConsumer.cpp
index 793436c..a10a1e2 100644
--- a/cpp/examples/ExamplePushConsumer.cpp
+++ b/cpp/examples/ExamplePushConsumer.cpp
@@ -28,6 +28,8 @@ using namespace ROCKETMQ_NAMESPACE;
DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are
published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through
your instance management console");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -44,11 +46,17 @@ int main(int argc, char* argv[]) {
return ConsumeResult::SUCCESS;
};
+ CredentialsProviderPtr credentials_provider;
+ if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+ credentials_provider =
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key,
FLAGS_access_secret);
+ }
+
auto push_consumer = PushConsumer::newBuilder()
.withGroup(FLAGS_group)
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withRequestTimeout(std::chrono::seconds(3))
+
.withCredentialsProvider(credentials_provider)
.build())
.withConsumeThreads(4)
.withListener(listener)
diff --git a/cpp/examples/ExampleSimpleConsumer.cpp
b/cpp/examples/ExampleSimpleConsumer.cpp
index 4cbea38..94c8d6c 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -27,6 +27,8 @@ using namespace ROCKETMQ_NAMESPACE;
DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are
published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through
your instance management console");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -38,9 +40,17 @@ int main(int argc, char* argv[]) {
std::string tag = "*";
+ CredentialsProviderPtr credentials_provider;
+ if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+ credentials_provider =
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key,
FLAGS_access_secret);
+ }
+
auto simple_consumer = SimpleConsumer::newBuilder()
.withGroup(FLAGS_group)
-
.withConfiguration(Configuration::newBuilder().withEndpoints(FLAGS_access_point).build())
+ .withConfiguration(Configuration::newBuilder()
+
.withEndpoints(FLAGS_access_point)
+
.withCredentialsProvider(credentials_provider)
+ .build())
.subscribe(FLAGS_topic, tag)
.build();
std::vector<MessageConstSharedPtr> messages;
diff --git a/cpp/source/rocketmq/ClientImpl.cpp
b/cpp/source/rocketmq/ClientImpl.cpp
index b90035e..d47133a 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -179,19 +179,35 @@ void ClientImpl::start() {
telemetry_handle_ =
client_manager_->getScheduler()->schedule(telemetry_functor,
TELEMETRY_TASK_NAME,
std::chrono::minutes(5), std::chrono::minutes(5));
+ auto&& metric_service_endpoint = metricServiceEndpoint();
+ if (!metric_service_endpoint.empty()) {
+ std::weak_ptr<Client> client_weak_ptr(self());
+#ifdef DEBUG_METRIC_EXPORTING
+ opencensus::stats::StatsExporter::SetInterval(absl::Seconds(1));
+
opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<StdoutHandler>());
+#else
+ opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1));
+#endif
+ SPDLOG_INFO("Export client metrics to {}", metric_service_endpoint);
+ opencensus::stats::StatsExporter::RegisterPushHandler(
+ absl::make_unique<OpencensusHandler>(metric_service_endpoint,
client_weak_ptr));
+ }
+}
+
+std::string ClientImpl::metricServiceEndpoint() const {
auto endpoints = client_config_.metric.endpoints;
- std::string target;
+ std::string service_endpoint;
switch (endpoints.scheme()) {
case rmq::AddressScheme::IPv4: {
- target.append("ipv4:");
+ service_endpoint.append("ipv4:");
break;
}
case rmq::AddressScheme::IPv6: {
- target.append("ipv6:");
+ service_endpoint.append("ipv6:");
break;
}
case rmq::AddressScheme::DOMAIN_NAME: {
- target.append("dns:");
+ service_endpoint.append("dns:");
break;
}
default: {
@@ -202,25 +218,15 @@ void ClientImpl::start() {
bool first = true;
for (const auto& address : endpoints.addresses()) {
if (!first) {
- target.push_back(',');
+ service_endpoint.push_back(',');
} else {
first = false;
}
- target.append(address.host());
- target.push_back(':');
- target.append(std::to_string(address.port()));
+ service_endpoint.append(address.host());
+ service_endpoint.push_back(':');
+ service_endpoint.append(std::to_string(address.port()));
}
-
- std::weak_ptr<Client> client_weak_ptr(self());
-
-#ifdef DEBUG_METRIC_EXPORTING
- opencensus::stats::StatsExporter::SetInterval(absl::Seconds(1));
-
opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<StdoutHandler>());
-#else
- opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1));
-#endif
- SPDLOG_INFO("Export client metrics to {}", target);
-
opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<OpencensusHandler>(target,
client_weak_ptr));
+ return service_endpoint;
}
void ClientImpl::shutdown() {
diff --git a/cpp/source/rocketmq/include/ClientImpl.h
b/cpp/source/rocketmq/include/ClientImpl.h
index 2a06a0f..f0b29f7 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -229,6 +229,8 @@ private:
void doVerify(std::string target, std::string command_id, MessageConstPtr
message);
static std::string clientId();
+
+ std::string metricServiceEndpoint() const;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file