This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b27902a110dfa88bed78016d2079225ea0e126e7
Author: feynmanlin <[email protected]>
AuthorDate: Thu Dec 3 14:31:59 2020 +0800

    Fix the problem that batchMessageId is converted to messageIdImpl (#8779)
    
    Fixes #8712
    
    ### Motivation
    TopicReaderTest.testHasMessageAvailable is flaky
    Cause Analysis:
    When there is only one message in the batch, calling Broker's 
`GetLastMessageId` will determine that this is not a batch message, and then 
return a `MessageIdImpl`. (See ServerCnx line 1553)
    In the same scenario, the client will return a `BatchMessageId` after 
sending a message. (See ProducerImpl line 1151)
    
    This will happen:
    MessageIdImpl: `3:31:-1`
    BatchMessageId: `3:31:-1:0`
    
    When calling `reader.hasMessageAvailable()`, the two ids will be compared, 
like this: `lastMessageIdInBroker.compareTo(messageId)`
    Although it is the same messageId, the results will not be equal.
    
    ### Modifications
    When we call `admin.topics().getLastMessageId`, even if there is only one 
message in the batch, it is considered to be BatchMessageId. Client also acts 
like this.
    Therefore, we keep consistent everywhere.
    Even if there is only one message in the batch, we think it is 
`BatchMessageId`
    
    
    (cherry picked from commit 85f3ff4edbaa10c7894af8ad823cbce37b13829c)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  4 +-
 .../apache/pulsar/client/api/TopicReaderTest.java  | 72 +++++++++++++++++++++-
 2 files changed, 72 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 06d5b75..e931c94 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1550,7 +1550,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             MessageMetadata metadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
             int batchSize = metadata.getNumMessagesInBatch();
             entry.release();
-            return batchSize;
+            return metadata.hasNumMessagesInBatch() ? batchSize : -1;
         });
 
         batchSizeFuture.whenComplete((batchSize, e) -> {
@@ -1558,7 +1558,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 ctx.writeAndFlush(Commands.newError(
                         requestId, ServerError.MetadataError, "Failed to get 
batch size for entry " + e.getMessage()));
             } else {
-                int largestBatchIndex = batchSize > 1 ? batchSize - 1 : -1;
+                int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
 
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}][{}] Get LastMessageId {} 
partitionIndex {}", remoteAddress,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index e16125f..ebc2889 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -31,9 +31,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -982,6 +980,76 @@ public class TopicReaderTest extends ProducerConsumerBase {
         producer.close();
     }
 
+    @Test(timeOut = 20000)
+    public void testHasMessageAvailableWithBatch() throws Exception {
+        final String topicName = 
"persistent://my-property/my-ns/testHasMessageAvailableWithBatch";
+        final int numOfMessage = 10;
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .enableBatching(true)
+                .batchingMaxMessages(10)
+                .batchingMaxPublishDelay(2,TimeUnit.SECONDS)
+                .topic(topicName).create();
+
+        //For batch-messages with single message, the type of client messageId 
should be the same as that of broker
+        MessageId messageId = producer.send("msg".getBytes());
+        assertTrue(messageId instanceof MessageIdImpl);
+        ReaderImpl<byte[]> reader = 
(ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
+                .startMessageId(messageId).startMessageIdInclusive().create();
+        MessageId lastMsgId = reader.getConsumer().getLastMessageId();
+        assertTrue(lastMsgId instanceof BatchMessageIdImpl);
+        assertTrue(messageId instanceof BatchMessageIdImpl);
+        assertEquals(lastMsgId, messageId);
+        reader.close();
+
+        CountDownLatch latch = new CountDownLatch(numOfMessage);
+        List<MessageId> allIds = Collections.synchronizedList(new 
ArrayList<>());
+        for (int i = 0; i < numOfMessage; i++) {
+            producer.sendAsync(String.format("msg num %d", 
i).getBytes()).whenComplete((mid, e) -> {
+                if (e != null) {
+                    Assert.fail();
+                } else {
+                    allIds.add(mid);
+                }
+                latch.countDown();
+            });
+        }
+        producer.flush();
+        latch.await();
+        producer.close();
+
+        //For batch-message with multi messages, the type of client messageId 
should be the same as that of broker
+        for (MessageId id : allIds) {
+            reader = (ReaderImpl<byte[]>) 
pulsarClient.newReader().topic(topicName)
+                    .startMessageId(id).startMessageIdInclusive().create();
+            if (id instanceof BatchMessageIdImpl) {
+                MessageId lastMessageId = 
reader.getConsumer().getLastMessageId();
+                assertTrue(lastMessageId instanceof BatchMessageIdImpl);
+                log.info("id {} instance of BatchMessageIdImpl",id);
+            } else {
+                assertTrue(id instanceof MessageIdImpl);
+                MessageId lastMessageId = 
reader.getConsumer().getLastMessageId();
+                assertTrue(lastMessageId instanceof MessageIdImpl);
+                log.info("id {} instance of MessageIdImpl",id);
+            }
+            reader.close();
+        }
+        //For non-batch message, the type of client messageId should be the 
same as that of broker
+        producer = pulsarClient.newProducer()
+                .enableBatching(false).topic(topicName).create();
+        messageId = producer.send("non-batch".getBytes());
+        assertFalse(messageId instanceof BatchMessageIdImpl);
+        assertTrue(messageId instanceof MessageIdImpl);
+        reader = (ReaderImpl<byte[]>) pulsarClient.newReader().topic(topicName)
+                .startMessageId(messageId).create();
+        MessageId lastMessageId = reader.getConsumer().getLastMessageId();
+        assertFalse(lastMessageId instanceof BatchMessageIdImpl);
+        assertTrue(lastMessageId instanceof MessageIdImpl);
+        assertEquals(lastMessageId, messageId);
+        producer.close();
+        reader.close();
+    }
+
     @Test
     public void testReaderNonDurableIsAbleToSeekRelativeTime() throws 
Exception {
         final int numOfMessage = 10;

Reply via email to