zebehringer opened a new issue, #21794:
URL: https://github.com/apache/pulsar/issues/21794

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   M1 macOS Sonoma + docker desktop 4.22.1, image: apachepulsar/pulsar:3.1.1, 
Java pulsar-client:3.1.1
   also on linux/k8s
   
   ### Minimal reproduce step
   
   using the following Java code and a local docker container running the 
apachepulsar/pulsar:3.1.1 image
   
   calling `runTest(50000, 5000, 1000000)` consistently times out a lot 
(returning partial batches), mixed with a few quickly received full batches
   calling `runTest(20000, 5000, 1000000)` never has a timeout/everything is 
received quickly
   
   ```
     public static void runTest(
             final int batchSize,
             final long batchTimeoutMs,
             final int totalMessages
     ) throws Exception {
         final PulsarClient client = PulsarClient.builder()
                 .serviceUrl("pulsar://localhost:6650")
                 .ioThreads(8)
                 .connectionsPerBroker(8)
                 .build();
   
         final Producer<byte[]> producer = client.newProducer()
                 .topic("persistent://public/default/test")
                 .compressionType(CompressionType.LZ4)
                 .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
                 .enableBatching(true)
                 .maxPendingMessages(2000)
                 .batchingMaxMessages(1000)
                 .blockIfQueueFull(true).create();
   
         final Consumer<byte[]> consumer = client.newConsumer()
                 .topic("persistent://public/default/test")
                 .subscriptionName("test2")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscriptionType(SubscriptionType.Shared)
                 .ackTimeout(batchTimeoutMs + 60000, TimeUnit.MILLISECONDS)
                 .deadLetterPolicy(
                         DeadLetterPolicy.builder()
                                 .maxRedeliverCount(10)
                                 .build()
                 )
                 .batchReceivePolicy(
                         BatchReceivePolicy.builder()
                                 .maxNumMessages(batchSize)
                                 .timeout((int)batchTimeoutMs, 
TimeUnit.MILLISECONDS)
                                 .build()
                 )
                 .receiverQueueSize(50000)
                 .subscribe();
   
         for (int i = 0; i < totalMessages; i++) {
             producer.sendAsync("item $i".getBytes());
         }
   
         int remainder = totalMessages;
         int iteration = 1;
         while (remainder > batchSize) { // compare to batchSize instead of 
zero since the last batch may not be full and would timeout, throwing off the 
test metric being observed
             long start = System.currentTimeMillis();
             final Messages<byte[]> batch = consumer.batchReceive();
             System.out.println("batch " + iteration + " size = " + 
batch.size() + ", time = " + (System.currentTimeMillis() - start));
             try { Thread.sleep(100); } catch (InterruptedException ignored) {}
             consumer.acknowledge(batch);
             remainder -= batch.size();
             iteration += 1;
         }
   
         // clean out the remaining messages
         while (remainder > 0) {
             final Messages<byte[]> batch = consumer.batchReceive();
             consumer.acknowledge(batch);
             remainder -= batch.size();
         }
   
         client.close();
     }
   ```
   
   ### What did you expect to see?
   
   I expect to see no timeouts while the topic has more messages in the backlog 
than _batchSize_
   
   I assume this something to do with some combination of 
`blockedSubscriptionOnUnackedMsgs`, `unackedMessages` and `availablePermits`. I 
read somewhere that if the number of `unackedMessages` is greater than 50000, 
flow to the consumer will stop, but the messages are ACKed very soon after 
being received, so I would expect flow to start again, and for the receive 
queue to fill up again before the timeout. It appears that I have to wait for a 
full timeout, or sometimes two, in order for the receive queue to fill up.
   
   ### What did you see instead?
   
   timeouts occur intermittently, more frequently for larger _batchSize_ 
parameter values
   
   ### Anything else?
   
   I want to use a large batchSize together with a long timeout threshold 
because this process writes message batches to file for long term storage, and 
I do not want a bunch of small files.
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to