This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new d80a4d3153c [broker] Fixed delayed delivery after read operation error 
(#18098)
d80a4d3153c is described below

commit d80a4d3153c25d4add5a1ac2a3d2461dc82c1e90
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Oct 18 17:38:17 2022 -0700

    [broker] Fixed delayed delivery after read operation error (#18098)
---
 .../PersistentDispatcherMultipleConsumers.java     | 17 ++++++--
 .../service/persistent/DelayedDeliveryTest.java    | 45 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 15c8654fe36..b4397635c41 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -92,8 +92,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                     "totalAvailablePermits");
     protected volatile int totalAvailablePermits = 0;
     protected volatile int readBatchSize;
-    protected final Backoff readFailureBackoff = new Backoff(15, 
TimeUnit.SECONDS,
-            1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+    protected final Backoff readFailureBackoff;
     private static final 
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
             TOTAL_UNACKED_MESSAGES_UPDATER =
             
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -130,6 +129,10 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.initializeDispatchRateLimiterIfNeeded();
+        this.readFailureBackoff = new Backoff(
+                
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
+                TimeUnit.MILLISECONDS,
+                1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -695,7 +698,10 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 
         topic.getBrokerService().executor().schedule(() -> {
             synchronized (PersistentDispatcherMultipleConsumers.this) {
-                if (!havePendingRead) {
+                // If it's a replay read we need to retry even if there's 
already
+                // another scheduled read, otherwise we'd be stuck until
+                // more messages are published.
+                if (!havePendingRead || readType == ReadType.Replay) {
                     log.info("[{}] Retrying read operation", name);
                     readMoreEntries();
                 } else {
@@ -900,7 +906,10 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
             return 
redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
         } else if (delayedDeliveryTracker.isPresent() && 
delayedDeliveryTracker.get().hasMessageAvailable()) {
             
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
-            return 
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
+            Set<PositionImpl> messagesAvailableNow =
+                    
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
+            messagesAvailableNow.forEach(p -> 
redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
+            return messagesAvailableNow;
         } else {
             return Collections.emptySet();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 45131761824..1fa62806e26 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
 
+import org.apache.bookkeeper.client.BKException;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -61,6 +62,7 @@ public class DelayedDeliveryTest extends ProducerConsumerBase 
{
         conf.setSystemTopicEnabled(true);
         conf.setTopicLevelPoliciesEnabled(true);
         conf.setDelayedDeliveryTickTimeMillis(1024);
+        conf.setDispatcherReadFailureBackoffInitialTimeInMs(1000);
         super.internalSetup();
         super.producerBaseSetup();
     }
@@ -582,4 +584,47 @@ public class DelayedDeliveryTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test
+    public void testDispatcherReadFailure() throws Exception {
+        String topic = 
BrokerTestUtil.newUniqueName("testDispatcherReadFailure");
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("shared-sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage()
+                    .value("msg-" + i)
+                    .deliverAfter(5, TimeUnit.SECONDS)
+                    .sendAsync();
+        }
+
+        producer.flush();
+
+        Message<String> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
+        assertNull(msg);
+
+        // Inject failure in BK read
+        this.mockBookKeeper.failNow(BKException.Code.ReadException);
+
+        Set<String> receivedMsgs = new TreeSet<>();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(10, TimeUnit.SECONDS);
+            receivedMsgs.add(msg.getValue());
+        }
+
+        assertEquals(receivedMsgs.size(), 10);
+        for (int i = 0; i < 10; i++) {
+            assertTrue(receivedMsgs.contains("msg-" + i));
+        }
+    }
+
 }

Reply via email to