This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 998c73d  [feat] Add consumer interceptor  (#210)
998c73d is described below

commit 998c73df164f7e08a5e4e4b33f9d979db4518e07
Author: Zike Yang <z...@apache.org>
AuthorDate: Thu Mar 16 17:10:54 2023 +0800

    [feat] Add consumer interceptor  (#210)
---
 include/pulsar/ConsumerConfiguration.h |  11 ++
 include/pulsar/ConsumerInterceptor.h   | 109 +++++++++++++++
 include/pulsar/ProducerInterceptor.h   |   8 +-
 lib/ClientImpl.cc                      |  22 ++-
 lib/ConsumerConfiguration.cc           |  10 ++
 lib/ConsumerConfigurationImpl.h        |   1 +
 lib/ConsumerImpl.cc                    |  27 +++-
 lib/ConsumerImpl.h                     |   5 +-
 lib/ConsumerInterceptors.cc            |  82 ++++++++++++
 lib/ConsumerInterceptors.h             |  54 ++++++++
 lib/MultiTopicsConsumerImpl.cc         |  29 ++--
 lib/MultiTopicsConsumerImpl.h          |   5 +-
 lib/PatternMultiTopicsConsumerImpl.cc  |  12 +-
 lib/PatternMultiTopicsConsumerImpl.h   |   3 +-
 lib/ReaderImpl.cc                      |   5 +-
 tests/InterceptorsTest.cc              | 238 ++++++++++++++++++++++++++++++---
 16 files changed, 566 insertions(+), 55 deletions(-)

diff --git a/include/pulsar/ConsumerConfiguration.h 
b/include/pulsar/ConsumerConfiguration.h
index 1977415..ee0c634 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -21,6 +21,7 @@
 
 #include <pulsar/ConsumerCryptoFailureAction.h>
 #include <pulsar/ConsumerEventListener.h>
+#include <pulsar/ConsumerInterceptor.h>
 #include <pulsar/ConsumerType.h>
 #include <pulsar/CryptoKeyReader.h>
 #include <pulsar/InitialPosition.h>
@@ -618,6 +619,16 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     bool isBatchIndexAckEnabled() const;
 
+    /**
+     * Intercept the consumer
+     *
+     * @param interceptors the list of interceptors to intercept the consumer
+     * @return Consumer Configuration
+     */
+    ConsumerConfiguration& intercept(const 
std::vector<ConsumerInterceptorPtr>& interceptors);
+
+    const std::vector<ConsumerInterceptorPtr>& getInterceptors() const;
+
     friend class PulsarWrapper;
     friend class PulsarFriend;
 
diff --git a/include/pulsar/ConsumerInterceptor.h 
b/include/pulsar/ConsumerInterceptor.h
new file mode 100644
index 0000000..0eecf20
--- /dev/null
+++ b/include/pulsar/ConsumerInterceptor.h
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PULSAR_CPP_CONSUMER_INTERCEPTOR_H
+#define PULSAR_CPP_CONSUMER_INTERCEPTOR_H
+
+#include <pulsar/Message.h>
+#include <pulsar/Result.h>
+#include <pulsar/defines.h>
+
+namespace pulsar {
+
+class Consumer;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate)
+ * messages received by the consumer.
+ *
+ * <p>A primary use case is to hook into consumer applications for custom
+ * monitoring, logging, etc.
+ *
+ * <p>Exceptions thrown by interceptor methods will be caught, logged, but
+ * not propagated further.
+ */
+class PULSAR_PUBLIC ConsumerInterceptor {
+   public:
+    virtual ~ConsumerInterceptor() {}
+    /**
+     * Close the interceptor
+     */
+    virtual void close() {}
+
+    /**
+     * This is called just before the message is consumed.
+     *
+     * <p>This method is allowed to modify message, in which case the new 
message
+     * will be returned.
+     *
+     * <p>Any exception thrown by this method will be caught by the caller, 
logged,
+     * but not propagated to client.
+     *
+     * <p>Since the consumer may run multiple interceptors, a particular
+     * interceptor's <tt>beforeConsume</tt> callback will be called in the 
order.
+     * The first interceptor in the list gets the consumed message, the 
following interceptor will be passed
+     * the message returned by the previous interceptor, and so on. Since
+     * interceptors are allowed to modify message, interceptors may potentially
+     * get the messages already modified by other interceptors. However 
building a
+     * pipeline of mutable interceptors that depend on the output of the 
previous interceptor is
+     * discouraged, because of potential side-effects caused by interceptors
+     * potentially failing to modify the message and throwing an exception.
+     * if one of interceptors in the list throws an exception from
+     * <tt>beforeConsume</tt>, the exception is caught, logged,
+     * and the next interceptor is called with the message returned by the last
+     * successful interceptor in the list, or otherwise the original consumed
+     * message.
+     *
+     * @param consumer the consumer which contains the interceptor
+     * @param message the message to be consumed by the client
+     * @return message that is either modified by the interceptor or same 
message
+     *         passed into the method.
+     */
+    virtual Message beforeConsume(const Consumer& consumer, const Message& 
message) = 0;
+
+    /**
+     *
+     * This is called before consumer sends the acknowledgment to the broker.
+     *
+     * <p>Any exception thrown by this method will be ignored by the caller.
+     *
+     * @param consumer the consumer which contains the interceptor
+     * @param result the result of the acknowledgement
+     * @param messageID the message id to be acknowledged
+     */
+    virtual void onAcknowledge(const Consumer& consumer, Result result, const 
MessageId& messageID) = 0;
+
+    /**
+     *
+     * This is called before consumer sends the cumulative acknowledgment to 
the broker.
+     *
+     * <p>Any exception thrown by this method will be ignored by the caller.
+     *
+     * @param consumer the consumer which contains the interceptor
+     * @param result the result of the cumulative acknowledgement
+     * @param messageID the message id to be acknowledged cumulatively
+     */
+    virtual void onAcknowledgeCumulative(const Consumer& consumer, Result 
result,
+                                         const MessageId& messageID) = 0;
+};
+
+typedef std::shared_ptr<ConsumerInterceptor> ConsumerInterceptorPtr;
+}  // namespace pulsar
+
+#endif  // PULSAR_CPP_CONSUMER_INTERCEPTOR_H
diff --git a/include/pulsar/ProducerInterceptor.h 
b/include/pulsar/ProducerInterceptor.h
index 45f55b5..0f40ce4 100644
--- a/include/pulsar/ProducerInterceptor.h
+++ b/include/pulsar/ProducerInterceptor.h
@@ -24,6 +24,10 @@
 #include <pulsar/Result.h>
 #include <pulsar/defines.h>
 
+namespace pulsar {
+
+class Producer;
+
 /**
  * An interface that allows you to intercept (and possibly mutate) the
  * messages received by the producer before they are published to the Pulsar
@@ -35,10 +39,6 @@
  * <p>ProducerInterceptor callbacks may be called from multiple threads. 
Interceptor
  * implementation must ensure thread-safety, if needed.
  */
-namespace pulsar {
-
-class Producer;
-
 class PULSAR_PUBLIC ProducerInterceptor {
    public:
     virtual ~ProducerInterceptor() {}
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index a9bca11..08e4a10 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -26,6 +26,7 @@
 #include "ClientConfigurationImpl.h"
 #include "Commands.h"
 #include "ConsumerImpl.h"
+#include "ConsumerInterceptors.h"
 #include "ExecutorService.h"
 #include "HTTPLookupService.h"
 #include "LogUtils.h"
@@ -358,8 +359,11 @@ void ClientImpl::createPatternMultiTopicsConsumer(const 
Result result, const Nam
         NamespaceTopicsPtr matchTopics =
             PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, 
pattern);
 
-        consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(
-            shared_from_this(), regexPattern, *matchTopics, subscriptionName, 
conf, lookupServicePtr_);
+        auto interceptors = 
std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
+
+        consumer = 
std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(), 
regexPattern,
+                                                                    
*matchTopics, subscriptionName, conf,
+                                                                    
lookupServicePtr_, interceptors);
 
         consumer->getConsumerCreatedFuture().addListener(
             std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), 
std::placeholders::_1,
@@ -396,8 +400,10 @@ void ClientImpl::subscribeAsync(const 
std::vector<std::string>& topics, const st
         topicNamePtr = TopicName::get(consumerTopicNameStream.str());
     }
 
+    auto interceptors = 
std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
+
     ConsumerImplBasePtr consumer = std::make_shared<MultiTopicsConsumerImpl>(
-        shared_from_this(), topics, subscriptionName, topicNamePtr, conf, 
lookupServicePtr_);
+        shared_from_this(), topics, subscriptionName, topicNamePtr, conf, 
lookupServicePtr_, interceptors);
 
     
consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated,
                                                                
shared_from_this(), std::placeholders::_1,
@@ -441,6 +447,8 @@ void ClientImpl::handleSubscribe(const Result result, const 
LookupDataResultPtr
             conf.setConsumerName(generateRandomName());
         }
         ConsumerImplBasePtr consumer;
+        auto interceptors = 
std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
+
         try {
             if (partitionMetadata->getPartitions() > 0) {
                 if (conf.getReceiverQueueSize() == 0) {
@@ -450,11 +458,11 @@ void ClientImpl::handleSubscribe(const Result result, 
const LookupDataResultPtr
                 }
                 consumer = std::make_shared<MultiTopicsConsumerImpl>(
                     shared_from_this(), topicName, 
partitionMetadata->getPartitions(), subscriptionName, conf,
-                    lookupServicePtr_);
+                    lookupServicePtr_, interceptors);
             } else {
-                auto consumerImpl =
-                    std::make_shared<ConsumerImpl>(shared_from_this(), 
topicName->toString(),
-                                                   subscriptionName, conf, 
topicName->isPersistent());
+                auto consumerImpl = 
std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(),
+                                                                   
subscriptionName, conf,
+                                                                   
topicName->isPersistent(), interceptors);
                 
consumerImpl->setPartitionIndex(topicName->getPartitionIndex());
                 consumer = consumerImpl;
             }
diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc
index 800be57..1497a2d 100644
--- a/lib/ConsumerConfiguration.cc
+++ b/lib/ConsumerConfiguration.cc
@@ -300,4 +300,14 @@ void ConsumerConfiguration::setDeadLetterPolicy(const 
DeadLetterPolicy& deadLett
 
 const DeadLetterPolicy& ConsumerConfiguration::getDeadLetterPolicy() const { 
return impl_->deadLetterPolicy; }
 
+ConsumerConfiguration& ConsumerConfiguration::intercept(
+    const std::vector<ConsumerInterceptorPtr>& interceptors) {
+    impl_->interceptors.insert(impl_->interceptors.end(), 
interceptors.begin(), interceptors.end());
+    return *this;
+}
+
+const std::vector<ConsumerInterceptorPtr>& 
ConsumerConfiguration::getInterceptors() const {
+    return impl_->interceptors;
+}
+
 }  // namespace pulsar
diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h
index 8b64b0d..4ada424 100644
--- a/lib/ConsumerConfigurationImpl.h
+++ b/lib/ConsumerConfigurationImpl.h
@@ -58,6 +58,7 @@ struct ConsumerConfigurationImpl {
     bool startMessageIdInclusive{false};
     long expireTimeOfIncompleteChunkedMessageMs{60000};
     bool batchIndexAckEnabled{false};
+    std::vector<ConsumerInterceptorPtr> interceptors;
 };
 }  // namespace pulsar
 #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 996ffbf..583cbcc 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -56,7 +56,7 @@ DECLARE_LOG_OBJECT()
 
 ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& 
topic,
                            const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
-                           bool isPersistent,
+                           bool isPersistent, const ConsumerInterceptorsPtr& 
interceptors,
                            const ExecutorServicePtr listenerExecutor /* = NULL 
by default */,
                            bool hasParent /* = false by default */,
                            const ConsumerTopicType consumerTopicType /* = 
NonPartitioned by default */,
@@ -90,7 +90,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const 
std::string& topic,
       startMessageId_(startMessageId),
       maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
       
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
-      
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs())
 {
+      
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()),
+      interceptors_(interceptors) {
     std::stringstream consumerStrStream;
     consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << 
consumerId_ << "] ";
     consumerStr_ = consumerStrStream.str();
@@ -646,7 +647,8 @@ void ConsumerImpl::notifyBatchPendingReceivedCallback(const 
BatchReceiveCallback
     Message peekMsg;
     while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && 
messages->canAdd(peekMsg)) {
         messageProcessed(peekMsg);
-        messages->add(peekMsg);
+        Message interceptMsg = 
interceptors_->beforeConsume(Consumer(shared_from_this()), peekMsg);
+        messages->add(interceptMsg);
     }
     auto self = get_shared_this_ptr();
     listenerExecutor_->postWork(
@@ -657,6 +659,7 @@ void ConsumerImpl::notifyPendingReceivedCallback(Result 
result, Message& msg,
                                                  const ReceiveCallback& 
callback) {
     if (result == ResultOk && config_.getReceiverQueueSize() != 0) {
         messageProcessed(msg);
+        msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg);
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
     }
     callback(result, msg);
@@ -838,7 +841,8 @@ void ConsumerImpl::internalListener() {
         consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
         lastDequedMessageId_ = msg.getMessageId();
         Consumer consumer{get_shared_this_ptr()};
-        messageListener_(consumer, msg);
+        Message interceptMsg = 
interceptors_->beforeConsume(Consumer(shared_from_this()), msg);
+        messageListener_(consumer, interceptMsg);
     } catch (const std::exception& e) {
         LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
     }
@@ -879,6 +883,9 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& 
msg) {
             if (msg.impl_->cnx_ == currentCnx.get()) {
                 waitingForZeroQueueSizeMessage = false;
                 // Can't use break here else it may trigger a race with 
connection opened.
+
+                localLock.unlock();
+                msg = 
interceptors_->beforeConsume(Consumer(shared_from_this()), msg);
                 return ResultOk;
             }
         }
@@ -904,6 +911,7 @@ void ConsumerImpl::receiveAsync(ReceiveCallback callback) {
     if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
         lock.unlock();
         messageProcessed(msg);
+        msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg);
         callback(ResultOk, msg);
     } else {
         pendingReceives_.push(callback);
@@ -934,6 +942,7 @@ Result ConsumerImpl::receiveHelper(Message& msg) {
     }
 
     messageProcessed(msg);
+    msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg);
     return ResultOk;
 }
 
