This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7feb7fe1665 [improve][broker] Make `unloadNamespaceBundle` async. 
(#16313)
7feb7fe1665 is described below

commit 7feb7fe16652e794f2e9fc055847a58e3f34f404
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Jul 6 10:46:49 2022 +0800

    [improve][broker] Make `unloadNamespaceBundle` async. (#16313)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 150 +++++++++++----------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  31 ++++-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  36 ++++-
 .../pulsar/broker/web/PulsarWebResource.java       |  52 ++++++-
 .../apache/pulsar/broker/admin/NamespacesTest.java |  27 ++--
 5 files changed, 191 insertions(+), 105 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0472a048f9c..5bdae87c24f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1006,6 +1006,37 @@ public abstract class NamespacesBase extends 
AdminResource {
         });
     }
 
+    protected CompletableFuture<Void> internalUnloadNamespaceAsync() {
+        return validateSuperUserAccessAsync()
+                .thenCompose(__ -> {
+                    log.info("[{}] Unloading namespace {}", clientAppId(), 
namespaceName);
+                    if (namespaceName.isGlobal()) {
+                        // check cluster ownership for a given global 
namespace: redirect if peer-cluster owns it
+                        return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
+                    } else {
+                        return 
validateClusterOwnershipAsync(namespaceName.getCluster())
+                                .thenCompose(ignore -> 
validateClusterForTenantAsync(namespaceName.getTenant(),
+                                        namespaceName.getCluster()));
+                    }
+                })
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenCompose(policies -> {
+                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+                    List<String> boundaries = policies.bundles.getBoundaries();
+                    for (int i = 0; i < boundaries.size() - 1; i++) {
+                        String bundle = String.format("%s_%s", 
boundaries.get(i), boundaries.get(i + 1));
+                        try {
+                            
futures.add(pulsar().getAdminClient().namespaces().unloadNamespaceBundleAsync(
+                                    namespaceName.toString(), bundle));
+                        } catch (PulsarServerException e) {
+                            log.error("[{}] Failed to unload namespace {}", 
clientAppId(), namespaceName, e);
+                            throw new RestException(e);
+                        }
+                    }
+                    return FutureUtil.waitForAll(futures);
+                });
+    }
+
 
     protected void internalSetBookieAffinityGroup(BookieAffinityGroupData 
bookieAffinityGroup) {
         validateSuperUserAccess();
@@ -1076,76 +1107,55 @@ public abstract class NamespacesBase extends 
AdminResource {
         }
     }
 
