This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b0f8e4a9b0b29f278014a79e0f15e2364312c2ed Author: feynmanlin <[email protected]> AuthorDate: Mon Feb 22 16:23:12 2021 +0800 Add original info when publishing message to dead letter topic (#9655) Fixes #9543 When message is produced to dead letter topic, its origin related information like original message-id and topic is lost. This info is useful when debugging failures and correlating failure logs with messages in dead letter topic. (cherry picked from commit b607bba285499c07e56a782fe954eded32526724) --- .../pulsar/client/api/DeadLetterTopicTest.java | 58 +++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 75 +++++++++++++--------- 2 files changed, 104 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 1ecab42..80e3c0d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -19,12 +19,15 @@ package org.apache.pulsar.client.api; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.pulsar.client.util.RetryMessageUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -34,6 +37,7 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; public class DeadLetterTopicTest extends ProducerConsumerBase { @@ -123,6 +127,60 @@ public class DeadLetterTopicTest extends ProducerConsumerBase { newPulsarClient.close(); } + @Test(timeOut = 20000) + public void testDeadLetterTopicHasOriginalInfo() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic"; + + final int maxRedeliveryCount = 1; + final int sendMessages = 10; + + Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + Set<String> messageIds = new HashSet<>(); + for (int i = 0; i < sendMessages; i++) { + MessageId messageId = producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + messageIds.add(messageId.toString()); + } + producer.close(); + + int totalReceived = 0; + do { + consumer.receive(); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message<byte[]> message = deadLetterConsumer.receive(); + //Original info should exists + assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic); + assertTrue(messageIds.contains(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID))); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + assertEquals(totalInDeadLetter, sendMessages); + deadLetterConsumer.close(); + consumer.close(); + newPulsarClient.close(); + } + @Test(timeOut = 30000) public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception { final String topic = "persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 391f45d..2b63673 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -674,47 +674,25 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle CompletableFuture<Void> result = new CompletableFuture<>(); if (retryLetterProducer != null) { try { - MessageImpl<T> retryMessage = null; - String originMessageIdStr = null; - String originTopicNameStr = null; - if (message instanceof TopicMessageImpl) { - retryMessage = (MessageImpl<T>) ((TopicMessageImpl<T>) message).getMessage(); - originMessageIdStr = ((TopicMessageIdImpl) message.getMessageId()).getInnerMessageId().toString(); - originTopicNameStr = ((TopicMessageIdImpl) message.getMessageId()).getTopicName(); - } else if (message instanceof MessageImpl) { - retryMessage = (MessageImpl<T>) message; - originMessageIdStr = ((MessageImpl<T>) message).getMessageId().toString(); - originTopicNameStr = ((MessageImpl<T>) message).getTopicName(); - } - SortedMap<String, String> propertiesMap = new TreeMap<>(); + MessageImpl<T> retryMessage = (MessageImpl<T>) getMessageImpl(message); + String originMessageIdStr = getOriginMessageIdStr(message); + String originTopicNameStr = getOriginTopicNameStr(message); + SortedMap<String, String> propertiesMap + = getPropertiesMap(message, originMessageIdStr, originTopicNameStr); int reconsumetimes = 1; - if (message.getProperties() != null) { - propertiesMap.putAll(message.getProperties()); - } - if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) { reconsumetimes = Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)); reconsumetimes = reconsumetimes + 1; - - } else { - propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr); - propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr); } - propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes)); propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime))); if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount() && StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) { initDeadLetterProducerIfNeeded(); MessageId finalMessageId = messageId; - String finalOriginTopicNameStr = originTopicNameStr; - String finalOriginMessageIdStr = originMessageIdStr; - MessageImpl<T> finalRetryMessage = retryMessage; deadLetterProducer.thenAccept(dlqProducer -> { - propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, finalOriginTopicNameStr); - propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, finalOriginMessageIdStr); TypedMessageBuilder<T> typedMessageBuilderNew = dlqProducer.newMessage() - .value(finalRetryMessage.getValue()) + .value(retryMessage.getValue()) .properties(propertiesMap); typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { doAcknowledge(finalMessageId, ackType, properties, null).thenAccept(v -> { @@ -800,6 +778,43 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return CompletableFuture.completedFuture(null); } + private SortedMap<String, String> getPropertiesMap(Message<?> message, String originMessageIdStr, String originTopicNameStr) { + SortedMap<String, String> propertiesMap = new TreeMap<>(); + if (message.getProperties() != null) { + propertiesMap.putAll(message.getProperties()); + } + propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr); + propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr); + return propertiesMap; + } + + private String getOriginMessageIdStr(Message<?> message) { + if (message instanceof TopicMessageImpl) { + return ((TopicMessageIdImpl) message.getMessageId()).getInnerMessageId().toString(); + } else if (message instanceof MessageImpl) { + return message.getMessageId().toString(); + } + return null; + } + + private String getOriginTopicNameStr(Message<?> message) { + if (message instanceof TopicMessageImpl) { + return ((TopicMessageIdImpl) message.getMessageId()).getTopicName(); + } else if (message instanceof MessageImpl) { + return message.getTopicName(); + } + return null; + } + + private MessageImpl<?> getMessageImpl(Message<?> message) { + if (message instanceof TopicMessageImpl) { + return (MessageImpl<?>) ((TopicMessageImpl<?>) message).getMessage(); + } else if (message instanceof MessageImpl) { + return (MessageImpl<?>) message; + } + return null; + } + @Override public void negativeAcknowledge(MessageId messageId) { negativeAcksTracker.add(messageId); @@ -1831,9 +1846,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle MessageIdImpl finalMessageId = messageId; deadLetterProducer.thenAccept(producerDLQ -> { for (MessageImpl<T> message : finalDeadLetterMessages) { + String originMessageIdStr = getOriginMessageIdStr(message); + String originTopicNameStr = getOriginTopicNameStr(message); producerDLQ.newMessage() .value(message.getValue()) - .properties(message.getProperties()) + .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)) .sendAsync() .thenAccept(messageIdInDLQ -> { possibleSendToDeadLetterTopicMessages.remove(finalMessageId);
