This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new eedf7026ae6 [improve][broker] PIP-192: Support delete namespace bundle 
admin API (#19851)
eedf7026ae6 is described below

commit eedf7026ae60f39bcf74ce67728b47d966fe237f
Author: Kai Wang <[email protected]>
AuthorDate: Wed Mar 29 10:23:10 2023 +0800

    [improve][broker] PIP-192: Support delete namespace bundle admin API 
(#19851)
    
    PIP: https://github.com/apache/pulsar/issues/16691
    
    ### Motivation
    Raising a PR to implement https://github.com/apache/pulsar/issues/16691.
    
    We need to support delete namespace bundle admin API.
    
    ### Modifications
    
    * Support delete namespace bundle admin API.
    * Add units test.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 12 +++---
 .../extensions/ExtensibleLoadManagerImpl.java      | 11 ++++-
 .../extensions/data/BrokerLookupData.java          |  6 +++
 .../pulsar/broker/namespace/NamespaceService.java  | 47 ++++++++++++++++----
 .../extensions/ExtensibleLoadManagerImplTest.java  | 50 ++++++++++++++++++++++
 5 files changed, 111 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index cffc94b1892..19f8d7c437d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -297,11 +297,11 @@ public abstract class NamespacesBase extends 
AdminResource {
                 .thenCompose(ignore -> pulsar().getNamespaceService()
                         
.getNamespaceBundleFactory().getBundlesAsync(namespaceName))
                 .thenCompose(bundles -> 
FutureUtil.waitForAll(bundles.getBundles().stream()
-                        .map(bundle -> 
pulsar().getNamespaceService().getOwnerAsync(bundle)
-                                .thenCompose(owner -> {
+                        .map(bundle -> 
pulsar().getNamespaceService().checkOwnershipPresentAsync(bundle)
+                                .thenCompose(present -> {
                                     // check if the bundle is owned by any 
broker,
                                     // if not then we do not need to delete 
the bundle
-                                    if (owner.isPresent()) {
+                                    if (present) {
                                         PulsarAdmin admin;
                                         try {
                                             admin = pulsar().getAdminClient();
@@ -1411,7 +1411,7 @@ public abstract class NamespacesBase extends 
AdminResource {
                     .getBundles(namespaceName);
             for (NamespaceBundle nsBundle : bundles.getBundles()) {
                 // check if the bundle is owned by any broker, if not then 
there is no backlog on this bundle to clear
-                if 
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+                if 
(pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
                     futures.add(pulsar().getAdminClient().namespaces()
                             
.clearNamespaceBundleBacklogAsync(namespaceName.toString(), 
nsBundle.getBundleRange()));
                 }
@@ -1476,7 +1476,7 @@ public abstract class NamespacesBase extends 
AdminResource {
                     .getBundles(namespaceName);
             for (NamespaceBundle nsBundle : bundles.getBundles()) {
                 // check if the bundle is owned by any broker, if not then 
there is no backlog on this bundle to clear
-                if 
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+                if 
(pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
                     
futures.add(pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscriptionAsync(
                             namespaceName.toString(), 
nsBundle.getBundleRange(), subscription));
                 }
@@ -1543,7 +1543,7 @@ public abstract class NamespacesBase extends 
AdminResource {
                     .getBundles(namespaceName);
             for (NamespaceBundle nsBundle : bundles.getBundles()) {
                 // check if the bundle is owned by any broker, if not then 
there are no subscriptions
-                if 
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+                if 
(pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
                     
futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync(
                             namespaceName.toString(), 
nsBundle.getBundleRange(), subscription));
                 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index fedcad4d009..7aefef596e7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -383,7 +383,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                 .thenApply(broker -> 
brokerRegistry.getBrokerId().equals(broker.orElse(null)));
     }
 
-    private CompletableFuture<Optional<String>> 
getOwnershipAsync(Optional<ServiceUnitId> topic,
+    public CompletableFuture<Optional<String>> 
getOwnershipAsync(Optional<ServiceUnitId> topic,
                                                                  ServiceUnitId 
bundleUnit) {
         final String bundle = bundleUnit.toString();
         CompletableFuture<Optional<String>> owner;
@@ -395,6 +395,15 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         return owner;
     }
 
+    public CompletableFuture<Optional<BrokerLookupData>> 
getOwnershipWithLookupDataAsync(ServiceUnitId bundleUnit) {
+        return getOwnershipAsync(Optional.empty(), 
bundleUnit).thenCompose(broker -> {
+            if (broker.isEmpty()) {
+                return CompletableFuture.completedFuture(Optional.empty());
+            }
+            return getBrokerRegistry().lookupAsync(broker.get());
+        });
+    }
+
     public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId 
bundle,
                                                               Optional<String> 
destinationBroker) {
         return getOwnershipAsync(Optional.empty(), bundle)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
index 504ae13003e..4c9e503129e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.loadbalance.extensions.data;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 
@@ -70,4 +71,9 @@ public record BrokerLookupData (String webServiceUrl,
         return new LookupResult(webServiceUrl, webServiceUrlTls, 
pulsarServiceUrl, pulsarServiceUrlTls,
                 LookupResult.Type.BrokerUrl, false);
     }
+
+    public NamespaceEphemeralData toNamespaceEphemeralData() {
+        return new NamespaceEphemeralData(pulsarServiceUrl, 
pulsarServiceUrlTls, webServiceUrl, webServiceUrlTls,
+                false, advertisedListeners);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index de18d50f3e1..d092ef04018 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1165,8 +1165,14 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle 
nsBundle) {
-        return ownershipCache.removeOwnership(nsBundle)
-                .thenRun(() -> 
bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject()));
+        CompletableFuture<Void> future;
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            ExtensibleLoadManagerImpl extensibleLoadManager = 
ExtensibleLoadManagerImpl.get(loadManager.get());
+            future = 
extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty());
+        } else {
+            future = ownershipCache.removeOwnership(nsBundle);
+        }
+        return future.thenRun(() -> 
bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject()));
     }
 
     protected void onNamespaceBundleOwned(NamespaceBundle bundle) {
@@ -1445,15 +1451,40 @@ public class NamespaceService implements AutoCloseable {
         });
     }
 
-    public Optional<NamespaceEphemeralData> getOwner(NamespaceBundle bundle) 
throws Exception {
-        // if there is no znode for the service unit, it is not owned by any 
broker
-        return 
getOwnerAsync(bundle).get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
 SECONDS);
-    }
-
     public CompletableFuture<Optional<NamespaceEphemeralData>> 
getOwnerAsync(NamespaceBundle bundle) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            ExtensibleLoadManagerImpl extensibleLoadManager = 
ExtensibleLoadManagerImpl.get(loadManager.get());
+            return 
extensibleLoadManager.getOwnershipWithLookupDataAsync(bundle)
+                    .thenCompose(lookupData -> {
+                        if (lookupData.isPresent()) {
+                            return CompletableFuture.completedFuture(
+                                    
Optional.of(lookupData.get().toNamespaceEphemeralData()));
+                        } else {
+                            return 
CompletableFuture.completedFuture(Optional.empty());
+                        }
+                    });
+        }
         return ownershipCache.getOwnerAsync(bundle);
     }
 
+    public boolean checkOwnershipPresent(NamespaceBundle bundle) throws 
Exception {
+        return checkOwnershipPresentAsync(bundle).get(pulsar.getConfiguration()
+                        .getMetadataStoreOperationTimeoutSeconds(), SECONDS);
+    }
+
+    public CompletableFuture<Boolean> 
checkOwnershipPresentAsync(NamespaceBundle bundle) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            if (bundle.getNamespaceObject().equals(SYSTEM_NAMESPACE)) {
+                return FutureUtil.failedFuture(new 
UnsupportedOperationException(
+                        "Ownership check for system namespace is not 
supported"));
+            }
+            ExtensibleLoadManagerImpl extensibleLoadManager = 
ExtensibleLoadManagerImpl.get(loadManager.get());
+            return extensibleLoadManager.getOwnershipAsync(Optional.empty(), 
bundle)
+                    .thenApply(Optional::isPresent);
+        }
+        return getOwnerAsync(bundle).thenApply(Optional::isPresent);
+    }
+
     public void unloadSLANamespace() throws Exception {
         PulsarAdmin adminClient = null;
         NamespaceName namespaceName = getSLAMonitorNamespace(host, config);
@@ -1461,7 +1492,7 @@ public class NamespaceService implements AutoCloseable {
         LOG.info("Checking owner for SLA namespace {}", namespaceName);
 
         NamespaceBundle nsFullBundle = getFullBundle(namespaceName);
-        if (!getOwner(nsFullBundle).isPresent()) {
+        if (!checkOwnershipPresent(nsFullBundle)) {
             // No one owns the namespace so no point trying to unload it
             // Next lookup will assign the bundle to this broker.
             return;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index d977e633305..f8a7a9b629f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -134,6 +134,7 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
     @BeforeClass
     @Override
     public void setup() throws Exception {
+        conf.setForceDeleteNamespaceAllowed(true);
         conf.setAllowAutoTopicCreation(true);
         
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
         
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
@@ -142,6 +143,7 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         pulsar1 = pulsar;
         ServiceConfiguration defaultConf = getDefaultConf();
         defaultConf.setAllowAutoTopicCreation(true);
+        defaultConf.setForceDeleteNamespaceAllowed(true);
         
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
         
defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
         defaultConf.setLoadBalancerSheddingEnabled(false);
@@ -433,6 +435,54 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         assertTrue(bundlesData.getBoundaries().contains(midBundle));
         assertTrue(bundlesData.getBoundaries().contains(highBundle));
     }
+    @Test(timeOut = 30 * 1000)
+    public void testDeleteNamespaceBundle() throws Exception {
+        TopicName topicName = TopicName.get("test-delete-namespace-bundle");
+        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+
+        String broker = admin.lookups().lookupTopic(topicName.toString());
+        log.info("Assign the bundle {} to {}", bundle, broker);
+
+        checkOwnershipState(broker, bundle);
+
+        admin.namespaces().deleteNamespaceBundle(topicName.getNamespace(), 
bundle.getBundleRange());
+        assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testDeleteNamespace() throws Exception {
+        String namespace = "public/test-delete-namespace";
+        TopicName topicName = TopicName.get(namespace + 
"/test-delete-namespace-topic");
+        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet(this.conf.getClusterName()));
+        
assertTrue(admin.namespaces().getNamespaces("public").contains(namespace));
+        admin.topics().createPartitionedTopic(topicName.toString(), 2);
+        admin.lookups().lookupTopic(topicName.toString());
+        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+        try {
+            admin.namespaces().deleteNamespaceBundle(namespace, 
bundle.getBundleRange());
+            fail();
+        } catch (Exception ex) {
+            assertTrue(ex.getMessage().contains("Cannot delete non empty 
bundle"));
+        }
+        admin.namespaces().deleteNamespaceBundle(namespace, 
bundle.getBundleRange(), true);
+        admin.lookups().lookupTopic(topicName.toString());
+
+        admin.namespaces().deleteNamespace(namespace, true);
+        
assertFalse(admin.namespaces().getNamespaces("public").contains(namespace));
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testCheckOwnershipPresentWithSystemNamespace() throws 
Exception {
+        NamespaceBundle namespaceBundle =
+                getBundleAsync(pulsar1, 
TopicName.get(NamespaceName.SYSTEM_NAMESPACE + "/test")).get();
+        try {
+            
pulsar1.getNamespaceService().checkOwnershipPresent(namespaceBundle);
+        } catch (Exception ex) {
+            log.info("Got exception", ex);
+            assertTrue(ex.getCause() instanceof UnsupportedOperationException);
+        }
+    }
 
     @Test
     public void testMoreThenOneFilter() throws Exception {

Reply via email to