michalcukierman opened a new issue, #23239: URL: https://github.com/apache/pulsar/issues/23239
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### Version 3.3.1 3.0.5 ### Minimal reproduce step 1. Execute the following java program: ```java package org.example; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; public class Main { public static void main(String[] args) throws PulsarClientException, InterruptedException { AtomicBoolean running = new AtomicBoolean(true); String topic = "tbce0"; PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:61723") .build(); Producer<byte[]> producer = client.newProducer() .topic(topic) .create(); String message = "Hello, Pulsar!"; for (int i = 0; i < 1000; i++) { producer.send(message.getBytes()); } System.out.println("Sent messages: " + message); producer.close(); Thread.ofVirtual().start(() -> { try { Consumer<byte[]> consumer = client.newConsumer() .topic(topic) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionName("bce-subscription-0") .subscriptionMode(SubscriptionMode.NonDurable) //.readCompacted(true) .subscribe(); int counter = 1; while (running.get()) { Message<byte[]> receivedMessage = consumer.receive(); System.out.println( "Received " + counter++ + " message: " + new String(receivedMessage.getData())); consumer.acknowledge(receivedMessage); } consumer.close(); } catch (Exception e) { throw new RuntimeException(e); } }); Runtime.getRuntime().addShutdownHook(new Thread(() -> { System.out.println("Exiting..."); running.set(false); try { client.close(); } catch (PulsarClientException e) { throw new RuntimeException(e); } })); Thread.sleep(600_000); } } ``` 2. Wait for the results: ```log ... Received 993 message: Hello, Pulsar! Received 994 message: Hello, Pulsar! Received 995 message: Hello, Pulsar! Received 996 message: Hello, Pulsar! Received 997 message: Hello, Pulsar! Received 998 message: Hello, Pulsar! Received 999 message: Hello, Pulsar! Received 1000 message: Hello, Pulsar! ``` 3. Run the following script (if on Kubernetes): ```bash TOPIC=tbce0 kubectl exec -n kaap pulsar-broker-0 -- ./bin/pulsar-admin topics stats $TOPIC | grep msgBacklog kubectl exec -n kaap pulsar-broker-0 -- ./bin/pulsar-admin topics unload $TOPIC kubectl exec -n kaap pulsar-broker-0 -- ./bin/pulsar-admin topics stats $TOPIC | grep msgBacklog ``` ### What did you expect to see? The cursor should be set to the end of the topic, backlog should be 0. ``` Before unloading "msgBacklog" : 0, "msgBacklogNoDelayed" : 0, After unloading "msgBacklog" : 0, "msgBacklogNoDelayed" : 0, ``` ### What did you see instead? Backlog is not empty: Before unloading "msgBacklog" : 0, "msgBacklogNoDelayed" : 0, After unloading "msgBacklog" : 1, "msgBacklogNoDelayed" : 1, ### Anything else? 1. You may want to increase the messages retention to not loose it between check 2. Unloading is just the easiest way to reproduce, broker restart may cause the same ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
