This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eedf7026ae6 [improve][broker] PIP-192: Support delete namespace bundle
admin API (#19851)
eedf7026ae6 is described below
commit eedf7026ae60f39bcf74ce67728b47d966fe237f
Author: Kai Wang <[email protected]>
AuthorDate: Wed Mar 29 10:23:10 2023 +0800
[improve][broker] PIP-192: Support delete namespace bundle admin API
(#19851)
PIP: https://github.com/apache/pulsar/issues/16691
### Motivation
Raising a PR to implement https://github.com/apache/pulsar/issues/16691.
We need to support delete namespace bundle admin API.
### Modifications
* Support delete namespace bundle admin API.
* Add units test.
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 12 +++---
.../extensions/ExtensibleLoadManagerImpl.java | 11 ++++-
.../extensions/data/BrokerLookupData.java | 6 +++
.../pulsar/broker/namespace/NamespaceService.java | 47 ++++++++++++++++----
.../extensions/ExtensibleLoadManagerImplTest.java | 50 ++++++++++++++++++++++
5 files changed, 111 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index cffc94b1892..19f8d7c437d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -297,11 +297,11 @@ public abstract class NamespacesBase extends
AdminResource {
.thenCompose(ignore -> pulsar().getNamespaceService()
.getNamespaceBundleFactory().getBundlesAsync(namespaceName))
.thenCompose(bundles ->
FutureUtil.waitForAll(bundles.getBundles().stream()
- .map(bundle ->
pulsar().getNamespaceService().getOwnerAsync(bundle)
- .thenCompose(owner -> {
+ .map(bundle ->
pulsar().getNamespaceService().checkOwnershipPresentAsync(bundle)
+ .thenCompose(present -> {
// check if the bundle is owned by any
broker,
// if not then we do not need to delete
the bundle
- if (owner.isPresent()) {
+ if (present) {
PulsarAdmin admin;
try {
admin = pulsar().getAdminClient();
@@ -1411,7 +1411,7 @@ public abstract class NamespacesBase extends
AdminResource {
.getBundles(namespaceName);
for (NamespaceBundle nsBundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then
there is no backlog on this bundle to clear
- if
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+ if
(pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
futures.add(pulsar().getAdminClient().namespaces()
.clearNamespaceBundleBacklogAsync(namespaceName.toString(),
nsBundle.getBundleRange()));
}
@@ -1476,7 +1476,7 @@ public abstract class NamespacesBase extends
AdminResource {
.getBundles(namespaceName);
for (NamespaceBundle nsBundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then
there is no backlog on this bundle to clear
- if
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+ if
(pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
futures.add(pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscriptionAsync(
namespaceName.toString(),
nsBundle.getBundleRange(), subscription));
}
@@ -1543,7 +1543,7 @@ public abstract class NamespacesBase extends
AdminResource {
.getBundles(namespaceName);
for (NamespaceBundle nsBundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then
there are no subscriptions
- if
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+ if
(pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync(
namespaceName.toString(),
nsBundle.getBundleRange(), subscription));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index fedcad4d009..7aefef596e7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -383,7 +383,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
.thenApply(broker ->
brokerRegistry.getBrokerId().equals(broker.orElse(null)));
}
- private CompletableFuture<Optional<String>>
getOwnershipAsync(Optional<ServiceUnitId> topic,
+ public CompletableFuture<Optional<String>>
getOwnershipAsync(Optional<ServiceUnitId> topic,
ServiceUnitId
bundleUnit) {
final String bundle = bundleUnit.toString();
CompletableFuture<Optional<String>> owner;
@@ -395,6 +395,15 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
return owner;
}
+ public CompletableFuture<Optional<BrokerLookupData>>
getOwnershipWithLookupDataAsync(ServiceUnitId bundleUnit) {
+ return getOwnershipAsync(Optional.empty(),
bundleUnit).thenCompose(broker -> {
+ if (broker.isEmpty()) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+ return getBrokerRegistry().lookupAsync(broker.get());
+ });
+ }
+
public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId
bundle,
Optional<String>
destinationBroker) {
return getOwnershipAsync(Optional.empty(), bundle)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
index 504ae13003e..4c9e503129e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.loadbalance.extensions.data;
import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
@@ -70,4 +71,9 @@ public record BrokerLookupData (String webServiceUrl,
return new LookupResult(webServiceUrl, webServiceUrlTls,
pulsarServiceUrl, pulsarServiceUrlTls,
LookupResult.Type.BrokerUrl, false);
}
+
+ public NamespaceEphemeralData toNamespaceEphemeralData() {
+ return new NamespaceEphemeralData(pulsarServiceUrl,
pulsarServiceUrlTls, webServiceUrl, webServiceUrlTls,
+ false, advertisedListeners);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index de18d50f3e1..d092ef04018 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1165,8 +1165,14 @@ public class NamespaceService implements AutoCloseable {
}
public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle
nsBundle) {
- return ownershipCache.removeOwnership(nsBundle)
- .thenRun(() ->
bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject()));
+ CompletableFuture<Void> future;
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
+ future =
extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty());
+ } else {
+ future = ownershipCache.removeOwnership(nsBundle);
+ }
+ return future.thenRun(() ->
bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject()));
}
protected void onNamespaceBundleOwned(NamespaceBundle bundle) {
@@ -1445,15 +1451,40 @@ public class NamespaceService implements AutoCloseable {
});
}
- public Optional<NamespaceEphemeralData> getOwner(NamespaceBundle bundle)
throws Exception {
- // if there is no znode for the service unit, it is not owned by any
broker
- return
getOwnerAsync(bundle).get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
SECONDS);
- }
-
public CompletableFuture<Optional<NamespaceEphemeralData>>
getOwnerAsync(NamespaceBundle bundle) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
+ return
extensibleLoadManager.getOwnershipWithLookupDataAsync(bundle)
+ .thenCompose(lookupData -> {
+ if (lookupData.isPresent()) {
+ return CompletableFuture.completedFuture(
+
Optional.of(lookupData.get().toNamespaceEphemeralData()));
+ } else {
+ return
CompletableFuture.completedFuture(Optional.empty());
+ }
+ });
+ }
return ownershipCache.getOwnerAsync(bundle);
}
+ public boolean checkOwnershipPresent(NamespaceBundle bundle) throws
Exception {
+ return checkOwnershipPresentAsync(bundle).get(pulsar.getConfiguration()
+ .getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+ }
+
+ public CompletableFuture<Boolean>
checkOwnershipPresentAsync(NamespaceBundle bundle) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (bundle.getNamespaceObject().equals(SYSTEM_NAMESPACE)) {
+ return FutureUtil.failedFuture(new
UnsupportedOperationException(
+ "Ownership check for system namespace is not
supported"));
+ }
+ ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
+ return extensibleLoadManager.getOwnershipAsync(Optional.empty(),
bundle)
+ .thenApply(Optional::isPresent);
+ }
+ return getOwnerAsync(bundle).thenApply(Optional::isPresent);
+ }
+
public void unloadSLANamespace() throws Exception {
PulsarAdmin adminClient = null;
NamespaceName namespaceName = getSLAMonitorNamespace(host, config);
@@ -1461,7 +1492,7 @@ public class NamespaceService implements AutoCloseable {
LOG.info("Checking owner for SLA namespace {}", namespaceName);
NamespaceBundle nsFullBundle = getFullBundle(namespaceName);
- if (!getOwner(nsFullBundle).isPresent()) {
+ if (!checkOwnershipPresent(nsFullBundle)) {
// No one owns the namespace so no point trying to unload it
// Next lookup will assign the bundle to this broker.
return;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index d977e633305..f8a7a9b629f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -134,6 +134,7 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
@BeforeClass
@Override
public void setup() throws Exception {
+ conf.setForceDeleteNamespaceAllowed(true);
conf.setAllowAutoTopicCreation(true);
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
@@ -142,6 +143,7 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
pulsar1 = pulsar;
ServiceConfiguration defaultConf = getDefaultConf();
defaultConf.setAllowAutoTopicCreation(true);
+ defaultConf.setForceDeleteNamespaceAllowed(true);
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
defaultConf.setLoadBalancerSheddingEnabled(false);
@@ -433,6 +435,54 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
assertTrue(bundlesData.getBoundaries().contains(midBundle));
assertTrue(bundlesData.getBoundaries().contains(highBundle));
}
+ @Test(timeOut = 30 * 1000)
+ public void testDeleteNamespaceBundle() throws Exception {
+ TopicName topicName = TopicName.get("test-delete-namespace-bundle");
+ NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+
+ String broker = admin.lookups().lookupTopic(topicName.toString());
+ log.info("Assign the bundle {} to {}", bundle, broker);
+
+ checkOwnershipState(broker, bundle);
+
+ admin.namespaces().deleteNamespaceBundle(topicName.getNamespace(),
bundle.getBundleRange());
+ assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testDeleteNamespace() throws Exception {
+ String namespace = "public/test-delete-namespace";
+ TopicName topicName = TopicName.get(namespace +
"/test-delete-namespace-topic");
+ admin.namespaces().createNamespace(namespace);
+ admin.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet(this.conf.getClusterName()));
+
assertTrue(admin.namespaces().getNamespaces("public").contains(namespace));
+ admin.topics().createPartitionedTopic(topicName.toString(), 2);
+ admin.lookups().lookupTopic(topicName.toString());
+ NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+ try {
+ admin.namespaces().deleteNamespaceBundle(namespace,
bundle.getBundleRange());
+ fail();
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("Cannot delete non empty
bundle"));
+ }
+ admin.namespaces().deleteNamespaceBundle(namespace,
bundle.getBundleRange(), true);
+ admin.lookups().lookupTopic(topicName.toString());
+
+ admin.namespaces().deleteNamespace(namespace, true);
+
assertFalse(admin.namespaces().getNamespaces("public").contains(namespace));
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testCheckOwnershipPresentWithSystemNamespace() throws
Exception {
+ NamespaceBundle namespaceBundle =
+ getBundleAsync(pulsar1,
TopicName.get(NamespaceName.SYSTEM_NAMESPACE + "/test")).get();
+ try {
+
pulsar1.getNamespaceService().checkOwnershipPresent(namespaceBundle);
+ } catch (Exception ex) {
+ log.info("Got exception", ex);
+ assertTrue(ex.getCause() instanceof UnsupportedOperationException);
+ }
+ }
@Test
public void testMoreThenOneFilter() throws Exception {