This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 735c91ab64b [improve] [broker] PIP-299-part-3: Add dynamic config 
support: dispatcherPauseOnAckStatePersistentEnabled (#21837)
735c91ab64b is described below

commit 735c91ab64b1fd008d51417b71022df4567538a5
Author: fengyubiao <[email protected]>
AuthorDate: Fri Jan 5 10:53:34 2024 +0800

    [improve] [broker] PIP-299-part-3: Add dynamic config support: 
dispatcherPauseOnAckStatePersistentEnabled (#21837)
---
 .../pulsar/broker/service/AbstractTopic.java       |  8 ++-
 .../pulsar/broker/service/BrokerService.java       | 16 +++++
 .../apache/pulsar/broker/service/Dispatcher.java   | 10 +++
 .../PersistentDispatcherMultipleConsumers.java     | 47 ++++++++-----
 .../broker/service/persistent/PersistentTopic.java | 14 ++++
 .../api/SubscriptionPauseOnAckStatPersistTest.java | 78 ++++++++++++++++++++++
 6 files changed, 156 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index fcadab228e5..2d32aac7d44 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -397,7 +397,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         
topicPolicies.getSchemaValidationEnforced().updateBrokerValue(config.isSchemaValidationEnforced());
         topicPolicies.getEntryFilters().updateBrokerValue(new 
EntryFilters(String.join(",",
                 config.getEntryFilterNames())));
-
+        topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled()
+                
.updateBrokerValue(config.isDispatcherPauseOnAckStatePersistentEnabled());
         updateEntryFilters();
     }
 
@@ -1267,6 +1268,11 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
             dispatchRateInBroker(brokerService.pulsar().getConfiguration()));
     }
 
+    public void updateBrokerDispatchPauseOnAckStatePersistentEnabled() {
+        
topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().updateBrokerValue(
+                
brokerService.pulsar().getConfiguration().isDispatcherPauseOnAckStatePersistentEnabled());
+    }
+
     public void addFilteredEntriesCount(int filtered) {
         this.filteredEntriesCounter.add(filtered);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b3b6e778596..fae81c1f644 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2605,6 +2605,10 @@ public class BrokerService implements Closeable {
         
registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInByte", 
(dispatchRatePerTopicInByte) -> {
             updateSubscriptionMessageDispatchRate();
         });
+        // add listener to update "dispatcherPauseOnAckStatePersistentEnabled" 
in byte for subscription
+        
registerConfigurationListener("dispatcherPauseOnAckStatePersistentEnabled", 
(dispatchRatePerTopicInByte) -> {
+            updateDispatchPauseOnAckStatePersistentEnabled();
+        });
 
         // add listener to update message-dispatch-rate in msg for replicator
         
registerConfigurationListener("dispatchThrottlingRatePerReplicatorInMsg",
@@ -2743,6 +2747,18 @@ public class BrokerService implements Closeable {
         });
     }
 
