merlimat closed pull request #2453: Fixed cpp multi-topic consumer when topics 
are not partitioned
URL: https://github.com/apache/incubator-pulsar/pull/2453
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 6750273649..69c3cc0257 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -175,17 +175,30 @@ void 
MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
     boost::shared_ptr<std::atomic<int>> partitionsNeedCreate =
         boost::make_shared<std::atomic<int>>(numPartitions);
 
-    for (int i = 0; i < numPartitions; i++) {
-        std::string topicPartitionName = topicName->getTopicPartitionName(i);
-        consumer = boost::make_shared<ConsumerImpl>(client_, 
topicPartitionName, subscriptionName_, config,
-                                                    internalListenerExecutor, 
Partitioned);
+    if (numPartitions == 1) {
+        // We don't have to add partition-n suffix
+        consumer = boost::make_shared<ConsumerImpl>(client_, 
topicName->toString(), subscriptionName_, config,
+                                                    internalListenerExecutor, 
NonPartitioned);
         consumer->getConsumerCreatedFuture().addListener(
             boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, 
shared_from_this(), _1, _2,
                         partitionsNeedCreate, topicSubResultPromise));
-        consumer->setPartitionIndex(i);
-        consumers_.insert(std::make_pair(topicPartitionName, consumer));
-        LOG_DEBUG("Create Consumer for - " << topicPartitionName << " - " << 
consumerStr_);
+        consumers_.insert(std::make_pair(topicName->toString(), consumer));
+        LOG_DEBUG("Creating Consumer for - " << topicName << " - " << 
consumerStr_);
         consumer->start();
+
+    } else {
+        for (int i = 0; i < numPartitions; i++) {
+            std::string topicPartitionName = 
topicName->getTopicPartitionName(i);
+            consumer = boost::make_shared<ConsumerImpl>(client_, 
topicPartitionName, subscriptionName_,
+                                                        config, 
internalListenerExecutor, Partitioned);
+            consumer->getConsumerCreatedFuture().addListener(
+                
boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, 
shared_from_this(), _1, _2,
+                            partitionsNeedCreate, topicSubResultPromise));
+            consumer->setPartitionIndex(i);
+            consumers_.insert(std::make_pair(topicPartitionName, consumer));
+            LOG_DEBUG("Creating Consumer for - " << topicPartitionName << " - 
" << consumerStr_);
+            consumer->start();
+        }
     }
 }
 
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index e87850c7c3..73430fc291 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1604,10 +1604,12 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
     std::string topicName1 = 
"persistent://prop/unit/ns/testMultiTopicsConsumer1";
     std::string topicName2 = 
"persistent://prop/unit/ns/testMultiTopicsConsumer2";
     std::string topicName3 = 
"persistent://prop/unit/ns/testMultiTopicsConsumer3";
+    std::string topicName4 = 
"persistent://prop/unit/ns/testMultiTopicsConsumer4";
 
     topicNames.push_back(topicName1);
     topicNames.push_back(topicName2);
     topicNames.push_back(topicName3);
+    topicNames.push_back(topicName4);
 
     // call admin api to make topics partitioned
     std::string url1 = adminUrl + 
"admin/persistent/prop/unit/ns/testMultiTopicsConsumer1/partitions";
@@ -1631,7 +1633,11 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
     result = client.createProducer(topicName3, producer3);
     ASSERT_EQ(ResultOk, result);
 
-    LOG_INFO("created 3 producers");
+    Producer producer4;
+    result = client.createProducer(topicName4, producer4);
+    ASSERT_EQ(ResultOk, result);
+
+    LOG_INFO("created 4 producers");
 
     int messageNumber = 100;
     ConsumerConfiguration consConfig;
@@ -1644,7 +1650,7 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
     result = consumerFuture.get(consumer);
     ASSERT_EQ(ResultOk, result);
     ASSERT_EQ(consumer.getSubscriptionName(), subName);
-    LOG_INFO("created topics consumer on 3 topics");
+    LOG_INFO("created topics consumer on 4 topics");
 
     std::string msgContent = "msg-content";
     LOG_INFO("Publishing 100 messages by producer 1 synchronously");
@@ -1673,14 +1679,23 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
         ASSERT_EQ(ResultOk, producer3.send(msg));
     }
 
-    LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer");
-    for (int i = 0; i < 3 * messageNumber; i++) {
+    msgContent = "msg-content4";
+    LOG_INFO("Publishing 100 messages by producer 4 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer4.send(msg));
+    }
+
+    LOG_INFO("Consuming and acking 400 messages by multiTopicsConsumer");
+    for (int i = 0; i < 4 * messageNumber; i++) {
         Message m;
         ASSERT_EQ(ResultOk, consumer.receive(m, 10000));
         ASSERT_EQ(ResultOk, consumer.acknowledge(m));
     }
 
-    LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer");
+    LOG_INFO("Consumed and acked 400 messages by multiTopicsConsumer");
 
     ASSERT_EQ(ResultOk, consumer.unsubscribe());
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to