This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new d80a4d3153c [broker] Fixed delayed delivery after read operation error
(#18098)
d80a4d3153c is described below
commit d80a4d3153c25d4add5a1ac2a3d2461dc82c1e90
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Oct 18 17:38:17 2022 -0700
[broker] Fixed delayed delivery after read operation error (#18098)
---
.../PersistentDispatcherMultipleConsumers.java | 17 ++++++--
.../service/persistent/DelayedDeliveryTest.java | 45 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 15c8654fe36..b4397635c41 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -92,8 +92,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
"totalAvailablePermits");
protected volatile int totalAvailablePermits = 0;
protected volatile int readBatchSize;
- protected final Backoff readFailureBackoff = new Backoff(15,
TimeUnit.SECONDS,
- 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+ protected final Backoff readFailureBackoff;
private static final
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
TOTAL_UNACKED_MESSAGES_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -130,6 +129,10 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
+ this.readFailureBackoff = new Backoff(
+
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
+ TimeUnit.MILLISECONDS,
+ 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
}
@Override
@@ -695,7 +698,10 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
topic.getBrokerService().executor().schedule(() -> {
synchronized (PersistentDispatcherMultipleConsumers.this) {
- if (!havePendingRead) {
+ // If it's a replay read we need to retry even if there's
already
+ // another scheduled read, otherwise we'd be stuck until
+ // more messages are published.
+ if (!havePendingRead || readType == ReadType.Replay) {
log.info("[{}] Retrying read operation", name);
readMoreEntries();
} else {
@@ -900,7 +906,10 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
return
redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
} else if (delayedDeliveryTracker.isPresent() &&
delayedDeliveryTracker.get().hasMessageAvailable()) {
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
- return
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
+ Set<PositionImpl> messagesAvailableNow =
+
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
+ messagesAvailableNow.forEach(p ->
redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
+ return messagesAvailableNow;
} else {
return Collections.emptySet();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 45131761824..1fa62806e26 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -61,6 +62,7 @@ public class DelayedDeliveryTest extends ProducerConsumerBase
{
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
conf.setDelayedDeliveryTickTimeMillis(1024);
+ conf.setDispatcherReadFailureBackoffInitialTimeInMs(1000);
super.internalSetup();
super.producerBaseSetup();
}
@@ -582,4 +584,47 @@ public class DelayedDeliveryTest extends
ProducerConsumerBase {
}
}
+ @Test
+ public void testDispatcherReadFailure() throws Exception {
+ String topic =
BrokerTestUtil.newUniqueName("testDispatcherReadFailure");
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("shared-sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage()
+ .value("msg-" + i)
+ .deliverAfter(5, TimeUnit.SECONDS)
+ .sendAsync();
+ }
+
+ producer.flush();
+
+ Message<String> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
+ assertNull(msg);
+
+ // Inject failure in BK read
+ this.mockBookKeeper.failNow(BKException.Code.ReadException);
+
+ Set<String> receivedMsgs = new TreeSet<>();
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(10, TimeUnit.SECONDS);
+ receivedMsgs.add(msg.getValue());
+ }
+
+ assertEquals(receivedMsgs.size(), 10);
+ for (int i = 0; i < 10; i++) {
+ assertTrue(receivedMsgs.contains("msg-" + i));
+ }
+ }
+
}