BewareMyPower commented on code in PR #54:
URL: https://github.com/apache/pulsar-client-cpp/pull/54#discussion_r1000355173
##########
tests/ClientTest.cc:
##########
@@ -295,3 +292,90 @@ TEST(ClientTest, testMultiBrokerUrl) {
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {},
reader));
client.close();
}
+
+enum class EndToEndType : uint8_t
+{
+ SINGLE_TOPIC,
+ MULTI_TOPICS,
+ REGEX_TOPICS
+};
+
+class ClientCloseTest : public ::testing::TestWithParam<EndToEndType> {
+ public:
+ void SetUp() override {
+ topic_ = topic_ + std::to_string(id_++) + "-" +
std::to_string(time(nullptr));
+ if (GetParam() != EndToEndType::SINGLE_TOPIC) {
+ int res = makePutRequest(
+ "http://localhost:8080/admin/v2/persistent/public/default/" +
topic_ + "/partitions", "2");
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+ }
+ }
+
+ protected:
+ std::string topic_ = "client-close-test-";
+ static std::atomic_int id_;
+
+ Result subscribe(Client &client, Consumer &consumer) {
+ if (GetParam() == EndToEndType::REGEX_TOPICS) {
+ // NOTE: Currently the regex subscription requires the complete
namespace prefix
+ return client.subscribeWithRegex("persistent://public/default/" +
topic_ + ".*", "sub", consumer);
+ } else {
+ return client.subscribe(topic_, "sub", consumer);
+ }
+ }
+};
+
+std::atomic_int ClientCloseTest::id_{0};
+
+TEST_P(ClientCloseTest, testCloseHandlers) {
+ Client client(lookupUrl);
+ auto &producers = PulsarFriend::getProducers(client);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic_, producer));
+ ASSERT_EQ(producers.size(), 1);
+ ASSERT_EQ(ResultOk, producer.close());
+ ASSERT_EQ(producers.size(), 0);
+
+ auto &consumers = PulsarFriend::getConsumers(client);
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, subscribe(client, consumer));
+ ASSERT_EQ(consumers.size(), 1);
+ ASSERT_EQ(ResultOk, consumer.close());
+ ASSERT_EQ(consumers.size(), 0);
+
+ ASSERT_EQ(ResultOk, subscribe(client, consumer));
+ ASSERT_EQ(consumers.size(), 1);
+ ASSERT_EQ(ResultOk, consumer.unsubscribe());
+ ASSERT_EQ(consumers.size(), 0);
+
+ auto connections = PulsarFriend::getConnections(client);
+ for (const auto &cnx : PulsarFriend::getConnections(client)) {
+ ASSERT_TRUE(PulsarFriend::getProducers(*cnx).empty());
+ ASSERT_TRUE(PulsarFriend::getConsumers(*cnx).empty());
+ }
+
+ ASSERT_EQ(ResultOk, client.close());
+}
+
+TEST_P(ClientCloseTest, testShutdown) {
+ Producer producer;
+ Consumer consumer;
+ {
+ Client client(lookupUrl);
+ ASSERT_EQ(ResultOk, client.createProducer(topic_, producer));
+ ASSERT_EQ(ResultOk, subscribe(client, consumer));
+ ASSERT_EQ(PulsarFriend::getProducers(client).size(), 1);
+ ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 1);
+ } // client is destructed here and `shutdown()` method will be called
+
+ EXPECT_EQ(ResultAlreadyClosed,
producer.send(MessageBuilder().setContent("msg").build()));
Review Comment:
Let's use another PR to discuss about this topic. I'm currently removing
code that is related to `weak_ptr` to pure reference. And will push a new
change soon.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]