This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 92c182a  [fix] Redeliver messages that can't be decrypted. (#160)
92c182a is described below

commit 92c182acf7d1212218cb67e315656a23ee32a904
Author: Baodi Shi <[email protected]>
AuthorDate: Wed Feb 22 19:16:48 2023 +0800

    [fix] Redeliver messages that can't be decrypted. (#160)
    
    * [fix] Redeliver messages that can't be decrypted.
    
    * Fix unit test.
    
    * Fix code reviews.
    
    * Just send once loop msg
---
 lib/ConsumerImpl.cc   |  4 +++
 lib/ConsumerImpl.h    |  1 +
 tests/ConsumerTest.cc | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 85 insertions(+)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index c63147f..1f26a70 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -747,6 +747,8 @@ bool ConsumerImpl::decryptMessageIfNeeded(const 
ClientConnectionPtr& cnx, const
         } else {
             LOG_ERROR(getName() << "Message delivery failed since 
CryptoKeyReader is not implemented to "
                                    "consume encrypted message");
+            auto messageId = MessageIdBuilder::from(msg.message_id()).build();
+            unAckedMessageTrackerPtr_->add(messageId);
         }
         return false;
     }
@@ -767,6 +769,8 @@ bool ConsumerImpl::decryptMessageIfNeeded(const 
ClientConnectionPtr& cnx, const
         discardCorruptedMessage(cnx, msg.message_id(), 
CommandAck_ValidationError_DecryptionError);
     } else {
         LOG_ERROR(getName() << "Message delivery failed since unable to 
decrypt incoming message");
+        auto messageId = MessageIdBuilder::from(msg.message_id()).build();
+        unAckedMessageTrackerPtr_->add(messageId);
     }
     return false;
 }
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 66e0cd0..8e09bb6 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -329,6 +329,7 @@ class ConsumerImpl : public ConsumerImplBase {
     // these two declared friend to access 
setNegativeAcknowledgeEnabledForTesting
     friend class MultiTopicsConsumerImpl;
 
+    FRIEND_TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages);
     FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index cf4d8f8..944b87b 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -27,6 +27,7 @@
 #include <vector>
 
 #include "HttpHelper.h"
+#include "NoOpsCryptoKeyReader.h"
 #include "PulsarFriend.h"
 #include "lib/ClientConnection.h"
 #include "lib/Future.h"
@@ -942,6 +943,85 @@ TEST(ConsumerTest, 
testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "testRedeliveryOfDecryptionFailedMessages" + 
std::to_string(time(nullptr));
+    std::string subName = "sub-test";
+
+    std::string PUBLIC_CERT_FILE_PATH = 
"../test-conf/public-key.client-rsa.pem";
+    std::string PRIVATE_CERT_FILE_PATH = 
"../test-conf/private-key.client-rsa.pem";
+    std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+        
std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, 
PRIVATE_CERT_FILE_PATH);
+
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+    ConsumerConfiguration consConfig1;
+    consConfig1.setCryptoKeyReader(keyReader);
+    consConfig1.setConsumerType(pulsar::ConsumerShared);
+    consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1, 
consumer1));
+
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+    consConfig2.setConsumerType(pulsar::ConsumerShared);
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig2.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2, 
consumer2));
+    auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+    consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new 
UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), 
static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+    ConsumerConfiguration consConfig3;
+    consConfig3.setConsumerType(pulsar::ConsumerShared);
+    consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    consConfig3.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3, 
consumer3));
+    auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+    consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new 
UnAckedMessageTrackerEnabled(
+        100, 100, PulsarFriend::getClientImplPtr(client), 
static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));
+
+    int numberOfMessages = 20;
+    std::string msgContent = "msg-content";
+    std::set<std::string> valuesSent;
+    Message msg;
+    for (int i = 0; i < numberOfMessages; i++) {
+        auto value = msgContent + std::to_string(i);
+        valuesSent.emplace(value);
+        msg = MessageBuilder().setContent(value).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    // Consuming from consumer 2 and 3
+    // no message should be returned since they can't decrypt the message
+    ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000));
+    ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000));
+
+    // All messages would be received by consumer 1
+    std::set<std::string> valuesReceived;
+    for (int i = 0; i < numberOfMessages; i++) {
+        ASSERT_EQ(ResultOk, consumer1.receive(msg, 2000));
+        ASSERT_EQ(ResultOk, consumer1.acknowledge(msg));
+        valuesReceived.emplace(msg.getDataAsString());
+    }
+    ASSERT_EQ(valuesSent, valuesReceived);
+
+    // Consuming from consumer 2 and 3 again just to be sure
+    // no message should be returned since they can't decrypt the message
+    ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000));
+    ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000));
+
+    ASSERT_EQ(ResultOk, client.close());
+}
+
 class ConsumerSeekTest : public ::testing::TestWithParam<bool> {
    public:
     void SetUp() override { producerConf_ = 
ProducerConfiguration().setBatchingEnabled(GetParam()); }

Reply via email to