This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 340e30bfb34 [fix][broker] Fix retry backoff for
PersistentDispatcherMultipleConsumers (#23284)
340e30bfb34 is described below
commit 340e30bfb3479fd059f9cb32f9cef85985e1bca8
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Sep 11 14:31:43 2024 +0300
[fix][broker] Fix retry backoff for PersistentDispatcherMultipleConsumers
(#23284)
(cherry picked from commit 8151639dccbbacee5ec23678cf7b5e0a768902dd)
---
.../PersistentDispatcherMultipleConsumers.java | 19 +--
...ntStickyKeyDispatcherMultipleConsumersTest.java | 139 +++++++++++++++++----
2 files changed, 128 insertions(+), 30 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 21441a13191..0f8404043c3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -451,12 +451,15 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Reschedule message read in {} ms",
topic.getName(), name, readAfterMs);
}
- topic.getBrokerService().executor().schedule(
- () -> {
- isRescheduleReadInProgress.set(false);
- readMoreEntries();
- },
- readAfterMs, TimeUnit.MILLISECONDS);
+ Runnable runnable = () -> {
+ isRescheduleReadInProgress.set(false);
+ readMoreEntries();
+ };
+ if (readAfterMs > 0) {
+ topic.getBrokerService().executor().schedule(runnable,
readAfterMs, TimeUnit.MILLISECONDS);
+ } else {
+ topic.getBrokerService().executor().execute(runnable);
+ }
}
}
@@ -836,6 +839,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
totalBytesSent += sendMessageInfo.getTotalBytes();
}
+ lastNumberOfEntriesDispatched = (int) totalEntries;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries,
totalMessagesSent, totalBytesSent);
if (entriesToDispatch > 0) {
@@ -848,9 +852,8 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(),
stickyKeyHash);
entry.release();
});
-
- lastNumberOfEntriesDispatched = entriesToDispatch;
}
+
return true;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index c41301ae352..e07df9aa269 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -101,6 +101,8 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
final String topicName = "persistent://public/default/testTopic";
final String subscriptionName = "testSubscription";
private AtomicInteger consumerMockAvailablePermits;
+ int retryBackoffInitialTimeInMs = 10;
+ int retryBackoffMaxTimeInMs = 50;
@BeforeMethod
public void setup() throws Exception {
@@ -111,8 +113,8 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
doReturn(false).when(configMock).isAllowOverrideEntryFilters();
-
doReturn(10).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
- doReturn(50).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
+ doAnswer(invocation ->
retryBackoffInitialTimeInMs).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
+ doAnswer(invocation ->
retryBackoffMaxTimeInMs).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();
@@ -472,35 +474,45 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
allEntries.forEach(entry -> entry.release());
}
- @DataProvider(name = "dispatchMessagesInSubscriptionThread")
- private Object[][] dispatchMessagesInSubscriptionThread() {
- return new Object[][] { { false }, { true } };
+ @DataProvider(name = "testBackoffDelayWhenNoMessagesDispatched")
+ private Object[][] testBackoffDelayWhenNoMessagesDispatchedParams() {
+ return new Object[][] { { false, true }, { true, true }, { true, false
}, { false, false } };
}
- @Test(dataProvider = "dispatchMessagesInSubscriptionThread")
- public void testBackoffDelayWhenNoMessagesDispatched(boolean
dispatchMessagesInSubscriptionThread)
+ @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
+ public void testBackoffDelayWhenNoMessagesDispatched(boolean
dispatchMessagesInSubscriptionThread, boolean isKeyShared)
throws Exception {
persistentDispatcher.close();
List<Long> retryDelays = new CopyOnWriteArrayList<>();
doReturn(dispatchMessagesInSubscriptionThread).when(configMock)
.isDispatcherDispatchMessagesInSubscriptionThread();
- persistentDispatcher = new
PersistentStickyKeyDispatcherMultipleConsumers(
- topicMock, cursorMock, subscriptionMock, configMock,
- new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
- @Override
- protected void reScheduleReadInMs(long readAfterMs) {
- retryDelays.add(readAfterMs);
- }
- };
+ PersistentDispatcherMultipleConsumers dispatcher;
+ if (isKeyShared) {
+ dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+ topicMock, cursorMock, subscriptionMock, configMock,
+ new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
+ @Override
+ protected void reScheduleReadInMs(long readAfterMs) {
+ retryDelays.add(readAfterMs);
+ }
+ };
+ } else {
+ dispatcher = new PersistentDispatcherMultipleConsumers(topicMock,
cursorMock, subscriptionMock) {
+ @Override
+ protected void reScheduleReadInMs(long readAfterMs) {
+ retryDelays.add(readAfterMs);
+ }
+ };
+ }
// add a consumer without permits to trigger the retry behavior
consumerMockAvailablePermits.set(0);
- persistentDispatcher.addConsumer(consumerMock);
+ dispatcher.addConsumer(consumerMock);
// call "readEntriesComplete" directly to test the retry behavior
List<Entry> entries = List.of(EntryImpl.create(1, 1,
createMessage("message1", 1)));
- persistentDispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ dispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 1);
assertEquals(retryDelays.get(0), 10, "Initial retry delay
should be 10ms");
@@ -508,7 +520,7 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
);
// test the second retry delay
entries = List.of(EntryImpl.create(1, 1, createMessage("message1",
1)));
- persistentDispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ dispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 2);
double delay = retryDelays.get(1);
@@ -518,7 +530,7 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
// verify the max retry delay
for (int i = 0; i < 100; i++) {
entries = List.of(EntryImpl.create(1, 1, createMessage("message1",
1)));
- persistentDispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ dispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
}
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 102);
@@ -529,14 +541,14 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
// unblock to check that the retry delay is reset
consumerMockAvailablePermits.set(1000);
entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1,
"key2")));
- persistentDispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ dispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
// wait that the possibly async handling has completed
- Awaitility.await().untilAsserted(() ->
assertFalse(persistentDispatcher.isSendInProgress()));
+ Awaitility.await().untilAsserted(() ->
assertFalse(dispatcher.isSendInProgress()));
// now block again to check the next retry delay so verify it was reset
consumerMockAvailablePermits.set(0);
entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1,
"key3")));
- persistentDispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ dispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 103);
assertEquals(retryDelays.get(0), 10, "Resetted retry delay
should be 10ms");
@@ -544,6 +556,89 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
);
}
+ @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
+ public void testBackoffDelayWhenRetryDelayDisabled(boolean
dispatchMessagesInSubscriptionThread, boolean isKeyShared)
+ throws Exception {
+ persistentDispatcher.close();
+
+ // it should be possible to disable the retry delay
+ // by setting retryBackoffInitialTimeInMs and retryBackoffMaxTimeInMs
to 0
+ retryBackoffInitialTimeInMs=0;
+ retryBackoffMaxTimeInMs=0;
+
+ List<Long> retryDelays = new CopyOnWriteArrayList<>();
+ doReturn(dispatchMessagesInSubscriptionThread).when(configMock)
+ .isDispatcherDispatchMessagesInSubscriptionThread();
+
+ PersistentDispatcherMultipleConsumers dispatcher;
+ if (isKeyShared) {
+ dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+ topicMock, cursorMock, subscriptionMock, configMock,
+ new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
+ @Override
+ protected void reScheduleReadInMs(long readAfterMs) {
+ retryDelays.add(readAfterMs);
+ }
+ };
+ } else {
+ dispatcher = new PersistentDispatcherMultipleConsumers(topicMock,
cursorMock, subscriptionMock) {
+ @Override
+ protected void reScheduleReadInMs(long readAfterMs) {
+ retryDelays.add(readAfterMs);
+ }
+ };
+ }
+
+ // add a consumer without permits to trigger the retry behavior
+ consumerMockAvailablePermits.set(0);
+ dispatcher.addConsumer(consumerMock);
+
+ // call "readEntriesComplete" directly to test the retry behavior
+ List<Entry> entries = List.of(EntryImpl.create(1, 1,
createMessage("message1", 1)));
+ dispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 1);
+ assertEquals(retryDelays.get(0), 0, "Initial retry delay
should be 0ms");
+ }
+ );
+ // test the second retry delay
+ entries = List.of(EntryImpl.create(1, 1, createMessage("message1",
1)));
+ dispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 2);
+ double delay = retryDelays.get(1);
+ assertEquals(delay, 0, 0, "Second retry delay should be
0ms");
+ }
+ );
+ // verify the max retry delay
+ for (int i = 0; i < 100; i++) {
+ entries = List.of(EntryImpl.create(1, 1, createMessage("message1",
1)));
+ dispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ }
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 102);
+ double delay = retryDelays.get(101);
+ assertEquals(delay, 0, 0, "Max delay should be 0ms");
+ }
+ );
+ // unblock to check that the retry delay is reset
+ consumerMockAvailablePermits.set(1000);
+ entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1,
"key2")));
+ dispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ // wait that the possibly async handling has completed
+ Awaitility.await().untilAsserted(() ->
assertFalse(dispatcher.isSendInProgress()));
+
+ // now block again to check the next retry delay so verify it was reset
+ consumerMockAvailablePermits.set(0);
+ entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1,
"key3")));
+ dispatcher.readEntriesComplete(entries,
PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 103);
+ assertEquals(retryDelays.get(0), 0, "Resetted retry delay
should be 0ms");
+ }
+ );
+ }
+
private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}