This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new ba7f59d  [feat] Add `onNegativeAcksSend` to the consumer interceptor 
(#220)
ba7f59d is described below

commit ba7f59d2bb28b72fa6d1d01abada71090537b45e
Author: Zike Yang <[email protected]>
AuthorDate: Tue Mar 21 19:33:53 2023 +0800

    [feat] Add `onNegativeAcksSend` to the consumer interceptor (#220)
    
    ### Motivation
    
    This PR is the C++ implementation of 
https://github.com/apache/pulsar/pull/3962
    
    ### Modifications
    
    * Add `onNegativeAcksSend` to the consumer interceptor
---
 include/pulsar/ConsumerInterceptor.h | 12 +++++++
 lib/ConsumerImpl.cc                  |  4 +++
 lib/ConsumerImpl.h                   |  1 +
 lib/ConsumerInterceptors.cc          | 12 +++++++
 lib/ConsumerInterceptors.h           |  3 ++
 lib/NegativeAcksTracker.cc           |  1 +
 tests/InterceptorsTest.cc            | 68 ++++++++++++++++++++++++++++++++++++
 7 files changed, 101 insertions(+)

diff --git a/include/pulsar/ConsumerInterceptor.h 
b/include/pulsar/ConsumerInterceptor.h
index 0eecf20..eb3db32 100644
--- a/include/pulsar/ConsumerInterceptor.h
+++ b/include/pulsar/ConsumerInterceptor.h
@@ -24,6 +24,8 @@
 #include <pulsar/Result.h>
 #include <pulsar/defines.h>
 
+#include <set>
+
 namespace pulsar {
 
 class Consumer;
@@ -101,6 +103,16 @@ class PULSAR_PUBLIC ConsumerInterceptor {
      */
     virtual void onAcknowledgeCumulative(const Consumer& consumer, Result 
result,
                                          const MessageId& messageID) = 0;
+
+    /**
+     * This method will be called when a redelivery from a negative 
acknowledge occurs.
+     *
+     * <p>Any exception thrown by this method will be ignored by the caller.
+     *
+     * @param consumer the consumer which contains the interceptor
+     * @param messageIds the set of message ids to negative ack
+     */
+    virtual void onNegativeAcksSend(const Consumer& consumer, const 
std::set<MessageId>& messageIds) = 0;
 };
 
 typedef std::shared_ptr<ConsumerInterceptor> ConsumerInterceptorPtr;
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 3393faf..5400e1a 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -200,6 +200,10 @@ void ConsumerImpl::start() {
 
 void ConsumerImpl::beforeConnectionChange(ClientConnection& cnx) { 
cnx.removeConsumer(consumerId_); }
 
+void ConsumerImpl::onNegativeAcksSend(const std::set<MessageId>& messageIds) {
+    interceptors_->onNegativeAcksSend(Consumer(shared_from_this()), 
messageIds);
+}
+
 void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     if (state_ == Closed) {
         LOG_DEBUG(getName() << "connectionOpened : Consumer is already 
closed");
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index ed8d0df..158132f 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -136,6 +136,7 @@ class ConsumerImpl : public ConsumerImplBase {
 
     virtual bool isReadCompacted();
     void beforeConnectionChange(ClientConnection& cnx) override;
+    void onNegativeAcksSend(const std::set<MessageId>& messageIds);
 
    protected:
     // overrided methods from HandlerBase
diff --git a/lib/ConsumerInterceptors.cc b/lib/ConsumerInterceptors.cc
index 3b26175..08625b4 100644
--- a/lib/ConsumerInterceptors.cc
+++ b/lib/ConsumerInterceptors.cc
@@ -64,6 +64,18 @@ void ConsumerInterceptors::onAcknowledgeCumulative(const 
Consumer &consumer, Res
     }
 }
 
+void ConsumerInterceptors::onNegativeAcksSend(const Consumer &consumer,
+                                              const std::set<MessageId> 
&messageIds) const {
+    for (const ConsumerInterceptorPtr &interceptor : interceptors_) {
+        try {
+            interceptor->onNegativeAcksSend(consumer, messageIds);
+        } catch (const std::exception &e) {
+            LOG_WARN("Error executing interceptor onNegativeAcksSend callback 
for topic: "
+                     << consumer.getTopic() << ", exception: " << e.what());
+        }
+    }
+}
+
 void ConsumerInterceptors::close() {
     State state = Ready;
     if (!state_.compare_exchange_strong(state, Closing)) {
diff --git a/lib/ConsumerInterceptors.h b/lib/ConsumerInterceptors.h
index d3a768d..c825140 100644
--- a/lib/ConsumerInterceptors.h
+++ b/lib/ConsumerInterceptors.h
@@ -22,6 +22,7 @@
 #include <pulsar/ConsumerInterceptor.h>
 
 #include <atomic>
+#include <set>
 #include <utility>
 #include <vector>
 
@@ -39,6 +40,8 @@ class ConsumerInterceptors {
 
     void onAcknowledgeCumulative(const Consumer& consumer, Result result, 
const MessageId& messageID) const;
 
+    void onNegativeAcksSend(const Consumer& consumer, const 
std::set<MessageId>& messageIds) const;
+
    private:
     enum State
     {
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index 7807808..451a13f 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -80,6 +80,7 @@ void NegativeAcksTracker::handleTimer(const 
boost::system::error_code &ec) {
     }
 
     if (!messagesToRedeliver.empty()) {
+        consumer_.onNegativeAcksSend(messagesToRedeliver);
         consumer_.redeliverUnacknowledgedMessages(messagesToRedeliver);
     }
     scheduleTimer();
diff --git a/tests/InterceptorsTest.cc b/tests/InterceptorsTest.cc
index 71fdf27..42e2b19 100644
--- a/tests/InterceptorsTest.cc
+++ b/tests/InterceptorsTest.cc
@@ -220,6 +220,8 @@ class ConsumerExceptionInterceptor : public 
ConsumerInterceptor {
         throw std::runtime_error("expected exception");
     }
 
+    void onNegativeAcksSend(const Consumer& consumer, const 
std::set<MessageId>& messageIds) override {}
+
    private:
     Latch latch_;
 };
@@ -260,6 +262,8 @@ class ConsumerTestInterceptor : public ConsumerInterceptor {
         latch_.countdown();
     }
 
+    void onNegativeAcksSend(const Consumer& consumer, const 
std::set<MessageId>& messageIds) override {}
+
    private:
     Latch latch_;
     std::string key_;
@@ -380,6 +384,70 @@ TEST_P(ConsumerInterceptorsTest, 
testConsumerInterceptorWithExceptions) {
     ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
 }
 
+class NegativeAcksSendInterceptors : public ConsumerInterceptor {
+   public:
+    explicit NegativeAcksSendInterceptors(Latch& latch) : latch_(latch) {}
+
+    void close() override {}
+
+    Message beforeConsume(const Consumer& consumer, const Message& message) 
override { return message; }
+
+    void onAcknowledge(const Consumer& consumer, Result result, const 
MessageId& messageID) override {}
+
+    void onAcknowledgeCumulative(const Consumer& consumer, Result result,
+                                 const MessageId& messageID) override {}
+
+    void onNegativeAcksSend(const Consumer& consumer, const 
std::set<MessageId>& messageIds) override {
+        for (auto _ : messageIds) {
+            latch_.countdown();
+        }
+    }
+
+   private:
+    Latch latch_;
+};
+
+TEST_P(ConsumerInterceptorsTest, TestNegativeAcksSend) {
+    int numMsgs = 100;
+    Latch latch(numMsgs / 2);
+
+    Consumer consumer;
+    
consumerConf_.intercept({std::make_shared<NegativeAcksSendInterceptors>(latch)});
+    consumerConf_.setNegativeAckRedeliveryDelayMs(100);
+
+    Result result;
+    if (std::get<0>(GetParam()) == Pattern) {
+        result = client_.subscribeWithRegex(topic_, "sub", consumerConf_, 
consumer);
+    } else {
+        result = client_.subscribe(topic_, "sub", consumerConf_, consumer);
+    }
+    ASSERT_EQ(result, ResultOk);
+
+    for (int i = 0; i < numMsgs; i++) {
+        Message msg = MessageBuilder().setContent("content").build();
+        if (i % 2) {
+            result = producer1_.send(msg);
+        } else {
+            result = producer2_.send(msg);
+        }
+        ASSERT_EQ(result, ResultOk);
+    }
+
+    Message recvMsg;
+    for (int i = 0; i < numMsgs; i++) {
+        consumer.receive(recvMsg);
+        LOG_INFO("RECEIVE: " << i);
+        if (i % 2) {
+            consumer.acknowledge(recvMsg);
+        } else {
+            consumer.negativeAcknowledge(recvMsg);
+        }
+    }
+
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
+    consumer.close();
+}
+
 INSTANTIATE_TEST_CASE_P(Pulsar, ProducerInterceptorsTest, 
::testing::Values(true, false));
 INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerInterceptorsTest,
                         testing::Values(

Reply via email to