AnonHxy opened a new issue, #18910:
URL: https://github.com/apache/pulsar/issues/18910

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   lastest master
   
   ### Minimal reproduce step
   
   1. start standalone;
   2. create a partitioned topic with 1 partition naming `pulsar_master`;
   3. start a shared subscription type consumer with ackTimeout = 1 second, and 
the first received message will sleep 5 seconds to timeout.
   ```
       public static void main(String[] args) throws Exception {
           Logger log = LoggerFactory.getLogger(ConsumerTest.class);
           String topic = "pulsar_master";
           String subscription = "sub_pulsar_master";
   
           PulsarClient client = PulsarClient.builder()
                   .serviceUrl("pulsar://127.0.0.1:6650/")
                   .build();
           AtomicInteger count = new AtomicInteger(0);
           MessageListener<byte[]> myMessageListener = (consumer, msg) -> {
               try {
                   log.info("Message received: " + new String(msg.getData()) + 
" topic:" + msg.getTopicName() + " "  + msg.getRedeliveryCount());
                   int i = count.incrementAndGet();
                   if (i == 1) { // only the first message will should be 
timeout
                       Thread.sleep(5000);
                   }
                   consumer.acknowledge(msg);
               } catch (Exception e) {
                   consumer.negativeAcknowledge(msg);
               }
           };
   
           client.newConsumer()
                   .topic(topic)
                   .subscriptionName(subscription)
                   .subscriptionType(Shared)
                   .ackTimeout(1, TimeUnit.SECONDS)
                   .messageListener(myMessageListener)
                   .subscribe();
   ```
   4. start a producer to publish message.
   ```
   String topic = "pulsar_master";
           PulsarClient client = PulsarClient.builder()
                   .serviceUrl("pulsar://127.0.0.1:6650")
                   .build();
           Producer<byte[]> producer = client.newProducer()
                   .topic(topic)
                   .enableBatching(false)
                   .create();
           for (int i = 0; i < 10; ++i) {
               Thread.sleep(1000);
               producer.newMessage().orderingKey("1".getBytes()).value(("msg" + 
i).getBytes()).send();
           }
           producer.close();
   ```
   
   ### What did you expect to see?
   
   Only the first message(msg0) should be redelivered. The result shoule be:
   ```
   Message received: msg0 RedeliveryCount: 0
   ConsumerBase{subscription='sub_pulsar_master', 1 messages will be 
re-delivered
   Message received: msg1 RedeliveryCount: 0
   Message received: msg0 RedeliveryCount: 1
   Message received: msg2 RedeliveryCount: 0
   Message received: msg3 RedeliveryCount: 0
   Message received: msg4 RedeliveryCount: 0
   Message received: msg5 RedeliveryCount: 0
   Message received: msg6 RedeliveryCount: 0
   Message received: msg7 RedeliveryCount: 0
   Message received: msg8 RedeliveryCount: 0
   Message received: msg9 RedeliveryCount: 0
   ```
   
   ### What did you see instead?
   
   msg1, msg3 also  redelivered
   ```
   Message received: msg0 RedeliveryCount: 0
   [ConsumerBase{subscription='sub_pulsar_master', consumerName='7600f', 
topic='pulsar_master'}] 1 messages will be re-delivered
   [ConsumerBase{subscription='sub_pulsar_master', consumerName='7600f', 
topic='pulsar_master'}] 1 messages will be re-delivered
    [ConsumerBase{subscription='sub_pulsar_master', consumerName='7600f', 
topic='pulsar_master'}] 2 messages will be re-delivered
   [ConsumerBase{subscription='sub_pulsar_master', consumerName='7600f', 
topic='pulsar_master'}] 2 messages will be re-delivered
   Message received: msg1 RedeliveryCount: 0
   Message received: msg0 RedeliveryCount: 1
   Message received: msg2 RedeliveryCount: 0
   Message received: msg1 RedeliveryCount: 1
   Message received: msg3 RedeliveryCount: 0
   Message received: msg0 RedeliveryCount: 2
   Message received: msg2 RedeliveryCount: 1
   Message received: msg4 RedeliveryCount: 0
   Message received: msg1 RedeliveryCount: 2
   Message received: msg3 RedeliveryCount: 1
   Message received: msg5 RedeliveryCount: 0
   Message received: msg6 RedeliveryCount: 0
   Message received: msg7 RedeliveryCount: 0
   Message received: msg8 RedeliveryCount: 0
   Message received: msg9 RedeliveryCount: 0
   ```
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to