This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 90c3804ac5781e670e61353b02097d117cebacb3
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Sep 24 21:13:42 2021 +0800

    Fix deadLetterPolicy is not working with key shared subscription under 
partitioned topic (#12148)
    
    Fixes #11652 .
    
    This is a bug fix, no need to update doc.
    
    (cherry picked from commit 7d4d8cc417105664eae66521e41e8f27cbbd5c87)
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 96 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  3 +-
 2 files changed, 98 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 7a65c16..645c767 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -519,4 +519,100 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
         assertNotNull(msg);
     }
+
+    @Test
+    public void testDeadLetterTopicUnderPartitionedTopicWithKeyShareType() 
throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic";
+
+        final int maxRedeliveryCount = 2;
+
+        final int sendMessages = 1;
+
+        int partitionCount = 2;
+
+        admin.topics().createPartitionedTopic(topic, partitionCount);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
+                .ackTimeout(1, TimeUnit.SECONDS)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Consumer<byte[]> deadLetterConsumer0 = 
pulsarClient.newConsumer(Schema.BYTES)
+                
.topic("persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic-partition-0-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Consumer<byte[]> deadLetterConsumer1 = 
pulsarClient.newConsumer(Schema.BYTES)
+                
.topic("persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic-partition-1-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+
+        producer.close();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer0.receive(3, TimeUnit.SECONDS);
+            if (message != null) {
+                log.info("dead letter consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+                deadLetterConsumer0.acknowledge(message);
+                totalInDeadLetter++;
+            } else {
+                break;
+            }
+        } while (totalInDeadLetter < sendMessages);
+
+        do {
+            Message message = deadLetterConsumer1.receive(3, TimeUnit.SECONDS);
+            if (message != null) {
+                log.info("dead letter consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+                deadLetterConsumer1.acknowledge(message);
+                totalInDeadLetter++;
+            } else {
+                break;
+            }
+        } while (totalInDeadLetter < sendMessages);
+
+        assertEquals(totalInDeadLetter, sendMessages);
+        deadLetterConsumer0.close();
+        deadLetterConsumer1.close();
+        consumer.close();
+
+        Consumer<byte[]> checkConsumer = 
this.pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Message<byte[]> checkMessage = checkConsumer.receive(3, 
TimeUnit.SECONDS);
+        if (checkMessage != null) {
+            log.info("check consumer received message : {} {}", 
checkMessage.getMessageId(), new String(checkMessage.getData()));
+        }
+        assertNull(checkMessage);
+
+        checkConsumer.close();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 3b06ffa..f31e6a5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -649,7 +649,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
         checkArgument(messageIds.stream().findFirst().get() instanceof 
TopicMessageIdImpl);
 
-        if (conf.getSubscriptionType() != SubscriptionType.Shared) {
+        if (conf.getSubscriptionType() != SubscriptionType.Shared
+                && conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
             // We cannot redeliver single messages if subscription type is not 
Shared
             redeliverUnacknowledgedMessages();
             return;

Reply via email to