This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 92c182a [fix] Redeliver messages that can't be decrypted. (#160)
92c182a is described below
commit 92c182acf7d1212218cb67e315656a23ee32a904
Author: Baodi Shi <[email protected]>
AuthorDate: Wed Feb 22 19:16:48 2023 +0800
[fix] Redeliver messages that can't be decrypted. (#160)
* [fix] Redeliver messages that can't be decrypted.
* Fix unit test.
* Fix code reviews.
* Just send once loop msg
---
lib/ConsumerImpl.cc | 4 +++
lib/ConsumerImpl.h | 1 +
tests/ConsumerTest.cc | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 85 insertions(+)
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index c63147f..1f26a70 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -747,6 +747,8 @@ bool ConsumerImpl::decryptMessageIfNeeded(const
ClientConnectionPtr& cnx, const
} else {
LOG_ERROR(getName() << "Message delivery failed since
CryptoKeyReader is not implemented to "
"consume encrypted message");
+ auto messageId = MessageIdBuilder::from(msg.message_id()).build();
+ unAckedMessageTrackerPtr_->add(messageId);
}
return false;
}
@@ -767,6 +769,8 @@ bool ConsumerImpl::decryptMessageIfNeeded(const
ClientConnectionPtr& cnx, const
discardCorruptedMessage(cnx, msg.message_id(),
CommandAck_ValidationError_DecryptionError);
} else {
LOG_ERROR(getName() << "Message delivery failed since unable to
decrypt incoming message");
+ auto messageId = MessageIdBuilder::from(msg.message_id()).build();
+ unAckedMessageTrackerPtr_->add(messageId);
}
return false;
}
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 66e0cd0..8e09bb6 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -329,6 +329,7 @@ class ConsumerImpl : public ConsumerImplBase {
// these two declared friend to access
setNegativeAcknowledgeEnabledForTesting
friend class MultiTopicsConsumerImpl;
+ FRIEND_TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages);
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index cf4d8f8..944b87b 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -27,6 +27,7 @@
#include <vector>
#include "HttpHelper.h"
+#include "NoOpsCryptoKeyReader.h"
#include "PulsarFriend.h"
#include "lib/ClientConnection.h"
#include "lib/Future.h"
@@ -942,6 +943,85 @@ TEST(ConsumerTest,
testGetLastMessageIdBlockWhenConnectionDisconnected) {
ASSERT_GE(elapsed.seconds(), operationTimeout);
}
+TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+ std::string topicName = "testRedeliveryOfDecryptionFailedMessages" +
std::to_string(time(nullptr));
+ std::string subName = "sub-test";
+
+ std::string PUBLIC_CERT_FILE_PATH =
"../test-conf/public-key.client-rsa.pem";
+ std::string PRIVATE_CERT_FILE_PATH =
"../test-conf/private-key.client-rsa.pem";
+ std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
+
std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH,
PRIVATE_CERT_FILE_PATH);
+
+ ProducerConfiguration conf;
+ conf.setCompressionType(CompressionLZ4);
+ conf.addEncryptionKey("client-rsa.pem");
+ conf.setCryptoKeyReader(keyReader);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+
+ ConsumerConfiguration consConfig1;
+ consConfig1.setCryptoKeyReader(keyReader);
+ consConfig1.setConsumerType(pulsar::ConsumerShared);
+ consConfig1.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+ Consumer consumer1;
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig1,
consumer1));
+
+ ConsumerConfiguration consConfig2;
+ consConfig2.setCryptoKeyReader(std::make_shared<NoOpsCryptoKeyReader>());
+ consConfig2.setConsumerType(pulsar::ConsumerShared);
+ consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+ consConfig2.setUnAckedMessagesTimeoutMs(10000);
+ Consumer consumer2;
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig2,
consumer2));
+ auto consumer2ImplPtr = PulsarFriend::getConsumerImplPtr(consumer2);
+ consumer2ImplPtr->unAckedMessageTrackerPtr_.reset(new
UnAckedMessageTrackerEnabled(
+ 100, 100, PulsarFriend::getClientImplPtr(client),
static_cast<ConsumerImplBase&>(*consumer2ImplPtr)));
+
+ ConsumerConfiguration consConfig3;
+ consConfig3.setConsumerType(pulsar::ConsumerShared);
+ consConfig3.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+ consConfig3.setUnAckedMessagesTimeoutMs(10000);
+ Consumer consumer3;
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consConfig3,
consumer3));
+ auto consumer3ImplPtr = PulsarFriend::getConsumerImplPtr(consumer3);
+ consumer3ImplPtr->unAckedMessageTrackerPtr_.reset(new
UnAckedMessageTrackerEnabled(
+ 100, 100, PulsarFriend::getClientImplPtr(client),
static_cast<ConsumerImplBase&>(*consumer3ImplPtr)));
+
+ int numberOfMessages = 20;
+ std::string msgContent = "msg-content";
+ std::set<std::string> valuesSent;
+ Message msg;
+ for (int i = 0; i < numberOfMessages; i++) {
+ auto value = msgContent + std::to_string(i);
+ valuesSent.emplace(value);
+ msg = MessageBuilder().setContent(value).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ // Consuming from consumer 2 and 3
+ // no message should be returned since they can't decrypt the message
+ ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000));
+ ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000));
+
+ // All messages would be received by consumer 1
+ std::set<std::string> valuesReceived;
+ for (int i = 0; i < numberOfMessages; i++) {
+ ASSERT_EQ(ResultOk, consumer1.receive(msg, 2000));
+ ASSERT_EQ(ResultOk, consumer1.acknowledge(msg));
+ valuesReceived.emplace(msg.getDataAsString());
+ }
+ ASSERT_EQ(valuesSent, valuesReceived);
+
+ // Consuming from consumer 2 and 3 again just to be sure
+ // no message should be returned since they can't decrypt the message
+ ASSERT_EQ(ResultTimeout, consumer2.receive(msg, 1000));
+ ASSERT_EQ(ResultTimeout, consumer3.receive(msg, 1000));
+
+ ASSERT_EQ(ResultOk, client.close());
+}
+
class ConsumerSeekTest : public ::testing::TestWithParam<bool> {
public:
void SetUp() override { producerConf_ =
ProducerConfiguration().setBatchingEnabled(GetParam()); }