This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c9752435fe7936382ff5037bff115ab3b657fc5c Author: wangyuwei <[email protected]> AuthorDate: Thu Feb 11 11:37:22 2021 +0800 [Issue 9495][c++ client] add 'encrypted' option in commands.newproducer() (#9542) Fixes #9495 ### Modifications pass an option if encrption is enabled in producer when `commands.newProducer` ### Verifying this change redo the sample in issue #9495 (cherry picked from commit f65b29701d7aad3b0a4c8e18c1f9392de1994263) --- pulsar-client-cpp/lib/Commands.cc | 3 +- pulsar-client-cpp/lib/Commands.h | 2 +- pulsar-client-cpp/lib/ProducerImpl.cc | 6 +- pulsar-client-cpp/pulsar-test-service-start.sh | 8 +++ pulsar-client-cpp/tests/BasicEndToEndTest.cc | 79 +++++++++++++------------- 5 files changed, 55 insertions(+), 43 deletions(-) diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 239b9f0..364ea30 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -319,7 +319,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId const std::string& producerName, uint64_t requestId, const std::map<std::string, std::string>& metadata, const SchemaInfo& schemaInfo, uint64_t epoch, - bool userProvidedProducerName) { + bool userProvidedProducerName, bool encrypted) { BaseCommand cmd; cmd.set_type(BaseCommand::PRODUCER); CommandProducer* producer = cmd.mutable_producer(); @@ -328,6 +328,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId producer->set_request_id(requestId); producer->set_epoch(epoch); producer->set_user_provided_producer_name(userProvidedProducerName); + producer->set_encrypted(encrypted); for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end(); it++) { diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h index 18f0049..bbe2a7c 100644 --- a/pulsar-client-cpp/lib/Commands.h +++ b/pulsar-client-cpp/lib/Commands.h @@ -96,7 +96,7 @@ class Commands { const std::string& producerName, uint64_t requestId, const std::map<std::string, std::string>& metadata, const SchemaInfo& schemaInfo, uint64_t epoch, - bool userProvidedProducerName); + bool userProvidedProducerName, bool encrypted); static SharedBuffer newAck(uint64_t consumerId, const proto::MessageIdData& messageId, proto::CommandAck_AckType ackType, int validationError); diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index 232708d..fcd02b5 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -142,9 +142,9 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { ClientImplPtr client = client_.lock(); int requestId = client->newRequestId(); - SharedBuffer cmd = - Commands::newProducer(topic_, producerId_, producerName_, requestId, conf_.getProperties(), - conf_.getSchema(), epoch_, userProvidedProducerName_); + SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId, + conf_.getProperties(), conf_.getSchema(), epoch_, + userProvidedProducerName_, conf_.isEncryptionEnabled()); cnx->sendRequestWithId(cmd, requestId) .addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx, std::placeholders::_1, std::placeholders::_2)); diff --git a/pulsar-client-cpp/pulsar-test-service-start.sh b/pulsar-client-cpp/pulsar-test-service-start.sh index 89b46df..d6da230 100755 --- a/pulsar-client-cpp/pulsar-test-service-start.sh +++ b/pulsar-client-cpp/pulsar-test-service-start.sh @@ -98,6 +98,14 @@ $PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default-3 \ --actions produce,consume \ --role "anonymous" +# Create "public/default-4" with encryption required +$PULSAR_DIR/bin/pulsar-admin namespaces create public/default-4 \ + --clusters standalone +$PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default-4 \ + --actions produce,consume \ + --role "anonymous" +$PULSAR_DIR/bin/pulsar-admin namespaces set-encryption-required public/default-4 -e + # Create "private" tenant $PULSAR_DIR/bin/pulsar-admin tenants create private -r "" -c "standalone" diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index bf9479f..255a563 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -1284,7 +1284,7 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) { TEST(BasicEndToEndTest, testRSAEncryption) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "my-rsaenctopic"; + std::string topicNames[] = {"my-rsaenctopic", "persistent://public/default-4/my-rsaenctopic"}; std::string subName = "my-sub-name"; Producer producer; @@ -1301,48 +1301,51 @@ TEST(BasicEndToEndTest, testRSAEncryption) { conf.addEncryptionKey("client-rsa.pem"); conf.setCryptoKeyReader(keyReader); - Promise<Result, Producer> producerPromise; - client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise)); - Future<Result, Producer> producerFuture = producerPromise.getFuture(); - Result result = producerFuture.get(producer); - ASSERT_EQ(ResultOk, result); + for (const auto &topicName : topicNames) { + Promise<Result, Producer> producerPromise; + client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise)); + Future<Result, Producer> producerFuture = producerPromise.getFuture(); + Result result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); - ConsumerConfiguration consConfig; - consConfig.setCryptoKeyReader(keyReader); - // consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME); + ConsumerConfiguration consConfig; + consConfig.setCryptoKeyReader(keyReader); + // consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME); - Consumer consumer; - Promise<Result, Consumer> consumerPromise; - client.subscribeAsync(topicName, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise)); - Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); - result = consumerFuture.get(consumer); - ASSERT_EQ(ResultOk, result); + Consumer consumer; + Promise<Result, Consumer> consumerPromise; + client.subscribeAsync(topicName, subName, consConfig, + WaitForCallbackValue<Consumer>(consumerPromise)); + Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); + result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); - // Send 1000 messages synchronously - std::string msgContent = "msg-content"; - LOG_INFO("Publishing 1000 messages synchronously"); - int msgNum = 0; - for (; msgNum < 1000; msgNum++) { - std::stringstream stream; - stream << msgContent << msgNum; - Message msg = MessageBuilder().setContent(stream.str()).build(); - ASSERT_EQ(ResultOk, producer.send(msg)); - } + // Send 1000 messages synchronously + std::string msgContent = "msg-content"; + LOG_INFO("Publishing 1000 messages synchronously"); + int msgNum = 0; + for (; msgNum < 1000; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } - LOG_INFO("Trying to receive 1000 messages"); - Message msgReceived; - for (msgNum = 0; msgNum < 1000; msgNum++) { - consumer.receive(msgReceived, 1000); - LOG_DEBUG("Received message :" << msgReceived.getMessageId()); - std::stringstream expected; - expected << msgContent << msgNum; - ASSERT_EQ(expected.str(), msgReceived.getDataAsString()); - ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived)); - } + LOG_INFO("Trying to receive 1000 messages"); + Message msgReceived; + for (msgNum = 0; msgNum < 1000; msgNum++) { + consumer.receive(msgReceived, 1000); + LOG_DEBUG("Received message :" << msgReceived.getMessageId()); + std::stringstream expected; + expected << msgContent << msgNum; + ASSERT_EQ(expected.str(), msgReceived.getDataAsString()); + ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived)); + } - ASSERT_EQ(ResultOk, consumer.unsubscribe()); - ASSERT_EQ(ResultAlreadyClosed, consumer.close()); - ASSERT_EQ(ResultOk, producer.close()); + ASSERT_EQ(ResultOk, consumer.unsubscribe()); + ASSERT_EQ(ResultAlreadyClosed, consumer.close()); + ASSERT_EQ(ResultOk, producer.close()); + } ASSERT_EQ(ResultOk, client.close()); }