@@ -960,6 +969,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int 
timeout) {
 
     if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) {
         messageProcessed(msg);
+        msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg);
         return ResultOk;
     } else {
         if (state_ != Ready) {
@@ -1076,6 +1086,7 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& 
msgId, ResultCallback callb
     if (readyToAck) {
         ackGroupingTrackerPtr_->addAcknowledge(msgIdToAck);
     }
+    interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultOk, 
msgId);
     if (callback) {
         callback(ResultOk);
     }
@@ -1083,6 +1094,7 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& 
msgId, ResultCallback callb
 
 void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, 
ResultCallback callback) {
     MessageIdList messageIdListToAck;
+    // TODO: Need to check if the consumer is ready. Same to all other public 
methods
     for (auto&& msgId : messageIdList) {
         auto pair = prepareIndividualAck(msgId);
         const auto& msgIdToAck = pair.first;
@@ -1090,6 +1102,9 @@ void ConsumerImpl::acknowledgeAsync(const MessageIdList& 
messageIdList, ResultCa
         if (readyToAck) {
             messageIdListToAck.emplace_back(msgIdToAck);
         }
+        // Invoking `onAcknowledge` for all message ids no matter if it's 
ready to ack. This is consistent
+        // with the Java client.
+        interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultOk, 
msgId);
     }
     this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdListToAck);
     if (callback) {
@@ -1099,6 +1114,8 @@ void ConsumerImpl::acknowledgeAsync(const MessageIdList& 
messageIdList, ResultCa
 
 void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, 
ResultCallback callback) {
     if (!isCumulativeAcknowledgementAllowed(config_.getConsumerType())) {
+        interceptors_->onAcknowledgeCumulative(Consumer(shared_from_this()),
+                                               
ResultCumulativeAcknowledgementNotAllowedError, msgId);
         if (callback) {
             callback(ResultCumulativeAcknowledgementNotAllowedError);
         }
@@ -1112,6 +1129,7 @@ void ConsumerImpl::acknowledgeCumulativeAsync(const 
MessageId& msgId, ResultCall
         unAckedMessageTrackerPtr_->removeMessagesTill(msgIdToAck);
         ackGroupingTrackerPtr_->addAcknowledgeCumulative(msgIdToAck);
     }
+    interceptors_->onAcknowledgeCumulative(Consumer(shared_from_this()), 
ResultOk, msgId);
     if (callback) {
         callback(ResultOk);
     }
@@ -1226,6 +1244,7 @@ void ConsumerImpl::shutdown() {
     incomingMessages_.clear();
     possibleSendToDeadLetterTopicMessages_.clear();
     resetCnx();
+    interceptors_->close();
     auto client = client_.lock();
     if (client) {
         client->cleanupConsumer(this);
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 8e09bb6..ed8d0df 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -30,6 +30,7 @@
 #include "Commands.h"
 #include "CompressionCodec.h"
 #include "ConsumerImplBase.h"
+#include "ConsumerInterceptors.h"
 #include "MapCache.h"
 #include "MessageIdImpl.h"
 #include "NegativeAcksTracker.h"
@@ -75,7 +76,7 @@ const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ";
 class ConsumerImpl : public ConsumerImplBase {
    public:
     ConsumerImpl(const ClientImplPtr client, const std::string& topic, const 
std::string& subscriptionName,
-                 const ConsumerConfiguration&, bool isPersistent,
+                 const ConsumerConfiguration&, bool isPersistent, const 
ConsumerInterceptorsPtr& interceptors,
                  const ExecutorServicePtr listenerExecutor = 
ExecutorServicePtr(), bool hasParent = false,
                  const ConsumerTopicType consumerTopicType = NonPartitioned,
                  Commands::SubscriptionMode = 
Commands::SubscriptionModeDurable,
@@ -303,6 +304,8 @@ class ConsumerImpl : public ConsumerImplBase {
     DeadlineTimerPtr checkExpiredChunkedTimer_;
     std::atomic_bool expireChunkMessageTaskScheduled_{false};
 
+    ConsumerInterceptorsPtr interceptors_;
+
     void triggerCheckExpiredChunkedTimer();
     void discardChunkMessages(std::string uuid, MessageId messageId, bool 
autoAck);
 
diff --git a/lib/ConsumerInterceptors.cc b/lib/ConsumerInterceptors.cc
new file mode 100644
index 0000000..3b26175
--- /dev/null
+++ b/lib/ConsumerInterceptors.cc
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "ConsumerInterceptors.h"
+
+#include <pulsar/Consumer.h>
+
+#include "LogUtils.h"
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+Message ConsumerInterceptors::beforeConsume(const Consumer &consumer, const 
Message &message) const {
+    Message interceptorMessage = message;
+    for (const ConsumerInterceptorPtr &interceptor : interceptors_) {
+        try {
+            interceptorMessage = interceptor->beforeConsume(consumer, 
interceptorMessage);
+        } catch (const std::exception &e) {
+            LOG_WARN("Error executing interceptor beforeConsume callback for 
topic: "
+                     << consumer.getTopic() << ", exception: " << e.what());
+        }
+    }
+    return interceptorMessage;
+}
+
+void ConsumerInterceptors::onAcknowledge(const Consumer &consumer, Result 
result,
+                                         const MessageId &messageID) const {
+    for (const ConsumerInterceptorPtr &interceptor : interceptors_) {
+        try {
+            interceptor->onAcknowledge(consumer, result, messageID);
+        } catch (const std::exception &e) {
+            LOG_WARN("Error executing interceptor onAcknowledge callback for 
topic: "
+                     << consumer.getTopic() << ", exception: " << e.what());
+        }
+    }
+}
+
+void ConsumerInterceptors::onAcknowledgeCumulative(const Consumer &consumer, 
Result result,
+                                                   const MessageId &messageID) 
const {
+    for (const ConsumerInterceptorPtr &interceptor : interceptors_) {
+        try {
+            interceptor->onAcknowledgeCumulative(consumer, result, messageID);
+        } catch (const std::exception &e) {
+            LOG_WARN("Error executing interceptor onAcknowledge callback for 
topic: "
+                     << consumer.getTopic() << ", exception: " << e.what());
+        }
+    }
+}
+
+void ConsumerInterceptors::close() {
+    State state = Ready;
+    if (!state_.compare_exchange_strong(state, Closing)) {
+        return;
+    }
+    for (const ConsumerInterceptorPtr &interceptor : interceptors_) {
+        try {
+            interceptor->close();
+        } catch (const std::exception &e) {
+            LOG_WARN("Failed to close consumer interceptor: " << e.what());
+        }
+    }
+    state_ = Closed;
+}
+
+}  // namespace pulsar
diff --git a/lib/ConsumerInterceptors.h b/lib/ConsumerInterceptors.h
new file mode 100644
index 0000000..d3a768d
--- /dev/null
+++ b/lib/ConsumerInterceptors.h
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/ConsumerInterceptor.h>
+
+#include <atomic>
+#include <utility>
+#include <vector>
+
+namespace pulsar {
+class ConsumerInterceptors {
+   public:
+    explicit ConsumerInterceptors(std::vector<ConsumerInterceptorPtr> 
interceptors)
+        : interceptors_(std::move(interceptors)) {}
+
+    void close();
+
+    Message beforeConsume(const Consumer& consumer, const Message& message) 
const;
+
+    void onAcknowledge(const Consumer& consumer, Result result, const 
MessageId& messageID) const;
+
+    void onAcknowledgeCumulative(const Consumer& consumer, Result result, 
const MessageId& messageID) const;
+
+   private:
+    enum State
+    {
+        Ready,
+        Closing,
+        Closed
+    };
+    std::vector<ConsumerInterceptorPtr> interceptors_;
+    std::atomic<State> state_{Ready};
+};
+
+typedef std::shared_ptr<ConsumerInterceptors> ConsumerInterceptorsPtr;
+}  // namespace pulsar
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 52af40f..e443d9a 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -41,10 +41,11 @@ 
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, TopicName
                                                  int numPartitions, const 
std::string& subscriptionName,
                                                  const ConsumerConfiguration& 
conf,
                                                  LookupServicePtr 
lookupServicePtr,
+                                                 const 
ConsumerInterceptorsPtr& interceptors,
                                                  const 
Commands::SubscriptionMode subscriptionMode,
                                                  boost::optional<MessageId> 
startMessageId)
     : MultiTopicsConsumerImpl(client, {topicName->toString()}, 
subscriptionName, topicName, conf,
-                              lookupServicePtr, subscriptionMode, 
startMessageId) {
+                              lookupServicePtr, interceptors, 
subscriptionMode, startMessageId) {
     topicsPartitions_[topicName->toString()] = numPartitions;
 }
 
@@ -52,6 +53,7 @@ 
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
                                                  const std::string& 
subscriptionName, TopicNamePtr topicName,
                                                  const ConsumerConfiguration& 
conf,
                                                  LookupServicePtr 
lookupServicePtr,
+                                                 const 
ConsumerInterceptorsPtr& interceptors,
                                                  const 
Commands::SubscriptionMode subscriptionMode,
                                                  boost::optional<MessageId> 
startMessageId)
     : ConsumerImplBase(client, topicName ? topicName->toString() : 
"EmptyTopics",
@@ -66,7 +68,8 @@ 
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
       numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
       topics_(topics),
       subscriptionMode_(subscriptionMode),
-      startMessageId_(startMessageId) {
+      startMessageId_(startMessageId),
+      interceptors_(interceptors) {
     std::stringstream consumerStrStream;
     consumerStrStream << "[Muti Topics Consumer: "
                       << "TopicName - " << topic_ << " - Subscription - " << 
subscriptionName << "]";
@@ -231,9 +234,10 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int 
numPartitions, TopicN
     if (numPartitions == 0) {
         // We don't have to add partition-n suffix
         try {
-            consumer = std::make_shared<ConsumerImpl>(
-                client, topicName->toString(), subscriptionName_, config, 
topicName->isPersistent(),
-                internalListenerExecutor, true, NonPartitioned, 
subscriptionMode_, startMessageId_);
+            consumer = std::make_shared<ConsumerImpl>(client, 
topicName->toString(), subscriptionName_,
+                                                      config, 
topicName->isPersistent(), interceptors_,
+                                                      
internalListenerExecutor, true, NonPartitioned,
+                                                      subscriptionMode_, 
startMessageId_);
         } catch (const std::runtime_error& e) {
             LOG_ERROR("Failed to create ConsumerImpl for " << 
topicName->toString() << ": " << e.what());
             topicSubResultPromise->setFailed(ResultConnectError);
@@ -251,9 +255,10 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int 
numPartitions, TopicN
         for (int i = 0; i < numPartitions; i++) {
             std::string topicPartitionName = 
topicName->getTopicPartitionName(i);
             try {
-                consumer = std::make_shared<ConsumerImpl>(
-                    client, topicPartitionName, subscriptionName_, config, 
topicName->isPersistent(),
-                    internalListenerExecutor, true, Partitioned, 
subscriptionMode_, startMessageId_);
+                consumer = std::make_shared<ConsumerImpl>(client, 
topicPartitionName, subscriptionName_,
+                                                          config, 
topicName->isPersistent(), interceptors_,
+                                                          
internalListenerExecutor, true, Partitioned,
+                                                          subscriptionMode_, 
startMessageId_);
             } catch (const std::runtime_error& e) {
                 LOG_ERROR("Failed to create ConsumerImpl for " << 
topicPartitionName << ": " << e.what());
                 topicSubResultPromise->setFailed(ResultConnectError);
@@ -650,6 +655,7 @@ void 
MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, const
 
 void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, 
ResultCallback callback) {
     if (state_ != Ready) {
+        interceptors_->onAcknowledge(Consumer(shared_from_this()), 
ResultAlreadyClosed, msgId);
         callback(ResultAlreadyClosed);
         return;
     }
@@ -737,6 +743,7 @@ void MultiTopicsConsumerImpl::shutdown() {
     incomingMessages_.clear();
     topicsPartitions_.clear();
     unAckedMessageTrackerPtr_->clear();
+    interceptors_->close();
     auto client = client_.lock();
     if (client) {
         client->cleanupConsumer(this);
@@ -1005,9 +1012,9 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
 
     std::string topicPartitionName = 
topicName->getTopicPartitionName(partitionIndex);
 
-    auto consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, 
subscriptionName_, config,
-                                                   topicName->isPersistent(), 
internalListenerExecutor, true,
-                                                   Partitioned, 
subscriptionMode_, startMessageId_);
+    auto consumer = std::make_shared<ConsumerImpl>(
+        client, topicPartitionName, subscriptionName_, config, 
topicName->isPersistent(), interceptors_,
+        internalListenerExecutor, true, Partitioned, subscriptionMode_, 
startMessageId_);
     consumer->getConsumerCreatedFuture().addListener(
         [this, weakSelf, partitionsNeedCreate, topicSubResultPromise](
             Result result, const ConsumerImplBaseWeakPtr& 
consumerImplBaseWeakPtr) {
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index a9ee160..35e1504 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -27,6 +27,7 @@
 #include "BlockingQueue.h"
 #include "Commands.h"
 #include "ConsumerImplBase.h"
+#include "ConsumerInterceptors.h"
 #include "Future.h"
 #include "Latch.h"
 #include "LookupDataResult.h"
@@ -54,13 +55,14 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
    public:
     MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int 
numPartitions,
                             const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
-                            LookupServicePtr lookupServicePtr,
+                            LookupServicePtr lookupServicePtr, const 
ConsumerInterceptorsPtr& interceptors,
                             Commands::SubscriptionMode = 
Commands::SubscriptionModeDurable,
                             boost::optional<MessageId> startMessageId = 
boost::none);
 
     MultiTopicsConsumerImpl(ClientImplPtr client, const 
std::vector<std::string>& topics,
                             const std::string& subscriptionName, TopicNamePtr 
topicName,
                             const ConsumerConfiguration& conf, 
LookupServicePtr lookupServicePtr_,
+                            const ConsumerInterceptorsPtr& interceptors,
                             Commands::SubscriptionMode = 
Commands::SubscriptionModeDurable,
                             boost::optional<MessageId> startMessageId = 
boost::none);
 
@@ -127,6 +129,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     std::queue<ReceiveCallback> pendingReceives_;
     const Commands::SubscriptionMode subscriptionMode_;
     boost::optional<MessageId> startMessageId_;
+    ConsumerInterceptorsPtr interceptors_;
 
     /* methods */
     void handleSinglePartitionConsumerCreated(Result result, 
ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
diff --git a/lib/PatternMultiTopicsConsumerImpl.cc 
b/lib/PatternMultiTopicsConsumerImpl.cc
index 657f869..02a7703 100644
--- a/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/lib/PatternMultiTopicsConsumerImpl.cc
@@ -27,14 +27,12 @@ DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
 
-PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(ClientImplPtr 
client,
-                                                               const 
std::string pattern,
-                                                               const 
std::vector<std::string>& topics,
-                                                               const 
std::string& subscriptionName,
-                                                               const 
ConsumerConfiguration& conf,
-                                                               const 
LookupServicePtr lookupServicePtr_)
+PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(
+    ClientImplPtr client, const std::string pattern, const 
std::vector<std::string>& topics,
+    const std::string& subscriptionName, const ConsumerConfiguration& conf,
+    const LookupServicePtr lookupServicePtr_, const ConsumerInterceptorsPtr 
interceptors)
     : MultiTopicsConsumerImpl(client, topics, subscriptionName, 
TopicName::get(pattern), conf,
-                              lookupServicePtr_),
+                              lookupServicePtr_, interceptors),
       patternString_(pattern),
       pattern_(PULSAR_REGEX_NAMESPACE::regex(pattern)),
       
autoDiscoveryTimer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()),
diff --git a/lib/PatternMultiTopicsConsumerImpl.h 
b/lib/PatternMultiTopicsConsumerImpl.h
index 28ad23e..87e301e 100644
--- a/lib/PatternMultiTopicsConsumerImpl.h
+++ b/lib/PatternMultiTopicsConsumerImpl.h
@@ -50,7 +50,8 @@ class PatternMultiTopicsConsumerImpl : public 
MultiTopicsConsumerImpl {
     PatternMultiTopicsConsumerImpl(ClientImplPtr client, const std::string 
patternString,
                                    const std::vector<std::string>& topics,
                                    const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
-                                   const LookupServicePtr lookupServicePtr_);
+                                   const LookupServicePtr lookupServicePtr_,
+                                   const ConsumerInterceptorsPtr interceptors);
 
     const PULSAR_REGEX_NAMESPACE::regex getPattern();
 
diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc
index a06c0ec..d6a8f1d 100644
--- a/lib/ReaderImpl.cc
+++ b/lib/ReaderImpl.cc
@@ -89,11 +89,14 @@ void ReaderImpl::start(const MessageId& startMessageId,
     if (partitions_ > 0) {
         auto consumerImpl = std::make_shared<MultiTopicsConsumerImpl>(
             client_.lock(), TopicName::get(topic_), partitions_, subscription, 
consumerConf,
-            client_.lock()->getLookup(), Commands::SubscriptionModeNonDurable, 
startMessageId);
+            client_.lock()->getLookup(),
+            
std::make_shared<ConsumerInterceptors>(std::vector<ConsumerInterceptorPtr>()),
+            Commands::SubscriptionModeNonDurable, startMessageId);
         consumer_ = consumerImpl;
     } else {
         auto consumerImpl = std::make_shared<ConsumerImpl>(
             client_.lock(), topic_, subscription, consumerConf, 
TopicName::get(topic_)->isPersistent(),
+            
std::make_shared<ConsumerInterceptors>(std::vector<ConsumerInterceptorPtr>()),
             ExecutorServicePtr(), false, NonPartitioned, 
Commands::SubscriptionModeNonDurable,
             startMessageId);
         consumerImpl->setPartitionIndex(TopicName::getPartitionIndex(topic_));
diff --git a/tests/InterceptorsTest.cc b/tests/InterceptorsTest.cc
index ef645c2..71fdf27 100644
--- a/tests/InterceptorsTest.cc
+++ b/tests/InterceptorsTest.cc
@@ -18,31 +18,41 @@
  */
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
+#include <pulsar/ConsumerInterceptor.h>
 #include <pulsar/ProducerInterceptor.h>
 
 #include <utility>
 
 #include "HttpHelper.h"
 #include "Latch.h"
+#include "lib/LogUtils.h"
+
+DECLARE_LOG_OBJECT()
 
 static const std::string serviceUrl = "pulsar://localhost:6650";
 static const std::string adminUrl = "http://localhost:8080/";;
 
 using namespace pulsar;
 
-class TestInterceptor : public ProducerInterceptor {
+class ProducerTestInterceptor : public ProducerInterceptor {
    public:
-    TestInterceptor(Latch& latch, Latch& closeLatch) : latch_(latch), 
closeLatch_(closeLatch) {}
+    ProducerTestInterceptor(Latch& latch, Latch& closeLatch, std::string key)
+        : latch_(latch), closeLatch_(closeLatch), key_(std::move(key)) {}
 
     Message beforeSend(const Producer& producer, const Message& message) 
override {
-        return MessageBuilder().setProperty("key", 
"set").setContent(message.getDataAsString()).build();
+        return MessageBuilder()
+            .setProperties(message.getProperties())
+            .setProperty(key_, "set")
+            .setContent(message.getDataAsString())
+            .build();
     }
 
     void onSendAcknowledgement(const Producer& producer, Result result, const 
Message& message,
                                const MessageId& messageID) override {
         ASSERT_EQ(result, ResultOk);
         auto properties = message.getProperties();
-        ASSERT_TRUE(properties.find("key") != properties.end() && 
properties["key"] == "set");
+        ASSERT_TRUE(properties.find("key1") != properties.end() && 
properties["key1"] == "set");
+        ASSERT_TRUE(properties.find("key2") != properties.end() && 
properties["key2"] == "set");
         latch_.countdown();
     }
 
@@ -51,11 +61,12 @@ class TestInterceptor : public ProducerInterceptor {
    private:
     Latch latch_;
     Latch closeLatch_;
+    std::string key_;
 };
 
-class ExceptionInterceptor : public ProducerInterceptor {
+class ProducerExceptionInterceptor : public ProducerInterceptor {
    public:
-    explicit ExceptionInterceptor(Latch& latch) : latch_(latch) {}
+    explicit ProducerExceptionInterceptor(Latch& latch) : latch_(latch) {}
 
     Message beforeSend(const Producer& producer, const Message& message) 
override {
         latch_.countdown();
@@ -77,9 +88,9 @@ class ExceptionInterceptor : public ProducerInterceptor {
     Latch latch_;
 };
 
-class PartitionsChangeInterceptor : public ProducerInterceptor {
+class ProducerPartitionsChangeInterceptor : public ProducerInterceptor {
    public:
-    explicit PartitionsChangeInterceptor(Latch& latch) : latch_(latch) {}
+    explicit ProducerPartitionsChangeInterceptor(Latch& latch) : latch_(latch) 
{}
 
     Message beforeSend(const Producer& producer, const Message& message) 
override { return message; }
 
@@ -102,21 +113,22 @@ void createPartitionedTopic(std::string topic) {
     ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
 }
 
-class InterceptorsTest : public ::testing::TestWithParam<bool> {};
+class ProducerInterceptorsTest : public ::testing::TestWithParam<bool> {};
 
-TEST_P(InterceptorsTest, testProducerInterceptor) {
+TEST_P(ProducerInterceptorsTest, testProducerInterceptor) {
     const std::string topic = "InterceptorsTest-testProducerInterceptor-" + 
std::to_string(time(nullptr));
 
     if (GetParam()) {
         createPartitionedTopic(topic);
     }
 
-    Latch latch(1);
-    Latch closeLatch(1);
+    Latch latch(2);
+    Latch closeLatch(2);
 
     Client client(serviceUrl);
     ProducerConfiguration conf;
-    conf.intercept({std::make_shared<TestInterceptor>(latch, closeLatch)});
+    conf.intercept({std::make_shared<ProducerTestInterceptor>(latch, 
closeLatch, "key1"),
+                    std::make_shared<ProducerTestInterceptor>(latch, 
closeLatch, "key2")});
     Producer producer;
     client.createProducer(topic, conf, producer);
 
@@ -131,7 +143,7 @@ TEST_P(InterceptorsTest, testProducerInterceptor) {
     client.close();
 }
 
-TEST_P(InterceptorsTest, testProducerInterceptorWithException) {
+TEST_P(ProducerInterceptorsTest, testProducerInterceptorWithException) {
     const std::string topic =
         "InterceptorsTest-testProducerInterceptorWithException-" + 
std::to_string(time(nullptr));
 
@@ -143,7 +155,7 @@ TEST_P(InterceptorsTest, 
testProducerInterceptorWithException) {
 
     Client client(serviceUrl);
     ProducerConfiguration conf;
-    conf.intercept({std::make_shared<ExceptionInterceptor>(latch)});
+    conf.intercept({std::make_shared<ProducerExceptionInterceptor>(latch)});
     Producer producer;
     client.createProducer(topic, conf, producer);
 
@@ -156,7 +168,7 @@ TEST_P(InterceptorsTest, 
testProducerInterceptorWithException) {
     client.close();
 }
 
-TEST(InterceptorsTest, testProducerInterceptorOnPartitionsChange) {
+TEST(ProducerInterceptorsTest, testProducerInterceptorOnPartitionsChange) {
     const std::string topic = 
"public/default/InterceptorsTest-testProducerInterceptorOnPartitionsChange-" +
                               std::to_string(time(nullptr));
     std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topic + 
"/partitions";
@@ -170,7 +182,7 @@ TEST(InterceptorsTest, 
testProducerInterceptorOnPartitionsChange) {
     clientConf.setPartititionsUpdateInterval(1);
     Client client(serviceUrl, clientConf);
     ProducerConfiguration conf;
-    conf.intercept({std::make_shared<PartitionsChangeInterceptor>(latch)});
+    
conf.intercept({std::make_shared<ProducerPartitionsChangeInterceptor>(latch)});
     Producer producer;
     client.createProducer(topic, conf, producer);
 
@@ -183,4 +195,194 @@ TEST(InterceptorsTest, 
testProducerInterceptorOnPartitionsChange) {
     client.close();
 }
 
-INSTANTIATE_TEST_CASE_P(Pulsar, InterceptorsTest, ::testing::Values(true, 
false));
+class ConsumerExceptionInterceptor : public ConsumerInterceptor {
+   public:
+    explicit ConsumerExceptionInterceptor(Latch& latch) : latch_(latch) {}
+
+    void close() override {
+        latch_.countdown();
+        throw std::runtime_error("expected exception");
+    }
+
+    Message beforeConsume(const Consumer& consumer, const Message& message) 
override {
+        latch_.countdown();
+        throw std::runtime_error("expected exception");
+    }
+
+    void onAcknowledge(const Consumer& consumer, Result result, const 
MessageId& messageID) override {
+        latch_.countdown();
+        throw std::runtime_error("expected exception");
+    }
+
+    void onAcknowledgeCumulative(const Consumer& consumer, Result result,
+                                 const MessageId& messageID) override {
+        latch_.countdown();
+        throw std::runtime_error("expected exception");
+    }
+
+   private:
+    Latch latch_;
+};
+
+enum TopicType
+{
+    Single,
+    Partitioned,
+    Pattern
+};
+
+class ConsumerTestInterceptor : public ConsumerInterceptor {
+   public:
+    ConsumerTestInterceptor(Latch& latch, std::string key) : latch_(latch), 
key_(std::move(key)) {}
+
+    void close() override { latch_.countdown(); }
+
+    Message beforeConsume(const Consumer& consumer, const Message& message) 
override {
+        latch_.countdown();
+        LOG_INFO("Received msg from: " << consumer.getTopic());
+        return MessageBuilder()
+            .setProperties(message.getProperties())
+            .setProperty(key_, "set")
+            .setContent(message.getDataAsString())
+            .build();
+    }
+
+    void onAcknowledge(const Consumer& consumer, Result result, const 
MessageId& messageID) override {
+        LOG_INFO("Ack msg from: " << consumer.getTopic());
+        ASSERT_EQ(result, ResultOk);
+        latch_.countdown();
+    }
+
+    void onAcknowledgeCumulative(const Consumer& consumer, Result result,
+                                 const MessageId& messageID) override {
+        LOG_INFO("Ack cumulative msg from: " << consumer.getTopic());
+        ASSERT_EQ(result, ResultOk);
+        latch_.countdown();
+    }
+
+   private:
+    Latch latch_;
+    std::string key_;
+};
+
+class ConsumerInterceptorsTest : public 
::testing::TestWithParam<std::tuple<TopicType, int>> {
+   public:
+    void SetUp() override {
+        topic_ = 
"persistent://public/default/InterceptorsTest-ConsumerInterceptors-" +
+                 std::to_string(time(nullptr));
+
+        switch (std::get<0>(GetParam())) {
+            case Partitioned:
+                this->createPartitionedTopic(topic_);
+            case Single:
+                client_.createProducer(topic_, producer1_);
+                client_.createProducer(topic_, producer2_);
+                break;
+            case Pattern:
+                client_.createProducer(topic_ + "-p1", producer1_);
+                client_.createProducer(topic_ + "-p2", producer2_);
+                topic_ += "-.*";
+                break;
+        }
+
+        consumerConf_.setReceiverQueueSize(std::get<1>(GetParam()));
+    }
+
+    void createPartitionedTopic(std::string topic) {
+        std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" +
+                                      
topic.substr(std::string("persistent://").length()) + "/partitions";
+
+        int res = makePutRequest(topicOperateUrl, "2");
+        ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+    }
+
+    void TearDown() override {
+        producer1_.close();
+        producer2_.close();
+        client_.close();
+    }
+
+   protected:
+    Client client_{serviceUrl};
+    std::string topic_;
+    ConsumerConfiguration consumerConf_;
+    Producer producer1_;
+    Producer producer2_;
+};
+
+TEST_P(ConsumerInterceptorsTest, testConsumerInterceptor) {
+    Latch latch(
+        10);  // (2 beforeConsume + 1 onAcknowledge + 1 
onAcknowledgeCumulative + 1 close) * 2 interceptors
+
+    Consumer consumer;
+    consumerConf_.intercept({std::make_shared<ConsumerTestInterceptor>(latch, 
"key1"),
+                             std::make_shared<ConsumerTestInterceptor>(latch, 
"key2")});
+    Result result;
+
+    if (std::get<0>(GetParam()) == Pattern) {
+        result = client_.subscribeWithRegex(topic_, "sub", consumerConf_, 
consumer);
+    } else {
+        result = client_.subscribe(topic_, "sub", consumerConf_, consumer);
+    }
+
+    ASSERT_EQ(result, ResultOk);
+
+    Message msg = MessageBuilder().setContent("content").build();
+    result = producer1_.send(msg);
+    ASSERT_EQ(result, ResultOk);
+
+    Message recvMsg;
+    result = consumer.receive(recvMsg);
+    ASSERT_EQ(result, ResultOk);
+    auto properties = recvMsg.getProperties();
+    ASSERT_TRUE(properties.find("key1") != properties.end() && 
properties["key1"] == "set");
+    ASSERT_TRUE(properties.find("key2") != properties.end() && 
properties["key2"] == "set");
+    consumer.acknowledge(recvMsg);
+
+    msg = MessageBuilder().setContent("content").build();
+    result = producer2_.send(msg);
+    ASSERT_EQ(result, ResultOk);
+
+    consumer.receive(recvMsg);
+    consumer.acknowledgeCumulative(recvMsg);
+
+    consumer.close();
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
+}
+
+TEST_P(ConsumerInterceptorsTest, testConsumerInterceptorWithExceptions) {
+    Latch latch(5);  // 2 beforeConsume + 1 onAcknowledge + 1 
onAcknowledgeCumulative + 1 close
+
+    Consumer consumer;
+    
consumerConf_.intercept({std::make_shared<ConsumerExceptionInterceptor>(latch)});
+    client_.subscribe(topic_, "sub", consumerConf_, consumer);
+
+    Producer producer;
+    client_.createProducer(topic_, producer);
+
+    Message msg = MessageBuilder().setContent("content").build();
+    Result result = producer.send(msg);
+    ASSERT_EQ(result, ResultOk);
+
+    Message recvMsg;
+    consumer.receive(recvMsg);
+    consumer.acknowledge(recvMsg);
+
+    msg = MessageBuilder().setContent("content").build();
+    result = producer.send(msg);
+    ASSERT_EQ(result, ResultOk);
+
+    consumer.receive(recvMsg);
+    consumer.acknowledgeCumulative(recvMsg);
+
+    producer.close();
+    consumer.close();
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
+}
+
+INSTANTIATE_TEST_CASE_P(Pulsar, ProducerInterceptorsTest, 
::testing::Values(true, false));
+INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerInterceptorsTest,
+                        testing::Values(
+                            // Can't use zero queue on multi topics consumer
+                            std::make_tuple(Single, 0), 
std::make_tuple(Single, 1000),
+                            std::make_tuple(Partitioned, 1000), 
std::make_tuple(Pattern, 1000)));

Reply via email to