This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 6abb3027a50 [branch-2.8] Fix delete namespace issue (revert and fix 
#14619) (#15040)
6abb3027a50 is described below

commit 6abb3027a50adea332b56310607cd3a90753909c
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Apr 12 12:23:53 2022 +0200

    [branch-2.8] Fix delete namespace issue (revert and fix #14619) (#15040)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 55 ++++++++--------------
 .../apache/pulsar/broker/admin/NamespacesTest.java |  9 +---
 2 files changed, 22 insertions(+), 42 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 264e800b715..e04b5bc0069 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -287,15 +287,15 @@ public abstract class NamespacesBase extends 
AdminResource {
             return pulsar().getNamespaceService().getNamespaceBundleFactory()
                     .getBundlesAsync(namespaceName).thenCompose(bundles -> {
                         for (NamespaceBundle bundle : bundles.getBundles()) {
-                            // check if the bundle is owned by any broker, if 
not then we do not need to delete
-                            // the bundle
-                            
deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle)
-                                    .thenCompose(ownership -> {
+                            // check if the bundle is owned by any broker,
+                            // if not then we do not need to delete the bundle
+                            deleteBundleFutures.add(
+                                    
pulsar().getNamespaceService().getOwnerAsync(bundle).thenCompose(ownership -> {
                                 if (ownership.isPresent()) {
                                     try {
                                         return 
pulsar().getAdminClient().namespaces()
-                                                
.deleteNamespaceBundleAsync(namespaceName.toString(),
-                                                        
bundle.getBundleRange());
+                                                .deleteNamespaceBundleAsync(
+                                                        
namespaceName.toString(), bundle.getBundleRange());
                                     } catch (PulsarServerException e) {
                                         throw new RestException(e);
                                     }
@@ -307,36 +307,21 @@ public abstract class NamespacesBase extends 
AdminResource {
                         return FutureUtil.waitForAll(deleteBundleFutures);
                     });
         })
-        .handle((result, exception) -> {
-            if (exception != null) {
-                if (exception.getCause() instanceof PulsarAdminException) {
-                    asyncResponse.resume(new 
RestException((PulsarAdminException) exception.getCause()));
-                    return null;
-                } else {
-                    log.error("[{}] Failed to remove owned namespace {}", 
clientAppId(), namespaceName, exception);
-                    asyncResponse.resume(new 
RestException(exception.getCause()));
-                    return null;
-                }
-            }
-
-            try {
-                // we have successfully removed all the ownership for the 
namespace, the policies znode can be deleted
-                // now
-                final String globalZkPolicyPath = path(POLICIES, 
namespaceName.toString());
-                final String localZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, 
namespaceName.toString());
-                namespaceResources().delete(globalZkPolicyPath);
-                try {
-                    getLocalPolicies().delete(localZkPolicyPath);
-                } catch (NotFoundException nne) {
-                    // If the z-node with the modified information is not 
there anymore, we're already good
-                }
-            } catch (Exception e) {
-                log.error("[{}] Failed to remove owned namespace {} from ZK", 
clientAppId(), namespaceName, e);
-                asyncResponse.resume(new RestException(e));
-                return null;
-            }
-
+        .thenCompose(__ -> {
+            // we have successfully removed all the ownership for the 
namespace, the policies znode can be deleted
+            // now
+            final String globalZkPolicyPath = path(POLICIES, 
namespaceName.toString());
+            final String localZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, 
namespaceName.toString());
+            return namespaceResources().deleteAsync(globalZkPolicyPath)
+                    .thenCompose((ignore -> 
getLocalPolicies().deleteAsync(localZkPolicyPath)));
+        })
+        .thenAccept(__ -> {
+            log.info("[{}] Remove namespace successfully {}", clientAppId(), 
namespaceName);
             asyncResponse.resume(Response.noContent().build());
+        })
+        .exceptionally(ex -> {
+            log.error("[{}] Failed to remove namespace {}", clientAppId(), 
namespaceName, ex.getCause());
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
             return null;
         });
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index fb23dd5765a..3b27ff787b1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -805,7 +805,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                 return bundle.getNamespaceObject().equals(testNs);
             }
         }));
-        doReturn(CompletableFuture.completedFuture(Optional.of(new 
NamespaceEphemeralData()))).when(nsSvc)
+        
doReturn(CompletableFuture.completedFuture(Optional.of(mock(NamespaceEphemeralData.class)))).when(nsSvc)
                 .getOwnerAsync(Mockito.argThat(new 
ArgumentMatcher<NamespaceBundle>() {
                     @Override
                     public boolean matches(NamespaceBundle bundle) {
@@ -836,15 +836,12 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         ArgumentCaptor<RestException> captor = 
ArgumentCaptor.forClass(RestException.class);
         verify(response, timeout(5000).times(1)).resume(captor.capture());
         assertEquals(captor.getValue().getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
-
-
         // make one bundle owned
         LookupOptions optionsHttps = 
LookupOptions.builder().authoritative(false).requestHttps(true).readOnly(false).build();
         
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0),
 optionsHttps);
         
doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0));
         
doReturn(CompletableFuture.completedFuture(null)).when(namespacesAdmin).deleteNamespaceBundleAsync(
                 testTenant + "/" + testLocalCluster + "/" + bundledNsLocal, 
"0x00000000_0x80000000");
-
         try {
             namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, 
bundledNsLocal, "0x80000000_0xffffffff",
                     false, false);
@@ -853,13 +850,11 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
             assertEquals(re.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
         }
         response = mock(AsyncResponse.class);
+        namespaces.deleteNamespace(response, testTenant, testLocalCluster, 
bundledNsLocal, false, false);
         
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class),
 any(LookupOptions.class));
-
-        // ensure all three bundles are owned by the local broker
         for (NamespaceBundle bundle : nsBundles.getBundles()) {
             doReturn(true).when(nsSvc).isServiceUnitOwned(bundle);
         }
-        namespaces.deleteNamespace(response, testTenant, testLocalCluster, 
bundledNsLocal, false, false);
         ArgumentCaptor<Response> captor2 = 
ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(captor2.capture());
         assertEquals(captor2.getValue().getStatus(), 
Status.NO_CONTENT.getStatusCode());

Reply via email to