This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b129924ec1 [refactor][admin] Refactor namespace bundle transfer admin 
api (#19525)
5b129924ec1 is described below

commit 5b129924ec13afa22ee342c7f9c2fa1bd80fe835
Author: Kai Wang <[email protected]>
AuthorDate: Fri Feb 17 18:43:51 2023 +0800

    [refactor][admin] Refactor namespace bundle transfer admin api (#19525)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 104 ++++++++++++---------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  36 ++-----
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  37 ++------
 3 files changed, 74 insertions(+), 103 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5446060ac65..6746d29af73 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -38,9 +38,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -59,7 +57,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.loadbalance.LeaderBroker;
-import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
@@ -886,55 +884,73 @@ public abstract class NamespacesBase extends 
AdminResource {
         }
     }
 
-    private void validateLeaderBroker() {
-        if (!this.isLeaderBroker()) {
-            LeaderBroker leaderBroker = 
pulsar().getLeaderElectionService().getCurrentLeader().get();
-            String leaderBrokerUrl = leaderBroker.getServiceUrl();
-            CompletableFuture<LookupResult> result = 
pulsar().getNamespaceService()
-                    .createLookupResult(leaderBrokerUrl, false, null);
-            try {
-                LookupResult lookupResult = result.get(2L, TimeUnit.SECONDS);
-                String redirectUrl = isRequestHttps() ? 
lookupResult.getLookupData().getHttpUrlTls()
-                        : lookupResult.getLookupData().getHttpUrl();
-                if (redirectUrl == null) {
-                    log.error("Redirected broker's service url is not 
configured");
-                    throw new 
RestException(Response.Status.PRECONDITION_FAILED,
-                            "Redirected broker's service url is not 
configured.");
-                }
-                URL url = new URL(redirectUrl);
-                URI redirect = 
UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost())
-                        .port(url.getPort())
-                        .replaceQueryParam("authoritative",
-                                false).build();
-
-                // Redirect
-                if (log.isDebugEnabled()) {
-                    log.debug("Redirecting the request call to leader - {}", 
redirect);
-                }
-                throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
-            } catch (MalformedURLException exception) {
-                log.error("The leader broker url is malformed - {}", 
leaderBrokerUrl);
-                throw new RestException(exception);
-            } catch (ExecutionException | InterruptedException exception) {
-                log.error("Leader broker not found - {}", leaderBrokerUrl);
-                throw new RestException(exception.getCause());
-            } catch (TimeoutException exception) {
-                log.error("Leader broker not found within timeout - {}", 
leaderBrokerUrl);
-                throw new RestException(exception);
-            }
+    private CompletableFuture<Void> validateLeaderBrokerAsync() {
+        if (this.isLeaderBroker()) {
+            return CompletableFuture.completedFuture(null);
         }
+        Optional<LeaderBroker> currentLeaderOpt = 
pulsar().getLeaderElectionService().getCurrentLeader();
+        if (currentLeaderOpt.isEmpty()) {
+            String errorStr = "The current leader is empty.";
+            log.error(errorStr);
+            return FutureUtil.failedFuture(new 
RestException(Response.Status.PRECONDITION_FAILED, errorStr));
+        }
+        LeaderBroker leaderBroker = 
pulsar().getLeaderElectionService().getCurrentLeader().get();
+        String leaderBrokerUrl = leaderBroker.getServiceUrl();
+        return pulsar().getNamespaceService()
+                .createLookupResult(leaderBrokerUrl, false, null)
+                .thenCompose(lookupResult -> {
+                    String redirectUrl = isRequestHttps() ? 
lookupResult.getLookupData().getHttpUrlTls()
+                            : lookupResult.getLookupData().getHttpUrl();
+                    if (redirectUrl == null) {
+                        log.error("Redirected broker's service url is not 
configured");
+                        return FutureUtil.failedFuture(new 
RestException(Response.Status.PRECONDITION_FAILED,
+                                "Redirected broker's service url is not 
configured."));
+                    }
+
+                    try {
+                        URL url = new URL(redirectUrl);
+                        URI redirect = 
UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost())
+                                .port(url.getPort())
+                                .replaceQueryParam("authoritative",
+                                        false).build();
+                        // Redirect
+                        if (log.isDebugEnabled()) {
+                            log.debug("Redirecting the request call to leader 
- {}", redirect);
+                        }
+                        return FutureUtil.failedFuture((
+                                new 
WebApplicationException(Response.temporaryRedirect(redirect).build())));
+                    } catch (MalformedURLException exception) {
+                        log.error("The leader broker url is malformed - {}", 
leaderBrokerUrl);
+                        return FutureUtil.failedFuture(new 
RestException(exception));
+                    }
+                });
     }
 
-    public void setNamespaceBundleAffinity (String bundleRange, String 
destinationBroker) {
+    public CompletableFuture<Void> setNamespaceBundleAffinityAsync(String 
bundleRange, String destinationBroker) {
         if (StringUtils.isBlank(destinationBroker)) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
-        validateLeaderBroker();
-        
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, 
destinationBroker);
+        return pulsar().getLoadManager().get().getAvailableBrokersAsync()
+                .thenCompose(brokers -> {
+                    if (!brokers.contains(destinationBroker)) {
+                        log.warn("[{}] Failed to unload namespace bundle {}/{} 
to inactive broker {}.",
+                                clientAppId(), namespaceName, bundleRange, 
destinationBroker);
+                        return FutureUtil.failedFuture(new 
BrokerServiceException.NotAllowedException(
+                                "Not allowed unload namespace bundle to 
inactive destination broker"));
+                    }
+                    return CompletableFuture.completedFuture(null);
+                })
+                .thenCompose(__ -> validateLeaderBrokerAsync())
+                .thenAccept(__ -> {
+                    
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, 
destinationBroker);
+                });
     }
 
-    public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String 
bundleRange, boolean authoritative) {
+    public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String 
bundleRange,
+                                                                      String 
destinationBroker,
+                                                                      boolean 
authoritative) {
         return validateSuperUserAccessAsync()
+                .thenCompose(__ -> 
setNamespaceBundleAffinityAsync(bundleRange, destinationBroker))
                 .thenAccept(__ -> {
                     checkNotNull(bundleRange, "BundleRange should not be 
null");
                     log.info("[{}] Unloading namespace bundle {}/{}", 
clientAppId(), namespaceName, bundleRange);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index c13441db3df..59613558eb8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -46,7 +46,6 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.admin.impl.NamespacesBase;
-import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
@@ -891,34 +890,13 @@ public class Namespaces extends NamespacesBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @QueryParam("destinationBroker") String destinationBroker) {
         validateNamespaceName(property, cluster, namespace);
-        pulsar().getLoadManager().get().getAvailableBrokersAsync()
-                .thenApply(brokers ->
-                        StringUtils.isNotBlank(destinationBroker) ? 
brokers.contains(destinationBroker) : true)
-                .thenAccept(isActiveDestination -> {
-                    if (isActiveDestination) {
-                        setNamespaceBundleAffinity(bundleRange, 
destinationBroker);
-                        internalUnloadNamespaceBundleAsync(bundleRange, 
authoritative)
-                                .thenAccept(__ -> {
-                                    log.info("[{}] Successfully unloaded 
namespace bundle {}",
-                                            clientAppId(), bundleRange);
-                                    
asyncResponse.resume(Response.noContent().build());
-                                })
-                                .exceptionally(ex -> {
-                                    if (!isRedirectException(ex)) {
-                                        log.error("[{}] Failed to unload 
namespace bundle {}/{}",
-                                                clientAppId(), namespaceName, 
bundleRange, ex);
-                                    }
-                                    
resumeAsyncResponseExceptionally(asyncResponse, ex);
-                                    return null;
-                                });
-                    } else {
-                        log.warn("[{}] Failed to unload namespace bundle {}/{} 
to inactive broker {}.",
-                                clientAppId(), namespaceName, bundleRange, 
destinationBroker);
-                        resumeAsyncResponseExceptionally(asyncResponse,
-                                new BrokerServiceException.NotAllowedException(
-                                        "Not allowed unload namespace bundle 
to inactive destination broker"));
-                    }
-                }).exceptionally(ex -> {
+        internalUnloadNamespaceBundleAsync(bundleRange, destinationBroker, 
authoritative)
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully unloaded namespace bundle {}",
+                            clientAppId(), bundleRange);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
                     if (!isRedirectException(ex)) {
                         log.error("[{}] Failed to unload namespace bundle 
{}/{}",
                                 clientAppId(), namespaceName, bundleRange, ex);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 80af5f4ad45..f5e23db79b9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -46,10 +46,8 @@ import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.admin.impl.NamespacesBase;
 import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils;
-import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -817,34 +815,13 @@ public class Namespaces extends NamespacesBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
                                       @QueryParam("destinationBroker") String 
destinationBroker) {
         validateNamespaceName(tenant, namespace);
-        pulsar().getLoadManager().get().getAvailableBrokersAsync()
-                .thenApply(brokers ->
-                        StringUtils.isNotBlank(destinationBroker) ? 
brokers.contains(destinationBroker) : true)
-                .thenAccept(isActiveDestination -> {
-                    if (isActiveDestination) {
-                        setNamespaceBundleAffinity(bundleRange, 
destinationBroker);
-                        internalUnloadNamespaceBundleAsync(bundleRange, 
authoritative)
-                                .thenAccept(__ -> {
-                                    log.info("[{}] Successfully unloaded 
namespace bundle {}",
-                                            clientAppId(), bundleRange);
-                                    
asyncResponse.resume(Response.noContent().build());
-                                })
-                                .exceptionally(ex -> {
-                                    if (!isRedirectException(ex)) {
-                                        log.error("[{}] Failed to unload 
namespace bundle {}/{}",
-                                                clientAppId(), namespaceName, 
bundleRange, ex);
-                                    }
-                                    
resumeAsyncResponseExceptionally(asyncResponse, ex);
-                                    return null;
-                                });
-                    } else {
-                        log.warn("[{}] Failed to unload namespace bundle {}/{} 
to inactive broker {}.",
-                                clientAppId(), namespaceName, bundleRange, 
destinationBroker);
-                        resumeAsyncResponseExceptionally(asyncResponse,
-                                new BrokerServiceException.NotAllowedException(
-                                        "Not allowed unload namespace bundle 
to inactive destination broker"));
-                    }
-                }).exceptionally(ex -> {
+        internalUnloadNamespaceBundleAsync(bundleRange, destinationBroker, 
authoritative)
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully unloaded namespace bundle {}",
+                            clientAppId(), bundleRange);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
                     if (!isRedirectException(ex)) {
                         log.error("[{}] Failed to unload namespace bundle 
{}/{}",
                                 clientAppId(), namespaceName, bundleRange, ex);

Reply via email to