+    private void updateDispatchPauseOnAckStatePersistentEnabled() {
+        this.pulsar().getExecutor().execute(() -> {
+            forEachTopic(topic -> {
+                if (topic instanceof PersistentTopic) {
+                    // Update policies.
+                    PersistentTopic persistentTopic = (PersistentTopic) topic;
+                    
persistentTopic.updateBrokerDispatchPauseOnAckStatePersistentEnabled();
+                }
+            });
+        });
+    }
+
     private void updateBrokerSubscriptionTypesEnabled(Object 
subscriptionTypesEnabled) {
         this.pulsar().getExecutor().execute(() -> {
             // update subscriptionTypesEnabled
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index aebafacd584..fcd4c52ee37 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -151,6 +151,16 @@ public interface Dispatcher {
      */
     default void afterAckMessages(Throwable exOfDeletion, Object 
ctxOfDeletion){}
 
+    /**
+     * Trigger a new "readMoreEntries" if the dispatching has been paused 
before. This method is only implemented in
+     * {@link 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers}
 right now, other
+     * implements are not necessary to implement this method.
+     * @return did a resume.
+     */
+    default boolean checkAndResumeIfPaused(){
+        return false;
+    }
+
     default long getFilterProcessedMsgCount() {
         return 0;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 0d1f198a7ca..be82b190ffb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -1039,25 +1039,40 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
 
     @Override
     public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) 
{
-        if (blockedDispatcherOnCursorDataCanNotFullyPersist == TRUE) {
-            if (cursor.isCursorDataFullyPersistable()) {
-                // If there was no previous pause due to cursor data is too 
large to persist, we don't need to manually
-                // trigger a new read. This can avoid too many CPU circles.
-                if 
(BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER.compareAndSet(this,
 TRUE, FALSE)) {
-                    readMoreEntriesAsync();
-                } else {
-                    // Retry due to conflict update.
-                    afterAckMessages(exOfDeletion, ctxOfDeletion);
-                }
+        boolean unPaused = blockedDispatcherOnCursorDataCanNotFullyPersist == 
FALSE;
+        // Trigger a new read if needed.
+        boolean shouldPauseNow = !checkAndResumeIfPaused();
+        // Switch stat to "paused" if needed.
+        if (unPaused && shouldPauseNow) {
+            if 
(!BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER
+                    .compareAndSet(this, FALSE, TRUE)) {
+                // Retry due to conflict update.
+                afterAckMessages(exOfDeletion, ctxOfDeletion);
             }
-        } else {
-            if (!cursor.isCursorDataFullyPersistable()) {
-                if 
(BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER.compareAndSet(this,
 FALSE, TRUE)) {
-                    // Retry due to conflict update.
-                    afterAckMessages(exOfDeletion, ctxOfDeletion);
-                }
+        }
+    }
+
+    @Override
+    public boolean checkAndResumeIfPaused() {
+        boolean paused = blockedDispatcherOnCursorDataCanNotFullyPersist == 
TRUE;
+        boolean shouldPauseNow = !cursor.isCursorDataFullyPersistable()
+                && topic.isDispatcherPauseOnAckStatePersistentEnabled();
+        // No need to change.
+        if (paused == shouldPauseNow) {
+            return !shouldPauseNow;
+        }
+        // Should change to "un-pause".
+        if (paused && !shouldPauseNow) {
+            // If there was no previous pause due to cursor data is too large 
to persist, we don't need to manually
+            // trigger a new read. This can avoid too many CPU circles.
+            if 
(BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER.compareAndSet(this,
 TRUE, FALSE)) {
+                readMoreEntriesAsync();
+            } else {
+                // Retry due to conflict update.
+                checkAndResumeIfPaused();
             }
         }
+        return !shouldPauseNow;
     }
 
     public boolean isBlockedDispatcherOnUnackedMsgs() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e3cfafff57f..34893fed4d3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3791,6 +3791,20 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return b == null ? false : b.booleanValue();
     }
 
+    @Override
+    public void updateBrokerDispatchPauseOnAckStatePersistentEnabled() {
+        super.updateBrokerDispatchPauseOnAckStatePersistentEnabled();
+        // Trigger new read if subscriptions has been paused before.
+        if 
(!topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().get()) {
+            getSubscriptions().forEach((sName, subscription) -> {
+                if (subscription.getDispatcher() == null) {
+                    return;
+                }
+                subscription.getDispatcher().afterAckMessages(null, 0);
+            });
+        }
+    }
+
     @Override
     public void onUpdate(TopicPolicies policies) {
         if (log.isDebugEnabled()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
index 0029e61df4c..06298e2fdd2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
@@ -37,7 +37,9 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.GetStatsOptions;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -211,6 +213,77 @@ public class SubscriptionPauseOnAckStatPersistTest extends 
ProducerConsumerBase
         }
     }
 
+    @Test
+    public void testBrokerDynamicConfig() throws Exception {
+        final String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subscription = "s1";
+        final int msgSendCount = MAX_UNACKED_RANGES_TO_PERSIST * 4;
+        final int incomingQueueSize = MAX_UNACKED_RANGES_TO_PERSIST * 10;
+
+        // Enable "dispatcherPauseOnAckStatePersistentEnabled".
+        
admin.brokers().updateDynamicConfiguration("dispatcherPauseOnAckStatePersistentEnabled",
 "true");
+        admin.topics().createNonPartitionedTopic(tpName);
+        admin.topics().createSubscription(tpName, subscription, 
MessageId.earliest);
+
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, 
false).join().get();
+        Awaitility.await().untilAsserted(() -> {
+            
Assert.assertTrue(pulsar.getConfig().isDispatcherPauseOnAckStatePersistentEnabled());
+            HierarchyTopicPolicies policies = 
WhiteboxImpl.getInternalState(persistentTopic, "topicPolicies");
+            Boolean v = 
policies.getDispatcherPauseOnAckStatePersistentEnabled().get();
+            Assert.assertNotNull(v);
+            Assert.assertTrue(v.booleanValue());
+        });
+
+        // Send double MAX_UNACKED_RANGES_TO_PERSIST messages.
+        Producer<String> p1 = 
pulsarClient.newProducer(Schema.STRING).topic(tpName).enableBatching(false).create();
+        ArrayList<MessageId> messageIdsSent = new ArrayList<>();
+        for (int i = 0; i < msgSendCount; i++) {
+            MessageIdImpl messageId = (MessageIdImpl) 
p1.send(Integer.valueOf(i).toString());
+            messageIdsSent.add(messageId);
+        }
+        // Make ack holes.
+        Consumer<String> c1 = 
pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName(subscription)
+                .receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+        ackOddMessagesOnly(c1);
+        verifyAckHolesIsMuchThanLimit(tpName, subscription);
+
+        cancelPendingRead(tpName, subscription);
+        triggerNewReadMoreEntries(tpName, subscription);
+
+        // Verify: the dispatcher has been paused.
+        final String specifiedMessage = "9876543210";
+        p1.send(specifiedMessage);
+        Message<String> msg1 = c1.receive(2, TimeUnit.SECONDS);
+        Assert.assertNull(msg1, msg1 == null ? "null" : msg1.getValue());
+
+        // Disable "dispatcherPauseOnAckStatePersistentEnabled".
+        
admin.brokers().updateDynamicConfiguration("dispatcherPauseOnAckStatePersistentEnabled",
 "false");
+        Awaitility.await().untilAsserted(() -> {
+            
Assert.assertFalse(pulsar.getConfig().isDispatcherPauseOnAckStatePersistentEnabled());
+            HierarchyTopicPolicies policies = 
WhiteboxImpl.getInternalState(persistentTopic, "topicPolicies");
+            Boolean v = 
policies.getDispatcherPauseOnAckStatePersistentEnabled().get();
+            Assert.assertTrue(v == null || !v.booleanValue());
+        });
+
+        // Verify the new message can be received.
+        Message<String> msg2 = c1.receive(2, TimeUnit.SECONDS);
+        Assert.assertNotNull(msg2);
+        Assert.assertEquals(msg2.getValue(), specifiedMessage);
+        // cleanup.
+        p1.close();
+        c1.close();
+        admin.topics().delete(tpName, false);
+    }
+
+    private void verifyAckHolesIsMuchThanLimit(String tpName, String 
subscription) {
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertTrue(MAX_UNACKED_RANGES_TO_PERSIST < admin.topics()
+                    
.getInternalStats(tpName).cursors.get(subscription).totalNonContiguousDeletedMessagesRange);
+        });
+    }
+
     @Test(dataProvider = "multiConsumerSubscriptionTypes")
     public void testPauseOnAckStatPersist(SubscriptionType subscriptionType) 
throws Exception {
         final String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
@@ -234,6 +307,7 @@ public class SubscriptionPauseOnAckStatPersistTest extends 
ProducerConsumerBase
                 
.receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true).subscriptionType(subscriptionType)
                 .subscribe();
         ackOddMessagesOnly(c1);
+        verifyAckHolesIsMuchThanLimit(tpName, subscription);
 
         cancelPendingRead(tpName, subscription);
         triggerNewReadMoreEntries(tpName, subscription);
@@ -279,6 +353,7 @@ public class SubscriptionPauseOnAckStatPersistTest extends 
ProducerConsumerBase
                 .receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true)
                 .subscriptionType(SubscriptionType.Shared).subscribe();
         ackOddMessagesOnly(c1);
+        verifyAckHolesIsMuchThanLimit(tpName, subscription);
 
         cancelPendingRead(tpName, subscription);
         triggerNewReadMoreEntries(tpName, subscription);
@@ -358,6 +433,7 @@ public class SubscriptionPauseOnAckStatPersistTest extends 
ProducerConsumerBase
                 .subscriptionType(subscriptionType)
                 .subscribe();
         ackOddMessagesOnly(c1);
+        verifyAckHolesIsMuchThanLimit(tpName, subscription);
 
         cancelPendingRead(tpName, subscription);
         triggerNewReadMoreEntries(tpName, subscription);
@@ -398,6 +474,7 @@ public class SubscriptionPauseOnAckStatPersistTest extends 
ProducerConsumerBase
         }
         // Make ack holes.
         ReceivedMessages receivedMessagesC1 = ackOddMessagesOnly(c1);
+        verifyAckHolesIsMuchThanLimit(tpName, subscription);
 
         cancelPendingRead(tpName, subscription);
         triggerNewReadMoreEntries(tpName, subscription);
@@ -454,6 +531,7 @@ public class SubscriptionPauseOnAckStatPersistTest extends 
ProducerConsumerBase
         }
         // Make ack holes.
         ReceivedMessages receivedMessagesC1AndC2 = ackOddMessagesOnly(c1, c2);
+        verifyAckHolesIsMuchThanLimit(tpName, subscription);
 
         cancelPendingRead(tpName, subscription);
         triggerNewReadMoreEntries(tpName, subscription);

Reply via email to