massakam opened a new pull request, #17503:
URL: https://github.com/apache/pulsar/pull/17503

   ### Motivation
   When we start a consumer like the following and publish multiple messages to 
a topic, ack timeout occurs even though the messages received are immediately 
acknowledged. This does not occur with normal topics, only with partitioned 
topics.
   ```cpp
   ClientConfiguration clientConfig;
   clientConfig.setMessageListenerThreads(1);
   Client client("pulsar://localhost:6650", clientConfig);
   
   ConsumerConfiguration consumerConfig;
   consumerConfig.setConsumerType(ConsumerShared);
   consumerConfig.setUnAckedMessagesTimeoutMs(10000);
   consumerConfig.setMessageListener([](Consumer con, const Message& msg) {
       cout << "Received: " << msg.getDataAsString() << endl;
       con.acknowledge(msg);
       this_thread::sleep_for(chrono::milliseconds(20000));
   });
   
   Consumer consumer;
   client.subscribe("persistent://public/default/pt4", "sub1", consumerConfig, 
consumer);
   ```
   
   The reason is that the task of executing the message listener function is 
queued to `listenerExecutor_` immediately after adding the message to 
`unAckedMessageTrackerPtr_`, but it is not always executed immediately. If it 
takes a long time to execute each message listener function like the code for 
reproduction above, it will take a long time for the queued tasks to actually 
execute, causing an ack timeout. I don't think this is the expected behavior.
   
https://github.com/apache/pulsar/blob/0bbc4e1ee32a3b5f07296614a650367dbd99e607/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc#L462-L464
   
   ### Modifications
   
   Have `listenerExecutor_` perform adding messages to 
`unAckedMessageTrackerPtr_`. In this way, the message listener function will 
always be executed immediately after adding a message to 
`unAckedMessageTrackerPtr_`, preventing unexpected ack timeouts.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   ### Documentation
   
   - [ ] `doc-not-needed`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to