nodece commented on issue #483:
URL:
https://github.com/apache/pulsar-client-cpp/issues/483#issuecomment-3424531681
Hi @BewareMyPower, I think there might be a race condition in the
client/consumer.
In our case, we close the consumer immediately after performing a seek
operation. It seems there's a timing issue: when we try to create a new
consumer right after closing the previous one, we occasionally hit a "consumer
busy" error.
```cpp
#include <iostream>
#include <thread>
#include <pulsar/Client.h>
using namespace std;
using namespace pulsar;
int main() {
Client client("pulsar://localhost:6650");
const auto topic = "persistent://public/default/my-topic-1";
// Create producer
Producer producer;
const auto producerRes = client.createProducer(topic, producer);
if (producerRes != ResultOk) {
cout << "Failed to create producer: " << producerRes << endl;
return 1;
}
// Create consumer
Consumer consumer;
ConsumerConfiguration consConfig;
consConfig.setMessageListener([](Consumer &consumer, const Message &msg)
{
cout << msg.getMessageId() << endl;
});
std::string subName = "sub";
const auto subscribeRes = client.subscribe(topic, subName, consConfig,
consumer);
if (subscribeRes != ResultOk) {
cout << "Failed to subscribe: " << subscribeRes << endl;
return 1;
}
const auto seekTS =
chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).
count();
std::string msgContent = "msg-content";
const Message msg =
MessageBuilder().setContent("content").setProperty("x", "1").build();
producer.send(msg);
std::this_thread::sleep_for(std::chrono::seconds(3));
const auto consumer1SeekRes = consumer.seek(seekTS);
cout << "Consumer1 seek result: " << consumer1SeekRes << endl;
if (consumer1SeekRes != ResultOk) {
return 1;
}
// When the client performs the seek operation, the broker disconnects
the consumer, and then the consumer re-subscribes in the background.
// If we close the consumer immediately after the seek operation, there
may be a race condition.
// Uses sleep_for to reduce the race condition.
// cout << "sleep_for 5s" << endl;
// std::this_thread::sleep_for(std::chrono::seconds(5));
const auto closeRes = consumer.close();
cout << "Consumer1 close result: " << closeRes << endl;
if (closeRes != ResultOk) {
return 1;
}
cout << "Starting consumer2" << endl;
Consumer consumer2;
const auto consumer2Res = client.subscribe(topic, subName, consConfig,
consumer2);
cout << "Consumer2 subscribe result: " << consumer2Res << endl; //
ConsumerBusy
if (consumer2Res != ResultOk) {
return 1;
}
const auto consumer2SeekRes = consumer2.seek(seekTS);
cout << "Consumer2 seek result: " << consumer2SeekRes << endl;
if (consumer2SeekRes != ResultOk) {
return 1;
}
consumer2.close();
client.close();
return 0;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]