This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b07ae00fb15a284cc3e523f6d1dfe9717bfdaa0c
Author: Penghui Li <[email protected]>
AuthorDate: Tue May 9 14:02:51 2023 +0800

    [fix][broker] Fix the behavior of delayed message in Key_Shared mode 
(#20233)
---
 .../PersistentDispatcherMultipleConsumers.java     |  9 +--
 .../client/api/KeySharedSubscriptionTest.java      | 76 ++++++++++++++++++++++
 2 files changed, 81 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index ef684d72397..7d626fa6a61 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -906,14 +906,15 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
     }
 
     protected synchronized NavigableSet<PositionImpl> 
getMessagesToReplayNow(int maxMessagesToRead) {
-        if (!redeliveryMessages.isEmpty()) {
-            return 
redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
-        } else if (delayedDeliveryTracker.isPresent() && 
delayedDeliveryTracker.get().hasMessageAvailable()) {
+        if (delayedDeliveryTracker.isPresent() && 
delayedDeliveryTracker.get().hasMessageAvailable()) {
             
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
             NavigableSet<PositionImpl> messagesAvailableNow =
                     
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
             messagesAvailableNow.forEach(p -> 
redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
-            return messagesAvailableNow;
+        }
+
+        if (!redeliveryMessages.isEmpty()) {
+            return 
redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
         } else {
             return Collections.emptyNavigableSet();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 4b899f2ee0c..0d65b9a7654 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -112,6 +112,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
+        this.conf.setUnblockStuckSubscriptionEnabled(true);
         super.internalSetup();
         super.producerBaseSetup();
         this.conf.setSubscriptionKeySharedUseConsistentHashing(true);
@@ -1538,4 +1539,79 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         count3.await();
         assertTrue(sentMessages.isEmpty(), "didn't receive " + sentMessages);
     }
+
+    @Test
+    public void testContinueDispatchMessagesWhenMessageDelayed() throws 
Exception {
+        int delayedMessages = 40;
+        int messages = 40;
+        int sum = 0;
+        final String topic = "persistent://public/default/key_shared-" + 
UUID.randomUUID();
+        final String subName = "my-sub";
+
+        @Cleanup
+        Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < delayedMessages; i++) {
+            MessageId messageId = producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+                    .value(100 + i)
+                    .deliverAfter(10, TimeUnit.SECONDS)
+                    .send();
+            log.info("Published delayed message :{}", messageId);
+        }
+
+        for (int i = 0; i < messages; i++) {
+            MessageId messageId = producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+                    .value(i)
+                    .send();
+            log.info("Published message :{}", messageId);
+        }
+
+        @Cleanup
+        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .receiverQueueSize(30)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        for (int i = 0; i < delayedMessages + messages; i++) {
+            Message<Integer> msg = consumer1.receive(30, TimeUnit.SECONDS);
+            if (msg != null) {
+                log.info("c1 message: {}, {}", msg.getValue(), 
msg.getMessageId());
+                consumer1.acknowledge(msg);
+            } else {
+                break;
+            }
+            sum++;
+        }
+
+        log.info("Got {} messages...", sum);
+
+        int remaining = delayedMessages + messages - sum;
+        for (int i = 0; i < remaining; i++) {
+            Message<Integer> msg = consumer2.receive(30, TimeUnit.SECONDS);
+            if (msg != null) {
+                log.info("c2 message: {}, {}", msg.getValue(), 
msg.getMessageId());
+                consumer2.acknowledge(msg);
+            } else {
+                break;
+            }
+            sum++;
+        }
+
+        log.info("Got {} other messages...", sum);
+        Assert.assertEquals(sum, delayedMessages + messages);
+    }
 }

Reply via email to