This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cb79264c44 [fix][test]Flaky test testMaxPendingChunkMessages (#21103)
3cb79264c44 is described below

commit 3cb79264c4476a2cd34b0807133afc29750c047d
Author: Xiangying Meng <[email protected]>
AuthorDate: Thu Sep 7 11:23:51 2023 +0800

    [fix][test]Flaky test testMaxPendingChunkMessages (#21103)
---
 .../pulsar/client/impl/MessageChunkingTest.java     | 21 +++++++++++++++++++--
 .../org/apache/pulsar/client/impl/ConsumerImpl.java |  7 ++++---
 2 files changed, 23 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index f266afd8a2e..6686edd2b67 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -62,6 +62,7 @@ import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -319,15 +320,29 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
         msg.send();
     }
 
+    /**
+     * This test used to test the consumer configuration of 
maxPendingChunkedMessage.
+     * If we set maxPendingChunkedMessage is 1 that means only one incomplete 
chunk message can be store in this
+     * consumer.
+     * For example:
+     * ChunkMessage1 chunk-1: uuid = 0, chunkId = 0, totalChunk = 2;
+     * ChunkMessage2 chunk-1: uuid = 1, chunkId = 0, totalChunk = 2;
+     * ChunkMessage2 chunk-2: uuid = 1, chunkId = 1, totalChunk = 2;
+     * ChunkMessage1 chunk-2: uuid = 0, chunkId = 1, totalChunk = 2;
+     * The chunk-1 in the ChunkMessage1 and ChunkMessage2 all is incomplete.
+     * chunk-1 in the ChunkMessage1 will be discarded and acked when receive 
the chunk-1 in the ChunkMessage2.
+     * If ack ChunkMessage2 and redeliver unacknowledged messages, the 
consumer can not receive any message again.
+     * @throws Exception
+     */
     @Test
     public void testMaxPendingChunkMessages() throws Exception {
         log.info("-- Starting {} test --", methodName);
         final String topicName = "persistent://my-property/my-ns/maxPending";
-
+        final String subName = "my-subscriber-name";
         @Cleanup
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topicName)
-                .subscriptionName("my-subscriber-name")
+                .subscriptionName(subName)
                 .maxPendingChunkedMessage(1)
                 .autoAckOldestChunkedMessageOnQueueFull(true)
                 .subscribe();
@@ -348,6 +363,8 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
         assertEquals(receivedMsg.getValue(), "chunk-1-0|chunk-1-1|");
 
         consumer.acknowledge(receivedMsg);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topics().getStats(topicName)
+                
.getSubscriptions().get(subName).getNonContiguousDeletedMessagesRanges(), 0));
         consumer.redeliverUnacknowledgedMessages();
 
         sendSingleChunk(producer, "0", 1, 2);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 964573d8d4f..aa3340c6078 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1520,9 +1520,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 return null;
             }
             // means we lost the first chunk: should never happen
-            log.info("[{}] [{}] Received unexpected chunk messageId {}, 
last-chunk-id = {}, chunkId = {}", topic,
-                    subscription, msgId,
-                    (chunkedMsgCtx != null ? 
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
+            log.info("[{}] [{}] Received unexpected chunk messageId {}, 
last-chunk-id = {}, chunkId = {}, uuid = {}",
+                    topic, subscription, msgId,
+                    (chunkedMsgCtx != null ? 
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId(),
+                    msgMetadata.getUuid());
             if (chunkedMsgCtx != null) {
                 if (chunkedMsgCtx.chunkedMsgBuffer != null) {
                     
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);

Reply via email to