This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push: new 998c73d [feat] Add consumer interceptor (#210) 998c73d is described below commit 998c73df164f7e08a5e4e4b33f9d979db4518e07 Author: Zike Yang <z...@apache.org> AuthorDate: Thu Mar 16 17:10:54 2023 +0800 [feat] Add consumer interceptor (#210) --- include/pulsar/ConsumerConfiguration.h | 11 ++ include/pulsar/ConsumerInterceptor.h | 109 +++++++++++++++ include/pulsar/ProducerInterceptor.h | 8 +- lib/ClientImpl.cc | 22 ++- lib/ConsumerConfiguration.cc | 10 ++ lib/ConsumerConfigurationImpl.h | 1 + lib/ConsumerImpl.cc | 27 +++- lib/ConsumerImpl.h | 5 +- lib/ConsumerInterceptors.cc | 82 ++++++++++++ lib/ConsumerInterceptors.h | 54 ++++++++ lib/MultiTopicsConsumerImpl.cc | 29 ++-- lib/MultiTopicsConsumerImpl.h | 5 +- lib/PatternMultiTopicsConsumerImpl.cc | 12 +- lib/PatternMultiTopicsConsumerImpl.h | 3 +- lib/ReaderImpl.cc | 5 +- tests/InterceptorsTest.cc | 238 ++++++++++++++++++++++++++++++--- 16 files changed, 566 insertions(+), 55 deletions(-) diff --git a/include/pulsar/ConsumerConfiguration.h b/include/pulsar/ConsumerConfiguration.h index 1977415..ee0c634 100644 --- a/include/pulsar/ConsumerConfiguration.h +++ b/include/pulsar/ConsumerConfiguration.h @@ -21,6 +21,7 @@ #include <pulsar/ConsumerCryptoFailureAction.h> #include <pulsar/ConsumerEventListener.h> +#include <pulsar/ConsumerInterceptor.h> #include <pulsar/ConsumerType.h> #include <pulsar/CryptoKeyReader.h> #include <pulsar/InitialPosition.h> @@ -618,6 +619,16 @@ class PULSAR_PUBLIC ConsumerConfiguration { */ bool isBatchIndexAckEnabled() const; + /** + * Intercept the consumer + * + * @param interceptors the list of interceptors to intercept the consumer + * @return Consumer Configuration + */ + ConsumerConfiguration& intercept(const std::vector<ConsumerInterceptorPtr>& interceptors); + + const std::vector<ConsumerInterceptorPtr>& getInterceptors() const; + friend class PulsarWrapper; friend class PulsarFriend; diff --git a/include/pulsar/ConsumerInterceptor.h b/include/pulsar/ConsumerInterceptor.h new file mode 100644 index 0000000..0eecf20 --- /dev/null +++ b/include/pulsar/ConsumerInterceptor.h @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef PULSAR_CPP_CONSUMER_INTERCEPTOR_H +#define PULSAR_CPP_CONSUMER_INTERCEPTOR_H + +#include <pulsar/Message.h> +#include <pulsar/Result.h> +#include <pulsar/defines.h> + +namespace pulsar { + +class Consumer; + +/** + * A plugin interface that allows you to intercept (and possibly mutate) + * messages received by the consumer. + * + * <p>A primary use case is to hook into consumer applications for custom + * monitoring, logging, etc. + * + * <p>Exceptions thrown by interceptor methods will be caught, logged, but + * not propagated further. + */ +class PULSAR_PUBLIC ConsumerInterceptor { + public: + virtual ~ConsumerInterceptor() {} + /** + * Close the interceptor + */ + virtual void close() {} + + /** + * This is called just before the message is consumed. + * + * <p>This method is allowed to modify message, in which case the new message + * will be returned. + * + * <p>Any exception thrown by this method will be caught by the caller, logged, + * but not propagated to client. + * + * <p>Since the consumer may run multiple interceptors, a particular + * interceptor's <tt>beforeConsume</tt> callback will be called in the order. + * The first interceptor in the list gets the consumed message, the following interceptor will be passed + * the message returned by the previous interceptor, and so on. Since + * interceptors are allowed to modify message, interceptors may potentially + * get the messages already modified by other interceptors. However building a + * pipeline of mutable interceptors that depend on the output of the previous interceptor is + * discouraged, because of potential side-effects caused by interceptors + * potentially failing to modify the message and throwing an exception. + * if one of interceptors in the list throws an exception from + * <tt>beforeConsume</tt>, the exception is caught, logged, + * and the next interceptor is called with the message returned by the last + * successful interceptor in the list, or otherwise the original consumed + * message. + * + * @param consumer the consumer which contains the interceptor + * @param message the message to be consumed by the client + * @return message that is either modified by the interceptor or same message + * passed into the method. + */ + virtual Message beforeConsume(const Consumer& consumer, const Message& message) = 0; + + /** + * + * This is called before consumer sends the acknowledgment to the broker. + * + * <p>Any exception thrown by this method will be ignored by the caller. + * + * @param consumer the consumer which contains the interceptor + * @param result the result of the acknowledgement + * @param messageID the message id to be acknowledged + */ + virtual void onAcknowledge(const Consumer& consumer, Result result, const MessageId& messageID) = 0; + + /** + * + * This is called before consumer sends the cumulative acknowledgment to the broker. + * + * <p>Any exception thrown by this method will be ignored by the caller. + * + * @param consumer the consumer which contains the interceptor + * @param result the result of the cumulative acknowledgement + * @param messageID the message id to be acknowledged cumulatively + */ + virtual void onAcknowledgeCumulative(const Consumer& consumer, Result result, + const MessageId& messageID) = 0; +}; + +typedef std::shared_ptr<ConsumerInterceptor> ConsumerInterceptorPtr; +} // namespace pulsar + +#endif // PULSAR_CPP_CONSUMER_INTERCEPTOR_H diff --git a/include/pulsar/ProducerInterceptor.h b/include/pulsar/ProducerInterceptor.h index 45f55b5..0f40ce4 100644 --- a/include/pulsar/ProducerInterceptor.h +++ b/include/pulsar/ProducerInterceptor.h @@ -24,6 +24,10 @@ #include <pulsar/Result.h> #include <pulsar/defines.h> +namespace pulsar { + +class Producer; + /** * An interface that allows you to intercept (and possibly mutate) the * messages received by the producer before they are published to the Pulsar @@ -35,10 +39,6 @@ * <p>ProducerInterceptor callbacks may be called from multiple threads. Interceptor * implementation must ensure thread-safety, if needed. */ -namespace pulsar { - -class Producer; - class PULSAR_PUBLIC ProducerInterceptor { public: virtual ~ProducerInterceptor() {} diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index a9bca11..08e4a10 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -26,6 +26,7 @@ #include "ClientConfigurationImpl.h" #include "Commands.h" #include "ConsumerImpl.h" +#include "ConsumerInterceptors.h" #include "ExecutorService.h" #include "HTTPLookupService.h" #include "LogUtils.h" @@ -358,8 +359,11 @@ void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const Nam NamespaceTopicsPtr matchTopics = PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, pattern); - consumer = std::make_shared<PatternMultiTopicsConsumerImpl>( - shared_from_this(), regexPattern, *matchTopics, subscriptionName, conf, lookupServicePtr_); + auto interceptors = std::make_shared<ConsumerInterceptors>(conf.getInterceptors()); + + consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(), regexPattern, + *matchTopics, subscriptionName, conf, + lookupServicePtr_, interceptors); consumer->getConsumerCreatedFuture().addListener( std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, @@ -396,8 +400,10 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const st topicNamePtr = TopicName::get(consumerTopicNameStream.str()); } + auto interceptors = std::make_shared<ConsumerInterceptors>(conf.getInterceptors()); + ConsumerImplBasePtr consumer = std::make_shared<MultiTopicsConsumerImpl>( - shared_from_this(), topics, subscriptionName, topicNamePtr, conf, lookupServicePtr_); + shared_from_this(), topics, subscriptionName, topicNamePtr, conf, lookupServicePtr_, interceptors); consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, @@ -441,6 +447,8 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr conf.setConsumerName(generateRandomName()); } ConsumerImplBasePtr consumer; + auto interceptors = std::make_shared<ConsumerInterceptors>(conf.getInterceptors()); + try { if (partitionMetadata->getPartitions() > 0) { if (conf.getReceiverQueueSize() == 0) { @@ -450,11 +458,11 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr } consumer = std::make_shared<MultiTopicsConsumerImpl>( shared_from_this(), topicName, partitionMetadata->getPartitions(), subscriptionName, conf, - lookupServicePtr_); + lookupServicePtr_, interceptors); } else { - auto consumerImpl = - std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(), - subscriptionName, conf, topicName->isPersistent()); + auto consumerImpl = std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(), + subscriptionName, conf, + topicName->isPersistent(), interceptors); consumerImpl->setPartitionIndex(topicName->getPartitionIndex()); consumer = consumerImpl; } diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc index 800be57..1497a2d 100644 --- a/lib/ConsumerConfiguration.cc +++ b/lib/ConsumerConfiguration.cc @@ -300,4 +300,14 @@ void ConsumerConfiguration::setDeadLetterPolicy(const DeadLetterPolicy& deadLett const DeadLetterPolicy& ConsumerConfiguration::getDeadLetterPolicy() const { return impl_->deadLetterPolicy; } +ConsumerConfiguration& ConsumerConfiguration::intercept( + const std::vector<ConsumerInterceptorPtr>& interceptors) { + impl_->interceptors.insert(impl_->interceptors.end(), interceptors.begin(), interceptors.end()); + return *this; +} + +const std::vector<ConsumerInterceptorPtr>& ConsumerConfiguration::getInterceptors() const { + return impl_->interceptors; +} + } // namespace pulsar diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h index 8b64b0d..4ada424 100644 --- a/lib/ConsumerConfigurationImpl.h +++ b/lib/ConsumerConfigurationImpl.h @@ -58,6 +58,7 @@ struct ConsumerConfigurationImpl { bool startMessageIdInclusive{false}; long expireTimeOfIncompleteChunkedMessageMs{60000}; bool batchIndexAckEnabled{false}; + std::vector<ConsumerInterceptorPtr> interceptors; }; } // namespace pulsar #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */ diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 996ffbf..583cbcc 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -56,7 +56,7 @@ DECLARE_LOG_OBJECT() ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, - bool isPersistent, + bool isPersistent, const ConsumerInterceptorsPtr& interceptors, const ExecutorServicePtr listenerExecutor /* = NULL by default */, bool hasParent /* = false by default */, const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */, @@ -90,7 +90,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, startMessageId_(startMessageId), maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()), autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()), - expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()) { + expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()), + interceptors_(interceptors) { std::stringstream consumerStrStream; consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] "; consumerStr_ = consumerStrStream.str(); @@ -646,7 +647,8 @@ void ConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback Message peekMsg; while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && messages->canAdd(peekMsg)) { messageProcessed(peekMsg); - messages->add(peekMsg); + Message interceptMsg = interceptors_->beforeConsume(Consumer(shared_from_this()), peekMsg); + messages->add(interceptMsg); } auto self = get_shared_this_ptr(); listenerExecutor_->postWork( @@ -657,6 +659,7 @@ void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg, const ReceiveCallback& callback) { if (result == ResultOk && config_.getReceiverQueueSize() != 0) { messageProcessed(msg); + msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); unAckedMessageTrackerPtr_->add(msg.getMessageId()); } callback(result, msg); @@ -838,7 +841,8 @@ void ConsumerImpl::internalListener() { consumerStatsBasePtr_->receivedMessage(msg, ResultOk); lastDequedMessageId_ = msg.getMessageId(); Consumer consumer{get_shared_this_ptr()}; - messageListener_(consumer, msg); + Message interceptMsg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); + messageListener_(consumer, interceptMsg); } catch (const std::exception& e) { LOG_ERROR(getName() << "Exception thrown from listener" << e.what()); } @@ -879,6 +883,9 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) { if (msg.impl_->cnx_ == currentCnx.get()) { waitingForZeroQueueSizeMessage = false; // Can't use break here else it may trigger a race with connection opened. + + localLock.unlock(); + msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); return ResultOk; } } @@ -904,6 +911,7 @@ void ConsumerImpl::receiveAsync(ReceiveCallback callback) { if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) { lock.unlock(); messageProcessed(msg); + msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); callback(ResultOk, msg); } else { pendingReceives_.push(callback); @@ -934,6 +942,7 @@ Result ConsumerImpl::receiveHelper(Message& msg) { } messageProcessed(msg); + msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); return ResultOk; } @@ -960,6 +969,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) { if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) { messageProcessed(msg); + msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); return ResultOk; } else { if (state_ != Ready) { @@ -1076,6 +1086,7 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callb if (readyToAck) { ackGroupingTrackerPtr_->addAcknowledge(msgIdToAck); } + interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultOk, msgId); if (callback) { callback(ResultOk); } @@ -1083,6 +1094,7 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callb void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) { MessageIdList messageIdListToAck; + // TODO: Need to check if the consumer is ready. Same to all other public methods for (auto&& msgId : messageIdList) { auto pair = prepareIndividualAck(msgId); const auto& msgIdToAck = pair.first; @@ -1090,6 +1102,9 @@ void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCa if (readyToAck) { messageIdListToAck.emplace_back(msgIdToAck); } + // Invoking `onAcknowledge` for all message ids no matter if it's ready to ack. This is consistent + // with the Java client. + interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultOk, msgId); } this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdListToAck); if (callback) { @@ -1099,6 +1114,8 @@ void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCa void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) { if (!isCumulativeAcknowledgementAllowed(config_.getConsumerType())) { + interceptors_->onAcknowledgeCumulative(Consumer(shared_from_this()), + ResultCumulativeAcknowledgementNotAllowedError, msgId); if (callback) { callback(ResultCumulativeAcknowledgementNotAllowedError); } @@ -1112,6 +1129,7 @@ void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCall unAckedMessageTrackerPtr_->removeMessagesTill(msgIdToAck); ackGroupingTrackerPtr_->addAcknowledgeCumulative(msgIdToAck); } + interceptors_->onAcknowledgeCumulative(Consumer(shared_from_this()), ResultOk, msgId); if (callback) { callback(ResultOk); } @@ -1226,6 +1244,7 @@ void ConsumerImpl::shutdown() { incomingMessages_.clear(); possibleSendToDeadLetterTopicMessages_.clear(); resetCnx(); + interceptors_->close(); auto client = client_.lock(); if (client) { client->cleanupConsumer(this); diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 8e09bb6..ed8d0df 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -30,6 +30,7 @@ #include "Commands.h" #include "CompressionCodec.h" #include "ConsumerImplBase.h" +#include "ConsumerInterceptors.h" #include "MapCache.h" #include "MessageIdImpl.h" #include "NegativeAcksTracker.h" @@ -75,7 +76,7 @@ const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ"; class ConsumerImpl : public ConsumerImplBase { public: ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName, - const ConsumerConfiguration&, bool isPersistent, + const ConsumerConfiguration&, bool isPersistent, const ConsumerInterceptorsPtr& interceptors, const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(), bool hasParent = false, const ConsumerTopicType consumerTopicType = NonPartitioned, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, @@ -303,6 +304,8 @@ class ConsumerImpl : public ConsumerImplBase { DeadlineTimerPtr checkExpiredChunkedTimer_; std::atomic_bool expireChunkMessageTaskScheduled_{false}; + ConsumerInterceptorsPtr interceptors_; + void triggerCheckExpiredChunkedTimer(); void discardChunkMessages(std::string uuid, MessageId messageId, bool autoAck); diff --git a/lib/ConsumerInterceptors.cc b/lib/ConsumerInterceptors.cc new file mode 100644 index 0000000..3b26175 --- /dev/null +++ b/lib/ConsumerInterceptors.cc @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "ConsumerInterceptors.h" + +#include <pulsar/Consumer.h> + +#include "LogUtils.h" + +DECLARE_LOG_OBJECT() + +namespace pulsar { + +Message ConsumerInterceptors::beforeConsume(const Consumer &consumer, const Message &message) const { + Message interceptorMessage = message; + for (const ConsumerInterceptorPtr &interceptor : interceptors_) { + try { + interceptorMessage = interceptor->beforeConsume(consumer, interceptorMessage); + } catch (const std::exception &e) { + LOG_WARN("Error executing interceptor beforeConsume callback for topic: " + << consumer.getTopic() << ", exception: " << e.what()); + } + } + return interceptorMessage; +} + +void ConsumerInterceptors::onAcknowledge(const Consumer &consumer, Result result, + const MessageId &messageID) const { + for (const ConsumerInterceptorPtr &interceptor : interceptors_) { + try { + interceptor->onAcknowledge(consumer, result, messageID); + } catch (const std::exception &e) { + LOG_WARN("Error executing interceptor onAcknowledge callback for topic: " + << consumer.getTopic() << ", exception: " << e.what()); + } + } +} + +void ConsumerInterceptors::onAcknowledgeCumulative(const Consumer &consumer, Result result, + const MessageId &messageID) const { + for (const ConsumerInterceptorPtr &interceptor : interceptors_) { + try { + interceptor->onAcknowledgeCumulative(consumer, result, messageID); + } catch (const std::exception &e) { + LOG_WARN("Error executing interceptor onAcknowledge callback for topic: " + << consumer.getTopic() << ", exception: " << e.what()); + } + } +} + +void ConsumerInterceptors::close() { + State state = Ready; + if (!state_.compare_exchange_strong(state, Closing)) { + return; + } + for (const ConsumerInterceptorPtr &interceptor : interceptors_) { + try { + interceptor->close(); + } catch (const std::exception &e) { + LOG_WARN("Failed to close consumer interceptor: " << e.what()); + } + } + state_ = Closed; +} + +} // namespace pulsar diff --git a/lib/ConsumerInterceptors.h b/lib/ConsumerInterceptors.h new file mode 100644 index 0000000..d3a768d --- /dev/null +++ b/lib/ConsumerInterceptors.h @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <pulsar/ConsumerInterceptor.h> + +#include <atomic> +#include <utility> +#include <vector> + +namespace pulsar { +class ConsumerInterceptors { + public: + explicit ConsumerInterceptors(std::vector<ConsumerInterceptorPtr> interceptors) + : interceptors_(std::move(interceptors)) {} + + void close(); + + Message beforeConsume(const Consumer& consumer, const Message& message) const; + + void onAcknowledge(const Consumer& consumer, Result result, const MessageId& messageID) const; + + void onAcknowledgeCumulative(const Consumer& consumer, Result result, const MessageId& messageID) const; + + private: + enum State + { + Ready, + Closing, + Closed + }; + std::vector<ConsumerInterceptorPtr> interceptors_; + std::atomic<State> state_{Ready}; +}; + +typedef std::shared_ptr<ConsumerInterceptors> ConsumerInterceptorsPtr; +} // namespace pulsar diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 52af40f..e443d9a 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -41,10 +41,11 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, TopicName int numPartitions, const std::string& subscriptionName, const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr, + const ConsumerInterceptorsPtr& interceptors, const Commands::SubscriptionMode subscriptionMode, boost::optional<MessageId> startMessageId) : MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf, - lookupServicePtr, subscriptionMode, startMessageId) { + lookupServicePtr, interceptors, subscriptionMode, startMessageId) { topicsPartitions_[topicName->toString()] = numPartitions; } @@ -52,6 +53,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std const std::string& subscriptionName, TopicNamePtr topicName, const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr, + const ConsumerInterceptorsPtr& interceptors, const Commands::SubscriptionMode subscriptionMode, boost::optional<MessageId> startMessageId) : ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics", @@ -66,7 +68,8 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)), topics_(topics), subscriptionMode_(subscriptionMode), - startMessageId_(startMessageId) { + startMessageId_(startMessageId), + interceptors_(interceptors) { std::stringstream consumerStrStream; consumerStrStream << "[Muti Topics Consumer: " << "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]"; @@ -231,9 +234,10 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN if (numPartitions == 0) { // We don't have to add partition-n suffix try { - consumer = std::make_shared<ConsumerImpl>( - client, topicName->toString(), subscriptionName_, config, topicName->isPersistent(), - internalListenerExecutor, true, NonPartitioned, subscriptionMode_, startMessageId_); + consumer = std::make_shared<ConsumerImpl>(client, topicName->toString(), subscriptionName_, + config, topicName->isPersistent(), interceptors_, + internalListenerExecutor, true, NonPartitioned, + subscriptionMode_, startMessageId_); } catch (const std::runtime_error& e) { LOG_ERROR("Failed to create ConsumerImpl for " << topicName->toString() << ": " << e.what()); topicSubResultPromise->setFailed(ResultConnectError); @@ -251,9 +255,10 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN for (int i = 0; i < numPartitions; i++) { std::string topicPartitionName = topicName->getTopicPartitionName(i); try { - consumer = std::make_shared<ConsumerImpl>( - client, topicPartitionName, subscriptionName_, config, topicName->isPersistent(), - internalListenerExecutor, true, Partitioned, subscriptionMode_, startMessageId_); + consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_, + config, topicName->isPersistent(), interceptors_, + internalListenerExecutor, true, Partitioned, + subscriptionMode_, startMessageId_); } catch (const std::runtime_error& e) { LOG_ERROR("Failed to create ConsumerImpl for " << topicPartitionName << ": " << e.what()); topicSubResultPromise->setFailed(ResultConnectError); @@ -650,6 +655,7 @@ void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, const void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) { if (state_ != Ready) { + interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultAlreadyClosed, msgId); callback(ResultAlreadyClosed); return; } @@ -737,6 +743,7 @@ void MultiTopicsConsumerImpl::shutdown() { incomingMessages_.clear(); topicsPartitions_.clear(); unAckedMessageTrackerPtr_->clear(); + interceptors_->close(); auto client = client_.lock(); if (client) { client->cleanupConsumer(this); @@ -1005,9 +1012,9 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer( std::string topicPartitionName = topicName->getTopicPartitionName(partitionIndex); - auto consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_, config, - topicName->isPersistent(), internalListenerExecutor, true, - Partitioned, subscriptionMode_, startMessageId_); + auto consumer = std::make_shared<ConsumerImpl>( + client, topicPartitionName, subscriptionName_, config, topicName->isPersistent(), interceptors_, + internalListenerExecutor, true, Partitioned, subscriptionMode_, startMessageId_); consumer->getConsumerCreatedFuture().addListener( [this, weakSelf, partitionsNeedCreate, topicSubResultPromise]( Result result, const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr) { diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index a9ee160..35e1504 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -27,6 +27,7 @@ #include "BlockingQueue.h" #include "Commands.h" #include "ConsumerImplBase.h" +#include "ConsumerInterceptors.h" #include "Future.h" #include "Latch.h" #include "LookupDataResult.h" @@ -54,13 +55,14 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { public: MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions, const std::string& subscriptionName, const ConsumerConfiguration& conf, - LookupServicePtr lookupServicePtr, + LookupServicePtr lookupServicePtr, const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, boost::optional<MessageId> startMessageId = boost::none); MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics, const std::string& subscriptionName, TopicNamePtr topicName, const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_, + const ConsumerInterceptorsPtr& interceptors, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, boost::optional<MessageId> startMessageId = boost::none); @@ -127,6 +129,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { std::queue<ReceiveCallback> pendingReceives_; const Commands::SubscriptionMode subscriptionMode_; boost::optional<MessageId> startMessageId_; + ConsumerInterceptorsPtr interceptors_; /* methods */ void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc index 657f869..02a7703 100644 --- a/lib/PatternMultiTopicsConsumerImpl.cc +++ b/lib/PatternMultiTopicsConsumerImpl.cc @@ -27,14 +27,12 @@ DECLARE_LOG_OBJECT() using namespace pulsar; -PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(ClientImplPtr client, - const std::string pattern, - const std::vector<std::string>& topics, - const std::string& subscriptionName, - const ConsumerConfiguration& conf, - const LookupServicePtr lookupServicePtr_) +PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl( + ClientImplPtr client, const std::string pattern, const std::vector<std::string>& topics, + const std::string& subscriptionName, const ConsumerConfiguration& conf, + const LookupServicePtr lookupServicePtr_, const ConsumerInterceptorsPtr interceptors) : MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf, - lookupServicePtr_), + lookupServicePtr_, interceptors), patternString_(pattern), pattern_(PULSAR_REGEX_NAMESPACE::regex(pattern)), autoDiscoveryTimer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()), diff --git a/lib/PatternMultiTopicsConsumerImpl.h b/lib/PatternMultiTopicsConsumerImpl.h index 28ad23e..87e301e 100644 --- a/lib/PatternMultiTopicsConsumerImpl.h +++ b/lib/PatternMultiTopicsConsumerImpl.h @@ -50,7 +50,8 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl { PatternMultiTopicsConsumerImpl(ClientImplPtr client, const std::string patternString, const std::vector<std::string>& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, - const LookupServicePtr lookupServicePtr_); + const LookupServicePtr lookupServicePtr_, + const ConsumerInterceptorsPtr interceptors); const PULSAR_REGEX_NAMESPACE::regex getPattern(); diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc index a06c0ec..d6a8f1d 100644 --- a/lib/ReaderImpl.cc +++ b/lib/ReaderImpl.cc @@ -89,11 +89,14 @@ void ReaderImpl::start(const MessageId& startMessageId, if (partitions_ > 0) { auto consumerImpl = std::make_shared<MultiTopicsConsumerImpl>( client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf, - client_.lock()->getLookup(), Commands::SubscriptionModeNonDurable, startMessageId); + client_.lock()->getLookup(), + std::make_shared<ConsumerInterceptors>(std::vector<ConsumerInterceptorPtr>()), + Commands::SubscriptionModeNonDurable, startMessageId); consumer_ = consumerImpl; } else { auto consumerImpl = std::make_shared<ConsumerImpl>( client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(), + std::make_shared<ConsumerInterceptors>(std::vector<ConsumerInterceptorPtr>()), ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable, startMessageId); consumerImpl->setPartitionIndex(TopicName::getPartitionIndex(topic_)); diff --git a/tests/InterceptorsTest.cc b/tests/InterceptorsTest.cc index ef645c2..71fdf27 100644 --- a/tests/InterceptorsTest.cc +++ b/tests/InterceptorsTest.cc @@ -18,31 +18,41 @@ */ #include <gtest/gtest.h> #include <pulsar/Client.h> +#include <pulsar/ConsumerInterceptor.h> #include <pulsar/ProducerInterceptor.h> #include <utility> #include "HttpHelper.h" #include "Latch.h" +#include "lib/LogUtils.h" + +DECLARE_LOG_OBJECT() static const std::string serviceUrl = "pulsar://localhost:6650"; static const std::string adminUrl = "http://localhost:8080/"; using namespace pulsar; -class TestInterceptor : public ProducerInterceptor { +class ProducerTestInterceptor : public ProducerInterceptor { public: - TestInterceptor(Latch& latch, Latch& closeLatch) : latch_(latch), closeLatch_(closeLatch) {} + ProducerTestInterceptor(Latch& latch, Latch& closeLatch, std::string key) + : latch_(latch), closeLatch_(closeLatch), key_(std::move(key)) {} Message beforeSend(const Producer& producer, const Message& message) override { - return MessageBuilder().setProperty("key", "set").setContent(message.getDataAsString()).build(); + return MessageBuilder() + .setProperties(message.getProperties()) + .setProperty(key_, "set") + .setContent(message.getDataAsString()) + .build(); } void onSendAcknowledgement(const Producer& producer, Result result, const Message& message, const MessageId& messageID) override { ASSERT_EQ(result, ResultOk); auto properties = message.getProperties(); - ASSERT_TRUE(properties.find("key") != properties.end() && properties["key"] == "set"); + ASSERT_TRUE(properties.find("key1") != properties.end() && properties["key1"] == "set"); + ASSERT_TRUE(properties.find("key2") != properties.end() && properties["key2"] == "set"); latch_.countdown(); } @@ -51,11 +61,12 @@ class TestInterceptor : public ProducerInterceptor { private: Latch latch_; Latch closeLatch_; + std::string key_; }; -class ExceptionInterceptor : public ProducerInterceptor { +class ProducerExceptionInterceptor : public ProducerInterceptor { public: - explicit ExceptionInterceptor(Latch& latch) : latch_(latch) {} + explicit ProducerExceptionInterceptor(Latch& latch) : latch_(latch) {} Message beforeSend(const Producer& producer, const Message& message) override { latch_.countdown(); @@ -77,9 +88,9 @@ class ExceptionInterceptor : public ProducerInterceptor { Latch latch_; }; -class PartitionsChangeInterceptor : public ProducerInterceptor { +class ProducerPartitionsChangeInterceptor : public ProducerInterceptor { public: - explicit PartitionsChangeInterceptor(Latch& latch) : latch_(latch) {} + explicit ProducerPartitionsChangeInterceptor(Latch& latch) : latch_(latch) {} Message beforeSend(const Producer& producer, const Message& message) override { return message; } @@ -102,21 +113,22 @@ void createPartitionedTopic(std::string topic) { ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; } -class InterceptorsTest : public ::testing::TestWithParam<bool> {}; +class ProducerInterceptorsTest : public ::testing::TestWithParam<bool> {}; -TEST_P(InterceptorsTest, testProducerInterceptor) { +TEST_P(ProducerInterceptorsTest, testProducerInterceptor) { const std::string topic = "InterceptorsTest-testProducerInterceptor-" + std::to_string(time(nullptr)); if (GetParam()) { createPartitionedTopic(topic); } - Latch latch(1); - Latch closeLatch(1); + Latch latch(2); + Latch closeLatch(2); Client client(serviceUrl); ProducerConfiguration conf; - conf.intercept({std::make_shared<TestInterceptor>(latch, closeLatch)}); + conf.intercept({std::make_shared<ProducerTestInterceptor>(latch, closeLatch, "key1"), + std::make_shared<ProducerTestInterceptor>(latch, closeLatch, "key2")}); Producer producer; client.createProducer(topic, conf, producer); @@ -131,7 +143,7 @@ TEST_P(InterceptorsTest, testProducerInterceptor) { client.close(); } -TEST_P(InterceptorsTest, testProducerInterceptorWithException) { +TEST_P(ProducerInterceptorsTest, testProducerInterceptorWithException) { const std::string topic = "InterceptorsTest-testProducerInterceptorWithException-" + std::to_string(time(nullptr)); @@ -143,7 +155,7 @@ TEST_P(InterceptorsTest, testProducerInterceptorWithException) { Client client(serviceUrl); ProducerConfiguration conf; - conf.intercept({std::make_shared<ExceptionInterceptor>(latch)}); + conf.intercept({std::make_shared<ProducerExceptionInterceptor>(latch)}); Producer producer; client.createProducer(topic, conf, producer); @@ -156,7 +168,7 @@ TEST_P(InterceptorsTest, testProducerInterceptorWithException) { client.close(); } -TEST(InterceptorsTest, testProducerInterceptorOnPartitionsChange) { +TEST(ProducerInterceptorsTest, testProducerInterceptorOnPartitionsChange) { const std::string topic = "public/default/InterceptorsTest-testProducerInterceptorOnPartitionsChange-" + std::to_string(time(nullptr)); std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topic + "/partitions"; @@ -170,7 +182,7 @@ TEST(InterceptorsTest, testProducerInterceptorOnPartitionsChange) { clientConf.setPartititionsUpdateInterval(1); Client client(serviceUrl, clientConf); ProducerConfiguration conf; - conf.intercept({std::make_shared<PartitionsChangeInterceptor>(latch)}); + conf.intercept({std::make_shared<ProducerPartitionsChangeInterceptor>(latch)}); Producer producer; client.createProducer(topic, conf, producer); @@ -183,4 +195,194 @@ TEST(InterceptorsTest, testProducerInterceptorOnPartitionsChange) { client.close(); } -INSTANTIATE_TEST_CASE_P(Pulsar, InterceptorsTest, ::testing::Values(true, false)); +class ConsumerExceptionInterceptor : public ConsumerInterceptor { + public: + explicit ConsumerExceptionInterceptor(Latch& latch) : latch_(latch) {} + + void close() override { + latch_.countdown(); + throw std::runtime_error("expected exception"); + } + + Message beforeConsume(const Consumer& consumer, const Message& message) override { + latch_.countdown(); + throw std::runtime_error("expected exception"); + } + + void onAcknowledge(const Consumer& consumer, Result result, const MessageId& messageID) override { + latch_.countdown(); + throw std::runtime_error("expected exception"); + } + + void onAcknowledgeCumulative(const Consumer& consumer, Result result, + const MessageId& messageID) override { + latch_.countdown(); + throw std::runtime_error("expected exception"); + } + + private: + Latch latch_; +}; + +enum TopicType +{ + Single, + Partitioned, + Pattern +}; + +class ConsumerTestInterceptor : public ConsumerInterceptor { + public: + ConsumerTestInterceptor(Latch& latch, std::string key) : latch_(latch), key_(std::move(key)) {} + + void close() override { latch_.countdown(); } + + Message beforeConsume(const Consumer& consumer, const Message& message) override { + latch_.countdown(); + LOG_INFO("Received msg from: " << consumer.getTopic()); + return MessageBuilder() + .setProperties(message.getProperties()) + .setProperty(key_, "set") + .setContent(message.getDataAsString()) + .build(); + } + + void onAcknowledge(const Consumer& consumer, Result result, const MessageId& messageID) override { + LOG_INFO("Ack msg from: " << consumer.getTopic()); + ASSERT_EQ(result, ResultOk); + latch_.countdown(); + } + + void onAcknowledgeCumulative(const Consumer& consumer, Result result, + const MessageId& messageID) override { + LOG_INFO("Ack cumulative msg from: " << consumer.getTopic()); + ASSERT_EQ(result, ResultOk); + latch_.countdown(); + } + + private: + Latch latch_; + std::string key_; +}; + +class ConsumerInterceptorsTest : public ::testing::TestWithParam<std::tuple<TopicType, int>> { + public: + void SetUp() override { + topic_ = "persistent://public/default/InterceptorsTest-ConsumerInterceptors-" + + std::to_string(time(nullptr)); + + switch (std::get<0>(GetParam())) { + case Partitioned: + this->createPartitionedTopic(topic_); + case Single: + client_.createProducer(topic_, producer1_); + client_.createProducer(topic_, producer2_); + break; + case Pattern: + client_.createProducer(topic_ + "-p1", producer1_); + client_.createProducer(topic_ + "-p2", producer2_); + topic_ += "-.*"; + break; + } + + consumerConf_.setReceiverQueueSize(std::get<1>(GetParam())); + } + + void createPartitionedTopic(std::string topic) { + std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + + topic.substr(std::string("persistent://").length()) + "/partitions"; + + int res = makePutRequest(topicOperateUrl, "2"); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + } + + void TearDown() override { + producer1_.close(); + producer2_.close(); + client_.close(); + } + + protected: + Client client_{serviceUrl}; + std::string topic_; + ConsumerConfiguration consumerConf_; + Producer producer1_; + Producer producer2_; +}; + +TEST_P(ConsumerInterceptorsTest, testConsumerInterceptor) { + Latch latch( + 10); // (2 beforeConsume + 1 onAcknowledge + 1 onAcknowledgeCumulative + 1 close) * 2 interceptors + + Consumer consumer; + consumerConf_.intercept({std::make_shared<ConsumerTestInterceptor>(latch, "key1"), + std::make_shared<ConsumerTestInterceptor>(latch, "key2")}); + Result result; + + if (std::get<0>(GetParam()) == Pattern) { + result = client_.subscribeWithRegex(topic_, "sub", consumerConf_, consumer); + } else { + result = client_.subscribe(topic_, "sub", consumerConf_, consumer); + } + + ASSERT_EQ(result, ResultOk); + + Message msg = MessageBuilder().setContent("content").build(); + result = producer1_.send(msg); + ASSERT_EQ(result, ResultOk); + + Message recvMsg; + result = consumer.receive(recvMsg); + ASSERT_EQ(result, ResultOk); + auto properties = recvMsg.getProperties(); + ASSERT_TRUE(properties.find("key1") != properties.end() && properties["key1"] == "set"); + ASSERT_TRUE(properties.find("key2") != properties.end() && properties["key2"] == "set"); + consumer.acknowledge(recvMsg); + + msg = MessageBuilder().setContent("content").build(); + result = producer2_.send(msg); + ASSERT_EQ(result, ResultOk); + + consumer.receive(recvMsg); + consumer.acknowledgeCumulative(recvMsg); + + consumer.close(); + ASSERT_TRUE(latch.wait(std::chrono::seconds(5))); +} + +TEST_P(ConsumerInterceptorsTest, testConsumerInterceptorWithExceptions) { + Latch latch(5); // 2 beforeConsume + 1 onAcknowledge + 1 onAcknowledgeCumulative + 1 close + + Consumer consumer; + consumerConf_.intercept({std::make_shared<ConsumerExceptionInterceptor>(latch)}); + client_.subscribe(topic_, "sub", consumerConf_, consumer); + + Producer producer; + client_.createProducer(topic_, producer); + + Message msg = MessageBuilder().setContent("content").build(); + Result result = producer.send(msg); + ASSERT_EQ(result, ResultOk); + + Message recvMsg; + consumer.receive(recvMsg); + consumer.acknowledge(recvMsg); + + msg = MessageBuilder().setContent("content").build(); + result = producer.send(msg); + ASSERT_EQ(result, ResultOk); + + consumer.receive(recvMsg); + consumer.acknowledgeCumulative(recvMsg); + + producer.close(); + consumer.close(); + ASSERT_TRUE(latch.wait(std::chrono::seconds(5))); +} + +INSTANTIATE_TEST_CASE_P(Pulsar, ProducerInterceptorsTest, ::testing::Values(true, false)); +INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerInterceptorsTest, + testing::Values( + // Can't use zero queue on multi topics consumer + std::make_tuple(Single, 0), std::make_tuple(Single, 1000), + std::make_tuple(Partitioned, 1000), std::make_tuple(Pattern, 1000)));