This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fcb359299e9 [fix][client] Fix Reader.hasMessageAvailable return wrong 
value after seeking by timestamp with startMessageIdInclusive (#23502)
fcb359299e9 is described below

commit fcb359299e9fd784afcebbc3967927a7f1f7c25c
Author: Jiawen Wang <[email protected]>
AuthorDate: Thu Oct 24 17:34:31 2024 +0800

    [fix][client] Fix Reader.hasMessageAvailable return wrong value after 
seeking by timestamp with startMessageIdInclusive (#23502)
---
 .../org/apache/pulsar/client/impl/ReaderTest.java  | 40 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  2 ++
 2 files changed, 42 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 12228220b18..a6a3f83ebc3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -906,6 +906,46 @@ public class ReaderTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void 
testHasMessageAvailableAfterSeekTimestampWithMessageIdInclusive() throws 
Exception {
+        final String topic = "persistent://my-property/my-ns/" +
+                
"testHasMessageAvailableAfterSeekTimestampWithMessageInclusive";
+
+        @Cleanup
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        final long timestampBeforeSend = System.currentTimeMillis();
+        final MessageId sentMsgId = producer.send("msg");
+
+        final List<MessageId> messageIds = new ArrayList<>();
+        messageIds.add(MessageId.earliest);
+        messageIds.add(sentMsgId);
+        messageIds.add(MessageId.latest);
+
+        for (MessageId messageId : messageIds) {
+            @Cleanup
+            Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
+                    .startMessageIdInclusive()
+                    .startMessageId(messageId).create();
+            assertTrue(reader.hasMessageAvailable());
+
+            reader.seek(System.currentTimeMillis());
+            assertFalse(reader.hasMessageAvailable());
+            Message<String> message = reader.readNext(10, TimeUnit.SECONDS);
+            assertNull(message);
+        }
+
+        for (MessageId messageId : messageIds) {
+            @Cleanup
+            Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
+                    .startMessageIdInclusive()
+                    .startMessageId(messageId).create();
+            assertTrue(reader.hasMessageAvailable());
+
+            reader.seek(timestampBeforeSend);
+            assertTrue(reader.hasMessageAvailable());
+        }
+    }
+
     @Test
     public void testReaderBuilderStateOnRetryFailure() throws Exception {
         String ns = "my-property/my-ns";
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b7010a1ddc7..be01bd00eb3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2515,6 +2515,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                 .result();
                         if (lastMessageId.getEntryId() < 0) {
                             
completehasMessageAvailableWithValue(booleanFuture, false);
+                        } else if (hasSoughtByTimestamp) {
+                            
completehasMessageAvailableWithValue(booleanFuture, result < 0);
                         } else {
                             completehasMessageAvailableWithValue(booleanFuture,
                                     resetIncludeHead ? result <= 0 : result < 
0);

Reply via email to