This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7feb7fe1665 [improve][broker] Make `unloadNamespaceBundle` async.
(#16313)
7feb7fe1665 is described below
commit 7feb7fe16652e794f2e9fc055847a58e3f34f404
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Jul 6 10:46:49 2022 +0800
[improve][broker] Make `unloadNamespaceBundle` async. (#16313)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 150 +++++++++++----------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 31 ++++-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 36 ++++-
.../pulsar/broker/web/PulsarWebResource.java | 52 ++++++-
.../apache/pulsar/broker/admin/NamespacesTest.java | 27 ++--
5 files changed, 191 insertions(+), 105 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0472a048f9c..5bdae87c24f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1006,6 +1006,37 @@ public abstract class NamespacesBase extends
AdminResource {
});
}
+ protected CompletableFuture<Void> internalUnloadNamespaceAsync() {
+ return validateSuperUserAccessAsync()
+ .thenCompose(__ -> {
+ log.info("[{}] Unloading namespace {}", clientAppId(),
namespaceName);
+ if (namespaceName.isGlobal()) {
+ // check cluster ownership for a given global
namespace: redirect if peer-cluster owns it
+ return
validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return
validateClusterOwnershipAsync(namespaceName.getCluster())
+ .thenCompose(ignore ->
validateClusterForTenantAsync(namespaceName.getTenant(),
+ namespaceName.getCluster()));
+ }
+ })
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(policies -> {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+ List<String> boundaries = policies.bundles.getBoundaries();
+ for (int i = 0; i < boundaries.size() - 1; i++) {
+ String bundle = String.format("%s_%s",
boundaries.get(i), boundaries.get(i + 1));
+ try {
+
futures.add(pulsar().getAdminClient().namespaces().unloadNamespaceBundleAsync(
+ namespaceName.toString(), bundle));
+ } catch (PulsarServerException e) {
+ log.error("[{}] Failed to unload namespace {}",
clientAppId(), namespaceName, e);
+ throw new RestException(e);
+ }
+ }
+ return FutureUtil.waitForAll(futures);
+ });
+ }
+
protected void internalSetBookieAffinityGroup(BookieAffinityGroupData
bookieAffinityGroup) {
validateSuperUserAccess();
@@ -1076,76 +1107,55 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
- @SuppressWarnings("deprecation")
- public void internalUnloadNamespaceBundle(AsyncResponse asyncResponse,
String bundleRange, boolean authoritative) {
- validateSuperUserAccess();
- checkNotNull(bundleRange, "BundleRange should not be null");
- log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(),
namespaceName, bundleRange);
-
- Policies policies = getNamespacePolicies(namespaceName);
-
- NamespaceBundle bundle =
- pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getBundle(namespaceName.toString(), bundleRange);
- boolean isOwnedByLocalCluster = false;
- try {
- isOwnedByLocalCluster =
pulsar().getNamespaceService().isNamespaceBundleOwned(bundle).get();
- } catch (Exception e) {
- if (log.isDebugEnabled()) {
- log.debug("Failed to validate cluster ownership for {}-{}, {}",
- namespaceName.toString(), bundleRange, e.getMessage(),
e);
- }
- }
-
- // validate namespace ownership only if namespace is not owned by
local-cluster (it happens when broker doesn't
- // receive replication-cluster change watch and still owning bundle
- if (!isOwnedByLocalCluster) {
- if (namespaceName.isGlobal()) {
- // check cluster ownership for a given global namespace:
redirect if peer-cluster owns it
- validateGlobalNamespaceOwnership(namespaceName);
- } else {
- validateClusterOwnership(namespaceName.getCluster());
- validateClusterForTenant(namespaceName.getTenant(),
namespaceName.getCluster());
- }
- }
-
- validatePoliciesReadOnlyAccess();
-
- isBundleOwnedByAnyBroker(namespaceName, policies.bundles,
bundleRange).thenAccept(flag -> {
- if (!flag) {
- log.info("[{}] Namespace bundle is not owned by any broker
{}/{}", clientAppId(), namespaceName,
- bundleRange);
- asyncResponse.resume(Response.noContent().build());
- return;
- }
- NamespaceBundle nsBundle;
-
- try {
- nsBundle = validateNamespaceBundleOwnership(namespaceName,
policies.bundles, bundleRange,
- authoritative, true);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- return;
- }
-
- pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)
- .thenRun(() -> {
- log.info("[{}] Successfully unloaded namespace bundle
{}", clientAppId(), nsBundle.toString());
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- log.error("[{}] Failed to unload namespace bundle {}/{}",
clientAppId(), namespaceName, bundleRange,
- ex);
- asyncResponse.resume(new RestException(ex));
- return null;
- });
- }).exceptionally((ex) -> {
- if (ex.getCause() instanceof WebApplicationException) {
- asyncResponse.resume(ex.getCause());
- } else {
- asyncResponse.resume(new RestException(ex.getCause()));
- }
- return null;
- });
+ public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String
bundleRange, boolean authoritative) {
+ return validateSuperUserAccessAsync()
+ .thenAccept(__ -> {
+ checkNotNull(bundleRange, "BundleRange should not be
null");
+ log.info("[{}] Unloading namespace bundle {}/{}",
clientAppId(), namespaceName, bundleRange);
+ })
+ .thenApply(__ ->
+ pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundle(namespaceName.toString(),
bundleRange)
+ )
+ .thenCompose(bundle ->
+
pulsar().getNamespaceService().isNamespaceBundleOwned(bundle)
+ .exceptionally(ex -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to validate cluster
ownership for {}-{}, {}",
+ namespaceName.toString(), bundleRange,
ex.getMessage(), ex);
+ }
+ return false;
+ })
+ )
+ .thenCompose(isOwnedByLocalCluster -> {
+ if (!isOwnedByLocalCluster) {
+ if (namespaceName.isGlobal()) {
+ // check cluster ownership for a given global
namespace: redirect if peer-cluster owns it
+ return
validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return
validateClusterOwnershipAsync(namespaceName.getCluster())
+ .thenCompose(__ ->
validateClusterForTenantAsync(namespaceName.getTenant(),
+ namespaceName.getCluster()));
+ }
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ })
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(policies ->
+ isBundleOwnedByAnyBroker(namespaceName, policies.bundles,
bundleRange)
+ .thenCompose(flag -> {
+ if (!flag) {
+ log.info("[{}] Namespace bundle is not owned
by any broker {}/{}", clientAppId(),
+ namespaceName, bundleRange);
+ return CompletableFuture.completedFuture(null);
+ }
+ return
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
bundleRange,
+ authoritative, true)
+ .thenCompose(nsBundle ->
+
pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle));
+ }));
}
@SuppressWarnings("deprecation")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index a2e3970a9ac..a86357198e0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -760,12 +760,23 @@ public class Namespaces extends NamespacesBase {
@PathParam("cluster") String cluster, @PathParam("namespace")
String namespace) {
try {
validateNamespaceName(property, cluster, namespace);
- internalUnloadNamespace(asyncResponse);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
+ return;
}
+ internalUnloadNamespaceAsync()
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully unloaded all the bundles in
namespace {}", clientAppId(),
+ namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to unload namespace {}",
clientAppId(), namespaceName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
@@ -779,7 +790,19 @@ public class Namespaces extends NamespacesBase {
@PathParam("namespace") String namespace, @PathParam("bundle")
String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateNamespaceName(property, cluster, namespace);
- internalUnloadNamespaceBundle(asyncResponse, bundleRange,
authoritative);
+ internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully unloaded namespace bundle {}",
clientAppId(), bundleRange);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to unload namespace bundle
{}/{}",
+ clientAppId(), namespaceName, bundleRange, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 370466bf8c1..e4ed338e16d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -685,16 +685,28 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or namespace doesn't
exist"),
@ApiResponse(code = 412, message = "Namespace is already unloaded
or Namespace has bundles activated")})
- public void unloadNamespace(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
+ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
try {
validateNamespaceName(tenant, namespace);
- internalUnloadNamespace(asyncResponse);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
+ return;
}
+ internalUnloadNamespaceAsync()
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully unloaded all the bundles in
namespace {}", clientAppId(),
+ namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to unload namespace {}",
clientAppId(), namespaceName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
@@ -708,7 +720,19 @@ public class Namespaces extends NamespacesBase {
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateNamespaceName(tenant, namespace);
- internalUnloadNamespaceBundle(asyncResponse, bundleRange,
authoritative);
+ internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully unloaded namespace bundle {}",
clientAppId(), bundleRange);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to unload namespace bundle
{}/{}",
+ clientAppId(), namespaceName, bundleRange, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 743ba4f954b..11837927297 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -597,12 +597,7 @@ public abstract class PulsarWebResource {
.requestHttps(isRequestHttps())
.readOnly(true)
.loadTopicsInBundle(false).build();
- try {
- return nsService.getWebServiceUrlAsync(nsBundle,
options).thenApply(optionUrl -> optionUrl.isPresent());
- } catch (Exception e) {
- log.error("Failed to check whether namespace bundle is owned
{}/{}", fqnn.toString(), bundleRange, e);
- throw new RestException(e);
- }
+ return nsService.getWebServiceUrlAsync(nsBundle,
options).thenApply(optionUrl -> optionUrl.isPresent());
}
protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName
fqnn, BundlesData bundles,
@@ -620,6 +615,18 @@ public abstract class PulsarWebResource {
}
}
+ protected CompletableFuture<NamespaceBundle>
validateNamespaceBundleOwnershipAsync(NamespaceName fqnn,
+ BundlesData bundles, String bundleRange, boolean authoritative,
boolean readOnly) {
+ NamespaceBundle nsBundle;
+ try {
+ nsBundle = validateNamespaceBundleRange(fqnn, bundles,
bundleRange);
+ } catch (WebApplicationException wae) {
+ return CompletableFuture.failedFuture(wae);
+ }
+ return validateBundleOwnershipAsync(nsBundle, authoritative, readOnly)
+ .thenApply(__ -> nsBundle);
+ }
+
public void validateBundleOwnership(NamespaceBundle bundle, boolean
authoritative, boolean readOnly)
throws Exception {
NamespaceService nsService = pulsar().getNamespaceService();
@@ -680,6 +687,39 @@ public abstract class PulsarWebResource {
}
}
+ public CompletableFuture<Void>
validateBundleOwnershipAsync(NamespaceBundle bundle, boolean authoritative,
+ boolean
readOnly) {
+ NamespaceService nsService = pulsar().getNamespaceService();
+ LookupOptions options = LookupOptions.builder()
+ .authoritative(authoritative)
+ .requestHttps(isRequestHttps())
+ .readOnly(readOnly)
+ .loadTopicsInBundle(false).build();
+ return nsService.getWebServiceUrlAsync(bundle, options)
+ .thenCompose(webUrl -> {
+ if (webUrl == null || !webUrl.isPresent()) {
+ log.warn("Unable to get web service url");
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Failed to find ownership for ServiceUnit:" +
bundle.toString());
+ }
+ return nsService.isServiceUnitOwnedAsync(bundle)
+ .thenAccept(owned -> {
+ if (!owned) {
+ boolean newAuthoritative =
this.isLeaderBroker();
+ // Replace the host and port of the
current request and redirect
+ URI redirect =
UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
+
.port(webUrl.get().getPort()).replaceQueryParam("authoritative",
+ newAuthoritative).build();
+
+ log.debug("{} is not a service unit
owned", bundle);
+ // Redirect
+ log.debug("Redirecting the rest call to
{}", redirect);
+ throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
+ }
+ });
+ });
+ }
+
/**
* Checks whether the broker is the owner of the namespace. Otherwise it
will raise an exception to redirect the
* client to the appropriate broker. If no broker owns the namespace yet,
this function will try to acquire the
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 232fda2a327..e8284ce2b5a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -672,16 +671,13 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
+ this.testLocalNamespaces.get(2).toString() + "/unload");
doReturn(uri).when(uriInfo).getRequestUri();
- try {
- namespaces.unloadNamespaceBundle(response, this.testTenant,
this.testOtherCluster,
- this.testLocalNamespaces.get(2).getLocalName(),
"0x00000000_0xffffffff", false);
- fail("Should have raised exception to redirect request");
- } catch (WebApplicationException wae) {
- // OK
- assertEquals(wae.getResponse().getStatus(),
Status.TEMPORARY_REDIRECT.getStatusCode());
- assertEquals(wae.getResponse().getLocation().toString(),
-
UriBuilder.fromUri(uri).host("broker-usc.com").port(8080).toString());
- }
+ namespaces.unloadNamespaceBundle(response, this.testTenant,
this.testOtherCluster,
+ this.testLocalNamespaces.get(2).getLocalName(),
"0x00000000_0xffffffff", false);
+ captor = ArgumentCaptor.forClass(WebApplicationException.class);
+ verify(response, timeout(5000).atLeast(1)).resume(captor.capture());
+ assertEquals(captor.getValue().getResponse().getStatus(),
Status.TEMPORARY_REDIRECT.getStatusCode());
+ assertEquals(captor.getValue().getResponse().getLocation().toString(),
+
UriBuilder.fromUri(uri).host("broker-usc.com").port(8080).toString());
uri = URI.create(pulsar.getWebServiceAddress() + "/admin/namespace/"
+ this.testGlobalNamespaces.get(0).toString() +
"/configversion");
@@ -986,14 +982,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
AsyncResponse response = mock(AsyncResponse.class);
namespaces.unloadNamespaceBundle(response, testTenant,
testLocalCluster, bundledNsLocal, "0x00000000_0x80000000",
false);
- verify(nsSvc, times(1)).unloadNamespaceBundle(testBundle);
- try {
- namespaces.unloadNamespaceBundle(response, testTenant,
testLocalCluster, bundledNsLocal, "0x00000000_0x88000000",
- false);
- fail("should have failed");
- } catch (RestException re) {
- // ok
- }
+ verify(response,
timeout(5000).times(1)).resume(any(RestException.class));
}
private void createBundledTestNamespaces(String property, String cluster,
String namespace, BundlesData bundle)