-    @SuppressWarnings("deprecation")
-    public void internalUnloadNamespaceBundle(AsyncResponse asyncResponse, 
String bundleRange, boolean authoritative) {
-        validateSuperUserAccess();
-        checkNotNull(bundleRange, "BundleRange should not be null");
-        log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(), 
namespaceName, bundleRange);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-
-        NamespaceBundle bundle =
-                pulsar().getNamespaceService().getNamespaceBundleFactory()
-                        .getBundle(namespaceName.toString(), bundleRange);
-        boolean isOwnedByLocalCluster = false;
-        try {
-            isOwnedByLocalCluster = 
pulsar().getNamespaceService().isNamespaceBundleOwned(bundle).get();
-        } catch (Exception e) {
-            if (log.isDebugEnabled()) {
-                log.debug("Failed to validate cluster ownership for {}-{}, {}",
-                        namespaceName.toString(), bundleRange, e.getMessage(), 
e);
-            }
-        }
-
-        // validate namespace ownership only if namespace is not owned by 
local-cluster (it happens when broker doesn't
-        // receive replication-cluster change watch and still owning bundle
-        if (!isOwnedByLocalCluster) {
-            if (namespaceName.isGlobal()) {
-                // check cluster ownership for a given global namespace: 
redirect if peer-cluster owns it
-                validateGlobalNamespaceOwnership(namespaceName);
-            } else {
-                validateClusterOwnership(namespaceName.getCluster());
-                validateClusterForTenant(namespaceName.getTenant(), 
namespaceName.getCluster());
-            }
-        }
-
-        validatePoliciesReadOnlyAccess();
-
-        isBundleOwnedByAnyBroker(namespaceName, policies.bundles, 
bundleRange).thenAccept(flag -> {
-            if (!flag) {
-                log.info("[{}] Namespace bundle is not owned by any broker 
{}/{}", clientAppId(), namespaceName,
-                        bundleRange);
-                asyncResponse.resume(Response.noContent().build());
-                return;
-            }
-            NamespaceBundle nsBundle;
-
-            try {
-                nsBundle = validateNamespaceBundleOwnership(namespaceName, 
policies.bundles, bundleRange,
-                    authoritative, true);
-            } catch (WebApplicationException wae) {
-                asyncResponse.resume(wae);
-                return;
-            }
-
-            pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)
-                    .thenRun(() -> {
-                        log.info("[{}] Successfully unloaded namespace bundle 
{}", clientAppId(), nsBundle.toString());
-                        asyncResponse.resume(Response.noContent().build());
-                    }).exceptionally(ex -> {
-                log.error("[{}] Failed to unload namespace bundle {}/{}", 
clientAppId(), namespaceName, bundleRange,
-                        ex);
-                asyncResponse.resume(new RestException(ex));
-                return null;
-            });
-        }).exceptionally((ex) -> {
-            if (ex.getCause() instanceof WebApplicationException) {
-                asyncResponse.resume(ex.getCause());
-            } else {
-                asyncResponse.resume(new RestException(ex.getCause()));
-            }
-            return null;
-        });
+    public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String 
bundleRange, boolean authoritative) {
+        return validateSuperUserAccessAsync()
+                .thenAccept(__ -> {
+                    checkNotNull(bundleRange, "BundleRange should not be 
null");
+                    log.info("[{}] Unloading namespace bundle {}/{}", 
clientAppId(), namespaceName, bundleRange);
+                })
+                .thenApply(__ ->
+                    pulsar().getNamespaceService().getNamespaceBundleFactory()
+                                    .getBundle(namespaceName.toString(), 
bundleRange)
+                )
+                .thenCompose(bundle ->
+                    
pulsar().getNamespaceService().isNamespaceBundleOwned(bundle)
+                          .exceptionally(ex -> {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Failed to validate cluster 
ownership for {}-{}, {}",
+                                        namespaceName.toString(), bundleRange, 
ex.getMessage(), ex);
+                            }
+                            return false;
+                          })
+                )
+                .thenCompose(isOwnedByLocalCluster -> {
+                    if (!isOwnedByLocalCluster) {
+                        if (namespaceName.isGlobal()) {
+                            // check cluster ownership for a given global 
namespace: redirect if peer-cluster owns it
+                            return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
+                        } else {
+                            return 
validateClusterOwnershipAsync(namespaceName.getCluster())
+                                    .thenCompose(__ -> 
validateClusterForTenantAsync(namespaceName.getTenant(),
+                                            namespaceName.getCluster()));
+                        }
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                })
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenCompose(policies ->
+                     isBundleOwnedByAnyBroker(namespaceName, policies.bundles, 
bundleRange)
+                        .thenCompose(flag -> {
+                            if (!flag) {
+                                log.info("[{}] Namespace bundle is not owned 
by any broker {}/{}", clientAppId(),
+                                        namespaceName, bundleRange);
+                                return CompletableFuture.completedFuture(null);
+                            }
+                            return 
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, 
bundleRange,
+                                    authoritative, true)
+                                    .thenCompose(nsBundle ->
+                                            
pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle));
+                        }));
     }
 
     @SuppressWarnings("deprecation")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index a2e3970a9ac..a86357198e0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -760,12 +760,23 @@ public class Namespaces extends NamespacesBase {
             @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace) {
         try {
             validateNamespaceName(property, cluster, namespace);
-            internalUnloadNamespace(asyncResponse);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
+            return;
         }
+        internalUnloadNamespaceAsync()
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully unloaded all the bundles in 
namespace {}", clientAppId(),
+                            namespaceName);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to unload namespace {}", 
clientAppId(), namespaceName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
@@ -779,7 +790,19 @@ public class Namespaces extends NamespacesBase {
             @PathParam("namespace") String namespace, @PathParam("bundle") 
String bundleRange,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateNamespaceName(property, cluster, namespace);
-        internalUnloadNamespaceBundle(asyncResponse, bundleRange, 
authoritative);
+        internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully unloaded namespace bundle {}", 
clientAppId(), bundleRange);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to unload namespace bundle 
{}/{}",
+                                clientAppId(), namespaceName, bundleRange, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 370466bf8c1..e4ed338e16d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -685,16 +685,28 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or namespace doesn't 
exist"),
             @ApiResponse(code = 412, message = "Namespace is already unloaded 
or Namespace has bundles activated")})
-    public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
-            @PathParam("namespace") String namespace) {
+    public void unloadNamespace(@Suspended final AsyncResponse asyncResponse,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace) {
         try {
             validateNamespaceName(tenant, namespace);
-            internalUnloadNamespace(asyncResponse);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
+            return;
         }
+        internalUnloadNamespaceAsync()
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully unloaded all the bundles in 
namespace {}", clientAppId(),
+                            namespaceName);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to unload namespace {}", 
clientAppId(), namespaceName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
@@ -708,7 +720,19 @@ public class Namespaces extends NamespacesBase {
             @PathParam("bundle") String bundleRange,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateNamespaceName(tenant, namespace);
-        internalUnloadNamespaceBundle(asyncResponse, bundleRange, 
authoritative);
+        internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully unloaded namespace bundle {}", 
clientAppId(), bundleRange);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to unload namespace bundle 
{}/{}",
+                                clientAppId(), namespaceName, bundleRange, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 743ba4f954b..11837927297 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -597,12 +597,7 @@ public abstract class PulsarWebResource {
                 .requestHttps(isRequestHttps())
                 .readOnly(true)
                 .loadTopicsInBundle(false).build();
-        try {
-            return nsService.getWebServiceUrlAsync(nsBundle, 
options).thenApply(optionUrl -> optionUrl.isPresent());
-        } catch (Exception e) {
-            log.error("Failed to check whether namespace bundle is owned 
{}/{}", fqnn.toString(), bundleRange, e);
-            throw new RestException(e);
-        }
+        return nsService.getWebServiceUrlAsync(nsBundle, 
options).thenApply(optionUrl -> optionUrl.isPresent());
     }
 
     protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName 
fqnn, BundlesData bundles,
@@ -620,6 +615,18 @@ public abstract class PulsarWebResource {
         }
     }
 
+    protected CompletableFuture<NamespaceBundle> 
validateNamespaceBundleOwnershipAsync(NamespaceName fqnn,
+              BundlesData bundles, String bundleRange, boolean authoritative, 
boolean readOnly) {
+        NamespaceBundle nsBundle;
+        try {
+            nsBundle = validateNamespaceBundleRange(fqnn, bundles, 
bundleRange);
+        } catch (WebApplicationException wae) {
+            return CompletableFuture.failedFuture(wae);
+        }
+        return validateBundleOwnershipAsync(nsBundle, authoritative, readOnly)
+                .thenApply(__ -> nsBundle);
+    }
+
     public void validateBundleOwnership(NamespaceBundle bundle, boolean 
authoritative, boolean readOnly)
             throws Exception {
         NamespaceService nsService = pulsar().getNamespaceService();
@@ -680,6 +687,39 @@ public abstract class PulsarWebResource {
         }
     }
 
+    public CompletableFuture<Void> 
validateBundleOwnershipAsync(NamespaceBundle bundle, boolean authoritative,
+                                                                boolean 
readOnly) {
+        NamespaceService nsService = pulsar().getNamespaceService();
+        LookupOptions options = LookupOptions.builder()
+                .authoritative(authoritative)
+                .requestHttps(isRequestHttps())
+                .readOnly(readOnly)
+                .loadTopicsInBundle(false).build();
+        return nsService.getWebServiceUrlAsync(bundle, options)
+                .thenCompose(webUrl -> {
+                    if (webUrl == null || !webUrl.isPresent()) {
+                        log.warn("Unable to get web service url");
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "Failed to find ownership for ServiceUnit:" + 
bundle.toString());
+                    }
+                    return nsService.isServiceUnitOwnedAsync(bundle)
+                            .thenAccept(owned -> {
+                                if (!owned) {
+                                    boolean newAuthoritative = 
this.isLeaderBroker();
+                                    // Replace the host and port of the 
current request and redirect
+                                    URI redirect = 
UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
+                                            
.port(webUrl.get().getPort()).replaceQueryParam("authoritative",
+                                                    newAuthoritative).build();
+
+                                    log.debug("{} is not a service unit 
owned", bundle);
+                                    // Redirect
+                                    log.debug("Redirecting the rest call to 
{}", redirect);
+                                    throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
+                                }
+                            });
+                });
+    }
+
     /**
      * Checks whether the broker is the owner of the namespace. Otherwise it 
will raise an exception to redirect the
      * client to the appropriate broker. If no broker owns the namespace yet, 
this function will try to acquire the
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 232fda2a327..e8284ce2b5a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -672,16 +671,13 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                 + this.testLocalNamespaces.get(2).toString() + "/unload");
         doReturn(uri).when(uriInfo).getRequestUri();
 
-        try {
-            namespaces.unloadNamespaceBundle(response, this.testTenant, 
this.testOtherCluster,
-                    this.testLocalNamespaces.get(2).getLocalName(), 
"0x00000000_0xffffffff", false);
-            fail("Should have raised exception to redirect request");
-        } catch (WebApplicationException wae) {
-            // OK
-            assertEquals(wae.getResponse().getStatus(), 
Status.TEMPORARY_REDIRECT.getStatusCode());
-            assertEquals(wae.getResponse().getLocation().toString(),
-                    
UriBuilder.fromUri(uri).host("broker-usc.com").port(8080).toString());
-        }
+        namespaces.unloadNamespaceBundle(response, this.testTenant, 
this.testOtherCluster,
+                this.testLocalNamespaces.get(2).getLocalName(), 
"0x00000000_0xffffffff", false);
+        captor = ArgumentCaptor.forClass(WebApplicationException.class);
+        verify(response, timeout(5000).atLeast(1)).resume(captor.capture());
+        assertEquals(captor.getValue().getResponse().getStatus(), 
Status.TEMPORARY_REDIRECT.getStatusCode());
+        assertEquals(captor.getValue().getResponse().getLocation().toString(),
+                
UriBuilder.fromUri(uri).host("broker-usc.com").port(8080).toString());
 
         uri = URI.create(pulsar.getWebServiceAddress() + "/admin/namespace/"
                 + this.testGlobalNamespaces.get(0).toString() + 
"/configversion");
@@ -986,14 +982,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         AsyncResponse response = mock(AsyncResponse.class);
         namespaces.unloadNamespaceBundle(response, testTenant, 
testLocalCluster, bundledNsLocal, "0x00000000_0x80000000",
                 false);
-        verify(nsSvc, times(1)).unloadNamespaceBundle(testBundle);
-        try {
-            namespaces.unloadNamespaceBundle(response, testTenant, 
testLocalCluster, bundledNsLocal, "0x00000000_0x88000000",
-                    false);
-            fail("should have failed");
-        } catch (RestException re) {
-            // ok
-        }
+        verify(response, 
timeout(5000).times(1)).resume(any(RestException.class));
     }
 
     private void createBundledTestNamespaces(String property, String cluster, 
String namespace, BundlesData bundle)

Reply via email to