eolivelli opened a new issue, #16421:
URL: https://github.com/apache/pulsar/issues/16421

   I am doing some testing on Shared subscription and Batch messages with the 
current Pulsar master.
   
   The behaviour that I am observing is that when you have Batch messages the 
Consumer is sending flow control messages for more messages that it can handle.
   
   This is how to reproduce the problem:
   - write 100.000 messages using batching
   - start a Consumer with a Shared subscription (from the beginning of the 
topic)
   - you will see that the PersistentDispatcherMultipleConsumers `consumerFlow` 
trigger the read of many messages
   
   This is happening because `consumerFlow` calls  `readMoreEntries() ` and 
`readMoreEntries() ` sees that there are messages to be re-delivered, because 
the consumer still haven't acknowledged them.
   
   This is turn requests the ManagedCursor to read the data from storage.
   
   I have observed this behaviour while working on offloader performances, but 
it also happens with regular BK based ledgers.
   
   This simple test case reproduces the problem, I append it to this test 
https://github.com/apache/pulsar/blob/1ba180cbc7490eff6ac6d3a78d61ce7919236c95/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java#L66
 
   
   
    ```
      @Test
       public void testConsumerFlowOnSharedSubscription() throws Exception {
           String topic = "persistent://my-property/my-ns/topic" + 
UUID.randomUUID();
           admin.topics().createNonPartitionedTopic(topic);
           String subName = "my-sub";
           int numMessages = 20_000;
           final CountDownLatch count = new CountDownLatch(numMessages);
           try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
                   .subscriptionMode(SubscriptionMode.Durable)
                   .subscriptionType(SubscriptionType.Shared)
                   .topic(topic)
                   .subscriptionName(subName)
                   .messageListener(new MessageListener<byte[]>() {
                       @Override
                       public void received(Consumer<byte[]> consumer, 
Message<byte[]> msg) {
                           //log.info("received {} - {}", msg, 
count.getCount());
                           consumer.acknowledgeAsync(msg);
                           count.countDown();
                       }
                   })
                   .subscribe();
                Producer<byte[]> producer = pulsarClient
                   .newProducer()
                   .blockIfQueueFull(true)
                   .enableBatching(true)
                   .topic(topic)
                        .create()) {
               consumer.pause();
               byte[] message = "foo".getBytes(StandardCharsets.UTF_8);
               List<CompletableFuture<?>> futures = new ArrayList<>();
               for (int i = 0; i < numMessages; i++) {
                   futures.add(producer.sendAsync(message).whenComplete( (id,e) 
-> {
                       if (e != null) {
                           log.error("error", e);
                       }
                   }));
                   if (futures.size() == 1000) {
                       FutureUtil.waitForAll(futures).get();
                       futures.clear();
                   }
               }
               producer.flush();
               consumer.resume();
               assertTrue(count.await(20, TimeUnit.SECONDS));
           }
   
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to