This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 6abb3027a50 [branch-2.8] Fix delete namespace issue (revert and fix
#14619) (#15040)
6abb3027a50 is described below
commit 6abb3027a50adea332b56310607cd3a90753909c
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Apr 12 12:23:53 2022 +0200
[branch-2.8] Fix delete namespace issue (revert and fix #14619) (#15040)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 55 ++++++++--------------
.../apache/pulsar/broker/admin/NamespacesTest.java | 9 +---
2 files changed, 22 insertions(+), 42 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 264e800b715..e04b5bc0069 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -287,15 +287,15 @@ public abstract class NamespacesBase extends
AdminResource {
return pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundlesAsync(namespaceName).thenCompose(bundles -> {
for (NamespaceBundle bundle : bundles.getBundles()) {
- // check if the bundle is owned by any broker, if
not then we do not need to delete
- // the bundle
-
deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle)
- .thenCompose(ownership -> {
+ // check if the bundle is owned by any broker,
+ // if not then we do not need to delete the bundle
+ deleteBundleFutures.add(
+
pulsar().getNamespaceService().getOwnerAsync(bundle).thenCompose(ownership -> {
if (ownership.isPresent()) {
try {
return
pulsar().getAdminClient().namespaces()
-
.deleteNamespaceBundleAsync(namespaceName.toString(),
-
bundle.getBundleRange());
+ .deleteNamespaceBundleAsync(
+
namespaceName.toString(), bundle.getBundleRange());
} catch (PulsarServerException e) {
throw new RestException(e);
}
@@ -307,36 +307,21 @@ public abstract class NamespacesBase extends
AdminResource {
return FutureUtil.waitForAll(deleteBundleFutures);
});
})
- .handle((result, exception) -> {
- if (exception != null) {
- if (exception.getCause() instanceof PulsarAdminException) {
- asyncResponse.resume(new
RestException((PulsarAdminException) exception.getCause()));
- return null;
- } else {
- log.error("[{}] Failed to remove owned namespace {}",
clientAppId(), namespaceName, exception);
- asyncResponse.resume(new
RestException(exception.getCause()));
- return null;
- }
- }
-
- try {
- // we have successfully removed all the ownership for the
namespace, the policies znode can be deleted
- // now
- final String globalZkPolicyPath = path(POLICIES,
namespaceName.toString());
- final String localZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT,
namespaceName.toString());
- namespaceResources().delete(globalZkPolicyPath);
- try {
- getLocalPolicies().delete(localZkPolicyPath);
- } catch (NotFoundException nne) {
- // If the z-node with the modified information is not
there anymore, we're already good
- }
- } catch (Exception e) {
- log.error("[{}] Failed to remove owned namespace {} from ZK",
clientAppId(), namespaceName, e);
- asyncResponse.resume(new RestException(e));
- return null;
- }
-
+ .thenCompose(__ -> {
+ // we have successfully removed all the ownership for the
namespace, the policies znode can be deleted
+ // now
+ final String globalZkPolicyPath = path(POLICIES,
namespaceName.toString());
+ final String localZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT,
namespaceName.toString());
+ return namespaceResources().deleteAsync(globalZkPolicyPath)
+ .thenCompose((ignore ->
getLocalPolicies().deleteAsync(localZkPolicyPath)));
+ })
+ .thenAccept(__ -> {
+ log.info("[{}] Remove namespace successfully {}", clientAppId(),
namespaceName);
asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to remove namespace {}", clientAppId(),
namespaceName, ex.getCause());
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index fb23dd5765a..3b27ff787b1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -805,7 +805,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
return bundle.getNamespaceObject().equals(testNs);
}
}));
- doReturn(CompletableFuture.completedFuture(Optional.of(new
NamespaceEphemeralData()))).when(nsSvc)
+
doReturn(CompletableFuture.completedFuture(Optional.of(mock(NamespaceEphemeralData.class)))).when(nsSvc)
.getOwnerAsync(Mockito.argThat(new
ArgumentMatcher<NamespaceBundle>() {
@Override
public boolean matches(NamespaceBundle bundle) {
@@ -836,15 +836,12 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
ArgumentCaptor<RestException> captor =
ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
-
-
// make one bundle owned
LookupOptions optionsHttps =
LookupOptions.builder().authoritative(false).requestHttps(true).readOnly(false).build();
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0),
optionsHttps);
doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0));
doReturn(CompletableFuture.completedFuture(null)).when(namespacesAdmin).deleteNamespaceBundleAsync(
testTenant + "/" + testLocalCluster + "/" + bundledNsLocal,
"0x00000000_0x80000000");
-
try {
namespaces.deleteNamespaceBundle(testTenant, testLocalCluster,
bundledNsLocal, "0x80000000_0xffffffff",
false, false);
@@ -853,13 +850,11 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
assertEquals(re.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
}
response = mock(AsyncResponse.class);
+ namespaces.deleteNamespace(response, testTenant, testLocalCluster,
bundledNsLocal, false, false);
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class),
any(LookupOptions.class));
-
- // ensure all three bundles are owned by the local broker
for (NamespaceBundle bundle : nsBundles.getBundles()) {
doReturn(true).when(nsSvc).isServiceUnitOwned(bundle);
}
- namespaces.deleteNamespace(response, testTenant, testLocalCluster,
bundledNsLocal, false, false);
ArgumentCaptor<Response> captor2 =
ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(captor2.capture());
assertEquals(captor2.getValue().getStatus(),
Status.NO_CONTENT.getStatusCode());