This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 706b588860c [cleanup][admin] Remove unused methods in 
PersistentTopicsBase (#22424)
706b588860c is described below

commit 706b588860c93d2a8a5f54bd3db0d10c004699db
Author: 道君 <[email protected]>
AuthorDate: Thu Apr 4 23:01:00 2024 +0800

    [cleanup][admin] Remove unused methods in PersistentTopicsBase (#22424)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 101 ---------------------
 1 file changed, 101 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 424c081d987..c5e280c5577 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1711,107 +1711,6 @@ public class PersistentTopicsBase extends AdminResource 
{
                 });
     }
 
-    protected void internalDeleteSubscriptionForcefully(AsyncResponse 
asyncResponse,
-                                                        String subName, 
boolean authoritative) {
-        CompletableFuture<Void> future;
-        if (topicName.isGlobal()) {
-            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
-        } else {
-            future = CompletableFuture.completedFuture(null);
-        }
-
-        future.thenAccept(__ -> {
-            if (topicName.isPartitioned()) {
-                
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, 
subName, authoritative);
-            } else {
-                getPartitionedTopicMetadataAsync(topicName,
-                        authoritative, false).thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions > 0) {
-                        final List<CompletableFuture<Void>> futures = new 
ArrayList<>();
-
-                        for (int i = 0; i < partitionMetadata.partitions; i++) 
{
-                            TopicName topicNamePartition = 
topicName.getPartition(i);
-                            try {
-                                futures.add(pulsar().getAdminClient().topics()
-                                        
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, true));
-                            } catch (Exception e) {
-                                log.error("[{}] Failed to delete subscription 
forcefully {} {}",
-                                        clientAppId(), topicNamePartition, 
subName,
-                                        e);
-                                asyncResponse.resume(new RestException(e));
-                                return;
-                            }
-                        }
-
-                        FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
-                            if (exception != null) {
-                                Throwable t = exception.getCause();
-                                if (t instanceof NotFoundException) {
-                                    asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
-                                            
getSubNotFoundErrorMessage(topicName.toString(), subName)));
-                                    return null;
-                                } else {
-                                    log.error("[{}] Failed to delete 
subscription forcefully {} {}",
-                                            clientAppId(), topicName, subName, 
t);
-                                    asyncResponse.resume(new RestException(t));
-                                    return null;
-                                }
-                            }
-
-                            asyncResponse.resume(Response.noContent().build());
-                            return null;
-                        });
-                    } else {
-                        
internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, 
subName,
-                                authoritative);
-                    }
-                }).exceptionally(ex -> {
-                    // If the exception is not redirect exception we need to 
log it.
-                    if (!isRedirectException(ex)) {
-                        log.error("[{}] Failed to delete subscription 
forcefully {} from topic {}",
-                                clientAppId(), subName, topicName, ex);
-                    }
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
-                });
-            }
-        }).exceptionally(ex -> {
-            // If the exception is not redirect exception we need to log it.
-            if (!isRedirectException(ex)) {
-                log.error("[{}] Failed to delete subscription {} from topic 
{}",
-                        clientAppId(), subName, topicName, ex);
-            }
-            resumeAsyncResponseExceptionally(asyncResponse, ex);
-            return null;
-        });
-    }
-
-    private void 
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse 
asyncResponse,
-                                                                            
String subName, boolean authoritative) {
-        validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.UNSUBSCRIBE, subName))
-                .thenCompose(__ -> getTopicReferenceAsync(topicName))
-                .thenCompose(topic -> {
-                    Subscription sub = topic.getSubscription(subName);
-                    if (sub == null) {
-                        throw new RestException(Status.NOT_FOUND,
-                                
getSubNotFoundErrorMessage(topicName.toString(), subName));
-                    }
-                    return sub.deleteForcefully();
-                }).thenRun(() -> {
-                    log.info("[{}][{}] Deleted subscription forcefully {}", 
clientAppId(), topicName, subName);
-                    asyncResponse.resume(Response.noContent().build());
-                }).exceptionally(ex -> {
-                    // If the exception is not redirect exception we need to 
log it.
-                    if (!isRedirectException(ex)) {
-                        log.error("[{}] Failed to delete subscription 
forcefully {} {}",
-                                clientAppId(), topicName, subName, ex);
-                    }
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
-                });
-    }
-
     protected void internalSkipAllMessages(AsyncResponse asyncResponse, String 
subName, boolean authoritative) {
         CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
         future.thenCompose(__ -> {

Reply via email to