shibd commented on code in PR #160:
URL: https://github.com/apache/pulsar-client-cpp/pull/160#discussion_r1113050168
##########
tests/ConsumerTest.cc:
##########
@@ -895,6 +896,91 @@ TEST(ConsumerTest,
testGetLastMessageIdBlockWhenConnectionDisconnected) {
ASSERT_GE(elapsed.seconds(), operationTimeout);
}
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+ std::string topicName = "testRedeliveryOfDecryptionFailedMessages" +
std::to_string(time(nullptr));
+ std::string subName = "sub-test";
+
+ std::string PUBLIC_CERT_FILE_PATH =
"../test-conf/public-key.client-rsa.pem";
+ std::string PRIVATE_CERT_FILE_PATH =
"../test-conf/private-key.client-rsa.pem";
+ std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+
std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH,
PRIVATE_CERT_FILE_PATH);
+
+ ProducerConfiguration conf;
+ conf.setCompressionType(CompressionLZ4);
+ conf.addEncryptionKey("client-rsa.pem");
+ conf.setCryptoKeyReader(keyReader);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+ ConsumerConfiguration consConfig1;
+ consConfig1.setCryptoKeyReader(keyReader);
+ consConfig1.setConsumerType(pulsar::ConsumerShared);
+ consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+ Consumer consumer1;
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1,
consumer1));
+
+ ConsumerConfiguration consConfig2;
+ consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+ consConfig2.setConsumerType(pulsar::ConsumerShared);
+ consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+ consConfig2.setUnAckedMessagesTimeoutMs(10000);
+ Consumer consumer2;
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2,
consumer2));
+ auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+ consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new
UnAckedMessageTrackerEnabled(
+ 100, 100, PulsarFriend::getClientImplPtr(client),
static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+ ConsumerConfiguration consConfig3;
+ consConfig3.setConsumerType(pulsar::ConsumerShared);
+ consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+ consConfig3.setUnAckedMessagesTimeoutMs(10000);
+ Consumer consumer3;
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3,
consumer3));
+ auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+ consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new
UnAckedMessageTrackerEnabled(
+ 100, 100, PulsarFriend::getClientImplPtr(client),
static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));
+
+ int numberOfMessages = 20;
+ std::string msgContent = "msg-content";
+ Message msg;
+ for (int i = 0; i < numberOfMessages; i++) {
+ msg = MessageBuilder().setContent(msgContent +
std::to_string(i)).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ // Consuming from consumer 2 and 3
+ // no message should be returned since they can't decrypt the message
+ ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000));
+ ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000));
+
+ // All messages would be received by consumer 1
+ std::set<std::string> valuesSent;
+ for (int i = 0; i < numberOfMessages; i++) {
+ auto value = msgContent + std::to_string(i);
+ valuesSent.emplace(value);
+ msg = MessageBuilder().setContent(value).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ // All messages would be received by consumer 1
+ std::set<std::string> valuesReceived;
+ for (int i = 0; i < numberOfMessages; i++) {
+ ASSERT_EQ(ResultOk, consumer1.receive(msg, 2000));
+ ASSERT_EQ(ResultOk, consumer1.acknowledge(msg));
+ valuesReceived.emplace(msg.getDataAsString());
+ }
+ ASSERT_EQ(valuesSent, valuesReceived);
+
+ // Consuming from consumer 2 and 3 again just to be sure
+ // no message should be returned since they can't decrypt the message
+ ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000));
+ ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000));
+
+ ASSERT_EQ(ResultOk, client.close());
+}
Review Comment:
When `consumer2` and `consumer3` receive a msg and the decrypted message
failed, it will be added to `unAckedMessageTracker` and redelivery. So, just
`consumer1` will receive all messages.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]