This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b07ae00fb15a284cc3e523f6d1dfe9717bfdaa0c Author: Penghui Li <[email protected]> AuthorDate: Tue May 9 14:02:51 2023 +0800 [fix][broker] Fix the behavior of delayed message in Key_Shared mode (#20233) --- .../PersistentDispatcherMultipleConsumers.java | 9 +-- .../client/api/KeySharedSubscriptionTest.java | 76 ++++++++++++++++++++++ 2 files changed, 81 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index ef684d72397..7d626fa6a61 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -906,14 +906,15 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) { - if (!redeliveryMessages.isEmpty()) { - return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead); - } else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) { + if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) { delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis()); NavigableSet<PositionImpl> messagesAvailableNow = delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead); messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId())); - return messagesAvailableNow; + } + + if (!redeliveryMessages.isEmpty()) { + return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead); } else { return Collections.emptyNavigableSet(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 4b899f2ee0c..0d65b9a7654 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -112,6 +112,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { @BeforeMethod(alwaysRun = true) @Override protected void setup() throws Exception { + this.conf.setUnblockStuckSubscriptionEnabled(true); super.internalSetup(); super.producerBaseSetup(); this.conf.setSubscriptionKeySharedUseConsistentHashing(true); @@ -1538,4 +1539,79 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { count3.await(); assertTrue(sentMessages.isEmpty(), "didn't receive " + sentMessages); } + + @Test + public void testContinueDispatchMessagesWhenMessageDelayed() throws Exception { + int delayedMessages = 40; + int messages = 40; + int sum = 0; + final String topic = "persistent://public/default/key_shared-" + UUID.randomUUID(); + final String subName = "my-sub"; + + @Cleanup + Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName(subName) + .receiverQueueSize(10) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + + @Cleanup + Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .create(); + + for (int i = 0; i < delayedMessages; i++) { + MessageId messageId = producer.newMessage() + .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .value(100 + i) + .deliverAfter(10, TimeUnit.SECONDS) + .send(); + log.info("Published delayed message :{}", messageId); + } + + for (int i = 0; i < messages; i++) { + MessageId messageId = producer.newMessage() + .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .value(i) + .send(); + log.info("Published message :{}", messageId); + } + + @Cleanup + Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName(subName) + .receiverQueueSize(30) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + + for (int i = 0; i < delayedMessages + messages; i++) { + Message<Integer> msg = consumer1.receive(30, TimeUnit.SECONDS); + if (msg != null) { + log.info("c1 message: {}, {}", msg.getValue(), msg.getMessageId()); + consumer1.acknowledge(msg); + } else { + break; + } + sum++; + } + + log.info("Got {} messages...", sum); + + int remaining = delayedMessages + messages - sum; + for (int i = 0; i < remaining; i++) { + Message<Integer> msg = consumer2.receive(30, TimeUnit.SECONDS); + if (msg != null) { + log.info("c2 message: {}, {}", msg.getValue(), msg.getMessageId()); + consumer2.acknowledge(msg); + } else { + break; + } + sum++; + } + + log.info("Got {} other messages...", sum); + Assert.assertEquals(sum, delayedMessages + messages); + } }
