This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 6580d14 [Issue 6043] Support force deleting subscription
6580d14 is described below
commit 6580d146b0bb85f37ef73956f2dac3229e874aeb
Author: Fangbin Sun <[email protected]>
AuthorDate: Wed Mar 25 09:42:19 2020 +0800
[Issue 6043] Support force deleting subscription
Signed-off-by: xiaolong.ran <[email protected]>
---
.../broker/admin/impl/PersistentTopicsBase.java | 91 +++++++++++++++++++++-
.../pulsar/broker/admin/v1/PersistentTopics.java | 10 ++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 7 +-
.../apache/pulsar/broker/service/Subscription.java | 2 +
.../nonpersistent/NonPersistentSubscription.java | 43 +++++++++-
.../service/persistent/PersistentSubscription.java | 43 +++++++++-
.../apache/pulsar/broker/admin/AdminApiTest.java | 50 ++++++++++++
.../pulsar/broker/admin/PersistentTopicsTest.java | 2 +-
.../org/apache/pulsar/client/admin/Topics.java | 43 ++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 20 +++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 4 +-
.../pulsar/admin/cli/CmdPersistentTopics.java | 6 +-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 6 +-
site2/docs/reference-pulsar-admin.md | 1 +
14 files changed, 315 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0322820..273901b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1045,6 +1045,14 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ protected void internalDeleteSubscription(AsyncResponse asyncResponse,
String subName, boolean authoritative, boolean force) {
+ if (force) {
+ internalDeleteSubscriptionForcefully(asyncResponse, subName,
authoritative);
+ } else {
+ internalDeleteSubscription(asyncResponse, subName, authoritative);
+ }
+ }
+
protected void internalDeleteSubscription(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
if (topicName.isGlobal()) {
try {
@@ -1067,7 +1075,7 @@ public class PersistentTopicsBase extends AdminResource {
TopicName topicNamePartition =
topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
-
.deleteSubscriptionAsync(topicNamePartition.toString(), subName));
+
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
} catch (Exception e) {
log.error("[{}] Failed to delete subscription {}
{}", clientAppId(), topicNamePartition, subName,
e);
@@ -1133,6 +1141,87 @@ public class PersistentTopicsBase extends AdminResource {
}
}
+ protected void internalDeleteSubscriptionForcefully(AsyncResponse
asyncResponse, String subName, boolean authoritative) {
+ if (topicName.isGlobal()) {
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to delete subscription forcefully {}
from topic {}", clientAppId(), subName, topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+ }
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (topicName.isPartitioned()) {
+
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse,
subName, authoritative);
+ } else {
+ getPartitionedTopicMetadataAsync(topicName, authoritative,
false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+ futures.add(pulsar().getAdminClient().topics()
+
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, true));
+ } catch (Exception e) {
+ log.error("[{}] Failed to delete subscription
forcefully {} {}", clientAppId(), topicNamePartition, subName,
+ e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
+
+ FutureUtil.waitForAll(futures).handle((result, exception)
-> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND, "Subscription not found"));
+ return null;
+ } else {
+ log.error("[{}] Failed to delete subscription
forcefully {} {}", clientAppId(), topicName, subName, t);
+ asyncResponse.resume(new RestException(t));
+ return null;
+ }
+ }
+
+ asyncResponse.resume(Response.noContent().build());
+ return null;
+ });
+ } else {
+
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse,
subName, authoritative);
+ }
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to delete subscription forcefully {}
from topic {}", clientAppId(), subName, topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+ }
+
+ private void
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse
asyncResponse, String subName, boolean authoritative) {
+ try {
+ validateAdminAccessForSubscriber(subName, authoritative);
+ Topic topic = getTopicReference(topicName);
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
+ return;
+ }
+ sub.deleteForcefully().get();
+ log.info("[{}][{}] Deleted subscription forcefully {}",
clientAppId(), topicName, subName);
+ asyncResponse.resume(Response.noContent().build());
+ } catch (Exception e) {
+ log.error("[{}] Failed to delete subscription forcefully {} from
topic {}", clientAppId(), subName, topicName, e);
+ if (e instanceof WebApplicationException) {
+ asyncResponse.resume(e);
+ } else {
+ log.error("[{}] Failed to delete subscription forcefully {}
{}", clientAppId(), topicName, subName, e);
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+ }
+
protected void internalSkipAllMessages(AsyncResponse asyncResponse, String
subName, boolean authoritative) {
if (topicName.isGlobal()) {
try {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 027e7bb..9c2cde1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -343,17 +343,21 @@ public class PersistentTopics extends
PersistentTopicsBase {
@DELETE
@Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}")
- @ApiOperation(hidden = true, value = "Delete a subscription.", notes =
"There should not be any active consumers on the subscription.")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiOperation(hidden = true, value = "Delete a subscription.", notes =
"The subscription cannot be deleted if delete is not forcefully and there are
any active consumers attached to it. "
+ + "Force delete ignores connected consumers and deletes
subscription by explicitly closing them.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Subscription has active
consumers") })
public void deleteSubscription(@Suspended final AsyncResponse
asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace")
String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("subName") String encodedSubName,
+ @QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalDeleteSubscription(asyncResponse, decode(encodedSubName),
authoritative);
+ internalDeleteSubscription(asyncResponse, decode(encodedSubName),
authoritative, force);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 6f187ff..ab05c06 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -589,7 +589,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@DELETE
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}")
- @ApiOperation(value = "Delete a subscription.", notes = "There should not
be any active consumers on the subscription.")
+ @ApiOperation(value = "Delete a subscription.", notes = "The subscription
cannot be deleted if delete is not forcefully and there are any active
consumers attached to it. "
+ + "Force delete ignores connected consumers and deletes
subscription by explicitly closing them.")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to
administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@@ -607,11 +608,13 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription to be deleted")
@PathParam("subName") String encodedSubName,
+ @ApiParam(value = "Disconnect and close all consumers and delete
subscription forcefully", defaultValue = "false", type = "boolean")
+ @QueryParam("force") @DefaultValue("false") boolean force,
@ApiParam(value = "Is authentication required to perform this
operation")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
- internalDeleteSubscription(asyncResponse, decode(encodedSubName),
authoritative);
+ internalDeleteSubscription(asyncResponse, decode(encodedSubName),
authoritative, force);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index f7d9687..85b5415 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -65,6 +65,8 @@ public interface Subscription {
CompletableFuture<Void> delete();
+ CompletableFuture<Void> deleteForcefully();
+
CompletableFuture<Void> disconnect();
CompletableFuture<Void> doUnsubscribe(Consumer consumer);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 316024a..d3a6197 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -304,12 +304,53 @@ public class NonPersistentSubscription implements
Subscription {
*/
@Override
public CompletableFuture<Void> delete() {
+ return delete(false);
+ }
+
+ /**
+ * Forcefully close all consumers and deletes the subscription.
+ * @return
+ */
+ @Override
+ public CompletableFuture<Void> deleteForcefully() {
+ return delete(true);
+ }
+
+ /**
+ * Delete the subscription by closing and deleting its managed cursor.
Handle unsubscribe call from admin layer.
+ *
+ * @param closeIfConsumersConnected
+ * Flag indicate whether explicitly close connected consumers
before trying to delete subscription. If
+ * any consumer is connected to it and if this flag is disable
then this operation fails.
+ * @return CompletableFuture indicating the completion of delete operation
+ */
+ private CompletableFuture<Void> delete(boolean closeIfConsumersConnected) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
log.info("[{}][{}] Unsubscribing", topicName, subName);
+ CompletableFuture<Void> closeSubscriptionFuture = new
CompletableFuture<>();
+
+ if (closeIfConsumersConnected) {
+ this.disconnect().thenRun(() -> {
+ closeSubscriptionFuture.complete(null);
+ }).exceptionally(ex -> {
+ log.error("[{}][{}] Error disconnecting and closing
subscription", topicName, subName, ex);
+ closeSubscriptionFuture.completeExceptionally(ex);
+ return null;
+ });
+ } else {
+ this.close().thenRun(() -> {
+ closeSubscriptionFuture.complete(null);
+ }).exceptionally(exception -> {
+ log.error("[{}][{}] Error closing subscription", topicName,
subName, exception);
+ closeSubscriptionFuture.completeExceptionally(exception);
+ return null;
+ });
+ }
+
// cursor close handles pending delete (ack) operations
- this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v
-> {
+ closeSubscriptionFuture.thenCompose(v ->
topic.unsubscribe(subName)).thenAccept(v -> {
synchronized (this) {
(dispatcher != null ? dispatcher.close() :
CompletableFuture.completedFuture(null)).thenRun(() -> {
log.info("[{}][{}] Successfully deleted subscription",
topicName, subName);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 25633f9..ea4883b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -850,12 +850,53 @@ public class PersistentSubscription implements
Subscription {
*/
@Override
public CompletableFuture<Void> delete() {
+ return delete(false);
+ }
+
+ /**
+ * Forcefully close all consumers and deletes the subscription.
+ * @return
+ */
+ @Override
+ public CompletableFuture<Void> deleteForcefully() {
+ return delete(true);
+ }
+
+ /**
+ * Delete the subscription by closing and deleting its managed cursor.
Handle unsubscribe call from admin layer.
+ *
+ * @param closeIfConsumersConnected
+ * Flag indicate whether explicitly close connected consumers
before trying to delete subscription. If
+ * any consumer is connected to it and if this flag is disable
then this operation fails.
+ * @return CompletableFuture indicating the completion of delete operation
+ */
+ private CompletableFuture<Void> delete(boolean closeIfConsumersConnected) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
log.info("[{}][{}] Unsubscribing", topicName, subName);
+ CompletableFuture<Void> closeSubscriptionFuture = new
CompletableFuture<>();
+
+ if (closeIfConsumersConnected) {
+ this.disconnect().thenRun(() -> {
+ closeSubscriptionFuture.complete(null);
+ }).exceptionally(ex -> {
+ log.error("[{}][{}] Error disconnecting and closing
subscription", topicName, subName, ex);
+ closeSubscriptionFuture.completeExceptionally(ex);
+ return null;
+ });
+ } else {
+ this.close().thenRun(() -> {
+ closeSubscriptionFuture.complete(null);
+ }).exceptionally(exception -> {
+ log.error("[{}][{}] Error closing subscription", topicName,
subName, exception);
+ closeSubscriptionFuture.completeExceptionally(exception);
+ return null;
+ });
+ }
+
// cursor close handles pending delete (ack) operations
- this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v
-> {
+ closeSubscriptionFuture.thenCompose(v ->
topic.unsubscribe(subName)).thenAccept(v -> {
synchronized (this) {
(dispatcher != null ? dispatcher.close() :
CompletableFuture.completedFuture(null)).thenRun(() -> {
log.info("[{}][{}] Successfully deleted subscription",
topicName, subName);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 82808b1..d1292fe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -84,6 +84,7 @@ import org.apache.pulsar.client.admin.internal.TenantsImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -1302,6 +1303,55 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
admin.topics().delete("persistent://prop-xyz/ns1-bundles/ds2");
}
+ @Test
+ public void testDeleteSubscription() throws Exception {
+ final String subName = "test-sub";
+ final String persistentTopicName =
"persistent://prop-xyz/ns1/test-sub-topic";
+
+ // disable auto subscription creation
+ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+
+ // create a topic and produce some messages
+ publishMessagesOnPersistentTopic(persistentTopicName, 5);
+ assertEquals(admin.topics().getList("prop-xyz/ns1"),
+ Lists.newArrayList(persistentTopicName));
+
+ // create the subscription by PulsarAdmin
+ admin.topics().createSubscription(persistentTopicName, subName,
MessageId.earliest);
+
+ assertEquals(admin.topics().getSubscriptions(persistentTopicName),
Lists.newArrayList(subName));
+
+ // create consumer and subscription
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getWebServiceAddress())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ Consumer<byte[]> consumer =
client.newConsumer().topic(persistentTopicName).subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Exclusive).subscribe();
+
+ // try to delete the subscription with a connected consumer
+ try {
+ admin.topics().deleteSubscription(persistentTopicName, subName);
+ fail("should have failed");
+ } catch (PulsarAdminException.PreconditionFailedException e) {
+ assertEquals(e.getStatusCode(),
Status.PRECONDITION_FAILED.getStatusCode());
+ }
+
+ // failed to delete the subscription
+ assertEquals(admin.topics().getSubscriptions(persistentTopicName),
Lists.newArrayList(subName));
+
+ // try to delete the subscription with a connected consumer forcefully
+ admin.topics().deleteSubscription(persistentTopicName, subName, true);
+
+ // delete the subscription successfully
+
assertEquals(admin.topics().getSubscriptions(persistentTopicName).size(), 0);
+
+ // reset to default
+ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+
+ client.close();
+ }
+
@Test(dataProvider = "bundling")
public void testClearBacklogOnNamespace(Integer numBundles) throws
Exception {
admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index bec3101..d2ef142 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -183,7 +183,7 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
// 6) Delete the subscription
response = mock(AsyncResponse.class);
- persistentTopics.deleteSubscription(response, testTenant,
testNamespace, testLocalTopicName, "test", true);
+ persistentTopics.deleteSubscription(response, testTenant,
testNamespace, testLocalTopicName, "test", false,true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 0a00462..4df3c31 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -792,6 +792,31 @@ public interface Topics {
void deleteSubscription(String topic, String subName) throws
PulsarAdminException;
/**
+ * Delete a subscription.
+ * <p>
+ * Delete a persistent subscription from a topic. There should not be any
active consumers on the subscription.
+ * Force flag deletes subscription forcefully by closing all active
consumers.
+ * <p>
+ *
+ * @param topic
+ * topic name
+ * @param subName
+ * Subscription name
+ * @param force
+ * Delete topic forcefully
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Topic or subscription does not exist
+ * @throws PreconditionFailedException
+ * Subscription has active consumers
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void deleteSubscription(String topic, String subName, boolean force)
throws PulsarAdminException;
+
+ /**
* Delete a subscription asynchronously.
* <p>
* Delete a persistent subscription from a topic. There should not be any
active consumers on the subscription.
@@ -807,6 +832,24 @@ public interface Topics {
CompletableFuture<Void> deleteSubscriptionAsync(String topic, String
subName);
/**
+ * Delete a subscription asynchronously.
+ * <p>
+ * Delete a persistent subscription from a topic. There should not be any
active consumers on the subscription.
+ * Force flag deletes subscription forcefully by closing all active
consumers.
+ * <p>
+ *
+ * @param topic
+ * topic name
+ * @param subName
+ * Subscription name
+ * @param force
+ * Delete topic forcefully
+ *
+ * @return a future that can be used to track when the subscription is
deleted
+ */
+ CompletableFuture<Void> deleteSubscriptionAsync(String topic, String
subName, boolean force);
+
+ /**
* Skip all messages on a topic subscription.
* <p>
* Completely clears the backlog on the subscription.
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 0b9b3d6..01491ac 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -634,10 +634,30 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
@Override
+ public void deleteSubscription(String topic, String subName, boolean
force) throws PulsarAdminException {
+ try {
+ deleteSubscriptionAsync(topic, subName,
force).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
public CompletableFuture<Void> deleteSubscriptionAsync(String topic,
String subName) {
+ return deleteSubscriptionAsync(topic, subName, false);
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteSubscriptionAsync(String topic,
String subName, boolean force) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subName);
WebTarget path = topicPath(tn, "subscription", encodedSubName);
+ path = path.queryParam("force", force);
return asyncDeleteRequest(path);
}
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 0a9d119..10a62e9 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -637,7 +637,7 @@ public class PulsarAdminToolTest {
verify(mockTopics).getSubscriptions("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("unsubscribe persistent://myprop/clust/ns1/ds1 -s
sub1"));
-
verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1",
"sub1");
+
verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1",
"sub1", false);
cmdTopics.run(split("stats persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1",
false);
@@ -723,7 +723,7 @@ public class PulsarAdminToolTest {
verify(mockTopics).getSubscriptions("persistent://myprop/clust/ns1/ds1");
topics.run(split("unsubscribe persistent://myprop/clust/ns1/ds1 -s
sub1"));
-
verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1",
"sub1");
+
verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1",
"sub1", false);
topics.run(split("stats persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1");
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index 9df331f..64d63e0 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -297,13 +297,17 @@ public class CmdPersistentTopics extends CmdBase {
@Parameter(description =
"persistent://property/cluster/namespace/topic", required = true)
private java.util.List<String> params;
+ @Parameter(names = { "-f",
+ "--force" }, description = "Disconnect and close all consumers and
delete subscription forcefully")
+ private boolean force = false;
+
@Parameter(names = { "-s", "--subscription" }, description =
"Subscription to be deleted", required = true)
private String subName;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
- persistentTopics.deleteSubscription(persistentTopic, subName);
+ persistentTopics.deleteSubscription(persistentTopic, subName,
force);
}
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index f5954cf..079405c 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -354,13 +354,17 @@ public class CmdTopics extends CmdBase {
@Parameter(description = "persistent://tenant/namespace/topic",
required = true)
private java.util.List<String> params;
+ @Parameter(names = { "-f",
+ "--force" }, description = "Disconnect and close all consumers and
delete subscription forcefully")
+ private boolean force = false;
+
@Parameter(names = { "-s", "--subscription" }, description =
"Subscription to be deleted", required = true)
private String subName;
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
- topics.deleteSubscription(topic, subName);
+ topics.deleteSubscription(topic, subName, force);
}
}
diff --git a/site2/docs/reference-pulsar-admin.md
b/site2/docs/reference-pulsar-admin.md
index ef0dfa7..fbd8927 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1910,6 +1910,7 @@ Options
|Flag|Description|Default|
|---|---|---|
|`-s`, `--subscription`|The subscription to delete||
+|`-f`, `--force`|Disconnect and close all consumers and delete subscription
forcefully|false|
### `stats`