This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 68bfd13f5c3 [broker] Fixed delayed delivery after read operation error 
(#18098)
68bfd13f5c3 is described below

commit 68bfd13f5c3e7171146386558f3a52dd2d51c12c
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Oct 18 17:38:17 2022 -0700

    [broker] Fixed delayed delivery after read operation error (#18098)
---
 .../PersistentDispatcherMultipleConsumers.java     | 17 ++++++--
 .../service/persistent/DelayedDeliveryTest.java    | 45 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 0b8e7e18338..2fdb03cb194 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -93,8 +93,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                     "totalAvailablePermits");
     protected volatile int totalAvailablePermits = 0;
     protected volatile int readBatchSize;
-    protected final Backoff readFailureBackoff = new Backoff(15, 
TimeUnit.SECONDS,
-            1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+    protected final Backoff readFailureBackoff;
     private static final 
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
             TOTAL_UNACKED_MESSAGES_UPDATER =
             
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -129,6 +128,10 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
+        this.readFailureBackoff = new Backoff(
+                
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
+                TimeUnit.MILLISECONDS,
+                1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -659,7 +662,10 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 
         topic.getBrokerService().executor().schedule(() -> {
             synchronized (PersistentDispatcherMultipleConsumers.this) {
-                if (!havePendingRead) {
+                // If it's a replay read we need to retry even if there's 
already
+                // another scheduled read, otherwise we'd be stuck until
+                // more messages are published.
+                if (!havePendingRead || readType == ReadType.Replay) {
                     log.info("[{}] Retrying read operation", name);
                     readMoreEntries();
                 } else {
@@ -869,7 +875,10 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             return 
redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
         } else if (delayedDeliveryTracker.isPresent() && 
delayedDeliveryTracker.get().hasMessageAvailable()) {
             
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
-            return 
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
+            Set<PositionImpl> messagesAvailableNow =
+                    
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
+            messagesAvailableNow.forEach(p -> 
redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
+            return messagesAvailableNow;
         } else {
             return Collections.emptySet();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 480da2f5b94..041818850ed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
 
+import org.apache.bookkeeper.client.BKException;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -61,6 +62,7 @@ public class DelayedDeliveryTest extends ProducerConsumerBase 
{
         conf.setSystemTopicEnabled(true);
         conf.setTopicLevelPoliciesEnabled(true);
         conf.setDelayedDeliveryTickTimeMillis(1024);
+        conf.setDispatcherReadFailureBackoffInitialTimeInMs(1000);
         super.internalSetup();
         super.producerBaseSetup();
     }
@@ -539,4 +541,47 @@ public class DelayedDeliveryTest extends 
ProducerConsumerBase {
 
         Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0));
     }
+
+    @Test
+    public void testDispatcherReadFailure() throws Exception {
+        String topic = 
BrokerTestUtil.newUniqueName("testDispatcherReadFailure");
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("shared-sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage()
+                    .value("msg-" + i)
+                    .deliverAfter(5, TimeUnit.SECONDS)
+                    .sendAsync();
+        }
+
+        producer.flush();
+
+        Message<String> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
+        assertNull(msg);
+
+        // Inject failure in BK read
+        this.mockBookKeeper.failNow(BKException.Code.ReadException);
+
+        Set<String> receivedMsgs = new TreeSet<>();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(10, TimeUnit.SECONDS);
+            receivedMsgs.add(msg.getValue());
+        }
+
+        assertEquals(receivedMsgs.size(), 10);
+        for (int i = 0; i < 10; i++) {
+            assertTrue(receivedMsgs.contains("msg-" + i));
+        }
+    }
 }

Reply via email to