This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ecc34a9 Fixed cpp multi-topic consumer when topics are not
partitioned (#2453)
ecc34a9 is described below
commit ecc34a943bb005712ad2492cda6a1d3d8674255f
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Aug 27 23:28:55 2018 -0700
Fixed cpp multi-topic consumer when topics are not partitioned (#2453)
* Fixed cpp multi-topic consumer when topics are not partitioned
* Fixed formatting
---
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 27 ++++++++++++++++++------
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 25 +++++++++++++++++-----
2 files changed, 40 insertions(+), 12 deletions(-)
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 6750273..69c3cc0 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -175,17 +175,30 @@ void
MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
boost::shared_ptr<std::atomic<int>> partitionsNeedCreate =
boost::make_shared<std::atomic<int>>(numPartitions);
- for (int i = 0; i < numPartitions; i++) {
- std::string topicPartitionName = topicName->getTopicPartitionName(i);
- consumer = boost::make_shared<ConsumerImpl>(client_,
topicPartitionName, subscriptionName_, config,
- internalListenerExecutor,
Partitioned);
+ if (numPartitions == 1) {
+ // We don't have to add partition-n suffix
+ consumer = boost::make_shared<ConsumerImpl>(client_,
topicName->toString(), subscriptionName_, config,
+ internalListenerExecutor,
NonPartitioned);
consumer->getConsumerCreatedFuture().addListener(
boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated,
shared_from_this(), _1, _2,
partitionsNeedCreate, topicSubResultPromise));
- consumer->setPartitionIndex(i);
- consumers_.insert(std::make_pair(topicPartitionName, consumer));
- LOG_DEBUG("Create Consumer for - " << topicPartitionName << " - " <<
consumerStr_);
+ consumers_.insert(std::make_pair(topicName->toString(), consumer));
+ LOG_DEBUG("Creating Consumer for - " << topicName << " - " <<
consumerStr_);
consumer->start();
+
+ } else {
+ for (int i = 0; i < numPartitions; i++) {
+ std::string topicPartitionName =
topicName->getTopicPartitionName(i);
+ consumer = boost::make_shared<ConsumerImpl>(client_,
topicPartitionName, subscriptionName_,
+ config,
internalListenerExecutor, Partitioned);
+ consumer->getConsumerCreatedFuture().addListener(
+
boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated,
shared_from_this(), _1, _2,
+ partitionsNeedCreate, topicSubResultPromise));
+ consumer->setPartitionIndex(i);
+ consumers_.insert(std::make_pair(topicPartitionName, consumer));
+ LOG_DEBUG("Creating Consumer for - " << topicPartitionName << " -
" << consumerStr_);
+ consumer->start();
+ }
}
}
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index e87850c..73430fc 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1604,10 +1604,12 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
std::string topicName1 =
"persistent://prop/unit/ns/testMultiTopicsConsumer1";
std::string topicName2 =
"persistent://prop/unit/ns/testMultiTopicsConsumer2";
std::string topicName3 =
"persistent://prop/unit/ns/testMultiTopicsConsumer3";
+ std::string topicName4 =
"persistent://prop/unit/ns/testMultiTopicsConsumer4";
topicNames.push_back(topicName1);
topicNames.push_back(topicName2);
topicNames.push_back(topicName3);
+ topicNames.push_back(topicName4);
// call admin api to make topics partitioned
std::string url1 = adminUrl +
"admin/persistent/prop/unit/ns/testMultiTopicsConsumer1/partitions";
@@ -1631,7 +1633,11 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
result = client.createProducer(topicName3, producer3);
ASSERT_EQ(ResultOk, result);
- LOG_INFO("created 3 producers");
+ Producer producer4;
+ result = client.createProducer(topicName4, producer4);
+ ASSERT_EQ(ResultOk, result);
+
+ LOG_INFO("created 4 producers");
int messageNumber = 100;
ConsumerConfiguration consConfig;
@@ -1644,7 +1650,7 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
ASSERT_EQ(consumer.getSubscriptionName(), subName);
- LOG_INFO("created topics consumer on 3 topics");
+ LOG_INFO("created topics consumer on 4 topics");
std::string msgContent = "msg-content";
LOG_INFO("Publishing 100 messages by producer 1 synchronously");
@@ -1673,14 +1679,23 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
ASSERT_EQ(ResultOk, producer3.send(msg));
}
- LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer");
- for (int i = 0; i < 3 * messageNumber; i++) {
+ msgContent = "msg-content4";
+ LOG_INFO("Publishing 100 messages by producer 4 synchronously");
+ for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+ std::stringstream stream;
+ stream << msgContent << msgNum;
+ Message msg = MessageBuilder().setContent(stream.str()).build();
+ ASSERT_EQ(ResultOk, producer4.send(msg));
+ }
+
+ LOG_INFO("Consuming and acking 400 messages by multiTopicsConsumer");
+ for (int i = 0; i < 4 * messageNumber; i++) {
Message m;
ASSERT_EQ(ResultOk, consumer.receive(m, 10000));
ASSERT_EQ(ResultOk, consumer.acknowledge(m));
}
- LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer");
+ LOG_INFO("Consumed and acked 400 messages by multiTopicsConsumer");
ASSERT_EQ(ResultOk, consumer.unsubscribe());