This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5b129924ec1 [refactor][admin] Refactor namespace bundle transfer admin
api (#19525)
5b129924ec1 is described below
commit 5b129924ec13afa22ee342c7f9c2fa1bd80fe835
Author: Kai Wang <[email protected]>
AuthorDate: Fri Feb 17 18:43:51 2023 +0800
[refactor][admin] Refactor namespace bundle transfer admin api (#19525)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 104 ++++++++++++---------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 36 ++-----
.../apache/pulsar/broker/admin/v2/Namespaces.java | 37 ++------
3 files changed, 74 insertions(+), 103 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5446060ac65..6746d29af73 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -38,9 +38,7 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -59,7 +57,7 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
-import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
@@ -886,55 +884,73 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
- private void validateLeaderBroker() {
- if (!this.isLeaderBroker()) {
- LeaderBroker leaderBroker =
pulsar().getLeaderElectionService().getCurrentLeader().get();
- String leaderBrokerUrl = leaderBroker.getServiceUrl();
- CompletableFuture<LookupResult> result =
pulsar().getNamespaceService()
- .createLookupResult(leaderBrokerUrl, false, null);
- try {
- LookupResult lookupResult = result.get(2L, TimeUnit.SECONDS);
- String redirectUrl = isRequestHttps() ?
lookupResult.getLookupData().getHttpUrlTls()
- : lookupResult.getLookupData().getHttpUrl();
- if (redirectUrl == null) {
- log.error("Redirected broker's service url is not
configured");
- throw new
RestException(Response.Status.PRECONDITION_FAILED,
- "Redirected broker's service url is not
configured.");
- }
- URL url = new URL(redirectUrl);
- URI redirect =
UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost())
- .port(url.getPort())
- .replaceQueryParam("authoritative",
- false).build();
-
- // Redirect
- if (log.isDebugEnabled()) {
- log.debug("Redirecting the request call to leader - {}",
redirect);
- }
- throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
- } catch (MalformedURLException exception) {
- log.error("The leader broker url is malformed - {}",
leaderBrokerUrl);
- throw new RestException(exception);
- } catch (ExecutionException | InterruptedException exception) {
- log.error("Leader broker not found - {}", leaderBrokerUrl);
- throw new RestException(exception.getCause());
- } catch (TimeoutException exception) {
- log.error("Leader broker not found within timeout - {}",
leaderBrokerUrl);
- throw new RestException(exception);
- }
+ private CompletableFuture<Void> validateLeaderBrokerAsync() {
+ if (this.isLeaderBroker()) {
+ return CompletableFuture.completedFuture(null);
}
+ Optional<LeaderBroker> currentLeaderOpt =
pulsar().getLeaderElectionService().getCurrentLeader();
+ if (currentLeaderOpt.isEmpty()) {
+ String errorStr = "The current leader is empty.";
+ log.error(errorStr);
+ return FutureUtil.failedFuture(new
RestException(Response.Status.PRECONDITION_FAILED, errorStr));
+ }
+ LeaderBroker leaderBroker =
pulsar().getLeaderElectionService().getCurrentLeader().get();
+ String leaderBrokerUrl = leaderBroker.getServiceUrl();
+ return pulsar().getNamespaceService()
+ .createLookupResult(leaderBrokerUrl, false, null)
+ .thenCompose(lookupResult -> {
+ String redirectUrl = isRequestHttps() ?
lookupResult.getLookupData().getHttpUrlTls()
+ : lookupResult.getLookupData().getHttpUrl();
+ if (redirectUrl == null) {
+ log.error("Redirected broker's service url is not
configured");
+ return FutureUtil.failedFuture(new
RestException(Response.Status.PRECONDITION_FAILED,
+ "Redirected broker's service url is not
configured."));
+ }
+
+ try {
+ URL url = new URL(redirectUrl);
+ URI redirect =
UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost())
+ .port(url.getPort())
+ .replaceQueryParam("authoritative",
+ false).build();
+ // Redirect
+ if (log.isDebugEnabled()) {
+ log.debug("Redirecting the request call to leader
- {}", redirect);
+ }
+ return FutureUtil.failedFuture((
+ new
WebApplicationException(Response.temporaryRedirect(redirect).build())));
+ } catch (MalformedURLException exception) {
+ log.error("The leader broker url is malformed - {}",
leaderBrokerUrl);
+ return FutureUtil.failedFuture(new
RestException(exception));
+ }
+ });
}
- public void setNamespaceBundleAffinity (String bundleRange, String
destinationBroker) {
+ public CompletableFuture<Void> setNamespaceBundleAffinityAsync(String
bundleRange, String destinationBroker) {
if (StringUtils.isBlank(destinationBroker)) {
- return;
+ return CompletableFuture.completedFuture(null);
}
- validateLeaderBroker();
-
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange,
destinationBroker);
+ return pulsar().getLoadManager().get().getAvailableBrokersAsync()
+ .thenCompose(brokers -> {
+ if (!brokers.contains(destinationBroker)) {
+ log.warn("[{}] Failed to unload namespace bundle {}/{}
to inactive broker {}.",
+ clientAppId(), namespaceName, bundleRange,
destinationBroker);
+ return FutureUtil.failedFuture(new
BrokerServiceException.NotAllowedException(
+ "Not allowed unload namespace bundle to
inactive destination broker"));
+ }
+ return CompletableFuture.completedFuture(null);
+ })
+ .thenCompose(__ -> validateLeaderBrokerAsync())
+ .thenAccept(__ -> {
+
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange,
destinationBroker);
+ });
}
- public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String
bundleRange, boolean authoritative) {
+ public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String
bundleRange,
+ String
destinationBroker,
+ boolean
authoritative) {
return validateSuperUserAccessAsync()
+ .thenCompose(__ ->
setNamespaceBundleAffinityAsync(bundleRange, destinationBroker))
.thenAccept(__ -> {
checkNotNull(bundleRange, "BundleRange should not be
null");
log.info("[{}] Unloading namespace bundle {}/{}",
clientAppId(), namespaceName, bundleRange);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index c13441db3df..59613558eb8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -46,7 +46,6 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
-import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
@@ -891,34 +890,13 @@ public class Namespaces extends NamespacesBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(property, cluster, namespace);
- pulsar().getLoadManager().get().getAvailableBrokersAsync()
- .thenApply(brokers ->
- StringUtils.isNotBlank(destinationBroker) ?
brokers.contains(destinationBroker) : true)
- .thenAccept(isActiveDestination -> {
- if (isActiveDestination) {
- setNamespaceBundleAffinity(bundleRange,
destinationBroker);
- internalUnloadNamespaceBundleAsync(bundleRange,
authoritative)
- .thenAccept(__ -> {
- log.info("[{}] Successfully unloaded
namespace bundle {}",
- clientAppId(), bundleRange);
-
asyncResponse.resume(Response.noContent().build());
- })
- .exceptionally(ex -> {
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to unload
namespace bundle {}/{}",
- clientAppId(), namespaceName,
bundleRange, ex);
- }
-
resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
- } else {
- log.warn("[{}] Failed to unload namespace bundle {}/{}
to inactive broker {}.",
- clientAppId(), namespaceName, bundleRange,
destinationBroker);
- resumeAsyncResponseExceptionally(asyncResponse,
- new BrokerServiceException.NotAllowedException(
- "Not allowed unload namespace bundle
to inactive destination broker"));
- }
- }).exceptionally(ex -> {
+ internalUnloadNamespaceBundleAsync(bundleRange, destinationBroker,
authoritative)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully unloaded namespace bundle {}",
+ clientAppId(), bundleRange);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to unload namespace bundle
{}/{}",
clientAppId(), namespaceName, bundleRange, ex);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 80af5f4ad45..f5e23db79b9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -46,10 +46,8 @@ import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils;
-import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
@@ -817,34 +815,13 @@ public class Namespaces extends NamespacesBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@QueryParam("destinationBroker") String
destinationBroker) {
validateNamespaceName(tenant, namespace);
- pulsar().getLoadManager().get().getAvailableBrokersAsync()
- .thenApply(brokers ->
- StringUtils.isNotBlank(destinationBroker) ?
brokers.contains(destinationBroker) : true)
- .thenAccept(isActiveDestination -> {
- if (isActiveDestination) {
- setNamespaceBundleAffinity(bundleRange,
destinationBroker);
- internalUnloadNamespaceBundleAsync(bundleRange,
authoritative)
- .thenAccept(__ -> {
- log.info("[{}] Successfully unloaded
namespace bundle {}",
- clientAppId(), bundleRange);
-
asyncResponse.resume(Response.noContent().build());
- })
- .exceptionally(ex -> {
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to unload
namespace bundle {}/{}",
- clientAppId(), namespaceName,
bundleRange, ex);
- }
-
resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
- } else {
- log.warn("[{}] Failed to unload namespace bundle {}/{}
to inactive broker {}.",
- clientAppId(), namespaceName, bundleRange,
destinationBroker);
- resumeAsyncResponseExceptionally(asyncResponse,
- new BrokerServiceException.NotAllowedException(
- "Not allowed unload namespace bundle
to inactive destination broker"));
- }
- }).exceptionally(ex -> {
+ internalUnloadNamespaceBundleAsync(bundleRange, destinationBroker,
authoritative)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully unloaded namespace bundle {}",
+ clientAppId(), bundleRange);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to unload namespace bundle
{}/{}",
clientAppId(), namespaceName, bundleRange, ex);