BewareMyPower commented on code in PR #332:
URL: https://github.com/apache/pulsar-client-cpp/pull/332#discussion_r1366703612
##########
tests/ConsumerTest.cc:
##########
@@ -1335,4 +1335,67 @@ TEST(ConsumerTest, testRetrySubscribe) {
// milliseconds
}
+TEST(ConsumerTest, testNoListenerThreadBlocking) {
+ Client client{lookupUrl};
+
+ const int numPartitions = 2;
+ const std::string partitionedTopic = "testNoListenerThreadBlocking-" +
std::to_string(time(nullptr));
+ int res =
+ makePutRequest(adminUrl + "admin/v2/persistent/public/default/" +
partitionedTopic + "/partitions",
+ std::to_string(numPartitions));
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ const int receiverQueueSize = 1;
+ const int receiverQueueSizeAcrossPartitions = receiverQueueSize *
numPartitions;
+
+ Consumer consumer1, consumer2;
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setReceiverQueueSize(receiverQueueSize);
+
consumerConfig.setMaxTotalReceiverQueueSizeAcrossPartitions(receiverQueueSizeAcrossPartitions);
+ Result consumerResult;
+ consumerResult = client.subscribe(partitionedTopic, "sub1",
consumerConfig, consumer1);
+ ASSERT_EQ(consumerResult, ResultOk);
+ consumerResult = client.subscribe(partitionedTopic, "sub2",
consumerConfig, consumer2);
+ ASSERT_EQ(consumerResult, ResultOk);
+
+ Producer producer;
+ ProducerConfiguration producerConfig;
+ producerConfig.setBatchingEnabled(false);
+
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+ Result producerResult = client.createProducer(partitionedTopic,
producerConfig, producer);
+ ASSERT_EQ(producerResult, ResultOk);
+
+ const int msgCount = receiverQueueSizeAcrossPartitions * 100;
+
+ for (int i = 0; i < msgCount; ++i) {
+ auto msg = MessageBuilder().setContent("test").build();
+ producer.sendAsync(msg, [](Result code, const MessageId& messageId)
{});
+ }
+ producer.flush();
+ producer.close();
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(200));
+
+ // check consumer1 prefetch num
+ auto multiConsumerImpl =
PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1);
+ int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages();
+ ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions);
Review Comment:
You can use `waitUntil` instead of the trivial sleep.
```c++
waitUntil(std::chrono::seconds(1), [consumer1] {
auto multiConsumerImpl =
PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1);
return multiConsumerImpl->getNumOfPrefetchedMessages() ==
receiverQueueSizeAcrossPartitions;
});
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]