This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 706b588860c [cleanup][admin] Remove unused methods in
PersistentTopicsBase (#22424)
706b588860c is described below
commit 706b588860c93d2a8a5f54bd3db0d10c004699db
Author: 道君 <[email protected]>
AuthorDate: Thu Apr 4 23:01:00 2024 +0800
[cleanup][admin] Remove unused methods in PersistentTopicsBase (#22424)
---
.../broker/admin/impl/PersistentTopicsBase.java | 101 ---------------------
1 file changed, 101 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 424c081d987..c5e280c5577 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1711,107 +1711,6 @@ public class PersistentTopicsBase extends AdminResource
{
});
}
- protected void internalDeleteSubscriptionForcefully(AsyncResponse
asyncResponse,
- String subName,
boolean authoritative) {
- CompletableFuture<Void> future;
- if (topicName.isGlobal()) {
- future = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- future = CompletableFuture.completedFuture(null);
- }
-
- future.thenAccept(__ -> {
- if (topicName.isPartitioned()) {
-
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse,
subName, authoritative);
- } else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<Void>> futures = new
ArrayList<>();
-
- for (int i = 0; i < partitionMetadata.partitions; i++)
{
- TopicName topicNamePartition =
topicName.getPartition(i);
- try {
- futures.add(pulsar().getAdminClient().topics()
-
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, true));
- } catch (Exception e) {
- log.error("[{}] Failed to delete subscription
forcefully {} {}",
- clientAppId(), topicNamePartition,
subName,
- e);
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
-
- FutureUtil.waitForAll(futures).handle((result,
exception) -> {
- if (exception != null) {
- Throwable t = exception.getCause();
- if (t instanceof NotFoundException) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
-
getSubNotFoundErrorMessage(topicName.toString(), subName)));
- return null;
- } else {
- log.error("[{}] Failed to delete
subscription forcefully {} {}",
- clientAppId(), topicName, subName,
t);
- asyncResponse.resume(new RestException(t));
- return null;
- }
- }
-
- asyncResponse.resume(Response.noContent().build());
- return null;
- });
- } else {
-
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse,
subName,
- authoritative);
- }
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to delete subscription
forcefully {} from topic {}",
- clientAppId(), subName, topicName, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
- }
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to delete subscription {} from topic
{}",
- clientAppId(), subName, topicName, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
- }
-
- private void
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse
asyncResponse,
-
String subName, boolean authoritative) {
- validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.UNSUBSCRIBE, subName))
- .thenCompose(__ -> getTopicReferenceAsync(topicName))
- .thenCompose(topic -> {
- Subscription sub = topic.getSubscription(subName);
- if (sub == null) {
- throw new RestException(Status.NOT_FOUND,
-
getSubNotFoundErrorMessage(topicName.toString(), subName));
- }
- return sub.deleteForcefully();
- }).thenRun(() -> {
- log.info("[{}][{}] Deleted subscription forcefully {}",
clientAppId(), topicName, subName);
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we need to
log it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to delete subscription
forcefully {} {}",
- clientAppId(), topicName, subName, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
- }
-
protected void internalSkipAllMessages(AsyncResponse asyncResponse, String
subName, boolean authoritative) {
CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
future.thenCompose(__ -> {