nodece commented on issue #483:
URL: 
https://github.com/apache/pulsar-client-cpp/issues/483#issuecomment-3424531681

   Hi @BewareMyPower, I think there might be a race condition in the 
client/consumer.
   In our case, we close the consumer immediately after performing a seek 
operation. It seems there's a timing issue: when we try to create a new 
consumer right after closing the previous one, we occasionally hit a "consumer 
busy" error.
   
   ```cpp
   #include <iostream>
   #include <thread>
   #include <pulsar/Client.h>
   using namespace std;
   using namespace pulsar;
   
   int main() {
       Client client("pulsar://localhost:6650");
   
       const auto topic = "persistent://public/default/my-topic-1";
   
       // Create producer
       Producer producer;
       const auto producerRes = client.createProducer(topic, producer);
       if (producerRes != ResultOk) {
           cout << "Failed to create producer: " << producerRes << endl;
           return 1;
       }
   
       // Create consumer
       Consumer consumer;
       ConsumerConfiguration consConfig;
       consConfig.setMessageListener([](Consumer &consumer, const Message &msg) 
{
           cout << msg.getMessageId() << endl;
       });
       std::string subName = "sub";
       const auto subscribeRes = client.subscribe(topic, subName, consConfig, 
consumer);
       if (subscribeRes != ResultOk) {
           cout << "Failed to subscribe: " << subscribeRes << endl;
           return 1;
       }
   
       const auto seekTS = 
chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).
               count();
   
       std::string msgContent = "msg-content";
       const Message msg = 
MessageBuilder().setContent("content").setProperty("x", "1").build();
       producer.send(msg);
       std::this_thread::sleep_for(std::chrono::seconds(3));
   
       const auto consumer1SeekRes = consumer.seek(seekTS);
       cout << "Consumer1 seek result: " << consumer1SeekRes << endl;
       if (consumer1SeekRes != ResultOk) {
           return 1;
       }
   
       // When the client performs the seek operation, the broker disconnects 
the consumer, and then the consumer re-subscribes in the background.
       // If we close the consumer immediately after the seek operation, there 
may be a race condition.
       // Uses sleep_for to reduce the race condition.
       // cout << "sleep_for 5s" << endl;            
       // std::this_thread::sleep_for(std::chrono::seconds(5));
       const auto closeRes = consumer.close();
       cout << "Consumer1 close result: " << closeRes << endl;
       if (closeRes != ResultOk) {
           return 1;
       }
   
       cout << "Starting consumer2" << endl;
       Consumer consumer2;
       const auto consumer2Res = client.subscribe(topic, subName, consConfig, 
consumer2);
       cout << "Consumer2 subscribe result: " << consumer2Res << endl; // 
ConsumerBusy
       if (consumer2Res != ResultOk) {
           return 1;
       }
   
       const auto consumer2SeekRes = consumer2.seek(seekTS);
       cout << "Consumer2 seek result: " << consumer2SeekRes << endl;
       if (consumer2SeekRes != ResultOk) {
           return 1;
       }
   
       consumer2.close();
       client.close();
   
       return 0;
   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to