This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new ba7f59d [feat] Add `onNegativeAcksSend` to the consumer interceptor
(#220)
ba7f59d is described below
commit ba7f59d2bb28b72fa6d1d01abada71090537b45e
Author: Zike Yang <[email protected]>
AuthorDate: Tue Mar 21 19:33:53 2023 +0800
[feat] Add `onNegativeAcksSend` to the consumer interceptor (#220)
### Motivation
This PR is the C++ implementation of
https://github.com/apache/pulsar/pull/3962
### Modifications
* Add `onNegativeAcksSend` to the consumer interceptor
---
include/pulsar/ConsumerInterceptor.h | 12 +++++++
lib/ConsumerImpl.cc | 4 +++
lib/ConsumerImpl.h | 1 +
lib/ConsumerInterceptors.cc | 12 +++++++
lib/ConsumerInterceptors.h | 3 ++
lib/NegativeAcksTracker.cc | 1 +
tests/InterceptorsTest.cc | 68 ++++++++++++++++++++++++++++++++++++
7 files changed, 101 insertions(+)
diff --git a/include/pulsar/ConsumerInterceptor.h
b/include/pulsar/ConsumerInterceptor.h
index 0eecf20..eb3db32 100644
--- a/include/pulsar/ConsumerInterceptor.h
+++ b/include/pulsar/ConsumerInterceptor.h
@@ -24,6 +24,8 @@
#include <pulsar/Result.h>
#include <pulsar/defines.h>
+#include <set>
+
namespace pulsar {
class Consumer;
@@ -101,6 +103,16 @@ class PULSAR_PUBLIC ConsumerInterceptor {
*/
virtual void onAcknowledgeCumulative(const Consumer& consumer, Result
result,
const MessageId& messageID) = 0;
+
+ /**
+ * This method will be called when a redelivery from a negative
acknowledge occurs.
+ *
+ * <p>Any exception thrown by this method will be ignored by the caller.
+ *
+ * @param consumer the consumer which contains the interceptor
+ * @param messageIds the set of message ids to negative ack
+ */
+ virtual void onNegativeAcksSend(const Consumer& consumer, const
std::set<MessageId>& messageIds) = 0;
};
typedef std::shared_ptr<ConsumerInterceptor> ConsumerInterceptorPtr;
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 3393faf..5400e1a 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -200,6 +200,10 @@ void ConsumerImpl::start() {
void ConsumerImpl::beforeConnectionChange(ClientConnection& cnx) {
cnx.removeConsumer(consumerId_); }
+void ConsumerImpl::onNegativeAcksSend(const std::set<MessageId>& messageIds) {
+ interceptors_->onNegativeAcksSend(Consumer(shared_from_this()),
messageIds);
+}
+
void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
if (state_ == Closed) {
LOG_DEBUG(getName() << "connectionOpened : Consumer is already
closed");
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index ed8d0df..158132f 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -136,6 +136,7 @@ class ConsumerImpl : public ConsumerImplBase {
virtual bool isReadCompacted();
void beforeConnectionChange(ClientConnection& cnx) override;
+ void onNegativeAcksSend(const std::set<MessageId>& messageIds);
protected:
// overrided methods from HandlerBase
diff --git a/lib/ConsumerInterceptors.cc b/lib/ConsumerInterceptors.cc
index 3b26175..08625b4 100644
--- a/lib/ConsumerInterceptors.cc
+++ b/lib/ConsumerInterceptors.cc
@@ -64,6 +64,18 @@ void ConsumerInterceptors::onAcknowledgeCumulative(const
Consumer &consumer, Res
}
}
+void ConsumerInterceptors::onNegativeAcksSend(const Consumer &consumer,
+ const std::set<MessageId>
&messageIds) const {
+ for (const ConsumerInterceptorPtr &interceptor : interceptors_) {
+ try {
+ interceptor->onNegativeAcksSend(consumer, messageIds);
+ } catch (const std::exception &e) {
+ LOG_WARN("Error executing interceptor onNegativeAcksSend callback
for topic: "
+ << consumer.getTopic() << ", exception: " << e.what());
+ }
+ }
+}
+
void ConsumerInterceptors::close() {
State state = Ready;
if (!state_.compare_exchange_strong(state, Closing)) {
diff --git a/lib/ConsumerInterceptors.h b/lib/ConsumerInterceptors.h
index d3a768d..c825140 100644
--- a/lib/ConsumerInterceptors.h
+++ b/lib/ConsumerInterceptors.h
@@ -22,6 +22,7 @@
#include <pulsar/ConsumerInterceptor.h>
#include <atomic>
+#include <set>
#include <utility>
#include <vector>
@@ -39,6 +40,8 @@ class ConsumerInterceptors {
void onAcknowledgeCumulative(const Consumer& consumer, Result result,
const MessageId& messageID) const;
+ void onNegativeAcksSend(const Consumer& consumer, const
std::set<MessageId>& messageIds) const;
+
private:
enum State
{
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index 7807808..451a13f 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -80,6 +80,7 @@ void NegativeAcksTracker::handleTimer(const
boost::system::error_code &ec) {
}
if (!messagesToRedeliver.empty()) {
+ consumer_.onNegativeAcksSend(messagesToRedeliver);
consumer_.redeliverUnacknowledgedMessages(messagesToRedeliver);
}
scheduleTimer();
diff --git a/tests/InterceptorsTest.cc b/tests/InterceptorsTest.cc
index 71fdf27..42e2b19 100644
--- a/tests/InterceptorsTest.cc
+++ b/tests/InterceptorsTest.cc
@@ -220,6 +220,8 @@ class ConsumerExceptionInterceptor : public
ConsumerInterceptor {
throw std::runtime_error("expected exception");
}
+ void onNegativeAcksSend(const Consumer& consumer, const
std::set<MessageId>& messageIds) override {}
+
private:
Latch latch_;
};
@@ -260,6 +262,8 @@ class ConsumerTestInterceptor : public ConsumerInterceptor {
latch_.countdown();
}
+ void onNegativeAcksSend(const Consumer& consumer, const
std::set<MessageId>& messageIds) override {}
+
private:
Latch latch_;
std::string key_;
@@ -380,6 +384,70 @@ TEST_P(ConsumerInterceptorsTest,
testConsumerInterceptorWithExceptions) {
ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
}
+class NegativeAcksSendInterceptors : public ConsumerInterceptor {
+ public:
+ explicit NegativeAcksSendInterceptors(Latch& latch) : latch_(latch) {}
+
+ void close() override {}
+
+ Message beforeConsume(const Consumer& consumer, const Message& message)
override { return message; }
+
+ void onAcknowledge(const Consumer& consumer, Result result, const
MessageId& messageID) override {}
+
+ void onAcknowledgeCumulative(const Consumer& consumer, Result result,
+ const MessageId& messageID) override {}
+
+ void onNegativeAcksSend(const Consumer& consumer, const
std::set<MessageId>& messageIds) override {
+ for (auto _ : messageIds) {
+ latch_.countdown();
+ }
+ }
+
+ private:
+ Latch latch_;
+};
+
+TEST_P(ConsumerInterceptorsTest, TestNegativeAcksSend) {
+ int numMsgs = 100;
+ Latch latch(numMsgs / 2);
+
+ Consumer consumer;
+
consumerConf_.intercept({std::make_shared<NegativeAcksSendInterceptors>(latch)});
+ consumerConf_.setNegativeAckRedeliveryDelayMs(100);
+
+ Result result;
+ if (std::get<0>(GetParam()) == Pattern) {
+ result = client_.subscribeWithRegex(topic_, "sub", consumerConf_,
consumer);
+ } else {
+ result = client_.subscribe(topic_, "sub", consumerConf_, consumer);
+ }
+ ASSERT_EQ(result, ResultOk);
+
+ for (int i = 0; i < numMsgs; i++) {
+ Message msg = MessageBuilder().setContent("content").build();
+ if (i % 2) {
+ result = producer1_.send(msg);
+ } else {
+ result = producer2_.send(msg);
+ }
+ ASSERT_EQ(result, ResultOk);
+ }
+
+ Message recvMsg;
+ for (int i = 0; i < numMsgs; i++) {
+ consumer.receive(recvMsg);
+ LOG_INFO("RECEIVE: " << i);
+ if (i % 2) {
+ consumer.acknowledge(recvMsg);
+ } else {
+ consumer.negativeAcknowledge(recvMsg);
+ }
+ }
+
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
+ consumer.close();
+}
+
INSTANTIATE_TEST_CASE_P(Pulsar, ProducerInterceptorsTest,
::testing::Values(true, false));
INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerInterceptorsTest,
testing::Values(