This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b76784  [Issue 1240][C++] Support setting ConsumerEventListener in 
pulsar cpp client (#12118)
3b76784 is described below

commit 3b76784b73b7c50a66482552db7845241333b563
Author: metahys <[email protected]>
AuthorDate: Fri Sep 24 23:19:27 2021 +0800

    [Issue 1240][C++] Support setting ConsumerEventListener in pulsar cpp 
client (#12118)
    
    * [C++] Support setting ConsumerEventListener in pulsar cpp client
    
    * Rename "become" to "became"
    
    * Register consumer before sending subscribe request
    
    * Add unit tests in ConsumerTest
---
 .../include/pulsar/ConsumerConfiguration.h         |  19 ++
 .../include/pulsar/ConsumerEventListener.h         |  49 +++++
 pulsar-client-cpp/lib/ClientConnection.cc          |  29 ++-
 pulsar-client-cpp/lib/ClientConnection.h           |   1 +
 pulsar-client-cpp/lib/ConsumerConfiguration.cc     |  13 ++
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h  |   2 +
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  25 ++-
 pulsar-client-cpp/lib/ConsumerImpl.h               |   5 +
 .../tests/ConsumerConfigurationTest.cc             |  11 ++
 pulsar-client-cpp/tests/ConsumerTest.cc            | 215 +++++++++++++++++++++
 10 files changed, 365 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 85b1f0a..201eba3 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -30,6 +30,7 @@
 #include <pulsar/CryptoKeyReader.h>
 #include <pulsar/InitialPosition.h>
 #include <pulsar/KeySharedPolicy.h>
+#include <pulsar/ConsumerEventListener.h>
 
 namespace pulsar {
 
@@ -43,6 +44,8 @@ typedef std::function<void(Result, const Message& msg)> 
ReceiveCallback;
 /// Callback definition for MessageListener
 typedef std::function<void(Consumer consumer, const Message& msg)> 
MessageListener;
 
+typedef std::shared_ptr<ConsumerEventListener> ConsumerEventListenerPtr;
+
 struct ConsumerConfigurationImpl;
 
 /**
@@ -128,6 +131,22 @@ class PULSAR_PUBLIC ConsumerConfiguration {
     bool hasMessageListener() const;
 
     /**
+     * A event listener enables your application to react the consumer state
+     * change event (active or inactive).
+     */
+    ConsumerConfiguration& setConsumerEventListener(ConsumerEventListenerPtr 
eventListener);
+
+    /**
+     * @return the consumer event listener
+     */
+    ConsumerEventListenerPtr getConsumerEventListener() const;
+
+    /**
+     * @return true if the consumer event listener has been set
+     */
+    bool hasConsumerEventListener() const;
+
+    /**
      * Sets the size of the consumer receive queue.
      *
      * The consumer receive queue controls how many messages can be 
accumulated by the consumer before the
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerEventListener.h 
b/pulsar-client-cpp/include/pulsar/ConsumerEventListener.h
new file mode 100644
index 0000000..467dce1
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/ConsumerEventListener.h
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_CONSUMEREVENTLISTENER_H_
+#define PULSAR_CONSUMEREVENTLISTENER_H_
+
+#include <pulsar/defines.h>
+
+namespace pulsar {
+
+class Consumer;
+
+class PULSAR_PUBLIC ConsumerEventListener {
+   public:
+    virtual ~ConsumerEventListener(){};
+    /**
+     * @brief Notified when the consumer group is changed, and the consumer 
becomes active.
+     *
+     * @param consumer the consumer that originated the event
+     * @param partitionId the id of the partition that beconmes active.
+     */
+    virtual void becameActive(Consumer consumer, int partitionId) = 0;
+
+    /**
+     * @brief Notified when the consumer group is changed, and the consumer is 
still inactive or becomes
+     * inactive.
+     *
+     * @param consumer the consumer that originated the event
+     * @param partitionId the id of the partition that is still inactive or 
becomes inactive.
+     */
+    virtual void becameInactive(Consumer consumer, int partitionId) = 0;
+};
+}  // namespace pulsar
+#endif /* PULSAR_CONSUMEREVENTLISTENER_H_ */
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index 9f10fea..8781979 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -717,6 +717,26 @@ bool ClientConnection::verifyChecksum(SharedBuffer& 
incomingBuffer_, uint32_t& r
     return isChecksumValid;
 }
 
+void ClientConnection::handleActiveConsumerChange(const 
proto::CommandActiveConsumerChange& change) {
+    Lock lock(mutex_);
+    ConsumersMap::iterator it = consumers_.find(change.consumer_id());
+    if (it != consumers_.end()) {
+        ConsumerImplPtr consumer = it->second.lock();
+
+        if (consumer) {
+            lock.unlock();
+            consumer->activeConsumerChanged(change.is_active());
+        } else {
+            consumers_.erase(change.consumer_id());
+            LOG_DEBUG(cnxString_ << "Ignoring incoming message for already 
destroyed consumer "
+                                 << change.consumer_id());
+        }
+    } else {
+        LOG_DEBUG(cnxString_ << "Got invalid consumer Id in " << 
change.consumer_id()
+                             << " -- isActive: " << change.is_active());
+    }
+}
+
 void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, 
bool isChecksumValid,
                                              proto::MessageMetadata& 
msgMetadata, SharedBuffer& payload) {
     LOG_DEBUG(cnxString_ << "Received a message from the server for consumer: 
" << msg.consumer_id());
@@ -1128,12 +1148,15 @@ void ClientConnection::handleIncomingCommand() {
                     asyncWrite(buffer.const_asio_buffer(),
                                
std::bind(&ClientConnection::handleSentAuthResponse, shared_from_this(),
                                          std::placeholders::_1, buffer));
+                    break;
                 }
 
                 case BaseCommand::ACTIVE_CONSUMER_CHANGE: {
-                    LOG_DEBUG(cnxString_ << "Received notification about 
active consumer changes");
-                    // ignore this message for now.
-                    // TODO: 
@link{https://github.com/apache/pulsar/issues/1240}
+                    const CommandActiveConsumerChange& change = 
incomingCmd_.active_consumer_change();
+                    LOG_DEBUG(cnxString_
+                              << "Received notification about active consumer 
change, consumer_id: "
+                              << change.consumer_id() << " isActive: " << 
change.is_active());
+                    handleActiveConsumerChange(change);
                     break;
                 }
 
diff --git a/pulsar-client-cpp/lib/ClientConnection.h 
b/pulsar-client-cpp/lib/ClientConnection.h
index f2f01ee..25fb6a1 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -193,6 +193,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     bool verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t& 
remainingBytes,
                         proto::BaseCommand& incomingCmd_);
 
+    void handleActiveConsumerChange(const proto::CommandActiveConsumerChange& 
change);
     void handleIncomingCommand();
     void handleIncomingMessage(const proto::CommandMessage& msg, bool 
isChecksumValid,
                                proto::MessageMetadata& msgMetadata, 
SharedBuffer& payload);
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc 
b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 748af71..b01e4e5 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -73,6 +73,19 @@ MessageListener ConsumerConfiguration::getMessageListener() 
const { return impl_
 
 bool ConsumerConfiguration::hasMessageListener() const { return 
impl_->hasMessageListener; }
 
+ConsumerConfiguration& ConsumerConfiguration::setConsumerEventListener(
+    ConsumerEventListenerPtr eventListener) {
+    impl_->eventListener = eventListener;
+    impl_->hasConsumerEventListener = true;
+    return *this;
+}
+
+ConsumerEventListenerPtr ConsumerConfiguration::getConsumerEventListener() 
const {
+    return impl_->eventListener;
+}
+
+bool ConsumerConfiguration::hasConsumerEventListener() const { return 
impl_->hasConsumerEventListener; }
+
 void ConsumerConfiguration::setReceiverQueueSize(int size) { 
impl_->receiverQueueSize = size; }
 
 int ConsumerConfiguration::getReceiverQueueSize() const { return 
impl_->receiverQueueSize; }
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h 
b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 14ef613..75f65a7 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -35,6 +35,8 @@ struct ConsumerConfigurationImpl {
     ConsumerType consumerType{ConsumerExclusive};
     MessageListener messageListener;
     bool hasMessageListener{false};
+    ConsumerEventListenerPtr eventListener;
+    bool hasConsumerEventListener{false};
     int receiverQueueSize{1000};
     int maxTotalReceiverQueueSizeAcrossPartitions{50000};
     std::string consumerName;
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 560ca6d..77c0fa9 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -47,6 +47,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const 
std::string& topic,
       subscription_(subscriptionName),
       originalSubscriptionName_(subscriptionName),
       messageListener_(config_.getMessageListener()),
+      eventListener_(config_.getConsumerEventListener()),
       hasParent_(hasParent),
       consumerTopicType_(consumerTopicType),
       subscriptionMode_(subscriptionMode),
@@ -164,6 +165,10 @@ void ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& cnx) {
         return;
     }
 
+    // Register consumer so that we can handle other incomming commands (e.g. 
ACTIVE_CONSUMER_CHANGE) after
+    // sending the subscribe request.
+    cnx->registerConsumer(consumerId_, shared_from_this());
+
     Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
     unAckedMessageTrackerPtr_->clear();
     batchAcknowledgementTracker_.clear();
@@ -217,7 +222,6 @@ void ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result r
             Lock lock(mutex_);
             connection_ = cnx;
             incomingMessages_.clear();
-            cnx->registerConsumer(consumerId_, shared_from_this());
             state_ = Ready;
             backoff_.reset();
             // Complicated logic since we don't have a isLocked() function for 
mutex
@@ -389,6 +393,25 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
     }
 }
 
+void ConsumerImpl::activeConsumerChanged(bool isActive) {
+    if (eventListener_) {
+        listenerExecutor_->postWork(
+            std::bind(&ConsumerImpl::internalConsumerChangeListener, 
shared_from_this(), isActive));
+    }
+}
+
+void ConsumerImpl::internalConsumerChangeListener(bool isActive) {
+    try {
+        if (isActive) {
+            eventListener_->becameActive(Consumer(shared_from_this()), 
partitionIndex_);
+        } else {
+            eventListener_->becameInactive(Consumer(shared_from_this()), 
partitionIndex_);
+        }
+    } catch (const std::exception& e) {
+        LOG_ERROR(getName() << "Exception thrown from event listener " << 
e.what());
+    }
+}
+
 void ConsumerImpl::failPendingReceiveCallback() {
     Message msg;
     Lock lock(pendingReceiveMutex_);
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h 
b/pulsar-client-cpp/lib/ConsumerImpl.h
index b176209..0754a89 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -79,6 +79,7 @@ class ConsumerImpl : public ConsumerImplBase,
     void messageReceived(const ClientConnectionPtr& cnx, const 
proto::CommandMessage& msg,
                          bool& isChecksumValid, proto::MessageMetadata& 
msgMetadata, SharedBuffer& payload);
     void messageProcessed(Message& msg, bool track = true);
+    void activeConsumerChanged(bool isActive);
     inline proto::CommandSubscribe_SubType getSubType();
     inline proto::CommandSubscribe_InitialPosition getInitialPosition();
     void handleUnsubscribe(Result result, ResultCallback callback);
@@ -148,6 +149,9 @@ class ConsumerImpl : public ConsumerImplBase,
     void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result);
 
     void internalListener();
+
+    void internalConsumerChangeListener(bool isActive);
+
     void handleClose(Result result, ResultCallback callback, ConsumerImplPtr 
consumer);
     ConsumerStatsBasePtr consumerStatsBasePtr_;
 
@@ -182,6 +186,7 @@ class ConsumerImpl : public ConsumerImplBase,
     const std::string subscription_;
     std::string originalSubscriptionName_;
     MessageListener messageListener_;
+    ConsumerEventListenerPtr eventListener_;
     ExecutorServicePtr listenerExecutor_;
     bool hasParent_;
     ConsumerTopicType consumerTopicType_;
diff --git a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc 
b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
index 88746dd..199b50c 100644
--- a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
@@ -28,11 +28,19 @@ DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
 
+class DummyEventListener : public ConsumerEventListener {
+   public:
+    virtual void becameActive(Consumer consumer, int partitionId) override {}
+
+    virtual void becameInactive(Consumer consumer, int partitionId) override {}
+};
+
 TEST(ConsumerConfigurationTest, testDefaultConfig) {
     ConsumerConfiguration conf;
     ASSERT_EQ(conf.getSchema().getSchemaType(), SchemaType::BYTES);
     ASSERT_EQ(conf.getConsumerType(), ConsumerExclusive);
     ASSERT_EQ(conf.hasMessageListener(), false);
+    ASSERT_EQ(conf.hasConsumerEventListener(), false);
     ASSERT_EQ(conf.getReceiverQueueSize(), 1000);
     ASSERT_EQ(conf.getMaxTotalReceiverQueueSizeAcrossPartitions(), 50000);
     ASSERT_EQ(conf.getConsumerName(), "");
@@ -73,6 +81,9 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
     conf.setMessageListener([](Consumer consumer, const Message& msg) {});
     ASSERT_EQ(conf.hasMessageListener(), true);
 
+    conf.setConsumerEventListener(std::make_shared<DummyEventListener>());
+    ASSERT_EQ(conf.hasConsumerEventListener(), true);
+
     conf.setReceiverQueueSize(2000);
     ASSERT_EQ(conf.getReceiverQueueSize(), 2000);
 
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc 
b/pulsar-client-cpp/tests/ConsumerTest.cc
index b5a49c1..2747b2d 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -41,6 +41,221 @@ DECLARE_LOG_OBJECT()
 
 namespace pulsar {
 
+class ConsumerStateEventListener : public ConsumerEventListener {
+   public:
+    ConsumerStateEventListener(std::string name) { name_ = name; }
+
+    void becameActive(Consumer consumer, int partitionId) override {
+        LOG_INFO("Received consumer active event, partitionId:" << partitionId 
<< ", name: " << name_);
+        activeQueue_.push(partitionId);
+    }
+
+    void becameInactive(Consumer consumer, int partitionId) override {
+        LOG_INFO("Received consumer inactive event, partitionId:" << 
partitionId << ", name: " << name_);
+        inActiveQueue_.push(partitionId);
+    }
+
+    std::queue<int> activeQueue_;
+    std::queue<int> inActiveQueue_;
+    std::string name_;
+};
+
+typedef std::shared_ptr<ConsumerStateEventListener> 
ConsumerStateEventListenerPtr;
+
+void verifyConsumerNotReceiveAnyStateChanges(ConsumerStateEventListenerPtr 
listener) {
+    ASSERT_EQ(0, listener->activeQueue_.size());
+    ASSERT_EQ(0, listener->inActiveQueue_.size());
+}
+
+void verifyConsumerActive(ConsumerStateEventListenerPtr listener, int 
partitionId) {
+    ASSERT_NE(0, listener->activeQueue_.size());
+    int pid = listener->activeQueue_.front();
+    listener->activeQueue_.pop();
+    ASSERT_EQ(partitionId, pid);
+    ASSERT_EQ(0, listener->inActiveQueue_.size());
+}
+
+void verifyConsumerInactive(ConsumerStateEventListenerPtr listener, int 
partitionId) {
+    ASSERT_NE(0, listener->inActiveQueue_.size());
+    int pid = listener->inActiveQueue_.front();
+    listener->inActiveQueue_.pop();
+    ASSERT_EQ(partitionId, pid);
+    ASSERT_EQ(0, listener->activeQueue_.size());
+}
+
+class ActiveInactiveListenerEvent : public ConsumerEventListener {
+   public:
+    void becameActive(Consumer consumer, int partitionId) override {
+        Lock lock(mutex_);
+        activePartitonIds_.emplace(partitionId);
+        inactivePartitionIds_.erase(partitionId);
+    }
+
+    void becameInactive(Consumer consumer, int partitionId) override {
+        Lock lock(mutex_);
+        activePartitonIds_.erase(partitionId);
+        inactivePartitionIds_.emplace(partitionId);
+    }
+
+    typedef std::unique_lock<std::mutex> Lock;
+    std::set<int> activePartitonIds_;
+    std::set<int> inactivePartitionIds_;
+    std::mutex mutex_;
+};
+
+typedef std::shared_ptr<ActiveInactiveListenerEvent> 
ActiveInactiveListenerEventPtr;
+
+TEST(ConsumerTest, testConsumerEventWithoutPartition) {
+    Client client(lookupUrl);
+
+    const std::string topicName = "testConsumerEventWithoutPartition-topic-" + 
std::to_string(time(nullptr));
+    const std::string subName = "sub";
+    const int waitTimeInMs = 1000;
+    // constexpr int unAckedMessagesTimeoutMs = 10000;
+    // constexpr int tickDurationInMs = 1000;
+
+    // 1. two consumers on the same subscription
+    Consumer consumer1;
+    ConsumerConfiguration config1;
+    ConsumerStateEventListenerPtr listener1 = 
std::make_shared<ConsumerStateEventListener>("listener-1");
+    config1.setConsumerEventListener(listener1);
+    config1.setConsumerName("consumer-1");
+    config1.setConsumerType(ConsumerType::ConsumerFailover);
+
+    ASSERT_EQ(pulsar::ResultOk, client.subscribe(topicName, subName, config1, 
consumer1));
+    std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
+
+    Consumer consumer2;
+    ConsumerConfiguration config2;
+    ConsumerStateEventListenerPtr listener2 = 
std::make_shared<ConsumerStateEventListener>("listener-2");
+    config2.setConsumerEventListener(listener2);
+    config2.setConsumerName("consumer-2");
+    config2.setConsumerType(ConsumerType::ConsumerFailover);
+
+    ASSERT_EQ(pulsar::ResultOk, client.subscribe(topicName, subName, config2, 
consumer2));
+    std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
+
+    verifyConsumerActive(listener1, -1);
+    verifyConsumerInactive(listener2, -1);
+
+    // clear inActiveQueue_
+    std::queue<int>().swap(listener2->inActiveQueue_);
+
+    consumer1.close();
+    std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeInMs * 2));
+    verifyConsumerActive(listener2, -1);
+    verifyConsumerNotReceiveAnyStateChanges(listener1);
+}
+
+TEST(ConsumerTest, testConsumerEventWithPartition) {
+    Client client(lookupUrl);
+
+    const int numPartitions = 4;
+    const std::string partitionedTopic =
+        "testConsumerEventWithPartition-topic-" + 
std::to_string(time(nullptr));
+    const std::string subName = "sub";
+    const int numOfMessages = 100;
+    constexpr int unAckedMessagesTimeoutMs = 10000;
+    constexpr int tickDurationInMs = 1000;
+
+    int res =
+        makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + 
partitionedTopic + "/partitions",
+                       std::to_string(numPartitions));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    // two consumers on the same subscription
+    Consumer consumer1;
+    ConsumerConfiguration config1;
+    ActiveInactiveListenerEventPtr listener1 = 
std::make_shared<ActiveInactiveListenerEvent>();
+    config1.setConsumerEventListener(listener1);
+    config1.setConsumerName("consumer-1");
+    config1.setConsumerType(ConsumerType::ConsumerFailover);
+    config1.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+    config1.setTickDurationInMs(tickDurationInMs);
+    client.subscribe(partitionedTopic, subName, config1, consumer1);
+
+    Consumer consumer2;
+    ConsumerConfiguration config2;
+    ActiveInactiveListenerEventPtr listener2 = 
std::make_shared<ActiveInactiveListenerEvent>();
+    config2.setConsumerEventListener(listener2);
+    config2.setConsumerName("consumer-2");
+    config2.setConsumerType(ConsumerType::ConsumerFailover);
+    config1.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+    config1.setTickDurationInMs(tickDurationInMs);
+    client.subscribe(partitionedTopic, subName, config2, consumer2);
+
+    // send messages
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(false);
+    producerConfig.setBlockIfQueueFull(true);
+    
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, 
producerConfig, producer));
+    std::string prefix = "message-";
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+    producer.flush();
+    producer.close();
+
+    // receive message and check partitionIds on consumer1
+    std::set<int> receivedPartitionIds;
+    while (true) {
+        Message msg;
+        Result rc = consumer1.receive(msg, 1000);
+        if (pulsar::ResultOk != rc) {
+            break;
+        }
+
+        MessageId msgId = msg.getMessageId();
+        int32_t partitionIndex = msgId.partition();
+        ASSERT_TRUE(partitionIndex < numPartitions);
+        consumer1.acknowledge(msgId);
+        receivedPartitionIds.insert(partitionIndex);
+    }
+
+    std::set<int> result;
+    std::set_difference(listener1->activePartitonIds_.begin(), 
listener1->activePartitonIds_.end(),
+                        receivedPartitionIds.begin(), 
receivedPartitionIds.end(),
+                        std::inserter(result, result.end()));
+    ASSERT_EQ(0, result.size());
+
+    std::set<int>().swap(result);
+    std::set_difference(listener2->inactivePartitionIds_.begin(), 
listener2->inactivePartitionIds_.end(),
+                        receivedPartitionIds.begin(), 
receivedPartitionIds.end(),
+                        std::inserter(result, result.end()));
+    ASSERT_EQ(0, result.size());
+
+    // receive message and check partitionIds on consumer2
+    std::set<int>().swap(receivedPartitionIds);
+    while (true) {
+        Message msg;
+        Result rc = consumer2.receive(msg, 1000);
+        if (pulsar::ResultOk != rc) {
+            break;
+        }
+        MessageId msgId = msg.getMessageId();
+        int32_t partitionIndex = msgId.partition();
+        ASSERT_TRUE(partitionIndex < numPartitions);
+        consumer2.acknowledge(msgId);
+        receivedPartitionIds.insert(partitionIndex);
+    }
+
+    std::set<int>().swap(result);
+    std::set_difference(listener2->activePartitonIds_.begin(), 
listener2->activePartitonIds_.end(),
+                        receivedPartitionIds.begin(), 
receivedPartitionIds.end(),
+                        std::inserter(result, result.end()));
+    ASSERT_EQ(0, result.size());
+
+    std::set<int>().swap(result);
+    std::set_difference(listener1->inactivePartitionIds_.begin(), 
listener1->inactivePartitionIds_.end(),
+                        receivedPartitionIds.begin(), 
receivedPartitionIds.end(),
+                        std::inserter(result, result.end()));
+    ASSERT_EQ(0, result.size());
+}
+
 TEST(ConsumerTest, consumerNotInitialized) {
     Consumer consumer;
 

Reply via email to