This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fcb359299e9 [fix][client] Fix Reader.hasMessageAvailable return wrong
value after seeking by timestamp with startMessageIdInclusive (#23502)
fcb359299e9 is described below
commit fcb359299e9fd784afcebbc3967927a7f1f7c25c
Author: Jiawen Wang <[email protected]>
AuthorDate: Thu Oct 24 17:34:31 2024 +0800
[fix][client] Fix Reader.hasMessageAvailable return wrong value after
seeking by timestamp with startMessageIdInclusive (#23502)
---
.../org/apache/pulsar/client/impl/ReaderTest.java | 40 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 2 ++
2 files changed, 42 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 12228220b18..a6a3f83ebc3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -906,6 +906,46 @@ public class ReaderTest extends
MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void
testHasMessageAvailableAfterSeekTimestampWithMessageIdInclusive() throws
Exception {
+ final String topic = "persistent://my-property/my-ns/" +
+
"testHasMessageAvailableAfterSeekTimestampWithMessageInclusive";
+
+ @Cleanup
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+ final long timestampBeforeSend = System.currentTimeMillis();
+ final MessageId sentMsgId = producer.send("msg");
+
+ final List<MessageId> messageIds = new ArrayList<>();
+ messageIds.add(MessageId.earliest);
+ messageIds.add(sentMsgId);
+ messageIds.add(MessageId.latest);
+
+ for (MessageId messageId : messageIds) {
+ @Cleanup
+ Reader<String> reader =
pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
+ .startMessageIdInclusive()
+ .startMessageId(messageId).create();
+ assertTrue(reader.hasMessageAvailable());
+
+ reader.seek(System.currentTimeMillis());
+ assertFalse(reader.hasMessageAvailable());
+ Message<String> message = reader.readNext(10, TimeUnit.SECONDS);
+ assertNull(message);
+ }
+
+ for (MessageId messageId : messageIds) {
+ @Cleanup
+ Reader<String> reader =
pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
+ .startMessageIdInclusive()
+ .startMessageId(messageId).create();
+ assertTrue(reader.hasMessageAvailable());
+
+ reader.seek(timestampBeforeSend);
+ assertTrue(reader.hasMessageAvailable());
+ }
+ }
+
@Test
public void testReaderBuilderStateOnRetryFailure() throws Exception {
String ns = "my-property/my-ns";
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b7010a1ddc7..be01bd00eb3 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2515,6 +2515,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
.result();
if (lastMessageId.getEntryId() < 0) {
completehasMessageAvailableWithValue(booleanFuture, false);
+ } else if (hasSoughtByTimestamp) {
+
completehasMessageAvailableWithValue(booleanFuture, result < 0);
} else {
completehasMessageAvailableWithValue(booleanFuture,
resetIncludeHead ? result <= 0 : result <
0);