This is an automated email from the ASF dual-hosted git repository.
huangqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 299bd70fdfa [fix][broker] Avoid bundle unload destination broker be
set as an inactive broker. (#19244)
299bd70fdfa is described below
commit 299bd70fdfa023768e94a8ee4347d39337b6cbd4
Author: lixinyang <[email protected]>
AuthorDate: Tue Jan 17 17:14:26 2023 +0800
[fix][broker] Avoid bundle unload destination broker be set as an inactive
broker. (#19244)
Co-authored-by: nicklixinyang <[email protected]>
---
.../apache/pulsar/broker/admin/v1/Namespaces.java | 36 +++++++++++++++++----
.../apache/pulsar/broker/admin/v2/Namespaces.java | 37 ++++++++++++++++++----
.../apache/pulsar/broker/admin/NamespacesTest.java | 7 ++++
3 files changed, 66 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index fd67de41172..c13441db3df 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -46,6 +46,7 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
@@ -890,13 +891,34 @@ public class Namespaces extends NamespacesBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(property, cluster, namespace);
- setNamespaceBundleAffinity(bundleRange, destinationBroker);
- internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
- .thenAccept(__ -> {
- log.info("[{}] Successfully unloaded namespace bundle {}",
clientAppId(), bundleRange);
- asyncResponse.resume(Response.noContent().build());
- })
- .exceptionally(ex -> {
+ pulsar().getLoadManager().get().getAvailableBrokersAsync()
+ .thenApply(brokers ->
+ StringUtils.isNotBlank(destinationBroker) ?
brokers.contains(destinationBroker) : true)
+ .thenAccept(isActiveDestination -> {
+ if (isActiveDestination) {
+ setNamespaceBundleAffinity(bundleRange,
destinationBroker);
+ internalUnloadNamespaceBundleAsync(bundleRange,
authoritative)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully unloaded
namespace bundle {}",
+ clientAppId(), bundleRange);
+
asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to unload
namespace bundle {}/{}",
+ clientAppId(), namespaceName,
bundleRange, ex);
+ }
+
resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } else {
+ log.warn("[{}] Failed to unload namespace bundle {}/{}
to inactive broker {}.",
+ clientAppId(), namespaceName, bundleRange,
destinationBroker);
+ resumeAsyncResponseExceptionally(asyncResponse,
+ new BrokerServiceException.NotAllowedException(
+ "Not allowed unload namespace bundle
to inactive destination broker"));
+ }
+ }).exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to unload namespace bundle
{}/{}",
clientAppId(), namespaceName, bundleRange, ex);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index c05c7ea946d..efaf038d632 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -46,8 +46,10 @@ import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
@@ -815,13 +817,34 @@ public class Namespaces extends NamespacesBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@QueryParam("destinationBroker") String
destinationBroker) {
validateNamespaceName(tenant, namespace);
- setNamespaceBundleAffinity(bundleRange, destinationBroker);
- internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
- .thenAccept(__ -> {
- log.info("[{}] Successfully unloaded namespace bundle {}",
clientAppId(), bundleRange);
- asyncResponse.resume(Response.noContent().build());
- })
- .exceptionally(ex -> {
+ pulsar().getLoadManager().get().getAvailableBrokersAsync()
+ .thenApply(brokers ->
+ StringUtils.isNotBlank(destinationBroker) ?
brokers.contains(destinationBroker) : true)
+ .thenAccept(isActiveDestination -> {
+ if (isActiveDestination) {
+ setNamespaceBundleAffinity(bundleRange,
destinationBroker);
+ internalUnloadNamespaceBundleAsync(bundleRange,
authoritative)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully unloaded
namespace bundle {}",
+ clientAppId(), bundleRange);
+
asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to unload
namespace bundle {}/{}",
+ clientAppId(), namespaceName,
bundleRange, ex);
+ }
+
resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } else {
+ log.warn("[{}] Failed to unload namespace bundle {}/{}
to inactive broker {}.",
+ clientAppId(), namespaceName, bundleRange,
destinationBroker);
+ resumeAsyncResponseExceptionally(asyncResponse,
+ new BrokerServiceException.NotAllowedException(
+ "Not allowed unload namespace bundle
to inactive destination broker"));
+ }
+ }).exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to unload namespace bundle
{}/{}",
clientAppId(), namespaceName, bundleRange, ex);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 0a92e0cc8bc..22200fdbf23 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -733,6 +733,13 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
assertEquals(captor.getValue().getResponse().getLocation().toString(),
UriBuilder.fromUri(uri).host("broker-usc.com").port(8080).toString());
+ // check the bundle should not unload to an inactive destination broker
+ namespaces.unloadNamespaceBundle(response, this.testTenant,
this.testOtherCluster,
+ this.testLocalNamespaces.get(2).getLocalName(),
"0x00000000_0xffffffff", false, "inactive_destination:8080");
+ captor = ArgumentCaptor.forClass(WebApplicationException.class);
+ verify(response, timeout(5000).atLeast(1)).resume(captor.capture());
+ assertEquals(captor.getValue().getResponse().getStatus(),
Status.CONFLICT.getStatusCode());
+
uri = URI.create(pulsar.getWebServiceAddress() + "/admin/namespace/"
+ this.testGlobalNamespaces.get(0).toString() +
"/configversion");
doReturn(uri).when(uriInfo).getRequestUri();