BewareMyPower commented on code in PR #160:
URL: https://github.com/apache/pulsar-client-cpp/pull/160#discussion_r1108076041


##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,90 @@ TEST(ConsumerTest, 
testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + 
std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = 
"../test-conf//public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = 
"../test-conf//private-key.client-rsa.pem";
+    std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+        
std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, 
PRIVATE_CERT_FILE_PATH);
+
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+    ConsumerConfiguration consConfig1;
+    consConfig1.setCryptoKeyReader(keyReader);
+    consConfig1.setConsumerType(pulsar::ConsumerShared);
+    consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, 
consumer1));
+
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+    consConfig2.setConsumerType(pulsar::ConsumerShared);
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig2.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, 
consumer2));
+    auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+    consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new 
UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), 
static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+    ConsumerConfiguration consConfig3;
+    consConfig3.setConsumerType(pulsar::ConsumerShared);
+    consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig3.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, 
consumer3));
+    auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+    consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new 
UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), 
static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));
+
+    int numberOfMessages = 20;
+    std::string msgContent = "msg-content";
+    Message msg;
+    for (int i = 0; i < numberOfMessages; i++) {
+        std::stringstream stream;
+        stream << msgContent << i;
+        msg = MessageBuilder().setContent(stream.str()).build();

Review Comment:
   We can concentrate an integer and a string by converting the integer to 
string, e.g.
   
   ```c++
            msg = MessageBuilder().setContent(msgContent + 
std::to_string(i)).build();
   ```
   
   The code above could simplify the code.



##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,90 @@ TEST(ConsumerTest, 
testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + 
std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = 
"../test-conf//public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = 
"../test-conf//private-key.client-rsa.pem";

Review Comment:
   ```suggestion
       std::string PUBLIC_CERT_FILE_PATH = 
"../test-conf/public-key.client-rsa.pem";
       std::string PRIVATE_CERT_FILE_PATH = 
"../test-conf/private-key.client-rsa.pem";
   ```



##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,90 @@ TEST(ConsumerTest, 
testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + 
std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = 
"../test-conf//public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = 
"../test-conf//private-key.client-rsa.pem";
+    std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+        
std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, 
PRIVATE_CERT_FILE_PATH);
+
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+    ConsumerConfiguration consConfig1;
+    consConfig1.setCryptoKeyReader(keyReader);
+    consConfig1.setConsumerType(pulsar::ConsumerShared);
+    consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, 
consumer1));
+
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+    consConfig2.setConsumerType(pulsar::ConsumerShared);
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig2.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, 
consumer2));
+    auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+    consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new 
UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), 
static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+    ConsumerConfiguration consConfig3;
+    consConfig3.setConsumerType(pulsar::ConsumerShared);
+    consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig3.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, 
consumer3));
+    auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+    consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new 
UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), 
static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));

Review Comment:
   Do we need another consumer for this test? I think only one consumer is 
enough to verify the case that messages cannot be decrypted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to