This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 90c3804ac5781e670e61353b02097d117cebacb3 Author: Jiwei Guo <[email protected]> AuthorDate: Fri Sep 24 21:13:42 2021 +0800 Fix deadLetterPolicy is not working with key shared subscription under partitioned topic (#12148) Fixes #11652 . This is a bug fix, no need to update doc. (cherry picked from commit 7d4d8cc417105664eae66521e41e8f27cbbd5c87) --- .../pulsar/client/api/DeadLetterTopicTest.java | 96 ++++++++++++++++++++++ .../client/impl/MultiTopicsConsumerImpl.java | 3 +- 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 7a65c16..645c767 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -519,4 +519,100 @@ public class DeadLetterTopicTest extends ProducerConsumerBase { Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS); assertNotNull(msg); } + + @Test + public void testDeadLetterTopicUnderPartitionedTopicWithKeyShareType() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic"; + + final int maxRedeliveryCount = 2; + + final int sendMessages = 1; + + int partitionCount = 2; + + admin.topics().createPartitionedTopic(topic, partitionCount); + + Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()) + .ackTimeout(1, TimeUnit.SECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Consumer<byte[]> deadLetterConsumer0 = pulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic-partition-0-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Consumer<byte[]> deadLetterConsumer1 = pulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic-partition-1-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + for (int i = 0; i < sendMessages; i++) { + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + + producer.close(); + + int totalReceived = 0; + do { + Message<byte[]> message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer0.receive(3, TimeUnit.SECONDS); + if (message != null) { + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer0.acknowledge(message); + totalInDeadLetter++; + } else { + break; + } + } while (totalInDeadLetter < sendMessages); + + do { + Message message = deadLetterConsumer1.receive(3, TimeUnit.SECONDS); + if (message != null) { + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer1.acknowledge(message); + totalInDeadLetter++; + } else { + break; + } + } while (totalInDeadLetter < sendMessages); + + assertEquals(totalInDeadLetter, sendMessages); + deadLetterConsumer0.close(); + deadLetterConsumer1.close(); + consumer.close(); + + Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Key_Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + + checkConsumer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 3b06ffa..f31e6a5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -649,7 +649,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { checkArgument(messageIds.stream().findFirst().get() instanceof TopicMessageIdImpl); - if (conf.getSubscriptionType() != SubscriptionType.Shared) { + if (conf.getSubscriptionType() != SubscriptionType.Shared + && conf.getSubscriptionType() != SubscriptionType.Key_Shared) { // We cannot redeliver single messages if subscription type is not Shared redeliverUnacknowledgedMessages(); return;
