This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 7bd26fc78b4 [fix][client] Fix DLQ producer name conflicts when
multiples consumers send messages to DLQ (#21890)
7bd26fc78b4 is described below
commit 7bd26fc78b4affaa645263568d719fad8bbad1ad
Author: Zike Yang <[email protected]>
AuthorDate: Mon Jan 15 09:58:25 2024 +0800
[fix][client] Fix DLQ producer name conflicts when multiples consumers send
messages to DLQ (#21890)
---
.../java/org/apache/pulsar/client/api/DeadLetterTopicTest.java | 7 ++++++-
.../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 3 ++-
2 files changed, 8 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index ea93ece549e..4433670c7a5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -140,7 +140,8 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
public void testDeadLetterTopicWithProducerName() throws Exception {
final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
final String subscription = "my-subscription";
- String deadLetterProducerName = String.format("%s-%s-DLQ", topic,
subscription);
+ final String consumerName = "my-consumer";
+ String deadLetterProducerName = String.format("%s-%s-%s-DLQ", topic,
subscription, consumerName);
final int maxRedeliveryCount = 1;
@@ -149,6 +150,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName(subscription)
+ .consumerName(consumerName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
@@ -929,6 +931,9 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
assertTrue(admin.topics().getSubscriptions(deadLetterTopic).contains(dlqInitialSub));
});
+ // We should assert that all consumers are able to produce messages to
DLQ
+
assertEquals(admin.topics().getStats(deadLetterTopic).getPublishers().size(),
2);
+
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
.topic(deadLetterTopic)
.subscriptionName(dlqInitialSub)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b43cd79959c..d2aaafdd09d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2172,7 +2172,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
((ProducerBuilderImpl<byte[]>)
client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
.topic(this.deadLetterPolicy.getDeadLetterTopic())
- .producerName(String.format("%s-%s-DLQ",
this.topicName, this.subscription))
+
.producerName(String.format("%s-%s-%s-DLQ", this.topicName, this.subscription,
+ this.consumerName))
.blockIfQueueFull(false)
.enableBatching(false)
.enableChunking(true)