This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fab85bdaf406441f40c8bd5349296566599be445 Author: Zike Yang <[email protected]> AuthorDate: Thu Aug 31 07:44:34 2023 +0800 [fix][client] Fix cannot retry chunk messages and send to DLQ (#21048) (cherry picked from commit 99e3fea2e3adecb3346d1fc8e5422a3fde4236e3) --- .../pulsar/client/api/DeadLetterTopicTest.java | 48 +++++++++++++++++---- .../apache/pulsar/client/impl/ConsumerImpl.java | 6 ++- .../pulsar/client/impl/MessageIdAdvUtils.java | 3 ++ .../apache/pulsar/client/impl/ProducerImpl.java | 6 +++ .../pulsar/client/impl/UnAckedMessageTracker.java | 7 +++- .../client/impl/UnAckedMessageTrackerTest.java | 49 +++++++++++++++++++++- 6 files changed, 107 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 5e3731fbf24..2a0cb3187d2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -46,9 +47,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -@Test(groups = "flaky") +@Test(groups = "broker-impl") public class DeadLetterTopicTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(DeadLetterTopicTest.class); @@ -56,6 +58,7 @@ public class DeadLetterTopicTest extends ProducerConsumerBase { @BeforeMethod(alwaysRun = true) @Override protected void setup() throws Exception { + this.conf.setMaxMessageSize(5 * 1024); super.internalSetup(); super.producerBaseSetup(); } @@ -66,6 +69,15 @@ public class DeadLetterTopicTest extends ProducerConsumerBase { super.internalCleanup(); } + private String createMessagePayload(int size) { + StringBuilder str = new StringBuilder(); + Random rand = new Random(); + for (int i = 0; i < size; i++) { + str.append(rand.nextInt(10)); + } + return str.toString(); + } + @Test public void testDeadLetterTopicWithMessageKey() throws Exception { final String topic = "persistent://my-property/my-ns/dead-letter-topic"; @@ -125,9 +137,13 @@ public class DeadLetterTopicTest extends ProducerConsumerBase { consumer.close(); } + @DataProvider(name = "produceLargeMessages") + public Object[][] produceLargeMessages() { + return new Object[][] { { false }, { true } }; + } - @Test(groups = "quarantine") - public void testDeadLetterTopic() throws Exception { + @Test(dataProvider = "produceLargeMessages") + public void testDeadLetterTopic(boolean produceLargeMessages) throws Exception { final String topic = "persistent://my-property/my-ns/dead-letter-topic"; final int maxRedeliveryCount = 2; @@ -154,28 +170,44 @@ public class DeadLetterTopicTest extends ProducerConsumerBase { Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) .topic(topic) + .enableChunking(produceLargeMessages) + .enableBatching(!produceLargeMessages) .create(); + Map<Integer, String> messageContent = new HashMap<>(); + for (int i = 0; i < sendMessages; i++) { - producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + String data; + if (!produceLargeMessages) { + data = String.format("Hello Pulsar [%d]", i); + } else { + data = createMessagePayload(1024 * 10); + } + producer.newMessage().key(String.valueOf(i)).value(data.getBytes()).send(); + messageContent.put(i, data); } producer.close(); int totalReceived = 0; do { - Message<byte[]> message = consumer.receive(); - log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(message, "The consumer should be able to receive messages."); + log.info("consumer received message : {}", message.getMessageId()); totalReceived++; } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); int totalInDeadLetter = 0; do { - Message message = deadLetterConsumer.receive(); - log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + Message message = deadLetterConsumer.receive(5, TimeUnit.SECONDS); + assertNotNull(message, "the deadLetterConsumer should receive messages."); + assertEquals(new String(message.getData()), messageContent.get(Integer.parseInt(message.getKey()))); + messageContent.remove(Integer.parseInt(message.getKey())); + log.info("dead letter consumer received message : {}", message.getMessageId()); deadLetterConsumer.acknowledge(message); totalInDeadLetter++; } while (totalInDeadLetter < sendMessages); + assertTrue(messageContent.isEmpty()); deadLetterConsumer.close(); consumer.close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ac910c4d1c9..47aecd8052c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -605,6 +605,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle retryLetterProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) .topic(this.deadLetterPolicy.getRetryLetterTopic()) .enableBatching(false) + .enableChunking(true) .blockIfQueueFull(false) .create(); } @@ -1452,7 +1453,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null || msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)) { // means we lost the first chunk: should never happen - log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}", msgId, + log.info("[{}] [{}] Received unexpected chunk messageId {}, last-chunk-id = {}, chunkId = {}", topic, + subscription, msgId, (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId()); if (chunkedMsgCtx != null) { if (chunkedMsgCtx.chunkedMsgBuffer != null) { @@ -2096,6 +2098,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle .initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName()) .topic(this.deadLetterPolicy.getDeadLetterTopic()) .blockIfQueueFull(false) + .enableBatching(false) + .enableChunking(true) .createAsync(); } } finally { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java index c8b18524ec0..a0d1446ba3d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java @@ -64,6 +64,9 @@ public class MessageIdAdvUtils { } static MessageIdAdv discardBatch(MessageId messageId) { + if (messageId instanceof ChunkMessageIdImpl) { + return (MessageIdAdv) messageId; + } MessageIdAdv msgId = (MessageIdAdv) messageId; return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 267b06649d7..8a30190b3a2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -695,6 +695,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne op = OpSendMsg.create(msg, null, sequenceId, callback); final MessageMetadata finalMsgMetadata = msgMetadata; op.rePopulate = () -> { + if (msgMetadata.hasChunkId()) { + // The message metadata is shared between all chunks in a large message + // We need to reset the chunk id for each call of this method + // It's safe to do that because there is only 1 thread to manipulate this message metadata + finalMsgMetadata.setChunkId(chunkId); + } op.cmd = sendMessage(producerId, sequenceId, numMessages, messageId, finalMsgMetadata, encryptedPayload); }; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java index 534f3335026..220771e426f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java @@ -138,8 +138,11 @@ public class UnAckedMessageTracker implements Closeable { if (!headPartition.isEmpty()) { log.info("[{}] {} messages will be re-delivered", consumerBase, headPartition.size()); headPartition.forEach(messageId -> { - addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase); - messageIds.add(messageId); + if (messageId instanceof ChunkMessageIdImpl) { + addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase); + } else { + messageIds.add(messageId); + } messageIdPartitionMap.remove(messageId); }); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java index 4ccc514e8e7..f6c668703d9 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java @@ -29,12 +29,15 @@ import static org.mockito.Mockito.when; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; - +import java.time.Duration; import java.util.HashSet; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.awaitility.Awaitility; import org.testng.annotations.Test; public class UnAckedMessageTrackerTest { @@ -77,4 +80,48 @@ public class UnAckedMessageTrackerTest { timer.stop(); } + @Test + public void testTrackChunkedMessageId() { + PulsarClientImpl client = mock(PulsarClientImpl.class); + Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), + 1, TimeUnit.MILLISECONDS); + when(client.timer()).thenReturn(timer); + + ConsumerBase<byte[]> consumer = mock(ConsumerBase.class); + doNothing().when(consumer).onAckTimeoutSend(any()); + doNothing().when(consumer).redeliverUnacknowledgedMessages(any()); + ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>(); + conf.setAckTimeoutMillis(1000); + conf.setTickDurationMillis(1000); + UnAckedMessageTracker tracker = new UnAckedMessageTracker(client, consumer, conf); + + assertTrue(tracker.isEmpty()); + assertEquals(tracker.size(), 0); + + // Build chunked message ID + MessageIdImpl[] chunkMsgIds = new MessageIdImpl[5]; + for (int i = 0; i < 5; i++) { + chunkMsgIds[i] = new MessageIdImpl(1L, i, -1); + } + ChunkMessageIdImpl chunkedMessageId = + new ChunkMessageIdImpl(chunkMsgIds[0], chunkMsgIds[chunkMsgIds.length - 1]); + + consumer.unAckedChunkedMessageIdSequenceMap = + ConcurrentOpenHashMap.<MessageIdAdv, MessageIdImpl[]>newBuilder().build(); + consumer.unAckedChunkedMessageIdSequenceMap.put(chunkedMessageId, chunkMsgIds); + + // Redeliver chunked message + tracker.add(chunkedMessageId); + + Awaitility.await() + .pollInterval(Duration.ofMillis(200)) + .atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> assertEquals(tracker.size(), 0)); + + // Assert that all chunk message ID are removed from unAckedChunkedMessageIdSequenceMap + assertEquals(consumer.unAckedChunkedMessageIdSequenceMap.size(), 0); + + timer.stop(); + } + }
