This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c3f1ee01a37 [improve] [broker] PIP-299-part-4: Add topic-level policy: 
dispatcherPauseOnAckStatePersistent (#21874)
c3f1ee01a37 is described below

commit c3f1ee01a376541dde8a5deda7ee99c0beb65dc0
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jan 15 10:52:01 2024 +0800

    [improve] [broker] PIP-299-part-4: Add topic-level policy: 
dispatcherPauseOnAckStatePersistent (#21874)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 28 ++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 81 +++++++++++++++++++++
 .../pulsar/broker/service/AbstractTopic.java       |  2 +
 .../broker/service/persistent/PersistentTopic.java |  4 +-
 .../api/SubscriptionPauseOnAckStatPersistTest.java | 83 ++++++++++++++++------
 .../apache/pulsar/client/admin/TopicPolicies.java  | 16 +++++
 .../client/admin/internal/TopicPoliciesImpl.java   | 21 ++++++
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  | 58 +++++++++++++++
 8 files changed, 271 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 1a94e7f86fe..5408557207d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3518,6 +3518,34 @@ public class PersistentTopicsBase extends AdminResource {
                 });
     }
 
+    protected CompletableFuture<Void> 
internalSetDispatcherPauseOnAckStatePersistent(boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+            .thenCompose(op -> {
+                TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                
topicPolicies.setDispatcherPauseOnAckStatePersistentEnabled(true);
+                topicPolicies.setIsGlobal(isGlobal);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+            });
+    }
+
+    protected CompletableFuture<Void> 
internalRemoveDispatcherPauseOnAckStatePersistent(boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+            .thenCompose(op -> {
+                if (!op.isPresent()) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                op.get().setDispatcherPauseOnAckStatePersistentEnabled(false);
+                return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
+            });
+    }
+
+    protected CompletableFuture<Boolean> 
internalGetDispatcherPauseOnAckStatePersistent(boolean applied,
+                                                                               
         boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+            .thenApply(op -> 
op.map(TopicPolicies::getDispatcherPauseOnAckStatePersistentEnabled)
+                .orElse(false));
+}
+
     protected CompletableFuture<PersistencePolicies> 
internalGetPersistence(boolean applied, boolean isGlobal) {
         return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenApply(op -> op.map(TopicPolicies::getPersistence)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9ccbc0ecba1..97531cf8ab0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2513,6 +2513,87 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             });
     }
 
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/dispatcherPauseOnAckStatePersistent")
+    @ApiOperation(value = "Set dispatcher pause on ack state persistent 
configuration for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Namespace or topic doesn't 
exist"),
+            @ApiResponse(code = 405, message =
+                    "Topic level policy is disabled, to enable the topic level 
policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void setDispatcherPauseOnAckStatePersistent(@Suspended final 
AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+            .thenCompose(__ -> 
internalSetDispatcherPauseOnAckStatePersistent(isGlobal))
+            .thenRun(() -> {
+                log.info("[{}] Successfully enabled 
dispatcherPauseOnAckStatePersistent: namespace={}, topic={}",
+                    clientAppId(), namespaceName, topicName.getLocalName());
+                asyncResponse.resume(Response.noContent().build());
+            })
+            .exceptionally(ex -> {
+                
handleTopicPolicyException("setDispatcherPauseOnAckStatePersistent", ex, 
asyncResponse);
+                return null;
+            });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/dispatcherPauseOnAckStatePersistent")
+    @ApiOperation(value = "Remove dispatcher pause on ack state persistent 
configuration for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Namespace or topic doesn't 
exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, to enable the 
topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeDispatcherPauseOnAckStatePersistent(@Suspended final 
AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+            .thenCompose(__ -> 
internalRemoveDispatcherPauseOnAckStatePersistent(isGlobal))
+            .thenRun(() -> {
+                log.info("[{}] Successfully remove 
dispatcherPauseOnAckStatePersistent: namespace={}, topic={}",
+                        clientAppId(), namespaceName, 
topicName.getLocalName());
+                asyncResponse.resume(Response.noContent().build());
+            })
+            .exceptionally(ex -> {
+                
handleTopicPolicyException("removeDispatcherPauseOnAckStatePersistent", ex, 
asyncResponse);
+                return null;
+            });
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/dispatcherPauseOnAckStatePersistent")
+    @ApiOperation(value = "Get dispatcher pause on ack state persistent config 
on a topic.", response = Integer.class)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"), })
+    public void getDispatcherPauseOnAckStatePersistent(@Suspended final 
AsyncResponse asyncResponse,
+                    @PathParam("tenant") String tenant,
+                    @PathParam("namespace") String namespace,
+                    @PathParam("topic") @Encoded String encodedTopic,
+                    @QueryParam("applied") @DefaultValue("false") boolean 
applied,
+                    @QueryParam("isGlobal") @DefaultValue("false") boolean 
isGlobal,
+                    @ApiParam(value = "Whether leader broker redirected this 
call to this broker. For internal use.")
+                    @QueryParam("authoritative") @DefaultValue("false") 
boolean authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+                .thenCompose(__ -> 
internalGetDispatcherPauseOnAckStatePersistent(applied, isGlobal))
+                .thenApply(asyncResponse::resume).exceptionally(ex -> {
+                    
handleTopicPolicyException("getDispatcherPauseOnAckStatePersistent", ex, 
asyncResponse);
+                    return null;
+                });
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/{topic}/persistence")
     @ApiOperation(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 598480191d0..e75ddb4f71c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -218,6 +218,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
                     
.updateTopicValue(formatSchemaCompatibilityStrategy(data.getSchemaCompatibilityStrategy()));
         }
         
topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies());
+        topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled()
+                
.updateTopicValue(data.getDispatcherPauseOnAckStatePersistentEnabled());
         
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
         
topicPolicies.getMaxUnackedMessagesOnConsumer().updateTopicValue(data.getMaxUnackedMessagesOnConsumer());
         topicPolicies.getMaxUnackedMessagesOnSubscription()
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e139cf583d2..8ae0546f051 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3138,6 +3138,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> 
checkPersistencePolicies()));
         applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
                 () -> preCreateSubscriptionForCompactionIfNeeded()));
