shibd commented on code in PR #54:
URL: https://github.com/apache/pulsar-client-cpp/pull/54#discussion_r1000347337
##########
tests/ClientTest.cc:
##########
@@ -295,3 +292,90 @@ TEST(ClientTest, testMultiBrokerUrl) {
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {},
reader));
client.close();
}
+
+enum class EndToEndType : uint8_t
+{
+ SINGLE_TOPIC,
+ MULTI_TOPICS,
+ REGEX_TOPICS
+};
+
+class ClientCloseTest : public ::testing::TestWithParam<EndToEndType> {
+ public:
+ void SetUp() override {
+ topic_ = topic_ + std::to_string(id_++) + "-" +
std::to_string(time(nullptr));
+ if (GetParam() != EndToEndType::SINGLE_TOPIC) {
+ int res = makePutRequest(
+ "http://localhost:8080/admin/v2/persistent/public/default/" +
topic_ + "/partitions", "2");
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+ }
+ }
+
+ protected:
+ std::string topic_ = "client-close-test-";
+ static std::atomic_int id_;
+
+ Result subscribe(Client &client, Consumer &consumer) {
+ if (GetParam() == EndToEndType::REGEX_TOPICS) {
+ // NOTE: Currently the regex subscription requires the complete
namespace prefix
+ return client.subscribeWithRegex("persistent://public/default/" +
topic_ + ".*", "sub", consumer);
+ } else {
+ return client.subscribe(topic_, "sub", consumer);
+ }
+ }
+};
+
+std::atomic_int ClientCloseTest::id_{0};
+
+TEST_P(ClientCloseTest, testCloseHandlers) {
+ Client client(lookupUrl);
+ auto &producers = PulsarFriend::getProducers(client);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic_, producer));
+ ASSERT_EQ(producers.size(), 1);
+ ASSERT_EQ(ResultOk, producer.close());
+ ASSERT_EQ(producers.size(), 0);
+
+ auto &consumers = PulsarFriend::getConsumers(client);
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, subscribe(client, consumer));
+ ASSERT_EQ(consumers.size(), 1);
+ ASSERT_EQ(ResultOk, consumer.close());
+ ASSERT_EQ(consumers.size(), 0);
+
+ ASSERT_EQ(ResultOk, subscribe(client, consumer));
+ ASSERT_EQ(consumers.size(), 1);
+ ASSERT_EQ(ResultOk, consumer.unsubscribe());
+ ASSERT_EQ(consumers.size(), 0);
+
+ auto connections = PulsarFriend::getConnections(client);
+ for (const auto &cnx : PulsarFriend::getConnections(client)) {
+ ASSERT_TRUE(PulsarFriend::getProducers(*cnx).empty());
+ ASSERT_TRUE(PulsarFriend::getConsumers(*cnx).empty());
+ }
+
+ ASSERT_EQ(ResultOk, client.close());
+}
+
+TEST_P(ClientCloseTest, testShutdown) {
+ Producer producer;
+ Consumer consumer;
+ {
+ Client client(lookupUrl);
+ ASSERT_EQ(ResultOk, client.createProducer(topic_, producer));
+ ASSERT_EQ(ResultOk, subscribe(client, consumer));
+ ASSERT_EQ(PulsarFriend::getProducers(client).size(), 1);
+ ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 1);
+ } // client is destructed here and `shutdown()` method will be called
+
+ EXPECT_EQ(ResultAlreadyClosed,
producer.send(MessageBuilder().setContent("msg").build()));
Review Comment:
It feels like this test is wrong. The user does not call any `close` or
`shutdown` methods and has references to `producer` and `consumer`. However, it
does not work properly.
Maybe we still need `weak_ptr` to prevent the client from being destructed
in this scenario.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]