michalcukierman opened a new issue, #23239:
URL: https://github.com/apache/pulsar/issues/23239

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [X] I understand that unsupported versions don't get bug fixes. I will 
attempt to reproduce the issue on a supported version of Pulsar client and 
Pulsar broker.
   
   
   ### Version
   
   3.3.1
   3.0.5
   
   ### Minimal reproduce step
   
   1. Execute the following java program:
   
   ```java
   package org.example;
   
   import java.util.concurrent.atomic.AtomicBoolean;
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.api.SubscriptionInitialPosition;
   import org.apache.pulsar.client.api.SubscriptionMode;
   
   public class Main {
   
     public static void main(String[] args) throws PulsarClientException, 
InterruptedException {
       AtomicBoolean running = new AtomicBoolean(true);
   
       String topic = "tbce0";
   
       PulsarClient client = PulsarClient.builder()
           .serviceUrl("pulsar://localhost:61723")
           .build();
   
       Producer<byte[]> producer = client.newProducer()
           .topic(topic)
           .create();
   
       String message = "Hello, Pulsar!";
       for (int i = 0; i < 1000; i++) {
         producer.send(message.getBytes());
       }
       System.out.println("Sent messages: " + message);
       producer.close();
   
       Thread.ofVirtual().start(() -> {
         try {
           Consumer<byte[]> consumer = client.newConsumer()
               .topic(topic)
               
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
               .subscriptionName("bce-subscription-0")
               .subscriptionMode(SubscriptionMode.NonDurable)
               //.readCompacted(true)
               .subscribe();
   
           int counter = 1;
           while (running.get()) {
             Message<byte[]> receivedMessage = consumer.receive();
             System.out.println(
                 "Received " + counter++ + " message: " + new 
String(receivedMessage.getData()));
             consumer.acknowledge(receivedMessage);
           }
           consumer.close();
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
       });
   
       Runtime.getRuntime().addShutdownHook(new Thread(() -> {
         System.out.println("Exiting...");
         running.set(false);
         try {
           client.close();
         } catch (PulsarClientException e) {
           throw new RuntimeException(e);
         }
       }));
   
       Thread.sleep(600_000);
     }
   }
   ```
   
   2. Wait for the results:
   
   ```log
   ...
   Received 993 message: Hello, Pulsar!
   Received 994 message: Hello, Pulsar!
   Received 995 message: Hello, Pulsar!
   Received 996 message: Hello, Pulsar!
   Received 997 message: Hello, Pulsar!
   Received 998 message: Hello, Pulsar!
   Received 999 message: Hello, Pulsar!
   Received 1000 message: Hello, Pulsar!
   
   ```
   
   3. Run the following script (if on Kubernetes):
   
   ```bash
   TOPIC=tbce0
   kubectl exec -n kaap pulsar-broker-0 -- ./bin/pulsar-admin topics stats 
$TOPIC | grep msgBacklog
   kubectl exec -n kaap pulsar-broker-0 -- ./bin/pulsar-admin topics unload 
$TOPIC
   kubectl exec -n kaap pulsar-broker-0 -- ./bin/pulsar-admin topics stats 
$TOPIC | grep msgBacklog
   ```
   
   
   ### What did you expect to see?
   
   The cursor should be set to the end of the topic, backlog should be 0.
   
   ```
   Before unloading
         "msgBacklog" : 0,
         "msgBacklogNoDelayed" : 0,
   After unloading
         "msgBacklog" : 0,
         "msgBacklogNoDelayed" : 0,
   
   ```
   
   ### What did you see instead?
   
   Backlog is not empty:
   
   Before unloading
         "msgBacklog" : 0,
         "msgBacklogNoDelayed" : 0,
   After unloading
         "msgBacklog" : 1,
         "msgBacklogNoDelayed" : 1,
   
   
   ### Anything else?
   
   1. You may want to increase the messages retention to not loose it between 
check
   2. Unloading is just the easiest way to reproduce, broker restart may cause 
the same
   
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to