This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c3f1ee01a37 [improve] [broker] PIP-299-part-4: Add topic-level policy:
dispatcherPauseOnAckStatePersistent (#21874)
c3f1ee01a37 is described below
commit c3f1ee01a376541dde8a5deda7ee99c0beb65dc0
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jan 15 10:52:01 2024 +0800
[improve] [broker] PIP-299-part-4: Add topic-level policy:
dispatcherPauseOnAckStatePersistent (#21874)
---
.../broker/admin/impl/PersistentTopicsBase.java | 28 ++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 81 +++++++++++++++++++++
.../pulsar/broker/service/AbstractTopic.java | 2 +
.../broker/service/persistent/PersistentTopic.java | 4 +-
.../api/SubscriptionPauseOnAckStatPersistTest.java | 83 ++++++++++++++++------
.../apache/pulsar/client/admin/TopicPolicies.java | 16 +++++
.../client/admin/internal/TopicPoliciesImpl.java | 21 ++++++
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 58 +++++++++++++++
8 files changed, 271 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 1a94e7f86fe..5408557207d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3518,6 +3518,34 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ protected CompletableFuture<Void>
internalSetDispatcherPauseOnAckStatePersistent(boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+
topicPolicies.setDispatcherPauseOnAckStatePersistentEnabled(true);
+ topicPolicies.setIsGlobal(isGlobal);
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies);
+ });
+ }
+
+ protected CompletableFuture<Void>
internalRemoveDispatcherPauseOnAckStatePersistent(boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenCompose(op -> {
+ if (!op.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ op.get().setDispatcherPauseOnAckStatePersistentEnabled(false);
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
op.get());
+ });
+ }
+
+ protected CompletableFuture<Boolean>
internalGetDispatcherPauseOnAckStatePersistent(boolean applied,
+
boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+ .thenApply(op ->
op.map(TopicPolicies::getDispatcherPauseOnAckStatePersistentEnabled)
+ .orElse(false));
+}
+
protected CompletableFuture<PersistencePolicies>
internalGetPersistence(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getPersistence)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9ccbc0ecba1..97531cf8ab0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2513,6 +2513,87 @@ public class PersistentTopics extends
PersistentTopicsBase {
});
}
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/dispatcherPauseOnAckStatePersistent")
+ @ApiOperation(value = "Set dispatcher pause on ack state persistent
configuration for specified topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Namespace or topic doesn't
exist"),
+ @ApiResponse(code = 405, message =
+ "Topic level policy is disabled, to enable the topic level
policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void setDispatcherPauseOnAckStatePersistent(@Suspended final
AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ preValidation(authoritative)
+ .thenCompose(__ ->
internalSetDispatcherPauseOnAckStatePersistent(isGlobal))
+ .thenRun(() -> {
+ log.info("[{}] Successfully enabled
dispatcherPauseOnAckStatePersistent: namespace={}, topic={}",
+ clientAppId(), namespaceName, topicName.getLocalName());
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+
handleTopicPolicyException("setDispatcherPauseOnAckStatePersistent", ex,
asyncResponse);
+ return null;
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/dispatcherPauseOnAckStatePersistent")
+ @ApiOperation(value = "Remove dispatcher pause on ack state persistent
configuration for specified topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Namespace or topic doesn't
exist"),
+ @ApiResponse(code = 405,
+ message = "Topic level policy is disabled, to enable the
topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removeDispatcherPauseOnAckStatePersistent(@Suspended final
AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+ @ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ preValidation(authoritative)
+ .thenCompose(__ ->
internalRemoveDispatcherPauseOnAckStatePersistent(isGlobal))
+ .thenRun(() -> {
+ log.info("[{}] Successfully remove
dispatcherPauseOnAckStatePersistent: namespace={}, topic={}",
+ clientAppId(), namespaceName,
topicName.getLocalName());
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+
handleTopicPolicyException("removeDispatcherPauseOnAckStatePersistent", ex,
asyncResponse);
+ return null;
+ });
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/dispatcherPauseOnAckStatePersistent")
+ @ApiOperation(value = "Get dispatcher pause on ack state persistent config
on a topic.", response = Integer.class)
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
or topic doesn't exist"),
+ @ApiResponse(code = 500, message = "Internal server error"), })
+ public void getDispatcherPauseOnAckStatePersistent(@Suspended final
AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("applied") @DefaultValue("false") boolean
applied,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean
isGlobal,
+ @ApiParam(value = "Whether leader broker redirected this
call to this broker. For internal use.")
+ @QueryParam("authoritative") @DefaultValue("false")
boolean authoritative) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ preValidation(authoritative)
+ .thenCompose(__ ->
internalGetDispatcherPauseOnAckStatePersistent(applied, isGlobal))
+ .thenApply(asyncResponse::resume).exceptionally(ex -> {
+
handleTopicPolicyException("getDispatcherPauseOnAckStatePersistent", ex,
asyncResponse);
+ return null;
+ });
+ }
+
@GET
@Path("/{tenant}/{namespace}/{topic}/persistence")
@ApiOperation(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 598480191d0..e75ddb4f71c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -218,6 +218,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
.updateTopicValue(formatSchemaCompatibilityStrategy(data.getSchemaCompatibilityStrategy()));
}
topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies());
+ topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled()
+
.updateTopicValue(data.getDispatcherPauseOnAckStatePersistentEnabled());
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxUnackedMessagesOnConsumer().updateTopicValue(data.getMaxUnackedMessagesOnConsumer());
topicPolicies.getMaxUnackedMessagesOnSubscription()
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e139cf583d2..8ae0546f051 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3138,6 +3138,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() ->
checkPersistencePolicies()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
() -> preCreateSubscriptionForCompactionIfNeeded()));
+ applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
+ () -> updateBrokerDispatchPauseOnAckStatePersistentEnabled()));
return applyPoliciesFutureList;
}
@@ -3800,7 +3802,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
if (subscription.getDispatcher() == null) {
return;
}
- subscription.getDispatcher().afterAckMessages(null, 0);
+ subscription.getDispatcher().checkAndResumeIfPaused();
});
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
index 06298e2fdd2..06f66585f11 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
@@ -213,28 +213,75 @@ public class SubscriptionPauseOnAckStatPersistTest
extends ProducerConsumerBase
}
}
- @Test
- public void testBrokerDynamicConfig() throws Exception {
+ @DataProvider(name = "typesOfSetDispatcherPauseOnAckStatePersistent")
+ public Object[][] typesOfSetDispatcherPauseOnAckStatePersistent() {
+ return new Object[][]{
+ {TypeOfUpdateTopicConfig.BROKER_CONF},
+ //{TypeOfUpdateTopicConfig.NAMESPACE_LEVEL_POLICY},
+ {TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY}
+ };
+ }
+
+ public enum TypeOfUpdateTopicConfig {
+ BROKER_CONF,
+ TOPIC_LEVEL_POLICY;
+ }
+
+ private void
enableDispatcherPauseOnAckStatePersistentAndCreateTopic(String tpName,
TypeOfUpdateTopicConfig type)
+ throws Exception {
+ if (type == TypeOfUpdateTopicConfig.BROKER_CONF) {
+
admin.brokers().updateDynamicConfiguration("dispatcherPauseOnAckStatePersistentEnabled",
"true");
+ admin.topics().createNonPartitionedTopic(tpName);
+ } else if (type == TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY) {
+ admin.topics().createNonPartitionedTopic(tpName);
+
admin.topicPolicies().setDispatcherPauseOnAckStatePersistent(tpName).join();
+ }
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(tpName, false).join().get();
+ HierarchyTopicPolicies policies =
WhiteboxImpl.getInternalState(persistentTopic, "topicPolicies");
+
Assert.assertTrue(persistentTopic.isDispatcherPauseOnAckStatePersistentEnabled());
+ if (type == TypeOfUpdateTopicConfig.BROKER_CONF) {
+
Assert.assertTrue(pulsar.getConfig().isDispatcherPauseOnAckStatePersistentEnabled());
+ } else if (type == TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY){
+
Assert.assertTrue(policies.getDispatcherPauseOnAckStatePersistentEnabled().getTopicValue());
+
Assert.assertTrue(admin.topicPolicies().getDispatcherPauseOnAckStatePersistent(tpName,
false).join());
+ }
+ });
+ }
+
+ private void disableDispatcherPauseOnAckStatePersistent(String tpName,
TypeOfUpdateTopicConfig type)
+ throws Exception {
+ if (type == TypeOfUpdateTopicConfig.BROKER_CONF) {
+
admin.brokers().updateDynamicConfiguration("dispatcherPauseOnAckStatePersistentEnabled",
"false");
+ } else if (type == TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY) {
+
admin.topicPolicies().removeDispatcherPauseOnAckStatePersistent(tpName).join();
+ }
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(tpName, false).join().get();
+ HierarchyTopicPolicies policies =
WhiteboxImpl.getInternalState(persistentTopic, "topicPolicies");
+
Assert.assertFalse(persistentTopic.isDispatcherPauseOnAckStatePersistentEnabled());
+ if (type == TypeOfUpdateTopicConfig.BROKER_CONF) {
+
Assert.assertFalse(pulsar.getConfig().isDispatcherPauseOnAckStatePersistentEnabled());
+ } else if (type == TypeOfUpdateTopicConfig.TOPIC_LEVEL_POLICY){
+
Assert.assertFalse(policies.getDispatcherPauseOnAckStatePersistentEnabled().getTopicValue());
+
Assert.assertFalse(admin.topicPolicies().getDispatcherPauseOnAckStatePersistent(tpName,
false).join());
+ }
+ });
+ }
+
+ @Test(dataProvider = "typesOfSetDispatcherPauseOnAckStatePersistent")
+ public void testBrokerDynamicConfig(TypeOfUpdateTopicConfig type) throws
Exception {
final String tpName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String subscription = "s1";
final int msgSendCount = MAX_UNACKED_RANGES_TO_PERSIST * 4;
final int incomingQueueSize = MAX_UNACKED_RANGES_TO_PERSIST * 10;
// Enable "dispatcherPauseOnAckStatePersistentEnabled".
-
admin.brokers().updateDynamicConfiguration("dispatcherPauseOnAckStatePersistentEnabled",
"true");
- admin.topics().createNonPartitionedTopic(tpName);
+ enableDispatcherPauseOnAckStatePersistentAndCreateTopic(tpName, type);
admin.topics().createSubscription(tpName, subscription,
MessageId.earliest);
- PersistentTopic persistentTopic =
- (PersistentTopic) pulsar.getBrokerService().getTopic(tpName,
false).join().get();
- Awaitility.await().untilAsserted(() -> {
-
Assert.assertTrue(pulsar.getConfig().isDispatcherPauseOnAckStatePersistentEnabled());
- HierarchyTopicPolicies policies =
WhiteboxImpl.getInternalState(persistentTopic, "topicPolicies");
- Boolean v =
policies.getDispatcherPauseOnAckStatePersistentEnabled().get();
- Assert.assertNotNull(v);
- Assert.assertTrue(v.booleanValue());
- });
-
// Send double MAX_UNACKED_RANGES_TO_PERSIST messages.
Producer<String> p1 =
pulsarClient.newProducer(Schema.STRING).topic(tpName).enableBatching(false).create();
ArrayList<MessageId> messageIdsSent = new ArrayList<>();
@@ -259,13 +306,7 @@ public class SubscriptionPauseOnAckStatPersistTest extends
ProducerConsumerBase
Assert.assertNull(msg1, msg1 == null ? "null" : msg1.getValue());
// Disable "dispatcherPauseOnAckStatePersistentEnabled".
-
admin.brokers().updateDynamicConfiguration("dispatcherPauseOnAckStatePersistentEnabled",
"false");
- Awaitility.await().untilAsserted(() -> {
-
Assert.assertFalse(pulsar.getConfig().isDispatcherPauseOnAckStatePersistentEnabled());
- HierarchyTopicPolicies policies =
WhiteboxImpl.getInternalState(persistentTopic, "topicPolicies");
- Boolean v =
policies.getDispatcherPauseOnAckStatePersistentEnabled().get();
- Assert.assertTrue(v == null || !v.booleanValue());
- });
+ disableDispatcherPauseOnAckStatePersistent(tpName, type);
// Verify the new message can be received.
Message<String> msg2 = c1.receive(2, TimeUnit.SECONDS);
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
index f6cd2a5a0ef..4238842bcfa 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
@@ -1913,4 +1913,20 @@ public interface TopicPolicies {
* Topic name
*/
CompletableFuture<Void> removeAutoSubscriptionCreationAsync(String topic);
+
+ /**
+ * After enabling this feature, Pulsar will stop delivery messages to
clients if the cursor metadata is too large to
+ * # persist, it will help to reduce the duplicates caused by the ack
state that can not be fully persistent.
+ */
+ CompletableFuture<Void> setDispatcherPauseOnAckStatePersistent(String
topic);
+
+ /**
+ * Removes the dispatcherPauseOnAckStatePersistentEnabled policy for a
given topic asynchronously.
+ */
+ CompletableFuture<Void> removeDispatcherPauseOnAckStatePersistent(String
topic);
+
+ /**
+ * Get the dispatcherPauseOnAckStatePersistentEnabled policy for a given
topic asynchronously.
+ */
+ CompletableFuture<Boolean> getDispatcherPauseOnAckStatePersistent(String
topic, boolean applied);
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
index 915b22a2589..f58fd865428 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
@@ -1260,6 +1260,27 @@ public class TopicPoliciesImpl extends BaseResource
implements TopicPolicies {
return asyncDeleteRequest(path);
}
+ @Override
+ public CompletableFuture<Void>
setDispatcherPauseOnAckStatePersistent(String topic) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "dispatcherPauseOnAckStatePersistent");
+ return asyncPostRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public CompletableFuture<Void>
removeDispatcherPauseOnAckStatePersistent(String topic) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "dispatcherPauseOnAckStatePersistent");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
getDispatcherPauseOnAckStatePersistent(String topic, boolean applied) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn,
"dispatcherPauseOnAckStatePersistent").queryParam("applied", applied);
+ return asyncGetRequest(path, new FutureCallback<Boolean>(){});
+ }
+
/*
* returns topic name with encoded Local Name
*/
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 6120f412edf..421ccec1403 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -165,6 +165,13 @@ public class CmdTopicPolicies extends CmdBase {
jcommander.addCommand("set-auto-subscription-creation", new
SetAutoSubscriptionCreation());
jcommander.addCommand("get-auto-subscription-creation", new
GetAutoSubscriptionCreation());
jcommander.addCommand("remove-auto-subscription-creation", new
RemoveAutoSubscriptionCreation());
+
+ jcommander.addCommand("set-dispatcher-pause-on-ack-state-persistent",
+ new SetDispatcherPauseOnAckStatePersistent());
+ jcommander.addCommand("get-dispatcher-pause-on-ack-state-persistent",
+ new GetDispatcherPauseOnAckStatePersistent());
+
jcommander.addCommand("remove-dispatcher-pause-on-ack-state-persistent",
+ new RemoveDispatcherPauseOnAckStatePersistent());
}
@Parameters(commandDescription = "Get entry filters for a topic")
@@ -1931,6 +1938,57 @@ public class CmdTopicPolicies extends CmdBase {
}
}
+ @Parameters(commandDescription = "Enable
dispatcherPauseOnAckStatePersistent for a topic")
+ private class SetDispatcherPauseOnAckStatePersistent extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to set
this policy globally. "
+ + "If set to true, the policy will be replicate to other
clusters asynchronously")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+
getTopicPolicies(isGlobal).setDispatcherPauseOnAckStatePersistent(persistentTopic);
+ }
+ }
+
+ @Parameters(commandDescription = "Get the
dispatcherPauseOnAckStatePersistent for a topic")
+ private class GetDispatcherPauseOnAckStatePersistent extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"--applied", "-a"}, description = "Get the applied
policy of the topic")
+ private boolean applied = false;
+
+ @Parameter(names = {"--global", "-g"}, description = "Whether to get
this policy globally. "
+ + "If set to true, broker returned global topic policies")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+
print(getTopicPolicies(isGlobal).getDispatcherPauseOnAckStatePersistent(persistentTopic,
applied));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove
dispatcherPauseOnAckStatePersistent for a topic")
+ private class RemoveDispatcherPauseOnAckStatePersistent extends CliCommand
{
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"--global", "-g"}, description = "Whether to
remove this policy globally. "
+ + "If set to true, the policy will be replicate to other
clusters asynchronously")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+
getTopicPolicies(isGlobal).removeDispatcherPauseOnAckStatePersistent(persistentTopic);
+ }
+ }
+
private TopicPolicies getTopicPolicies(boolean isGlobal) {
return getAdmin().topicPolicies(isGlobal);
}