This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2456729  [issue 438] reader.HasNext() returns true on empty topic 
(#441)
2456729 is described below

commit 2456729a54cad11865005d177698a617db864c72
Author: Ming <[email protected]>
AuthorDate: Thu Jan 14 20:21:26 2021 -0500

    [issue 438] reader.HasNext() returns true on empty topic (#441)
---
 pulsar/impl_message.go |  4 ++++
 pulsar/reader_impl.go  |  6 +++---
 pulsar/reader_test.go  | 20 ++++++++++++++++++++
 3 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 6fc9cad..c358c22 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -71,6 +71,10 @@ func (id trackingMessageID) ack() bool {
        return true
 }
 
+func (id messageID) isEntryIDValid() bool {
+       return id.entryID >= 0
+}
+
 func (id messageID) greater(other messageID) bool {
        if id.ledgerID != other.ledgerID {
                return id.ledgerID > other.ledgerID
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 8caaff4..d76865e 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -161,15 +161,15 @@ func (r *reader) HasNext() bool {
 
 func (r *reader) hasMoreMessages() bool {
        if !r.pc.lastDequeuedMsg.Undefined() {
-               return 
r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID)
+               return r.lastMessageInBroker.isEntryIDValid() && 
r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID)
        }
 
        if r.pc.options.startMessageIDInclusive {
-               return 
r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID)
+               return r.lastMessageInBroker.isEntryIDValid() && 
r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID)
        }
 
        // Non-inclusive
-       return r.lastMessageInBroker.greater(r.pc.startMessageID.messageID)
+       return r.lastMessageInBroker.isEntryIDValid() && 
r.lastMessageInBroker.greater(r.pc.startMessageID.messageID)
 }
 
 func (r *reader) Close() {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 793dc8d..36204cf 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -312,6 +312,26 @@ func TestReaderOnLatestWithBatching(t *testing.T) {
        cancel()
 }
 
+func TestReaderHasNextAgainstEmptyTopic(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       // create reader on 5th message (not included)
+       reader, err := client.CreateReader(ReaderOptions{
+               Topic:          "an-empty-topic",
+               StartMessageID: EarliestMessageID(),
+       })
+
+       assert.Nil(t, err)
+       defer reader.Close()
+
+       assert.Equal(t, reader.HasNext(), false)
+}
+
 func TestReaderHasNext(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,

Reply via email to