This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 735c91ab64b [improve] [broker] PIP-299-part-3: Add dynamic config
support: dispatcherPauseOnAckStatePersistentEnabled (#21837)
735c91ab64b is described below
commit 735c91ab64b1fd008d51417b71022df4567538a5
Author: fengyubiao <[email protected]>
AuthorDate: Fri Jan 5 10:53:34 2024 +0800
[improve] [broker] PIP-299-part-3: Add dynamic config support:
dispatcherPauseOnAckStatePersistentEnabled (#21837)
---
.../pulsar/broker/service/AbstractTopic.java | 8 ++-
.../pulsar/broker/service/BrokerService.java | 16 +++++
.../apache/pulsar/broker/service/Dispatcher.java | 10 +++
.../PersistentDispatcherMultipleConsumers.java | 47 ++++++++-----
.../broker/service/persistent/PersistentTopic.java | 14 ++++
.../api/SubscriptionPauseOnAckStatPersistTest.java | 78 ++++++++++++++++++++++
6 files changed, 156 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index fcadab228e5..2d32aac7d44 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -397,7 +397,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
topicPolicies.getSchemaValidationEnforced().updateBrokerValue(config.isSchemaValidationEnforced());
topicPolicies.getEntryFilters().updateBrokerValue(new
EntryFilters(String.join(",",
config.getEntryFilterNames())));
-
+ topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled()
+
.updateBrokerValue(config.isDispatcherPauseOnAckStatePersistentEnabled());
updateEntryFilters();
}
@@ -1267,6 +1268,11 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
dispatchRateInBroker(brokerService.pulsar().getConfiguration()));
}
+ public void updateBrokerDispatchPauseOnAckStatePersistentEnabled() {
+
topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().updateBrokerValue(
+
brokerService.pulsar().getConfiguration().isDispatcherPauseOnAckStatePersistentEnabled());
+ }
+
public void addFilteredEntriesCount(int filtered) {
this.filteredEntriesCounter.add(filtered);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b3b6e778596..fae81c1f644 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2605,6 +2605,10 @@ public class BrokerService implements Closeable {
registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInByte",
(dispatchRatePerTopicInByte) -> {
updateSubscriptionMessageDispatchRate();
});
+ // add listener to update "dispatcherPauseOnAckStatePersistentEnabled"
in byte for subscription
+
registerConfigurationListener("dispatcherPauseOnAckStatePersistentEnabled",
(dispatchRatePerTopicInByte) -> {
+ updateDispatchPauseOnAckStatePersistentEnabled();
+ });
// add listener to update message-dispatch-rate in msg for replicator
registerConfigurationListener("dispatchThrottlingRatePerReplicatorInMsg",
@@ -2743,6 +2747,18 @@ public class BrokerService implements Closeable {
});
}
+ private void updateDispatchPauseOnAckStatePersistentEnabled() {
+ this.pulsar().getExecutor().execute(() -> {
+ forEachTopic(topic -> {
+ if (topic instanceof PersistentTopic) {
+ // Update policies.
+ PersistentTopic persistentTopic = (PersistentTopic) topic;
+
persistentTopic.updateBrokerDispatchPauseOnAckStatePersistentEnabled();
+ }
+ });
+ });
+ }
+
private void updateBrokerSubscriptionTypesEnabled(Object
subscriptionTypesEnabled) {
this.pulsar().getExecutor().execute(() -> {
// update subscriptionTypesEnabled
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index aebafacd584..fcd4c52ee37 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -151,6 +151,16 @@ public interface Dispatcher {
*/
default void afterAckMessages(Throwable exOfDeletion, Object
ctxOfDeletion){}
+ /**
+ * Trigger a new "readMoreEntries" if the dispatching has been paused
before. This method is only implemented in
+ * {@link
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers}
right now, other
+ * implements are not necessary to implement this method.
+ * @return did a resume.
+ */
+ default boolean checkAndResumeIfPaused(){
+ return false;
+ }
+
default long getFilterProcessedMsgCount() {
return 0;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 0d1f198a7ca..be82b190ffb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -1039,25 +1039,40 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
@Override
public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion)
{
- if (blockedDispatcherOnCursorDataCanNotFullyPersist == TRUE) {
- if (cursor.isCursorDataFullyPersistable()) {
- // If there was no previous pause due to cursor data is too
large to persist, we don't need to manually
- // trigger a new read. This can avoid too many CPU circles.
- if
(BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER.compareAndSet(this,
TRUE, FALSE)) {
- readMoreEntriesAsync();
- } else {
- // Retry due to conflict update.
- afterAckMessages(exOfDeletion, ctxOfDeletion);
- }
+ boolean unPaused = blockedDispatcherOnCursorDataCanNotFullyPersist ==
FALSE;
+ // Trigger a new read if needed.
+ boolean shouldPauseNow = !checkAndResumeIfPaused();
+ // Switch stat to "paused" if needed.
+ if (unPaused && shouldPauseNow) {
+ if
(!BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER
+ .compareAndSet(this, FALSE, TRUE)) {
+ // Retry due to conflict update.
+ afterAckMessages(exOfDeletion, ctxOfDeletion);
}
- } else {
- if (!cursor.isCursorDataFullyPersistable()) {
- if
(BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER.compareAndSet(this,
FALSE, TRUE)) {
- // Retry due to conflict update.
- afterAckMessages(exOfDeletion, ctxOfDeletion);
- }
+ }
+ }
+
+ @Override
+ public boolean checkAndResumeIfPaused() {
+ boolean paused = blockedDispatcherOnCursorDataCanNotFullyPersist ==
TRUE;
+ boolean shouldPauseNow = !cursor.isCursorDataFullyPersistable()
+ && topic.isDispatcherPauseOnAckStatePersistentEnabled();
+ // No need to change.
+ if (paused == shouldPauseNow) {
+ return !shouldPauseNow;
+ }
+ // Should change to "un-pause".
+ if (paused && !shouldPauseNow) {
+ // If there was no previous pause due to cursor data is too large
to persist, we don't need to manually
+ // trigger a new read. This can avoid too many CPU circles.
+ if
(BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER.compareAndSet(this,
TRUE, FALSE)) {
+ readMoreEntriesAsync();
+ } else {
+ // Retry due to conflict update.
+ checkAndResumeIfPaused();
}
}
+ return !shouldPauseNow;
}
public boolean isBlockedDispatcherOnUnackedMsgs() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e3cfafff57f..34893fed4d3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3791,6 +3791,20 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return b == null ? false : b.booleanValue();
}
+ @Override
+ public void updateBrokerDispatchPauseOnAckStatePersistentEnabled() {
+ super.updateBrokerDispatchPauseOnAckStatePersistentEnabled();
+ // Trigger new read if subscriptions has been paused before.
+ if
(!topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().get()) {
+ getSubscriptions().forEach((sName, subscription) -> {
+ if (subscription.getDispatcher() == null) {
+ return;
+ }
+ subscription.getDispatcher().afterAckMessages(null, 0);
+ });
+ }
+ }
+
@Override
public void onUpdate(TopicPolicies policies) {
if (log.isDebugEnabled()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
index 0029e61df4c..06298e2fdd2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
@@ -37,7 +37,9 @@ import
org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.GetStatsOptions;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -211,6 +213,77 @@ public class SubscriptionPauseOnAckStatPersistTest extends
ProducerConsumerBase
}
}
+ @Test
+ public void testBrokerDynamicConfig() throws Exception {
+ final String tpName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String subscription = "s1";
+ final int msgSendCount = MAX_UNACKED_RANGES_TO_PERSIST * 4;
+ final int incomingQueueSize = MAX_UNACKED_RANGES_TO_PERSIST * 10;
+
+ // Enable "dispatcherPauseOnAckStatePersistentEnabled".
+
admin.brokers().updateDynamicConfiguration("dispatcherPauseOnAckStatePersistentEnabled",
"true");
+ admin.topics().createNonPartitionedTopic(tpName);
+ admin.topics().createSubscription(tpName, subscription,
MessageId.earliest);
+
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(tpName,
false).join().get();
+ Awaitility.await().untilAsserted(() -> {
+
Assert.assertTrue(pulsar.getConfig().isDispatcherPauseOnAckStatePersistentEnabled());
+ HierarchyTopicPolicies policies =
WhiteboxImpl.getInternalState(persistentTopic, "topicPolicies");
+ Boolean v =
policies.getDispatcherPauseOnAckStatePersistentEnabled().get();
+ Assert.assertNotNull(v);
+ Assert.assertTrue(v.booleanValue());
+ });
+
+ // Send double MAX_UNACKED_RANGES_TO_PERSIST messages.
+ Producer<String> p1 =
pulsarClient.newProducer(Schema.STRING).topic(tpName).enableBatching(false).create();
+ ArrayList<MessageId> messageIdsSent = new ArrayList<>();
+ for (int i = 0; i < msgSendCount; i++) {
+ MessageIdImpl messageId = (MessageIdImpl)
p1.send(Integer.valueOf(i).toString());
+ messageIdsSent.add(messageId);
+ }
+ // Make ack holes.
+ Consumer<String> c1 =
pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName(subscription)
+ .receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true)
+ .subscriptionType(SubscriptionType.Shared).subscribe();
+ ackOddMessagesOnly(c1);
+ verifyAckHolesIsMuchThanLimit(tpName, subscription);
+
+ cancelPendingRead(tpName, subscription);
+ triggerNewReadMoreEntries(tpName, subscription);
+
+ // Verify: the dispatcher has been paused.
+ final String specifiedMessage = "9876543210";
+ p1.send(specifiedMessage);
+ Message<String> msg1 = c1.receive(2, TimeUnit.SECONDS);
+ Assert.assertNull(msg1, msg1 == null ? "null" : msg1.getValue());
+
+ // Disable "dispatcherPauseOnAckStatePersistentEnabled".
+
admin.brokers().updateDynamicConfiguration("dispatcherPauseOnAckStatePersistentEnabled",
"false");
+ Awaitility.await().untilAsserted(() -> {
+
Assert.assertFalse(pulsar.getConfig().isDispatcherPauseOnAckStatePersistentEnabled());
+ HierarchyTopicPolicies policies =
WhiteboxImpl.getInternalState(persistentTopic, "topicPolicies");
+ Boolean v =
policies.getDispatcherPauseOnAckStatePersistentEnabled().get();
+ Assert.assertTrue(v == null || !v.booleanValue());
+ });
+
+ // Verify the new message can be received.
+ Message<String> msg2 = c1.receive(2, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg2);
+ Assert.assertEquals(msg2.getValue(), specifiedMessage);
+ // cleanup.
+ p1.close();
+ c1.close();
+ admin.topics().delete(tpName, false);
+ }
+
+ private void verifyAckHolesIsMuchThanLimit(String tpName, String
subscription) {
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertTrue(MAX_UNACKED_RANGES_TO_PERSIST < admin.topics()
+
.getInternalStats(tpName).cursors.get(subscription).totalNonContiguousDeletedMessagesRange);
+ });
+ }
+
@Test(dataProvider = "multiConsumerSubscriptionTypes")
public void testPauseOnAckStatPersist(SubscriptionType subscriptionType)
throws Exception {
final String tpName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
@@ -234,6 +307,7 @@ public class SubscriptionPauseOnAckStatPersistTest extends
ProducerConsumerBase
.receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true).subscriptionType(subscriptionType)
.subscribe();
ackOddMessagesOnly(c1);
+ verifyAckHolesIsMuchThanLimit(tpName, subscription);
cancelPendingRead(tpName, subscription);
triggerNewReadMoreEntries(tpName, subscription);
@@ -279,6 +353,7 @@ public class SubscriptionPauseOnAckStatPersistTest extends
ProducerConsumerBase
.receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true)
.subscriptionType(SubscriptionType.Shared).subscribe();
ackOddMessagesOnly(c1);
+ verifyAckHolesIsMuchThanLimit(tpName, subscription);
cancelPendingRead(tpName, subscription);
triggerNewReadMoreEntries(tpName, subscription);
@@ -358,6 +433,7 @@ public class SubscriptionPauseOnAckStatPersistTest extends
ProducerConsumerBase
.subscriptionType(subscriptionType)
.subscribe();
ackOddMessagesOnly(c1);
+ verifyAckHolesIsMuchThanLimit(tpName, subscription);
cancelPendingRead(tpName, subscription);
triggerNewReadMoreEntries(tpName, subscription);
@@ -398,6 +474,7 @@ public class SubscriptionPauseOnAckStatPersistTest extends
ProducerConsumerBase
}
// Make ack holes.
ReceivedMessages receivedMessagesC1 = ackOddMessagesOnly(c1);
+ verifyAckHolesIsMuchThanLimit(tpName, subscription);
cancelPendingRead(tpName, subscription);
triggerNewReadMoreEntries(tpName, subscription);
@@ -454,6 +531,7 @@ public class SubscriptionPauseOnAckStatPersistTest extends
ProducerConsumerBase
}
// Make ack holes.
ReceivedMessages receivedMessagesC1AndC2 = ackOddMessagesOnly(c1, c2);
+ verifyAckHolesIsMuchThanLimit(tpName, subscription);
cancelPendingRead(tpName, subscription);
triggerNewReadMoreEntries(tpName, subscription);