This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d174db8729 [fix][pulsar-broker] Fix RawReader hasMessageAvailable 
returns true when no messages (#16443)
1d174db8729 is described below

commit 1d174db872914afbafb03607a29d7a2396d87a33
Author: Yong Zhang <[email protected]>
AuthorDate: Thu Jul 14 16:21:28 2022 +0800

    [fix][pulsar-broker] Fix RawReader hasMessageAvailable returns true when no 
messages (#16443)
    
    * [fix][pulsar-broker] Fix RawReader hasMessageAvailable returns true when 
no messages
    ---
    
    *Motivation*
    
    The RawReader hasMessageAvailable will return true when all the
    messages have been consumed. And that will cause the readNextAsync
    blocked and the process never recovered.
    In the ConsumerImpl, we update the lastDequeuedMessageId in the
    messageProcess. The messageReceived method rewrites by the RawReader,
    we should update the lastDequeuedMessageId in the RawReader as well.
    
    * Fix the hasMessageAvaiable in batch messages
---
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  3 ++
 .../apache/pulsar/client/impl/RawReaderTest.java   | 52 ++++++++++++++++++++++
 2 files changed, 55 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 92cbbe4a399..331a353b9bd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -155,6 +155,9 @@ public class RawReaderImpl implements RawReader {
                     messageAndCnx.msg.close();
                     closeAsync();
                 }
+                MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
+                lastDequeuedMessageId = new 
BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
+                    messageId.getPartition(), numMsg - 1);
 
                 ClientCnx currentCnx = cnx();
                 if (currentCnx == messageAndCnx.cnx) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index 5fb3a187f0b..40f36cf4609 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -106,6 +106,58 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
         return msgMetadata.getPartitionKey();
     }
 
+    @Test
+    public void testHasMessageAvailableWithoutBatch() throws Exception {
+        int numKeys = 10;
+        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        Set<String> keys = publishMessages(topic, numKeys);
+        RawReader reader = RawReader.create(pulsarClient, topic, 
subscription).get();
+        while (true) {
+            boolean hasMsg = reader.hasMessageAvailableAsync().get();
+            if (hasMsg && keys.isEmpty()) {
+                Assert.fail("HasMessageAvailable shows still has message when 
there is no message");
+            }
+            if (hasMsg) {
+                try (RawMessage m = reader.readNextAsync().get()) {
+                    Assert.assertTrue(keys.remove(extractKey(m)));
+                }
+            } else {
+                break;
+            }
+        }
+        Assert.assertTrue(keys.isEmpty());
+    }
+
+    @Test
+    public void testHasMessageAvailableWithBatch() throws Exception {
+        int numKeys = 20;
+        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        Set<String> keys = publishMessages(topic, numKeys, true);
+        RawReader reader = RawReader.create(pulsarClient, topic, 
subscription).get();
+        int messageCount = 0;
+        while (true) {
+            boolean hasMsg = reader.hasMessageAvailableAsync().get();
+            if (hasMsg && (messageCount == numKeys)) {
+                Assert.fail("HasMessageAvailable shows still has message when 
there is no message");
+            }
+            if (hasMsg) {
+                try (RawMessage m = reader.readNextAsync().get()) {
+                    MessageMetadata meta = 
Commands.parseMessageMetadata(m.getHeadersAndPayload());
+                    messageCount += meta.getNumMessagesInBatch();
+                    
RawBatchConverter.extractIdsAndKeysAndSize(m).forEach(batchInfo -> {
+                        String key = batchInfo.getMiddle();
+                        Assert.assertTrue(keys.remove(key));
+                    });
+
+                }
+            } else {
+                break;
+            }
+        }
+        Assert.assertEquals(messageCount, numKeys);
+        Assert.assertTrue(keys.isEmpty());
+    }
+
     @Test
     public void testRawReader() throws Exception {
         int numKeys = 10;

Reply via email to