This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 7bd26fc78b4 [fix][client] Fix DLQ producer name conflicts when 
multiples consumers send messages to DLQ (#21890)
7bd26fc78b4 is described below

commit 7bd26fc78b4affaa645263568d719fad8bbad1ad
Author: Zike Yang <[email protected]>
AuthorDate: Mon Jan 15 09:58:25 2024 +0800

    [fix][client] Fix DLQ producer name conflicts when multiples consumers send 
messages to DLQ (#21890)
---
 .../java/org/apache/pulsar/client/api/DeadLetterTopicTest.java     | 7 ++++++-
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java  | 3 ++-
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index ea93ece549e..4433670c7a5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -140,7 +140,8 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
     public void testDeadLetterTopicWithProducerName() throws Exception {
         final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
         final String subscription = "my-subscription";
-        String deadLetterProducerName = String.format("%s-%s-DLQ", topic, 
subscription);
+        final String consumerName = "my-consumer";
+        String deadLetterProducerName = String.format("%s-%s-%s-DLQ", topic, 
subscription, consumerName);
 
         final int maxRedeliveryCount = 1;
 
@@ -149,6 +150,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                 .topic(topic)
                 .subscriptionName(subscription)
+                .consumerName(consumerName)
                 .subscriptionType(SubscriptionType.Shared)
                 .ackTimeout(1, TimeUnit.SECONDS)
                 
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
@@ -929,6 +931,9 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
             
assertTrue(admin.topics().getSubscriptions(deadLetterTopic).contains(dlqInitialSub));
         });
 
+        // We should assert that all consumers are able to produce messages to 
DLQ
+        
assertEquals(admin.topics().getStats(deadLetterTopic).getPublishers().size(), 
2);
+
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
                 .topic(deadLetterTopic)
                 .subscriptionName(dlqInitialSub)
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b43cd79959c..d2aaafdd09d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2172,7 +2172,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                             ((ProducerBuilderImpl<byte[]>) 
client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
                                     
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
                                     
.topic(this.deadLetterPolicy.getDeadLetterTopic())
-                                    .producerName(String.format("%s-%s-DLQ", 
this.topicName, this.subscription))
+                                    
.producerName(String.format("%s-%s-%s-DLQ", this.topicName, this.subscription,
+                                            this.consumerName))
                                     .blockIfQueueFull(false)
                                     .enableBatching(false)
                                     .enableChunking(true)

Reply via email to