This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3cb79264c44 [fix][test]Flaky test testMaxPendingChunkMessages (#21103)
3cb79264c44 is described below
commit 3cb79264c4476a2cd34b0807133afc29750c047d
Author: Xiangying Meng <[email protected]>
AuthorDate: Thu Sep 7 11:23:51 2023 +0800
[fix][test]Flaky test testMaxPendingChunkMessages (#21103)
---
.../pulsar/client/impl/MessageChunkingTest.java | 21 +++++++++++++++++++--
.../org/apache/pulsar/client/impl/ConsumerImpl.java | 7 ++++---
2 files changed, 23 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index f266afd8a2e..6686edd2b67 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -62,6 +62,7 @@ import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -319,15 +320,29 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
msg.send();
}
+ /**
+ * This test used to test the consumer configuration of
maxPendingChunkedMessage.
+ * If we set maxPendingChunkedMessage is 1 that means only one incomplete
chunk message can be store in this
+ * consumer.
+ * For example:
+ * ChunkMessage1 chunk-1: uuid = 0, chunkId = 0, totalChunk = 2;
+ * ChunkMessage2 chunk-1: uuid = 1, chunkId = 0, totalChunk = 2;
+ * ChunkMessage2 chunk-2: uuid = 1, chunkId = 1, totalChunk = 2;
+ * ChunkMessage1 chunk-2: uuid = 0, chunkId = 1, totalChunk = 2;
+ * The chunk-1 in the ChunkMessage1 and ChunkMessage2 all is incomplete.
+ * chunk-1 in the ChunkMessage1 will be discarded and acked when receive
the chunk-1 in the ChunkMessage2.
+ * If ack ChunkMessage2 and redeliver unacknowledged messages, the
consumer can not receive any message again.
+ * @throws Exception
+ */
@Test
public void testMaxPendingChunkMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
final String topicName = "persistent://my-property/my-ns/maxPending";
-
+ final String subName = "my-subscriber-name";
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
- .subscriptionName("my-subscriber-name")
+ .subscriptionName(subName)
.maxPendingChunkedMessage(1)
.autoAckOldestChunkedMessageOnQueueFull(true)
.subscribe();
@@ -348,6 +363,8 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
assertEquals(receivedMsg.getValue(), "chunk-1-0|chunk-1-1|");
consumer.acknowledge(receivedMsg);
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName)
+
.getSubscriptions().get(subName).getNonContiguousDeletedMessagesRanges(), 0));
consumer.redeliverUnacknowledgedMessages();
sendSingleChunk(producer, "0", 1, 2);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 964573d8d4f..aa3340c6078 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1520,9 +1520,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return null;
}
// means we lost the first chunk: should never happen
- log.info("[{}] [{}] Received unexpected chunk messageId {},
last-chunk-id = {}, chunkId = {}", topic,
- subscription, msgId,
- (chunkedMsgCtx != null ?
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
+ log.info("[{}] [{}] Received unexpected chunk messageId {},
last-chunk-id = {}, chunkId = {}, uuid = {}",
+ topic, subscription, msgId,
+ (chunkedMsgCtx != null ?
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId(),
+ msgMetadata.getUuid());
if (chunkedMsgCtx != null) {
if (chunkedMsgCtx.chunkedMsgBuffer != null) {
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);