This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new c8abba8f1ea [fix][client] Fix RawReader hasMessageAvailable returns 
true when no messages (#21032)
c8abba8f1ea is described below

commit c8abba8f1ea3fefba8a0338b4eee70a08b1a4f26
Author: Jiwe Guo <[email protected]>
AuthorDate: Tue Oct 24 18:02:38 2023 +0800

    [fix][client] Fix RawReader hasMessageAvailable returns true when no 
messages (#21032)
---
 .../src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 331a353b9bd..8faf02c81b3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -151,14 +151,13 @@ public class RawReaderImpl implements RawReader {
                     // TODO message validation
                     numMsg = 1;
                 }
+                MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
+                lastDequeuedMessageId = new 
BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
+                        messageId.getPartition(), numMsg - 1);
                 if (!future.complete(messageAndCnx.msg)) {
                     messageAndCnx.msg.close();
                     closeAsync();
                 }
-                MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
-                lastDequeuedMessageId = new 
BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
-                    messageId.getPartition(), numMsg - 1);
-
                 ClientCnx currentCnx = cnx();
                 if (currentCnx == messageAndCnx.cnx) {
                     increaseAvailablePermits(currentCnx, numMsg);

Reply via email to