megfigura opened a new issue #10671:
URL: https://github.com/apache/pulsar/issues/10671


   **Describe the bug**
   Consuming a message with the Reader interface immediately after seeking can 
consume the wrong message. This only seems to happen in a pretty specific 
situation (details below). Adding a short pause fixes it, so I wonder if it 
_could_ happen in other situations.
   
   **To Reproduce**
   IntelliJ / Maven project is attached: 
[PulsarSeek.zip](https://github.com/apache/pulsar/files/6523034/PulsarSeek.zip)
   
   Adjust the `SERVICE_URL` as needed. I am using Pulsar 2.7.1 in Docker.
   
   ```java
   public class SeekTest
   {
       private static final String SERVICE_URL = "pulsar://localhost:6650";
       private static final String TEST_TOPIC = 
"persistent://public/default/seek_test";
   
       public static void main(String[] args) throws Exception
       {
           PulsarClient client = 
PulsarClient.builder().serviceUrl(SERVICE_URL).build();
   
           Producer<byte[]> producer = client.newProducer()
                   .topic(TEST_TOPIC)
                   .producerName("producer")
                   .create();
   
           // Produce 100 messages and record the message IDs
           System.out.print("Producing 100 messages...  ");
           ArrayList<MessageId> messageIds = new ArrayList<>();
           for (int i = 0; i < 100; i++)
           {
               // Message is the loop counter
               byte[] bytes = 
Integer.toString(i).getBytes(StandardCharsets.UTF_8);
   
               MessageId id = producer.send(bytes);
               messageIds.add(id);
           }
           System.out.println("Done");
   
   
           Reader<byte[]> reader = client.newReader()
                   .topic(TEST_TOPIC)
                   .startMessageId(MessageId.latest)
                   .startMessageIdInclusive()
                   .create();
   
           System.out.println("Seeking to message 50...");
           reader.seek(messageIds.get(50));
           reader.hasMessageAvailable();
           Message<byte[]> message = reader.readNext();
           System.out.println("Got MessageId: " + message.getMessageId());
           System.out.println("Got message data: " + new 
String(message.getData(), StandardCharsets.UTF_8));
   
           reader.closeAsync();
           producer.closeAsync();
           client.closeAsync();
       }
   }
   ```
   
   **Expected behavior**
   It should print "50", but it actually prints "99".
   
   Output:
   ```
   Producing 100 messages...  Done
   Seeking to message 50...
   Got MessageId: 21:99:-1:0
   Got message data: 99
   ```
   
   **Additional context**
   There are a few things that seem necessary to cause it to fail:
   * Create Reader with `startMessageIdInclusive()` and 
`startMessageId(MessageId.latest)`
   * Call `hasMessageAvailable()` after `seek()`
   
   If you add a short pause between `seek()` and `hasMessageAvailable()`, it 
produces the correct result ("50"). This part is a bit worrying since it seems 
to be some type of race condition. Could it cause other problems that are 
harder to reproduce?
   
   This can also be reproduced by seeking to a timestamp instead of a MessageId.
   
   In my application, I avoid this issue by creating the Reader with an initial 
position (`startMessageId()` / `startMessageFromRollbackDuration()`) instead of 
using `seek()` and this works fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to