This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b2a9915c46 [improve][client] Add `REAL_SUBSCRIPTION` when produces 
msg to DLQ (#21369)
1b2a9915c46 is described below

commit 1b2a9915c461c4357eb34d14c509963fb6cb47cd
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Oct 18 09:33:42 2023 +0800

    [improve][client] Add `REAL_SUBSCRIPTION` when produces msg to DLQ (#21369)
---
 .../test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java   | 4 +++-
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java     | 1 +
 .../src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java | 1 +
 3 files changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 2a0cb3187d2..7be292a6026 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -234,10 +234,11 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
         final int maxRedeliveryCount = 1;
         final int sendMessages = 10;
+        final String subscriptionName = "my-subscription";
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                 .topic(topic)
-                .subscriptionName("my-subscription")
+                .subscriptionName(subscriptionName)
                 .subscriptionType(SubscriptionType.Shared)
                 .ackTimeout(1, TimeUnit.SECONDS)
                 
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
@@ -273,6 +274,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
             Message<byte[]> message = deadLetterConsumer.receive();
             //Original info should exists
             
assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC),
 topic);
+            
assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_SUBSCRIPTION),
 subscriptionName);
             
assertTrue(messageIds.contains(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)));
             deadLetterConsumer.acknowledge(message);
             totalInDeadLetter++;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ded6a546c24..f390b80a7f0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -719,6 +719,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         //Compatible with the old version, will be deleted in the future
         
propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, 
originMessageIdStr);
         propertiesMap.putIfAbsent(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID, 
originMessageIdStr);
+        
propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_REAL_SUBSCRIPTION, 
subscription);
         return propertiesMap;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java
index f73c2668779..e9071f171a2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java
@@ -23,6 +23,7 @@ public class RetryMessageUtil {
     public static final String SYSTEM_PROPERTY_RECONSUMETIMES = 
"RECONSUMETIMES";
     public static final String SYSTEM_PROPERTY_DELAY_TIME = "DELAY_TIME";
     public static final String SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC";
+    public static final String SYSTEM_PROPERTY_REAL_SUBSCRIPTION = 
"REAL_SUBSCRIPTION";
     public static final String SYSTEM_PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
     @Deprecated
     public static final String SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID = 
"ORIGIN_MESSAGE_IDY_TIME";

Reply via email to