This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4d8974d Fix reader always has message available (#8735)
4d8974d is described below
commit 4d8974d8581af53b386dd1d58edca42250e377fe
Author: feynmanlin <[email protected]>
AuthorDate: Tue Dec 1 10:55:06 2020 +0800
Fix reader always has message available (#8735)
Fixes #8721
### Motivation
Whenever you create a new Reader with startMessageId set to latest and
startMessageIdInclusive, hasMessageAvailable will be always true even if the
topic is freshly created without any messages inside. If you remove message
inclusiveness, then the issue is not appearing.
### Modifications
If the returned entryId is illegal, it means there is no message
### Verifying this change
ReaderTest#testReaderHasMessageAvailable
---
.../test/java/org/apache/pulsar/client/impl/ReaderTest.java | 12 ++++++++++++
.../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 9 +++++++--
2 files changed, 19 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index ab00a84..8400102 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -303,6 +303,18 @@ public class ReaderTest extends
MockedPulsarServiceBaseTest {
}
@Test
+ public void testReaderHasMessageAvailable() throws Exception {
+ final String topic =
"persistent://my-property/my-ns/testReaderHasMessageAvailable" +
System.currentTimeMillis();
+ @Cleanup
+ Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+ .topic(topic)
+ .startMessageId(MessageId.latest)
+ .startMessageIdInclusive()
+ .create();
+ assertFalse(reader.hasMessageAvailable());
+ }
+
+ @Test
public void testKeyHashRangeReader() throws IOException {
final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5",
"6", "7", "8", "9");
final String topic =
"persistent://my-property/my-ns/testKeyHashRangeReader";
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c87bdc7..1f2613c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2010,11 +2010,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
startMessageId.partitionIndex == -1) {
getLastMessageIdAsync()
- .thenCompose(this::seekAsync)
- .whenComplete((ignore, e) -> {
+ .thenCompose((msgId) ->
seekAsync(msgId).thenApply((ignore) -> msgId))
+ .whenComplete((msgId, e) -> {
if (e != null) {
log.error("[{}][{}] Failed getLastMessageId
command", topic, subscription);
booleanFuture.completeExceptionally(e.getCause());
+ return;
+ }
+ MessageIdImpl messageId =
MessageIdImpl.convertToMessageIdImpl(msgId);
+ if (messageId == null || messageId.getEntryId() <
0) {
+ booleanFuture.complete(false);
} else {
booleanFuture.complete(resetIncludeHead);
}