BewareMyPower commented on code in PR #160:
URL: https://github.com/apache/pulsar-client-cpp/pull/160#discussion_r1108087040


##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,90 @@ TEST(ConsumerTest, 
testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + 
std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = 
"../test-conf//public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = 
"../test-conf//private-key.client-rsa.pem";
+    std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+        
std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, 
PRIVATE_CERT_FILE_PATH);
+
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+    ConsumerConfiguration consConfig1;
+    consConfig1.setCryptoKeyReader(keyReader);
+    consConfig1.setConsumerType(pulsar::ConsumerShared);
+    consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, 
consumer1));
+
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+    consConfig2.setConsumerType(pulsar::ConsumerShared);
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig2.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, 
consumer2));
+    auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+    consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new 
UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), 
static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+    ConsumerConfiguration consConfig3;
+    consConfig3.setConsumerType(pulsar::ConsumerShared);
+    consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig3.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, 
consumer3));
+    auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+    consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new 
UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), 
static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));
+
+    int numberOfMessages = 20;
+    std::string msgContent = "msg-content";
+    Message msg;
+    for (int i = 0; i < numberOfMessages; i++) {
+        std::stringstream stream;
+        stream << msgContent << i;
+        msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    // Consuming from consumer 2 and 3
+    // no message should be returned since they can't decrypt the message
+    ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000));
+    ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000));
+
+    // All messages would be received by consumer 1
+    std::unordered_set<std::string> receivedMsgs;
+    for (int i = 0; i < numberOfMessages; i++) {
+        ASSERT_EQ(ResultOk, consumer1.receive(msg, 2000));
+        ASSERT_EQ(ResultOk, consumer1.acknowledge(msg));
+        receivedMsgs.insert(msg.getDataAsString());
+    }
+
+    ASSERT_EQ(receivedMsgs.size(), numberOfMessages);
+    for (int i = 0; i < numberOfMessages; i++) {
+        std::stringstream expected;
+        expected << msgContent << i;
+        ASSERT_TRUE(receivedMsgs.count(expected.str()));

Review Comment:
   It's better not to convert from `size_t` to `bool` immediately, i.e. check 
`xxx.count(key) > 0`.
   
   BTW, in this case, we can store the messages in an ordered set and then just 
compare the sent messages and received messages.
   
   ```c++
       std::set<std::string> valuesSent;
       for (int i = 0; i < numberOfMessages; i++) {
           auto value = msgContent + std::to_string(i);
           valuesSent.emplace(value);
           msg = MessageBuilder().setContent(value).build();
           ASSERT_EQ(ResultOk, producer.send(msg));
       }
   
       /* ... */
   
       // All messages would be received by consumer 1
       std::set<std::string> valuesReceived;
       for (int i = 0; i < numberOfMessages; i++) {
           ASSERT_EQ(ResultOk, consumer1.receive(msg, 2000));
           ASSERT_EQ(ResultOk, consumer1.acknowledge(msg));
           valuesReceived.emplace(msg.getDataAsString());
       }
       ASSERT_EQ(valuesSent, valuesReceived);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to