zhuhai1221 opened a new issue, #23999:
URL: https://github.com/apache/pulsar/issues/23999

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that unsupported versions don't get bug fixes. I will 
attempt to reproduce the issue on a supported version of Pulsar client and 
Pulsar broker.
   
   
   ### Version
   
   3.0.9
   
   ### Minimal reproduce step
   
   Exact steps to reproduce aren't yet confirmed.
   
   
   send messages
           Producer<String> producer = client.newProducer(Schema.STRING)
                   .topic("test")
                   .batcherBuilder(BatcherBuilder.KEY_BASED)
                   .blockIfQueueFull(true)
                   .sendTimeout(3, TimeUnit.SECONDS)
                   .create();
           for (int j = 0; j < 2000000; j++) {
               producer.newMessage()
                       .key("test")
                       .value("test")
                       .sendAsync();
               if (j % 2 == 0) {
                   producer.flush();
               }
           }
          producer.close()
   
   consume messages
           consumer = client.newConsumer(Schema.STRING)
                   .topic("test")
                   .receiverQueueSize(1000)
                   .batchReceivePolicy(BatchReceivePolicy.builder()
                           .maxNumMessages(1000)
                           .timeout(100, TimeUnit.MILLISECONDS)
                           .build())
                   .subscriptionName("Key-Shared")
                   .subscriptionType(SubscriptionType.Key_Shared)
                   
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   .subscribe();
           while (true) {
               Messages<String> messages = consumer.batchReceive();
               log.info("接收到消息数量:" + messages.size());
               Thread.sleep(3000);
               consumer.acknowledge(messages);
           }
   
   
   
   
   ### What did you expect to see?
   
   Consume all messages in the topic
   
   ### What did you see instead?
   
   Sometimes there is a backlog of messages, but you cannot continue to consume 
them.
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to