This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1b2a9915c46 [improve][client] Add `REAL_SUBSCRIPTION` when produces
msg to DLQ (#21369)
1b2a9915c46 is described below
commit 1b2a9915c461c4357eb34d14c509963fb6cb47cd
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Oct 18 09:33:42 2023 +0800
[improve][client] Add `REAL_SUBSCRIPTION` when produces msg to DLQ (#21369)
---
.../test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java | 4 +++-
.../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 1 +
.../src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java | 1 +
3 files changed, 5 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 2a0cb3187d2..7be292a6026 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -234,10 +234,11 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
final int maxRedeliveryCount = 1;
final int sendMessages = 10;
+ final String subscriptionName = "my-subscription";
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
- .subscriptionName("my-subscription")
+ .subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
@@ -273,6 +274,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
Message<byte[]> message = deadLetterConsumer.receive();
//Original info should exists
assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC),
topic);
+
assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_SUBSCRIPTION),
subscriptionName);
assertTrue(messageIds.contains(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)));
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ded6a546c24..f390b80a7f0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -719,6 +719,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
//Compatible with the old version, will be deleted in the future
propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID,
originMessageIdStr);
propertiesMap.putIfAbsent(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID,
originMessageIdStr);
+
propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_REAL_SUBSCRIPTION,
subscription);
return propertiesMap;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java
index f73c2668779..e9071f171a2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java
@@ -23,6 +23,7 @@ public class RetryMessageUtil {
public static final String SYSTEM_PROPERTY_RECONSUMETIMES =
"RECONSUMETIMES";
public static final String SYSTEM_PROPERTY_DELAY_TIME = "DELAY_TIME";
public static final String SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC";
+ public static final String SYSTEM_PROPERTY_REAL_SUBSCRIPTION =
"REAL_SUBSCRIPTION";
public static final String SYSTEM_PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
@Deprecated
public static final String SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID =
"ORIGIN_MESSAGE_IDY_TIME";