This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3b76784 [Issue 1240][C++] Support setting ConsumerEventListener in
pulsar cpp client (#12118)
3b76784 is described below
commit 3b76784b73b7c50a66482552db7845241333b563
Author: metahys <[email protected]>
AuthorDate: Fri Sep 24 23:19:27 2021 +0800
[Issue 1240][C++] Support setting ConsumerEventListener in pulsar cpp
client (#12118)
* [C++] Support setting ConsumerEventListener in pulsar cpp client
* Rename "become" to "became"
* Register consumer before sending subscribe request
* Add unit tests in ConsumerTest
---
.../include/pulsar/ConsumerConfiguration.h | 19 ++
.../include/pulsar/ConsumerEventListener.h | 49 +++++
pulsar-client-cpp/lib/ClientConnection.cc | 29 ++-
pulsar-client-cpp/lib/ClientConnection.h | 1 +
pulsar-client-cpp/lib/ConsumerConfiguration.cc | 13 ++
pulsar-client-cpp/lib/ConsumerConfigurationImpl.h | 2 +
pulsar-client-cpp/lib/ConsumerImpl.cc | 25 ++-
pulsar-client-cpp/lib/ConsumerImpl.h | 5 +
.../tests/ConsumerConfigurationTest.cc | 11 ++
pulsar-client-cpp/tests/ConsumerTest.cc | 215 +++++++++++++++++++++
10 files changed, 365 insertions(+), 4 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 85b1f0a..201eba3 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -30,6 +30,7 @@
#include <pulsar/CryptoKeyReader.h>
#include <pulsar/InitialPosition.h>
#include <pulsar/KeySharedPolicy.h>
+#include <pulsar/ConsumerEventListener.h>
namespace pulsar {
@@ -43,6 +44,8 @@ typedef std::function<void(Result, const Message& msg)>
ReceiveCallback;
/// Callback definition for MessageListener
typedef std::function<void(Consumer consumer, const Message& msg)>
MessageListener;
+typedef std::shared_ptr<ConsumerEventListener> ConsumerEventListenerPtr;
+
struct ConsumerConfigurationImpl;
/**
@@ -128,6 +131,22 @@ class PULSAR_PUBLIC ConsumerConfiguration {
bool hasMessageListener() const;
/**
+ * A event listener enables your application to react the consumer state
+ * change event (active or inactive).
+ */
+ ConsumerConfiguration& setConsumerEventListener(ConsumerEventListenerPtr
eventListener);
+
+ /**
+ * @return the consumer event listener
+ */
+ ConsumerEventListenerPtr getConsumerEventListener() const;
+
+ /**
+ * @return true if the consumer event listener has been set
+ */
+ bool hasConsumerEventListener() const;
+
+ /**
* Sets the size of the consumer receive queue.
*
* The consumer receive queue controls how many messages can be
accumulated by the consumer before the
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerEventListener.h
b/pulsar-client-cpp/include/pulsar/ConsumerEventListener.h
new file mode 100644
index 0000000..467dce1
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/ConsumerEventListener.h
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_CONSUMEREVENTLISTENER_H_
+#define PULSAR_CONSUMEREVENTLISTENER_H_
+
+#include <pulsar/defines.h>
+
+namespace pulsar {
+
+class Consumer;
+
+class PULSAR_PUBLIC ConsumerEventListener {
+ public:
+ virtual ~ConsumerEventListener(){};
+ /**
+ * @brief Notified when the consumer group is changed, and the consumer
becomes active.
+ *
+ * @param consumer the consumer that originated the event
+ * @param partitionId the id of the partition that beconmes active.
+ */
+ virtual void becameActive(Consumer consumer, int partitionId) = 0;
+
+ /**
+ * @brief Notified when the consumer group is changed, and the consumer is
still inactive or becomes
+ * inactive.
+ *
+ * @param consumer the consumer that originated the event
+ * @param partitionId the id of the partition that is still inactive or
becomes inactive.
+ */
+ virtual void becameInactive(Consumer consumer, int partitionId) = 0;
+};
+} // namespace pulsar
+#endif /* PULSAR_CONSUMEREVENTLISTENER_H_ */
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc
b/pulsar-client-cpp/lib/ClientConnection.cc
index 9f10fea..8781979 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -717,6 +717,26 @@ bool ClientConnection::verifyChecksum(SharedBuffer&
incomingBuffer_, uint32_t& r
return isChecksumValid;
}
+void ClientConnection::handleActiveConsumerChange(const
proto::CommandActiveConsumerChange& change) {
+ Lock lock(mutex_);
+ ConsumersMap::iterator it = consumers_.find(change.consumer_id());
+ if (it != consumers_.end()) {
+ ConsumerImplPtr consumer = it->second.lock();
+
+ if (consumer) {
+ lock.unlock();
+ consumer->activeConsumerChanged(change.is_active());
+ } else {
+ consumers_.erase(change.consumer_id());
+ LOG_DEBUG(cnxString_ << "Ignoring incoming message for already
destroyed consumer "
+ << change.consumer_id());
+ }
+ } else {
+ LOG_DEBUG(cnxString_ << "Got invalid consumer Id in " <<
change.consumer_id()
+ << " -- isActive: " << change.is_active());
+ }
+}
+
void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg,
bool isChecksumValid,
proto::MessageMetadata&
msgMetadata, SharedBuffer& payload) {
LOG_DEBUG(cnxString_ << "Received a message from the server for consumer:
" << msg.consumer_id());
@@ -1128,12 +1148,15 @@ void ClientConnection::handleIncomingCommand() {
asyncWrite(buffer.const_asio_buffer(),
std::bind(&ClientConnection::handleSentAuthResponse, shared_from_this(),
std::placeholders::_1, buffer));
+ break;
}
case BaseCommand::ACTIVE_CONSUMER_CHANGE: {
- LOG_DEBUG(cnxString_ << "Received notification about
active consumer changes");
- // ignore this message for now.
- // TODO:
@link{https://github.com/apache/pulsar/issues/1240}
+ const CommandActiveConsumerChange& change =
incomingCmd_.active_consumer_change();
+ LOG_DEBUG(cnxString_
+ << "Received notification about active consumer
change, consumer_id: "
+ << change.consumer_id() << " isActive: " <<
change.is_active());
+ handleActiveConsumerChange(change);
break;
}
diff --git a/pulsar-client-cpp/lib/ClientConnection.h
b/pulsar-client-cpp/lib/ClientConnection.h
index f2f01ee..25fb6a1 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -193,6 +193,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
bool verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t&
remainingBytes,
proto::BaseCommand& incomingCmd_);
+ void handleActiveConsumerChange(const proto::CommandActiveConsumerChange&
change);
void handleIncomingCommand();
void handleIncomingMessage(const proto::CommandMessage& msg, bool
isChecksumValid,
proto::MessageMetadata& msgMetadata,
SharedBuffer& payload);
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 748af71..b01e4e5 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -73,6 +73,19 @@ MessageListener ConsumerConfiguration::getMessageListener()
const { return impl_
bool ConsumerConfiguration::hasMessageListener() const { return
impl_->hasMessageListener; }
+ConsumerConfiguration& ConsumerConfiguration::setConsumerEventListener(
+ ConsumerEventListenerPtr eventListener) {
+ impl_->eventListener = eventListener;
+ impl_->hasConsumerEventListener = true;
+ return *this;
+}
+
+ConsumerEventListenerPtr ConsumerConfiguration::getConsumerEventListener()
const {
+ return impl_->eventListener;
+}
+
+bool ConsumerConfiguration::hasConsumerEventListener() const { return
impl_->hasConsumerEventListener; }
+
void ConsumerConfiguration::setReceiverQueueSize(int size) {
impl_->receiverQueueSize = size; }
int ConsumerConfiguration::getReceiverQueueSize() const { return
impl_->receiverQueueSize; }
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 14ef613..75f65a7 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -35,6 +35,8 @@ struct ConsumerConfigurationImpl {
ConsumerType consumerType{ConsumerExclusive};
MessageListener messageListener;
bool hasMessageListener{false};
+ ConsumerEventListenerPtr eventListener;
+ bool hasConsumerEventListener{false};
int receiverQueueSize{1000};
int maxTotalReceiverQueueSizeAcrossPartitions{50000};
std::string consumerName;
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 560ca6d..77c0fa9 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -47,6 +47,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const
std::string& topic,
subscription_(subscriptionName),
originalSubscriptionName_(subscriptionName),
messageListener_(config_.getMessageListener()),
+ eventListener_(config_.getConsumerEventListener()),
hasParent_(hasParent),
consumerTopicType_(consumerTopicType),
subscriptionMode_(subscriptionMode),
@@ -164,6 +165,10 @@ void ConsumerImpl::connectionOpened(const
ClientConnectionPtr& cnx) {
return;
}
+ // Register consumer so that we can handle other incomming commands (e.g.
ACTIVE_CONSUMER_CHANGE) after
+ // sending the subscribe request.
+ cnx->registerConsumer(consumerId_, shared_from_this());
+
Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
unAckedMessageTrackerPtr_->clear();
batchAcknowledgementTracker_.clear();
@@ -217,7 +222,6 @@ void ConsumerImpl::handleCreateConsumer(const
ClientConnectionPtr& cnx, Result r
Lock lock(mutex_);
connection_ = cnx;
incomingMessages_.clear();
- cnx->registerConsumer(consumerId_, shared_from_this());
state_ = Ready;
backoff_.reset();
// Complicated logic since we don't have a isLocked() function for
mutex
@@ -389,6 +393,25 @@ void ConsumerImpl::messageReceived(const
ClientConnectionPtr& cnx, const proto::
}
}
+void ConsumerImpl::activeConsumerChanged(bool isActive) {
+ if (eventListener_) {
+ listenerExecutor_->postWork(
+ std::bind(&ConsumerImpl::internalConsumerChangeListener,
shared_from_this(), isActive));
+ }
+}
+
+void ConsumerImpl::internalConsumerChangeListener(bool isActive) {
+ try {
+ if (isActive) {
+ eventListener_->becameActive(Consumer(shared_from_this()),
partitionIndex_);
+ } else {
+ eventListener_->becameInactive(Consumer(shared_from_this()),
partitionIndex_);
+ }
+ } catch (const std::exception& e) {
+ LOG_ERROR(getName() << "Exception thrown from event listener " <<
e.what());
+ }
+}
+
void ConsumerImpl::failPendingReceiveCallback() {
Message msg;
Lock lock(pendingReceiveMutex_);
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h
b/pulsar-client-cpp/lib/ConsumerImpl.h
index b176209..0754a89 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -79,6 +79,7 @@ class ConsumerImpl : public ConsumerImplBase,
void messageReceived(const ClientConnectionPtr& cnx, const
proto::CommandMessage& msg,
bool& isChecksumValid, proto::MessageMetadata&
msgMetadata, SharedBuffer& payload);
void messageProcessed(Message& msg, bool track = true);
+ void activeConsumerChanged(bool isActive);
inline proto::CommandSubscribe_SubType getSubType();
inline proto::CommandSubscribe_InitialPosition getInitialPosition();
void handleUnsubscribe(Result result, ResultCallback callback);
@@ -148,6 +149,9 @@ class ConsumerImpl : public ConsumerImplBase,
void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result);
void internalListener();
+
+ void internalConsumerChangeListener(bool isActive);
+
void handleClose(Result result, ResultCallback callback, ConsumerImplPtr
consumer);
ConsumerStatsBasePtr consumerStatsBasePtr_;
@@ -182,6 +186,7 @@ class ConsumerImpl : public ConsumerImplBase,
const std::string subscription_;
std::string originalSubscriptionName_;
MessageListener messageListener_;
+ ConsumerEventListenerPtr eventListener_;
ExecutorServicePtr listenerExecutor_;
bool hasParent_;
ConsumerTopicType consumerTopicType_;
diff --git a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
index 88746dd..199b50c 100644
--- a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
@@ -28,11 +28,19 @@ DECLARE_LOG_OBJECT()
using namespace pulsar;
+class DummyEventListener : public ConsumerEventListener {
+ public:
+ virtual void becameActive(Consumer consumer, int partitionId) override {}
+
+ virtual void becameInactive(Consumer consumer, int partitionId) override {}
+};
+
TEST(ConsumerConfigurationTest, testDefaultConfig) {
ConsumerConfiguration conf;
ASSERT_EQ(conf.getSchema().getSchemaType(), SchemaType::BYTES);
ASSERT_EQ(conf.getConsumerType(), ConsumerExclusive);
ASSERT_EQ(conf.hasMessageListener(), false);
+ ASSERT_EQ(conf.hasConsumerEventListener(), false);
ASSERT_EQ(conf.getReceiverQueueSize(), 1000);
ASSERT_EQ(conf.getMaxTotalReceiverQueueSizeAcrossPartitions(), 50000);
ASSERT_EQ(conf.getConsumerName(), "");
@@ -73,6 +81,9 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
conf.setMessageListener([](Consumer consumer, const Message& msg) {});
ASSERT_EQ(conf.hasMessageListener(), true);
+ conf.setConsumerEventListener(std::make_shared<DummyEventListener>());
+ ASSERT_EQ(conf.hasConsumerEventListener(), true);
+
conf.setReceiverQueueSize(2000);
ASSERT_EQ(conf.getReceiverQueueSize(), 2000);
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc
b/pulsar-client-cpp/tests/ConsumerTest.cc
index b5a49c1..2747b2d 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -41,6 +41,221 @@ DECLARE_LOG_OBJECT()
namespace pulsar {
+class ConsumerStateEventListener : public ConsumerEventListener {
+ public:
+ ConsumerStateEventListener(std::string name) { name_ = name; }
+
+ void becameActive(Consumer consumer, int partitionId) override {
+ LOG_INFO("Received consumer active event, partitionId:" << partitionId
<< ", name: " << name_);
+ activeQueue_.push(partitionId);
+ }
+
+ void becameInactive(Consumer consumer, int partitionId) override {
+ LOG_INFO("Received consumer inactive event, partitionId:" <<
partitionId << ", name: " << name_);
+ inActiveQueue_.push(partitionId);
+ }
+
+ std::queue<int> activeQueue_;
+ std::queue<int> inActiveQueue_;
+ std::string name_;
+};
+
+typedef std::shared_ptr<ConsumerStateEventListener>
ConsumerStateEventListenerPtr;
+
+void verifyConsumerNotReceiveAnyStateChanges(ConsumerStateEventListenerPtr
listener) {
+ ASSERT_EQ(0, listener->activeQueue_.size());
+ ASSERT_EQ(0, listener->inActiveQueue_.size());
+}
+
+void verifyConsumerActive(ConsumerStateEventListenerPtr listener, int
partitionId) {
+ ASSERT_NE(0, listener->activeQueue_.size());
+ int pid = listener->activeQueue_.front();
+ listener->activeQueue_.pop();
+ ASSERT_EQ(partitionId, pid);
+ ASSERT_EQ(0, listener->inActiveQueue_.size());
+}
+
+void verifyConsumerInactive(ConsumerStateEventListenerPtr listener, int
partitionId) {
+ ASSERT_NE(0, listener->inActiveQueue_.size());
+ int pid = listener->inActiveQueue_.front();
+ listener->inActiveQueue_.pop();
+ ASSERT_EQ(partitionId, pid);
+ ASSERT_EQ(0, listener->activeQueue_.size());
+}
+
+class ActiveInactiveListenerEvent : public ConsumerEventListener {
+ public:
+ void becameActive(Consumer consumer, int partitionId) override {
+ Lock lock(mutex_);
+ activePartitonIds_.emplace(partitionId);
+ inactivePartitionIds_.erase(partitionId);
+ }
+
+ void becameInactive(Consumer consumer, int partitionId) override {
+ Lock lock(mutex_);
+ activePartitonIds_.erase(partitionId);
+ inactivePartitionIds_.emplace(partitionId);
+ }
+
+ typedef std::unique_lock<std::mutex> Lock;
+ std::set<int> activePartitonIds_;
+ std::set<int> inactivePartitionIds_;
+ std::mutex mutex_;
+};
+
+typedef std::shared_ptr<ActiveInactiveListenerEvent>
ActiveInactiveListenerEventPtr;
+
+TEST(ConsumerTest, testConsumerEventWithoutPartition) {
+ Client client(lookupUrl);
+
+ const std::string topicName = "testConsumerEventWithoutPartition-topic-" +
std::to_string(time(nullptr));
+ const std::string subName = "sub";
+ const int waitTimeInMs = 1000;
+ // constexpr int unAckedMessagesTimeoutMs = 10000;
+ // constexpr int tickDurationInMs = 1000;
+
+ // 1. two consumers on the same subscription
+ Consumer consumer1;
+ ConsumerConfiguration config1;
+ ConsumerStateEventListenerPtr listener1 =
std::make_shared<ConsumerStateEventListener>("listener-1");
+ config1.setConsumerEventListener(listener1);
+ config1.setConsumerName("consumer-1");
+ config1.setConsumerType(ConsumerType::ConsumerFailover);
+
+ ASSERT_EQ(pulsar::ResultOk, client.subscribe(topicName, subName, config1,
consumer1));
+ std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
+
+ Consumer consumer2;
+ ConsumerConfiguration config2;
+ ConsumerStateEventListenerPtr listener2 =
std::make_shared<ConsumerStateEventListener>("listener-2");
+ config2.setConsumerEventListener(listener2);
+ config2.setConsumerName("consumer-2");
+ config2.setConsumerType(ConsumerType::ConsumerFailover);
+
+ ASSERT_EQ(pulsar::ResultOk, client.subscribe(topicName, subName, config2,
consumer2));
+ std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
+
+ verifyConsumerActive(listener1, -1);
+ verifyConsumerInactive(listener2, -1);
+
+ // clear inActiveQueue_
+ std::queue<int>().swap(listener2->inActiveQueue_);
+
+ consumer1.close();
+ std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
+ verifyConsumerActive(listener2, -1);
+ verifyConsumerNotReceiveAnyStateChanges(listener1);
+}
+
+TEST(ConsumerTest, testConsumerEventWithPartition) {
+ Client client(lookupUrl);
+
+ const int numPartitions = 4;
+ const std::string partitionedTopic =
+ "testConsumerEventWithPartition-topic-" +
std::to_string(time(nullptr));
+ const std::string subName = "sub";
+ const int numOfMessages = 100;
+ constexpr int unAckedMessagesTimeoutMs = 10000;
+ constexpr int tickDurationInMs = 1000;
+
+ int res =
+ makePutRequest(adminUrl + "admin/v2/persistent/public/default/" +
partitionedTopic + "/partitions",
+ std::to_string(numPartitions));
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ // two consumers on the same subscription
+ Consumer consumer1;
+ ConsumerConfiguration config1;
+ ActiveInactiveListenerEventPtr listener1 =
std::make_shared<ActiveInactiveListenerEvent>();
+ config1.setConsumerEventListener(listener1);
+ config1.setConsumerName("consumer-1");
+ config1.setConsumerType(ConsumerType::ConsumerFailover);
+ config1.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+ config1.setTickDurationInMs(tickDurationInMs);
+ client.subscribe(partitionedTopic, subName, config1, consumer1);
+
+ Consumer consumer2;
+ ConsumerConfiguration config2;
+ ActiveInactiveListenerEventPtr listener2 =
std::make_shared<ActiveInactiveListenerEvent>();
+ config2.setConsumerEventListener(listener2);
+ config2.setConsumerName("consumer-2");
+ config2.setConsumerType(ConsumerType::ConsumerFailover);
+ config1.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+ config1.setTickDurationInMs(tickDurationInMs);
+ client.subscribe(partitionedTopic, subName, config2, consumer2);
+
+ // send messages
+ ProducerConfiguration producerConfig;
+ producerConfig.setBatchingEnabled(false);
+ producerConfig.setBlockIfQueueFull(true);
+
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic,
producerConfig, producer));
+ std::string prefix = "message-";
+ for (int i = 0; i < numOfMessages; i++) {
+ std::string messageContent = prefix + std::to_string(i);
+ Message msg = MessageBuilder().setContent(messageContent).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+ producer.flush();
+ producer.close();
+
+ // receive message and check partitionIds on consumer1
+ std::set<int> receivedPartitionIds;
+ while (true) {
+ Message msg;
+ Result rc = consumer1.receive(msg, 1000);
+ if (pulsar::ResultOk != rc) {
+ break;
+ }
+
+ MessageId msgId = msg.getMessageId();
+ int32_t partitionIndex = msgId.partition();
+ ASSERT_TRUE(partitionIndex < numPartitions);
+ consumer1.acknowledge(msgId);
+ receivedPartitionIds.insert(partitionIndex);
+ }
+
+ std::set<int> result;
+ std::set_difference(listener1->activePartitonIds_.begin(),
listener1->activePartitonIds_.end(),
+ receivedPartitionIds.begin(),
receivedPartitionIds.end(),
+ std::inserter(result, result.end()));
+ ASSERT_EQ(0, result.size());
+
+ std::set<int>().swap(result);
+ std::set_difference(listener2->inactivePartitionIds_.begin(),
listener2->inactivePartitionIds_.end(),
+ receivedPartitionIds.begin(),
receivedPartitionIds.end(),
+ std::inserter(result, result.end()));
+ ASSERT_EQ(0, result.size());
+
+ // receive message and check partitionIds on consumer2
+ std::set<int>().swap(receivedPartitionIds);
+ while (true) {
+ Message msg;
+ Result rc = consumer2.receive(msg, 1000);
+ if (pulsar::ResultOk != rc) {
+ break;
+ }
+ MessageId msgId = msg.getMessageId();
+ int32_t partitionIndex = msgId.partition();
+ ASSERT_TRUE(partitionIndex < numPartitions);
+ consumer2.acknowledge(msgId);
+ receivedPartitionIds.insert(partitionIndex);
+ }
+
+ std::set<int>().swap(result);
+ std::set_difference(listener2->activePartitonIds_.begin(),
listener2->activePartitonIds_.end(),
+ receivedPartitionIds.begin(),
receivedPartitionIds.end(),
+ std::inserter(result, result.end()));
+ ASSERT_EQ(0, result.size());
+
+ std::set<int>().swap(result);
+ std::set_difference(listener1->inactivePartitionIds_.begin(),
listener1->inactivePartitionIds_.end(),
+ receivedPartitionIds.begin(),
receivedPartitionIds.end(),
+ std::inserter(result, result.end()));
+ ASSERT_EQ(0, result.size());
+}
+
TEST(ConsumerTest, consumerNotInitialized) {
Consumer consumer;