This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 67d9c056f3b [fix][client] Fix cannot retry chunk messages and send to
DLQ (#21048)
67d9c056f3b is described below
commit 67d9c056f3b681df542f23ee2f6c602885e7c989
Author: Zike Yang <[email protected]>
AuthorDate: Thu Aug 31 07:44:34 2023 +0800
[fix][client] Fix cannot retry chunk messages and send to DLQ (#21048)
(cherry picked from commit 99e3fea2e3adecb3346d1fc8e5422a3fde4236e3)
---
.../pulsar/client/api/DeadLetterTopicTest.java | 48 ++++++++++++++++++----
.../apache/pulsar/client/impl/ConsumerImpl.java | 3 ++
.../apache/pulsar/client/impl/ProducerImpl.java | 9 +++-
.../pulsar/client/impl/UnAckedMessageTracker.java | 7 +++-
.../client/impl/UnAckedMessageTrackerTest.java | 48 +++++++++++++++++++++-
5 files changed, 103 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index cb9a4c3c104..3731e6cd5b2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -46,9 +47,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-@Test(groups = "flaky")
+@Test(groups = "broker-impl")
public class DeadLetterTopicTest extends ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(DeadLetterTopicTest.class);
@@ -56,6 +58,7 @@ public class DeadLetterTopicTest extends ProducerConsumerBase
{
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
+ this.conf.setMaxMessageSize(5 * 1024);
super.internalSetup();
super.producerBaseSetup();
}
@@ -66,6 +69,15 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
super.internalCleanup();
}
+ private String createMessagePayload(int size) {
+ StringBuilder str = new StringBuilder();
+ Random rand = new Random();
+ for (int i = 0; i < size; i++) {
+ str.append(rand.nextInt(10));
+ }
+ return str.toString();
+ }
+
@Test
public void testDeadLetterTopicWithMessageKey() throws Exception {
final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
@@ -125,9 +137,13 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
consumer.close();
}
+ @DataProvider(name = "produceLargeMessages")
+ public Object[][] produceLargeMessages() {
+ return new Object[][] { { false }, { true } };
+ }
- @Test(groups = "quarantine")
- public void testDeadLetterTopic() throws Exception {
+ @Test(dataProvider = "produceLargeMessages")
+ public void testDeadLetterTopic(boolean produceLargeMessages) throws
Exception {
final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
final int maxRedeliveryCount = 2;
@@ -154,28 +170,44 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
+ .enableChunking(produceLargeMessages)
+ .enableBatching(!produceLargeMessages)
.create();
+ Map<Integer, String> messageContent = new HashMap<>();
+
for (int i = 0; i < sendMessages; i++) {
- producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+ String data;
+ if (!produceLargeMessages) {
+ data = String.format("Hello Pulsar [%d]", i);
+ } else {
+ data = createMessagePayload(1024 * 10);
+ }
+
producer.newMessage().key(String.valueOf(i)).value(data.getBytes()).send();
+ messageContent.put(i, data);
}
producer.close();
int totalReceived = 0;
do {
- Message<byte[]> message = consumer.receive();
- log.info("consumer received message : {} {}",
message.getMessageId(), new String(message.getData()));
+ Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message, "The consumer should be able to receive
messages.");
+ log.info("consumer received message : {}", message.getMessageId());
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
int totalInDeadLetter = 0;
do {
- Message message = deadLetterConsumer.receive();
- log.info("dead letter consumer received message : {} {}",
message.getMessageId(), new String(message.getData()));
+ Message message = deadLetterConsumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message, "the deadLetterConsumer should receive
messages.");
+ assertEquals(new String(message.getData()),
messageContent.get(Integer.parseInt(message.getKey())));
+ messageContent.remove(Integer.parseInt(message.getKey()));
+ log.info("dead letter consumer received message : {}",
message.getMessageId());
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);
+ assertTrue(messageContent.isEmpty());
deadLetterConsumer.close();
consumer.close();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a5e32854434..ac453281d0d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -653,6 +653,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
retryLetterProducer = client.newProducer(schema)
.topic(this.deadLetterPolicy.getRetryLetterTopic())
.enableBatching(false)
+ .enableChunking(true)
.blockIfQueueFull(false)
.create();
}
@@ -2206,6 +2207,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
.topic(this.deadLetterPolicy.getDeadLetterTopic())
.blockIfQueueFull(false)
+ .enableBatching(false)
+ .enableChunking(true)
.createAsync();
}
} finally {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 869aad0fc01..3c481e641c5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -687,7 +687,14 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
op = OpSendMsg.create(msg, null, sequenceId, callback);
final MessageMetadata finalMsgMetadata = msgMetadata;
op.rePopulate = () -> {
- op.cmd = sendMessage(producerId, sequenceId, numMessages,
finalMsgMetadata, encryptedPayload);
+ if (msgMetadata.hasChunkId()) {
+ // The message metadata is shared between all chunks
in a large message
+ // We need to reset the chunk id for each call of this
method
+ // It's safe to do that because there is only 1 thread
to manipulate this message metadata
+ finalMsgMetadata.setChunkId(chunkId);
+ }
+ op.cmd = sendMessage(producerId, sequenceId, numMessages,
finalMsgMetadata,
+ encryptedPayload);
};
}
op.setNumMessagesInBatch(numMessages);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 9ad30296c21..5a706720dc5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -138,8 +138,11 @@ public class UnAckedMessageTracker implements Closeable {
if (!headPartition.isEmpty()) {
log.info("[{}] {} messages will be re-delivered",
consumerBase, headPartition.size());
headPartition.forEach(messageId -> {
-
addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds,
consumerBase);
- messageIds.add(messageId);
+ if (messageId instanceof ChunkMessageIdImpl) {
+
addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds,
consumerBase);
+ } else {
+ messageIds.add(messageId);
+ }
messageIdPartitionMap.remove(messageId);
});
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
index 323333afc74..029ad0fc34c 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
@@ -29,12 +29,14 @@ import static org.mockito.Mockito.when;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
-
+import java.time.Duration;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
import org.testng.annotations.Test;
public class UnAckedMessageTrackerTest {
@@ -77,4 +79,48 @@ public class UnAckedMessageTrackerTest {
timer.stop();
}
+ @Test
+ public void testTrackChunkedMessageId() {
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+ Timer timer = new HashedWheelTimer(new
DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()),
+ 1, TimeUnit.MILLISECONDS);
+ when(client.timer()).thenReturn(timer);
+
+ ConsumerBase<byte[]> consumer = mock(ConsumerBase.class);
+ doNothing().when(consumer).onAckTimeoutSend(any());
+ doNothing().when(consumer).redeliverUnacknowledgedMessages(any());
+ ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
+ conf.setAckTimeoutMillis(1000);
+ conf.setTickDurationMillis(1000);
+ UnAckedMessageTracker tracker = new UnAckedMessageTracker(client,
consumer, conf);
+
+ assertTrue(tracker.isEmpty());
+ assertEquals(tracker.size(), 0);
+
+ // Build chunked message ID
+ MessageIdImpl[] chunkMsgIds = new MessageIdImpl[5];
+ for (int i = 0; i < 5; i++) {
+ chunkMsgIds[i] = new MessageIdImpl(1L, i, -1);
+ }
+ ChunkMessageIdImpl chunkedMessageId =
+ new ChunkMessageIdImpl(chunkMsgIds[0],
chunkMsgIds[chunkMsgIds.length - 1]);
+
+ consumer.unAckedChunkedMessageIdSequenceMap =
+ ConcurrentOpenHashMap.<MessageIdImpl,
MessageIdImpl[]>newBuilder().build();
+ consumer.unAckedChunkedMessageIdSequenceMap.put(chunkedMessageId,
chunkMsgIds);
+
+ // Redeliver chunked message
+ tracker.add(chunkedMessageId);
+
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(200))
+ .atMost(Duration.ofSeconds(3))
+ .untilAsserted(() -> assertEquals(tracker.size(), 0));
+
+ // Assert that all chunk message ID are removed from
unAckedChunkedMessageIdSequenceMap
+ assertEquals(consumer.unAckedChunkedMessageIdSequenceMap.size(), 0);
+
+ timer.stop();
+ }
+
}