This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b0f8e4a9b0b29f278014a79e0f15e2364312c2ed
Author: feynmanlin <[email protected]>
AuthorDate: Mon Feb 22 16:23:12 2021 +0800

    Add original info when publishing message to dead letter topic (#9655)
    
    Fixes #9543
    
    When message is produced to dead letter topic, its origin related 
information like original message-id and topic is lost. This info is useful 
when debugging failures and correlating failure logs with messages in dead 
letter topic.
    
    (cherry picked from commit b607bba285499c07e56a782fe954eded32526724)
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 58 +++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 75 +++++++++++++---------
 2 files changed, 104 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 1ecab42..80e3c0d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -19,12 +19,15 @@
 package org.apache.pulsar.client.api;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -34,6 +37,7 @@ import org.testng.annotations.Test;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 public class DeadLetterTopicTest extends ProducerConsumerBase {
@@ -123,6 +127,60 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         newPulsarClient.close();
     }
 
+    @Test(timeOut = 20000)
+    public void testDeadLetterTopicHasOriginalInfo() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+
+        final int maxRedeliveryCount = 1;
+        final int sendMessages = 10;
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
+                
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+        Set<String> messageIds = new HashSet<>();
+        for (int i = 0; i < sendMessages; i++) {
+            MessageId messageId = producer.send(String.format("Hello Pulsar 
[%d]", i).getBytes());
+            messageIds.add(messageId.toString());
+        }
+        producer.close();
+
+        int totalReceived = 0;
+        do {
+            consumer.receive();
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message<byte[]> message = deadLetterConsumer.receive();
+            //Original info should exists
+            
assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC),
 topic);
+            
assertTrue(messageIds.contains(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+        assertEquals(totalInDeadLetter, sendMessages);
+        deadLetterConsumer.close();
+        consumer.close();
+        newPulsarClient.close();
+    }
+
     @Test(timeOut = 30000)
     public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception {
         final String topic = 
"persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage";
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 391f45d..2b63673 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -674,47 +674,25 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         CompletableFuture<Void> result = new CompletableFuture<>();
         if (retryLetterProducer != null) {
             try {
-                MessageImpl<T> retryMessage = null;
-                String originMessageIdStr = null;
-                String originTopicNameStr = null;
-                if (message instanceof TopicMessageImpl) {
-                    retryMessage = (MessageImpl<T>) ((TopicMessageImpl<T>) 
message).getMessage();
-                    originMessageIdStr = ((TopicMessageIdImpl) 
message.getMessageId()).getInnerMessageId().toString();
-                    originTopicNameStr = ((TopicMessageIdImpl) 
message.getMessageId()).getTopicName();
-                } else if (message instanceof MessageImpl) {
-                    retryMessage = (MessageImpl<T>) message;
-                    originMessageIdStr = ((MessageImpl<T>) 
message).getMessageId().toString();
-                    originTopicNameStr =  ((MessageImpl<T>) 
message).getTopicName();
-                }
-                SortedMap<String, String> propertiesMap = new TreeMap<>();
+                MessageImpl<T> retryMessage = (MessageImpl<T>) 
getMessageImpl(message);
+                String originMessageIdStr = getOriginMessageIdStr(message);
+                String originTopicNameStr = getOriginTopicNameStr(message);
+                SortedMap<String, String> propertiesMap
+                        = getPropertiesMap(message, originMessageIdStr, 
originTopicNameStr);
                 int reconsumetimes = 1;
-                if (message.getProperties() != null) {
-                    propertiesMap.putAll(message.getProperties());
-                }
-
                 if 
(propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
                     reconsumetimes = 
Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
                     reconsumetimes = reconsumetimes + 1;
-
-                } else {
-                    
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, 
originTopicNameStr);
-                    
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, 
originMessageIdStr);
                 }
-
                 
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, 
String.valueOf(reconsumetimes));
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, 
String.valueOf(unit.toMillis(delayTime)));
 
                 if (reconsumetimes > 
this.deadLetterPolicy.getMaxRedeliverCount() && 
StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
                     initDeadLetterProducerIfNeeded();
                     MessageId finalMessageId = messageId;
-                    String finalOriginTopicNameStr = originTopicNameStr;
-                    String finalOriginMessageIdStr = originMessageIdStr;
-                    MessageImpl<T> finalRetryMessage = retryMessage;
                     deadLetterProducer.thenAccept(dlqProducer -> {
-                        
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, 
finalOriginTopicNameStr);
-                        
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, 
finalOriginMessageIdStr);
                         TypedMessageBuilder<T> typedMessageBuilderNew = 
dlqProducer.newMessage()
-                                .value(finalRetryMessage.getValue())
+                                .value(retryMessage.getValue())
                                 .properties(propertiesMap);
                         typedMessageBuilderNew.sendAsync().thenAccept(msgId -> 
{
                             doAcknowledge(finalMessageId, ackType, properties, 
null).thenAccept(v -> {
@@ -800,6 +778,43 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return CompletableFuture.completedFuture(null);
     }
 
+    private SortedMap<String, String> getPropertiesMap(Message<?> message, 
String originMessageIdStr, String originTopicNameStr) {
+        SortedMap<String, String> propertiesMap = new TreeMap<>();
+        if (message.getProperties() != null) {
+            propertiesMap.putAll(message.getProperties());
+        }
+        propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, 
originTopicNameStr);
+        propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, 
originMessageIdStr);
+        return propertiesMap;
+    }
+
+    private String getOriginMessageIdStr(Message<?> message) {
+        if (message instanceof TopicMessageImpl) {
+            return  ((TopicMessageIdImpl) 
message.getMessageId()).getInnerMessageId().toString();
+        } else if (message instanceof MessageImpl) {
+            return message.getMessageId().toString();
+        }
+        return null;
+    }
+
+    private String getOriginTopicNameStr(Message<?> message) {
+        if (message instanceof TopicMessageImpl) {
+            return  ((TopicMessageIdImpl) 
message.getMessageId()).getTopicName();
+        } else if (message instanceof MessageImpl) {
+            return message.getTopicName();
+        }
+        return null;
+    }
+
+    private MessageImpl<?> getMessageImpl(Message<?> message) {
+        if (message instanceof TopicMessageImpl) {
+            return (MessageImpl<?>) ((TopicMessageImpl<?>) 
message).getMessage();
+        } else if (message instanceof MessageImpl) {
+            return (MessageImpl<?>) message;
+        }
+        return null;
+    }
+
     @Override
     public void negativeAcknowledge(MessageId messageId) {
         negativeAcksTracker.add(messageId);
@@ -1831,9 +1846,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             MessageIdImpl finalMessageId = messageId;
             deadLetterProducer.thenAccept(producerDLQ -> {
                 for (MessageImpl<T> message : finalDeadLetterMessages) {
+                    String originMessageIdStr = getOriginMessageIdStr(message);
+                    String originTopicNameStr = getOriginTopicNameStr(message);
                     producerDLQ.newMessage()
                             .value(message.getValue())
-                            .properties(message.getProperties())
+                            .properties(getPropertiesMap(message, 
originMessageIdStr, originTopicNameStr))
                             .sendAsync()
                             .thenAccept(messageIdInDLQ -> {
                                 
possibleSendToDeadLetterTopicMessages.remove(finalMessageId);

Reply via email to