This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 4d85960f375 [broker] Fixed delayed delivery after read operation error 
(#18098)
4d85960f375 is described below

commit 4d85960f375bbb4ce592d103f3f1ed0b16bf2628
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Oct 18 17:38:17 2022 -0700

    [broker] Fixed delayed delivery after read operation error (#18098)
---
 .../PersistentDispatcherMultipleConsumers.java     | 17 ++++++--
 .../service/persistent/DelayedDeliveryTest.java    | 45 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 390cb0d719b..bf8540e7527 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -100,8 +100,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                     "totalAvailablePermits");
     protected volatile int totalAvailablePermits = 0;
     protected volatile int readBatchSize;
-    protected final Backoff readFailureBackoff = new Backoff(15, 
TimeUnit.SECONDS,
-            1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+    protected final Backoff readFailureBackoff;
     private static final 
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
             TOTAL_UNACKED_MESSAGES_UPDATER =
             
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -141,6 +140,10 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.initializeDispatchRateLimiterIfNeeded();
         this.assignor = new SharedConsumerAssignor(this::getNextConsumer, 
this::addMessageToReplay);
+        this.readFailureBackoff = new Backoff(
+                
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
+                TimeUnit.MILLISECONDS,
+                1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -830,7 +833,10 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 
         topic.getBrokerService().executor().schedule(() -> {
             synchronized (PersistentDispatcherMultipleConsumers.this) {
-                if (!havePendingRead) {
+                // If it's a replay read we need to retry even if there's 
already
+                // another scheduled read, otherwise we'd be stuck until
+                // more messages are published.
+                if (!havePendingRead || readType == ReadType.Replay) {
                     log.info("[{}] Retrying read operation", name);
                     readMoreEntries();
                 } else {
@@ -1036,7 +1042,10 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
             return 
redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
         } else if (delayedDeliveryTracker.isPresent() && 
delayedDeliveryTracker.get().hasMessageAvailable()) {
             
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
-            return 
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
+            Set<PositionImpl> messagesAvailableNow =
+                    
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
+            messagesAvailableNow.forEach(p -> 
redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
+            return messagesAvailableNow;
         } else {
             return Collections.emptySet();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 8b62845572d..7f8059e1f0b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
 
+import org.apache.bookkeeper.client.BKException;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -59,6 +60,7 @@ public class DelayedDeliveryTest extends ProducerConsumerBase 
{
     @BeforeClass
     public void setup() throws Exception {
         conf.setDelayedDeliveryTickTimeMillis(1024);
+        conf.setDispatcherReadFailureBackoffInitialTimeInMs(1000);
         super.internalSetup();
         super.producerBaseSetup();
     }
@@ -580,4 +582,47 @@ public class DelayedDeliveryTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test
+    public void testDispatcherReadFailure() throws Exception {
+        String topic = 
BrokerTestUtil.newUniqueName("testDispatcherReadFailure");
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("shared-sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage()
+                    .value("msg-" + i)
+                    .deliverAfter(5, TimeUnit.SECONDS)
+                    .sendAsync();
+        }
+
+        producer.flush();
+
+        Message<String> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
+        assertNull(msg);
+
+        // Inject failure in BK read
+        this.mockBookKeeper.failNow(BKException.Code.ReadException);
+
+        Set<String> receivedMsgs = new TreeSet<>();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(10, TimeUnit.SECONDS);
+            receivedMsgs.add(msg.getValue());
+        }
+
+        assertEquals(receivedMsgs.size(), 10);
+        for (int i = 0; i < 10; i++) {
+            assertTrue(receivedMsgs.contains("msg-" + i));
+        }
+    }
+
 }

Reply via email to