This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 8b25896d Allow user to disable TLS in cpp. (#542)
8b25896d is described below
commit 8b25896daa22effb6504c04fb411c883b6ab6db0
Author: yulangz <[email protected]>
AuthorDate: Fri Jul 14 10:55:23 2023 +0800
Allow user to disable TLS in cpp. (#542)
* Allow user to disable TLS in cpp.
* rename to withSsl
* Enable ssl switch for push consumer
---------
Co-authored-by: htaowang <[email protected]>
Co-authored-by: Aaron Ai <[email protected]>
---
cpp/examples/ExampleProducer.cpp | 1 +
cpp/examples/ExampleProducerWithAsync.cpp | 1 +
cpp/examples/ExampleProducerWithFifoMessage.cpp | 1 +
cpp/examples/ExampleProducerWithTimedMessage.cpp | 1 +
cpp/examples/ExampleProducerWithTransactionalMessage.cpp | 1 +
cpp/examples/ExamplePushConsumer.cpp | 1 +
cpp/examples/ExampleSimpleConsumer.cpp | 1 +
cpp/include/rocketmq/Configuration.h | 7 +++++++
cpp/source/base/Configuration.cpp | 5 +++++
cpp/source/client/ClientManagerImpl.cpp | 7 ++++---
cpp/source/client/include/ClientConfig.h | 1 +
cpp/source/client/include/ClientManagerImpl.h | 3 ++-
cpp/source/rocketmq/ClientImpl.cpp | 2 +-
cpp/source/rocketmq/Producer.cpp | 1 +
cpp/source/rocketmq/PushConsumer.cpp | 1 +
cpp/source/rocketmq/SimpleConsumer.cpp | 1 +
cpp/source/rocketmq/include/ClientImpl.h | 4 ++++
cpp/source/rocketmq/include/SimpleConsumerImpl.h | 4 ++++
18 files changed, 38 insertions(+), 5 deletions(-)
diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp
index 452b4ce1..ca5fc7d7 100644
--- a/cpp/examples/ExampleProducer.cpp
+++ b/cpp/examples/ExampleProducer.cpp
@@ -77,6 +77,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
+ .withSsl(true)
.build())
.build();
diff --git a/cpp/examples/ExampleProducerWithAsync.cpp
b/cpp/examples/ExampleProducerWithAsync.cpp
index 102af2f4..d8846b49 100644
--- a/cpp/examples/ExampleProducerWithAsync.cpp
+++ b/cpp/examples/ExampleProducerWithAsync.cpp
@@ -116,6 +116,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
+ .withSsl(true)
.build())
.build();
diff --git a/cpp/examples/ExampleProducerWithFifoMessage.cpp
b/cpp/examples/ExampleProducerWithFifoMessage.cpp
index caede189..e8a6f209 100644
--- a/cpp/examples/ExampleProducerWithFifoMessage.cpp
+++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp
@@ -74,6 +74,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
+ .withSsl(true)
.build())
.build();
diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp
b/cpp/examples/ExampleProducerWithTimedMessage.cpp
index 1379cd1b..c4623852 100644
--- a/cpp/examples/ExampleProducerWithTimedMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp
@@ -75,6 +75,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
+ .withSsl(true)
.build())
.build();
diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
index abff6e4f..befb18ca 100644
--- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
@@ -79,6 +79,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
+ .withSsl(true)
.build())
.withTopics({FLAGS_topic})
.withTransactionChecker(checker)
diff --git a/cpp/examples/ExamplePushConsumer.cpp
b/cpp/examples/ExamplePushConsumer.cpp
index 2a3d3fe2..1e20b2ee 100644
--- a/cpp/examples/ExamplePushConsumer.cpp
+++ b/cpp/examples/ExamplePushConsumer.cpp
@@ -58,6 +58,7 @@ int main(int argc, char* argv[]) {
.withEndpoints(FLAGS_access_point)
.withRequestTimeout(std::chrono::seconds(3))
.withCredentialsProvider(credentials_provider)
+ .withSsl(true)
.build())
.withConsumeThreads(4)
.withListener(listener)
diff --git a/cpp/examples/ExampleSimpleConsumer.cpp
b/cpp/examples/ExampleSimpleConsumer.cpp
index 69644282..4c30214f 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -51,6 +51,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
+ .withSsl(true)
.build())
.subscribe(FLAGS_topic, tag)
.build();
diff --git a/cpp/include/rocketmq/Configuration.h
b/cpp/include/rocketmq/Configuration.h
index 90cdf7d2..0037c270 100644
--- a/cpp/include/rocketmq/Configuration.h
+++ b/cpp/include/rocketmq/Configuration.h
@@ -43,6 +43,10 @@ public:
return request_timeout_;
}
+ bool withSsl() const {
+ return withSsl_;
+ }
+
protected:
friend class ConfigurationBuilder;
@@ -52,6 +56,7 @@ private:
std::string endpoints_;
CredentialsProviderPtr credentials_provider_;
std::chrono::milliseconds
request_timeout_{ConfigurationDefaults::RequestTimeout};
+ bool withSsl_ = true;
};
class ConfigurationBuilder {
@@ -62,6 +67,8 @@ public:
ConfigurationBuilder& withRequestTimeout(std::chrono::milliseconds
request_timeout);
+ ConfigurationBuilder& withSsl(bool enable);
+
Configuration build();
private:
diff --git a/cpp/source/base/Configuration.cpp
b/cpp/source/base/Configuration.cpp
index cf0f4bd3..2a136d5d 100644
--- a/cpp/source/base/Configuration.cpp
+++ b/cpp/source/base/Configuration.cpp
@@ -38,6 +38,11 @@ ConfigurationBuilder&
ConfigurationBuilder::withRequestTimeout(std::chrono::mill
return *this;
}
+ConfigurationBuilder& ConfigurationBuilder::withSsl(bool enable) {
+ configuration_.withSsl_ = enable;
+ return *this;
+}
+
Configuration ConfigurationBuilder::build() {
return std::move(configuration_);
}
diff --git a/cpp/source/client/ClientManagerImpl.cpp
b/cpp/source/client/ClientManagerImpl.cpp
index a39bb738..5865dbb2 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -47,10 +47,11 @@
ROCKETMQ_NAMESPACE_BEGIN
-ClientManagerImpl::ClientManagerImpl(std::string resource_namespace)
+ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool
withSsl)
: scheduler_(std::make_shared<SchedulerImpl>()),
resource_namespace_(std::move(resource_namespace)),
state_(State::CREATED),
-
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency()))
{
+
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
+ withSsl_(withSsl){
certificate_verifier_ =
grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
tls_channel_credential_options_.set_verify_server_certs(false);
tls_channel_credential_options_.set_check_call_host(false);
@@ -469,7 +470,7 @@ std::shared_ptr<grpc::Channel>
ClientManagerImpl::createChannel(const std::strin
std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
interceptor_factories;
interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
- target_host, channel_credential_, channel_arguments_,
std::move(interceptor_factories));
+ target_host, withSsl_ ? channel_credential_ :
grpc::InsecureChannelCredentials(), channel_arguments_,
std::move(interceptor_factories));
return channel;
}
diff --git a/cpp/source/client/include/ClientConfig.h
b/cpp/source/client/include/ClientConfig.h
index 58cd1fe7..e0a7fbf6 100644
--- a/cpp/source/client/include/ClientConfig.h
+++ b/cpp/source/client/include/ClientConfig.h
@@ -60,6 +60,7 @@ struct ClientConfig {
PublisherConfig publisher;
SubscriberConfig subscriber;
Metric metric;
+ bool withSsl;
std::unique_ptr<opencensus::trace::Sampler> sampler_;
};
diff --git a/cpp/source/client/include/ClientManagerImpl.h
b/cpp/source/client/include/ClientManagerImpl.h
index 0769769a..653fcad3 100644
--- a/cpp/source/client/include/ClientManagerImpl.h
+++ b/cpp/source/client/include/ClientManagerImpl.h
@@ -54,7 +54,7 @@ public:
* effectively.
* @param resource_namespace Abstract resource namespace, in which this
client manager lives.
*/
- explicit ClientManagerImpl(std::string resource_namespace);
+ explicit ClientManagerImpl(std::string resource_namespace, bool withSsl =
true);
~ClientManagerImpl() override;
@@ -242,6 +242,7 @@ private:
grpc::ChannelArguments channel_arguments_;
bool trace_{false};
+ bool withSsl_;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/ClientImpl.cpp
b/cpp/source/rocketmq/ClientImpl.cpp
index 15518614..e2f401e3 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -113,7 +113,7 @@ void ClientImpl::start() {
client_config_.client_id = clientId();
if (!client_manager_) {
- client_manager_ =
std::make_shared<ClientManagerImpl>(client_config_.resource_namespace);
+ client_manager_ =
std::make_shared<ClientManagerImpl>(client_config_.resource_namespace,
client_config_.withSsl);
}
client_manager_->start();
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 49b9f71c..8620f681 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -83,6 +83,7 @@ ProducerBuilder&
ProducerBuilder::withConfiguration(Configuration configuration)
impl_->withNameServerResolver(std::move(name_server_resolver));
impl_->withCredentialsProvider(configuration.credentialsProvider());
impl_->withRequestTimeout(configuration.requestTimeout());
+ impl_->withSsl(configuration.withSsl());
return *this;
}
diff --git a/cpp/source/rocketmq/PushConsumer.cpp
b/cpp/source/rocketmq/PushConsumer.cpp
index 17ea8ca8..2b1c1566 100644
--- a/cpp/source/rocketmq/PushConsumer.cpp
+++ b/cpp/source/rocketmq/PushConsumer.cpp
@@ -43,6 +43,7 @@ PushConsumer PushConsumerBuilder::build() {
}
impl->consumeThreadPoolSize(consume_thread_);
impl->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints()));
+ impl->withSsl(configuration_.withSsl());
impl->withCredentialsProvider(configuration_.credentialsProvider());
impl->withRequestTimeout(configuration_.requestTimeout());
impl->start();
diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp
b/cpp/source/rocketmq/SimpleConsumer.cpp
index d7e94ae9..a48a0e49 100644
--- a/cpp/source/rocketmq/SimpleConsumer.cpp
+++ b/cpp/source/rocketmq/SimpleConsumer.cpp
@@ -130,6 +130,7 @@ SimpleConsumer SimpleConsumerBuilder::build() {
simple_consumer.impl_->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints()));
simple_consumer.impl_->withCredentialsProvider(configuration_.credentialsProvider());
simple_consumer.impl_->withReceiveMessageTimeout(await_duration_);
+ simple_consumer.impl_->withSsl(configuration_.withSsl());
for (const auto& entry : subscriptions_) {
simple_consumer.impl_->subscribe(entry.first, entry.second);
diff --git a/cpp/source/rocketmq/include/ClientImpl.h
b/cpp/source/rocketmq/include/ClientImpl.h
index e472c590..70dc5382 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -94,6 +94,10 @@ public:
client_config_.request_timeout = absl::FromChrono(request_timeout);
}
+ void withSsl(bool enable) {
+ client_config_.withSsl = enable;
+ }
+
/**
* Expose for test purpose only.
*/
diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
index 7ef3d8e3..a20cce56 100644
--- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
@@ -61,6 +61,10 @@ public:
long_polling_duration_ = receive_timeout;
}
+ void withSsl(bool enable) {
+ client_config_.withSsl = enable;
+ }
+
protected:
void topicsOfInterest(std::vector<std::string> topics) override;