+        applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
+                () -> updateBrokerDispatchPauseOnAckStatePersistentEnabled()));
 
         return applyPoliciesFutureList;
     }
@@ -3800,7 +3802,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 if (subscription.getDispatcher() == null) {
                     return;
                 }
-                subscription.getDispatcher().afterAckMessages(null, 0);
+                subscription.getDispatcher().checkAndResumeIfPaused();
             });
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
index 06298e2fdd2..06f66585f11 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
@@ -213,28 +213,75 @@ public class SubscriptionPauseOnAckStatPersistTest 
extends ProducerConsumerBase
         }
     }
 
-    @Test
-    public void testBrokerDynamicConfig() throws Exception {
+    @DataProvider(name = "typesOfSetDispatcherPauseOnAckStatePersistent")
+    public Object[][] typesOfSetDispatcherPauseOnAckStatePersistent() {
+        return new Object[][]{
+          {TypeOfUpdateTopicConfig.BROKER_CONF},
+          //{TypeOfUpdateTopicConfig.NAMESPACE_LEVEL_POLICY},
+          {TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY}
+        };
+    }
+
+    public enum TypeOfUpdateTopicConfig {
+        BROKER_CONF,
+        TOPIC_LEVEL_POLICY;
+    }
+
+    private void 
enableDispatcherPauseOnAckStatePersistentAndCreateTopic(String tpName, 
TypeOfUpdateTopicConfig type)
+            throws Exception {
+        if (type == TypeOfUpdateTopicConfig.BROKER_CONF) {
+            
admin.brokers().updateDynamicConfiguration("dispatcherPauseOnAckStatePersistentEnabled",
 "true");
+            admin.topics().createNonPartitionedTopic(tpName);
+        } else if (type == TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY) {
+            admin.topics().createNonPartitionedTopic(tpName);
+            
admin.topicPolicies().setDispatcherPauseOnAckStatePersistent(tpName).join();
+        }
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopic(tpName, false).join().get();
+            HierarchyTopicPolicies policies = 
WhiteboxImpl.getInternalState(persistentTopic, "topicPolicies");
+            
Assert.assertTrue(persistentTopic.isDispatcherPauseOnAckStatePersistentEnabled());
+            if (type == TypeOfUpdateTopicConfig.BROKER_CONF) {
+                
Assert.assertTrue(pulsar.getConfig().isDispatcherPauseOnAckStatePersistentEnabled());
+            } else if (type == TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY){
+                
Assert.assertTrue(policies.getDispatcherPauseOnAckStatePersistentEnabled().getTopicValue());
+                
Assert.assertTrue(admin.topicPolicies().getDispatcherPauseOnAckStatePersistent(tpName,
 false).join());
+            }
+        });
+    }
+
+    private void disableDispatcherPauseOnAckStatePersistent(String tpName, 
TypeOfUpdateTopicConfig type)
+            throws Exception {
+        if (type == TypeOfUpdateTopicConfig.BROKER_CONF) {
+            
admin.brokers().updateDynamicConfiguration("dispatcherPauseOnAckStatePersistentEnabled",
 "false");
+        } else if (type == TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY) {
+            
admin.topicPolicies().removeDispatcherPauseOnAckStatePersistent(tpName).join();
+        }
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopic(tpName, false).join().get();
+            HierarchyTopicPolicies policies = 
WhiteboxImpl.getInternalState(persistentTopic, "topicPolicies");
+            
Assert.assertFalse(persistentTopic.isDispatcherPauseOnAckStatePersistentEnabled());
+            if (type == TypeOfUpdateTopicConfig.BROKER_CONF) {
+                
Assert.assertFalse(pulsar.getConfig().isDispatcherPauseOnAckStatePersistentEnabled());
+            } else if (type == TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY){
+                
Assert.assertFalse(policies.getDispatcherPauseOnAckStatePersistentEnabled().getTopicValue());
+                
Assert.assertFalse(admin.topicPolicies().getDispatcherPauseOnAckStatePersistent(tpName,
 false).join());
+            }
+        });
+    }
+
+    @Test(dataProvider = "typesOfSetDispatcherPauseOnAckStatePersistent")
+    public void testBrokerDynamicConfig(TypeOfUpdateTopicConfig type) throws 
Exception {
         final String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
         final String subscription = "s1";
         final int msgSendCount = MAX_UNACKED_RANGES_TO_PERSIST * 4;
         final int incomingQueueSize = MAX_UNACKED_RANGES_TO_PERSIST * 10;
 
         // Enable "dispatcherPauseOnAckStatePersistentEnabled".
-        
admin.brokers().updateDynamicConfiguration("dispatcherPauseOnAckStatePersistentEnabled",
 "true");
-        admin.topics().createNonPartitionedTopic(tpName);
+        enableDispatcherPauseOnAckStatePersistentAndCreateTopic(tpName, type);
         admin.topics().createSubscription(tpName, subscription, 
MessageId.earliest);
 
-        PersistentTopic persistentTopic =
-                (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, 
false).join().get();
-        Awaitility.await().untilAsserted(() -> {
-            
Assert.assertTrue(pulsar.getConfig().isDispatcherPauseOnAckStatePersistentEnabled());
-            HierarchyTopicPolicies policies = 
WhiteboxImpl.getInternalState(persistentTopic, "topicPolicies");
-            Boolean v = 
policies.getDispatcherPauseOnAckStatePersistentEnabled().get();
-            Assert.assertNotNull(v);
-            Assert.assertTrue(v.booleanValue());
-        });
-
         // Send double MAX_UNACKED_RANGES_TO_PERSIST messages.
         Producer<String> p1 = 
pulsarClient.newProducer(Schema.STRING).topic(tpName).enableBatching(false).create();
         ArrayList<MessageId> messageIdsSent = new ArrayList<>();
@@ -259,13 +306,7 @@ public class SubscriptionPauseOnAckStatPersistTest extends 
ProducerConsumerBase
         Assert.assertNull(msg1, msg1 == null ? "null" : msg1.getValue());
 
         // Disable "dispatcherPauseOnAckStatePersistentEnabled".
-        
admin.brokers().updateDynamicConfiguration("dispatcherPauseOnAckStatePersistentEnabled",
 "false");
-        Awaitility.await().untilAsserted(() -> {
-            
Assert.assertFalse(pulsar.getConfig().isDispatcherPauseOnAckStatePersistentEnabled());
-            HierarchyTopicPolicies policies = 
WhiteboxImpl.getInternalState(persistentTopic, "topicPolicies");
-            Boolean v = 
policies.getDispatcherPauseOnAckStatePersistentEnabled().get();
-            Assert.assertTrue(v == null || !v.booleanValue());
-        });
+        disableDispatcherPauseOnAckStatePersistent(tpName, type);
 
         // Verify the new message can be received.
         Message<String> msg2 = c1.receive(2, TimeUnit.SECONDS);
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
index f6cd2a5a0ef..4238842bcfa 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
@@ -1913,4 +1913,20 @@ public interface TopicPolicies {
      *            Topic name
      */
     CompletableFuture<Void> removeAutoSubscriptionCreationAsync(String topic);
+
+    /**
+     * After enabling this feature, Pulsar will stop delivery messages to 
clients if the cursor metadata is too large to
+     * # persist, it will help to reduce the duplicates caused by the ack 
state that can not be fully persistent.
+     */
+    CompletableFuture<Void> setDispatcherPauseOnAckStatePersistent(String 
topic);
+
+    /**
+     * Removes the dispatcherPauseOnAckStatePersistentEnabled policy for a 
given topic asynchronously.
+     */
+    CompletableFuture<Void> removeDispatcherPauseOnAckStatePersistent(String 
topic);
+
+    /**
+     * Get the dispatcherPauseOnAckStatePersistentEnabled policy for a given 
topic asynchronously.
+     */
+    CompletableFuture<Boolean> getDispatcherPauseOnAckStatePersistent(String 
topic, boolean applied);
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
index 915b22a2589..f58fd865428 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
@@ -1260,6 +1260,27 @@ public class TopicPoliciesImpl extends BaseResource 
implements TopicPolicies {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public CompletableFuture<Void> 
setDispatcherPauseOnAckStatePersistent(String topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "dispatcherPauseOnAckStatePersistent");
+        return asyncPostRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public CompletableFuture<Void> 
removeDispatcherPauseOnAckStatePersistent(String topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "dispatcherPauseOnAckStatePersistent");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> 
getDispatcherPauseOnAckStatePersistent(String topic, boolean applied) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, 
"dispatcherPauseOnAckStatePersistent").queryParam("applied", applied);
+        return asyncGetRequest(path, new FutureCallback<Boolean>(){});
+    }
+
     /*
      * returns topic name with encoded Local Name
      */
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 6120f412edf..421ccec1403 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -165,6 +165,13 @@ public class CmdTopicPolicies extends CmdBase {
         jcommander.addCommand("set-auto-subscription-creation", new 
SetAutoSubscriptionCreation());
         jcommander.addCommand("get-auto-subscription-creation", new 
GetAutoSubscriptionCreation());
         jcommander.addCommand("remove-auto-subscription-creation", new 
RemoveAutoSubscriptionCreation());
+
+        jcommander.addCommand("set-dispatcher-pause-on-ack-state-persistent",
+                new SetDispatcherPauseOnAckStatePersistent());
+        jcommander.addCommand("get-dispatcher-pause-on-ack-state-persistent",
+                new GetDispatcherPauseOnAckStatePersistent());
+        
jcommander.addCommand("remove-dispatcher-pause-on-ack-state-persistent",
+                new RemoveDispatcherPauseOnAckStatePersistent());
     }
 
     @Parameters(commandDescription = "Get entry filters for a topic")
@@ -1931,6 +1938,57 @@ public class CmdTopicPolicies extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Enable 
dispatcherPauseOnAckStatePersistent for a topic")
+    private class SetDispatcherPauseOnAckStatePersistent extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to set 
this policy globally. "
+                + "If set to true, the policy will be replicate to other 
clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            
getTopicPolicies(isGlobal).setDispatcherPauseOnAckStatePersistent(persistentTopic);
+        }
+    }
+
+    @Parameters(commandDescription = "Get the 
dispatcherPauseOnAckStatePersistent for a topic")
+    private class GetDispatcherPauseOnAckStatePersistent extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--applied", "-a"}, description = "Get the applied 
policy of the topic")
+        private boolean applied = false;
+
+        @Parameter(names = {"--global", "-g"}, description = "Whether to get 
this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            
print(getTopicPolicies(isGlobal).getDispatcherPauseOnAckStatePersistent(persistentTopic,
 applied));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove 
dispatcherPauseOnAckStatePersistent for a topic")
+    private class RemoveDispatcherPauseOnAckStatePersistent extends CliCommand 
{
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--global", "-g"}, description = "Whether to 
remove this policy globally. "
+                + "If set to true, the policy will be replicate to other 
clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            
getTopicPolicies(isGlobal).removeDispatcherPauseOnAckStatePersistent(persistentTopic);
+        }
+    }
+
     private TopicPolicies getTopicPolicies(boolean isGlobal) {
         return getAdmin().topicPolicies(isGlobal);
     }

Reply via email to