poorbarcode commented on code in PR #21187:
URL: https://github.com/apache/pulsar/pull/21187#discussion_r1338461297


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -349,7 +350,11 @@ protected void readMoreEntries(Consumer consumer) {
                 }
                 havePendingRead = true;
                 if (consumer.readCompacted()) {
-                    boolean readFromEarliest = isFirstRead && 
MessageId.earliest.equals(consumer.getStartMessageId());
+                    boolean readFromEarliest = false;
+                    if (!cursor.isDurable() || ((ManagedCursorImpl) 
cursor).isCompactionCursor()
+                            || cursor.getPersistentMarkDeletedPosition() == 
null) {
+                        readFromEarliest =  isFirstRead && 
MessageId.earliest.equals(consumer.getStartMessageId());

Review Comment:
   Got it.  I wrote a test below(it works as expected). Could you add it into 
the current PR to guarantee this feature will not be broken?
   
   ```java
   @Test
   public void testReaderReconnectedFromNextEntry() throws Exception {
       final String topic = newTopicName();
       Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
               .startMessageId(MessageId.earliest).create();
       Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
   
       // Send 3 and consume 1.
       producer.send("1");
       producer.send("2");
       producer.send("3");
       Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
       assertEquals(msg1.getValue(), "1");
   
       // Trigger reader reconnect.
       admin.topics().unload(topic);
   
       // For non-durable we are going to restart from the next entry.
       Message<String> msg2 = reader.readNext(2, TimeUnit.SECONDS);
       assertEquals(msg2.getValue(), "2");
       Message<String> msg3 = reader.readNext(2, TimeUnit.SECONDS);
       assertEquals(msg3.getValue(), "3");
   
       // cleanup.
       reader.close();
       producer.close();
       admin.topics().delete(topic, false);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to