This is an automated email from the ASF dual-hosted git repository.

huangqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 299bd70fdfa [fix][broker] Avoid bundle unload destination broker be 
set as an inactive broker. (#19244)
299bd70fdfa is described below

commit 299bd70fdfa023768e94a8ee4347d39337b6cbd4
Author: lixinyang <[email protected]>
AuthorDate: Tue Jan 17 17:14:26 2023 +0800

    [fix][broker] Avoid bundle unload destination broker be set as an inactive 
broker. (#19244)
    
    Co-authored-by: nicklixinyang <[email protected]>
---
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 36 +++++++++++++++++----
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 37 ++++++++++++++++++----
 .../apache/pulsar/broker/admin/NamespacesTest.java |  7 ++++
 3 files changed, 66 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index fd67de41172..c13441db3df 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -46,6 +46,7 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.admin.impl.NamespacesBase;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
@@ -890,13 +891,34 @@ public class Namespaces extends NamespacesBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @QueryParam("destinationBroker") String destinationBroker) {
         validateNamespaceName(property, cluster, namespace);
-        setNamespaceBundleAffinity(bundleRange, destinationBroker);
-        internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
-                .thenAccept(__ -> {
-                    log.info("[{}] Successfully unloaded namespace bundle {}", 
clientAppId(), bundleRange);
-                    asyncResponse.resume(Response.noContent().build());
-                })
-                .exceptionally(ex -> {
+        pulsar().getLoadManager().get().getAvailableBrokersAsync()
+                .thenApply(brokers ->
+                        StringUtils.isNotBlank(destinationBroker) ? 
brokers.contains(destinationBroker) : true)
+                .thenAccept(isActiveDestination -> {
+                    if (isActiveDestination) {
+                        setNamespaceBundleAffinity(bundleRange, 
destinationBroker);
+                        internalUnloadNamespaceBundleAsync(bundleRange, 
authoritative)
+                                .thenAccept(__ -> {
+                                    log.info("[{}] Successfully unloaded 
namespace bundle {}",
+                                            clientAppId(), bundleRange);
+                                    
asyncResponse.resume(Response.noContent().build());
+                                })
+                                .exceptionally(ex -> {
+                                    if (!isRedirectException(ex)) {
+                                        log.error("[{}] Failed to unload 
namespace bundle {}/{}",
+                                                clientAppId(), namespaceName, 
bundleRange, ex);
+                                    }
+                                    
resumeAsyncResponseExceptionally(asyncResponse, ex);
+                                    return null;
+                                });
+                    } else {
+                        log.warn("[{}] Failed to unload namespace bundle {}/{} 
to inactive broker {}.",
+                                clientAppId(), namespaceName, bundleRange, 
destinationBroker);
+                        resumeAsyncResponseExceptionally(asyncResponse,
+                                new BrokerServiceException.NotAllowedException(
+                                        "Not allowed unload namespace bundle 
to inactive destination broker"));
+                    }
+                }).exceptionally(ex -> {
                     if (!isRedirectException(ex)) {
                         log.error("[{}] Failed to unload namespace bundle 
{}/{}",
                                 clientAppId(), namespaceName, bundleRange, ex);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index c05c7ea946d..efaf038d632 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -46,8 +46,10 @@ import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.admin.impl.NamespacesBase;
 import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -815,13 +817,34 @@ public class Namespaces extends NamespacesBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
                                       @QueryParam("destinationBroker") String 
destinationBroker) {
         validateNamespaceName(tenant, namespace);
-        setNamespaceBundleAffinity(bundleRange, destinationBroker);
-        internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
-                .thenAccept(__ -> {
-                    log.info("[{}] Successfully unloaded namespace bundle {}", 
clientAppId(), bundleRange);
-                    asyncResponse.resume(Response.noContent().build());
-                })
-                .exceptionally(ex -> {
+        pulsar().getLoadManager().get().getAvailableBrokersAsync()
+                .thenApply(brokers ->
+                        StringUtils.isNotBlank(destinationBroker) ? 
brokers.contains(destinationBroker) : true)
+                .thenAccept(isActiveDestination -> {
+                    if (isActiveDestination) {
+                        setNamespaceBundleAffinity(bundleRange, 
destinationBroker);
+                        internalUnloadNamespaceBundleAsync(bundleRange, 
authoritative)
+                                .thenAccept(__ -> {
+                                    log.info("[{}] Successfully unloaded 
namespace bundle {}",
+                                            clientAppId(), bundleRange);
+                                    
asyncResponse.resume(Response.noContent().build());
+                                })
+                                .exceptionally(ex -> {
+                                    if (!isRedirectException(ex)) {
+                                        log.error("[{}] Failed to unload 
namespace bundle {}/{}",
+                                                clientAppId(), namespaceName, 
bundleRange, ex);
+                                    }
+                                    
resumeAsyncResponseExceptionally(asyncResponse, ex);
+                                    return null;
+                                });
+                    } else {
+                        log.warn("[{}] Failed to unload namespace bundle {}/{} 
to inactive broker {}.",
+                                clientAppId(), namespaceName, bundleRange, 
destinationBroker);
+                        resumeAsyncResponseExceptionally(asyncResponse,
+                                new BrokerServiceException.NotAllowedException(
+                                        "Not allowed unload namespace bundle 
to inactive destination broker"));
+                    }
+                }).exceptionally(ex -> {
                     if (!isRedirectException(ex)) {
                         log.error("[{}] Failed to unload namespace bundle 
{}/{}",
                                 clientAppId(), namespaceName, bundleRange, ex);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 0a92e0cc8bc..22200fdbf23 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -733,6 +733,13 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(captor.getValue().getResponse().getLocation().toString(),
                 
UriBuilder.fromUri(uri).host("broker-usc.com").port(8080).toString());
 
+        // check the bundle should not unload to an inactive destination broker
+        namespaces.unloadNamespaceBundle(response, this.testTenant, 
this.testOtherCluster,
+                this.testLocalNamespaces.get(2).getLocalName(), 
"0x00000000_0xffffffff", false, "inactive_destination:8080");
+        captor = ArgumentCaptor.forClass(WebApplicationException.class);
+        verify(response, timeout(5000).atLeast(1)).resume(captor.capture());
+        assertEquals(captor.getValue().getResponse().getStatus(), 
Status.CONFLICT.getStatusCode());
+
         uri = URI.create(pulsar.getWebServiceAddress() + "/admin/namespace/"
                 + this.testGlobalNamespaces.get(0).toString() + 
"/configversion");
         doReturn(uri).when(uriInfo).getRequestUri();

Reply via email to