IdanFridman opened a new issue #2221: Getting UnAckedMessageTracker messages 
have timed-out without a reason
URL: https://github.com/apache/incubator-pulsar/issues/2221
 
 
   #### Expected behavior
   
   No warning at all
   
   Tell us what should happen
   
   No warning at all. consumer should have gotten the ack from the broker with 
warning ack timeout
   
   
   Tell us what happens instead
   
   We produce message to pulsar. the consumer recieve the message and ack it. 
for some reason we get this error:
   
   WARN  o.a.p.c.i.UnAckedMessageTracker:91 - 
[ConsumerBase{subscription=‘subscr-wallet’, consumerName=‘9e188’, 
topic=‘WithdrawEventQueue’}] 1 messages have timed-out
   
   no re-delivers occurs but still we see this error on the consumer side 
claiming an ack was timed out. 
   
   How can we reproduce the issue
   
   create consumer this way:
   ```
    ExecutorService executor = 
Executors.newFixedThreadPool(numOfConsumerThreads);
           for (int i = 0; i < numOfConsumerThreads; i++) {
               executor.submit(() ->
               {
                   try {
                      while (!Thread.currentThread().isInterrupted() && 
!isShutdown) {
                       try {
                           Message<String> msg = consumer.receive();
                                                //processing msg..
                                        }
               ...
   }
   ```
   
   and ack message this way:
   
   ```
   public void messageAckAsync(String topic, MessageId messageId, String
               eventId, java.util.function.Consumer<MessageId> successHandler,
                                   java.util.function.Consumer<Exception> 
failHandler) {
           try {
               log.trace("acking msg. topic {} messageId {} eventId {}", topic, 
messageId, eventId);
               Consumer consumer = consumerMap.get(topic).getPulsarConsumer();
               consumer.acknowledgeAsync(messageId)
                       .handle((result, ex) -> {
                           if (ex == null) {
                               log.info("Message acked: {} topic {} envetId 
{}", messageId, topic, eventId);
                               successHandler.accept(messageId);
                           } else {
                               log.error("Error acking message to pulsar 
broker. errorMsg {} topic {} eventId {}", ex, topic, eventId);
                               failHandler.accept((Exception) ex);
                           }
                           return null;
                       });
           } catch (Exception e) {
               throw new RuntimeException("Ack failed. topic= " + topic + ", 
messageId= " + messageId + ", eventId= " + eventId);
           }
       }
   
   ```
   #### System configuration
   org.apache.pulsar:pulsar-client:2.0.0-rc1-incubating

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to