This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2554db41ca346d032a20d436600db5e97d1e5b14 Author: Jiwei Guo <[email protected]> AuthorDate: Tue Feb 15 22:49:17 2022 +0800 Fix validateGlobalNamespaceOwnership wrap exception issue. (#14269) ### Motivation When Rest API call `AdminResource#validateGlobalNamespaceOwnership`, broker will execute `PulsarWebResource#checkLocalOrGetPeerReplicationCluster`. In `PulsarWebResource#checkLocalOrGetPeerReplicationCluster`: https://github.com/apache/pulsar/blob/6d717a08ef8cfcac032caee06105285594baf09f/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java#L773-L802 Line 780, 794, and 801 has thrown RestException. But `validateGlobalNamespaceOwnership ` has wrapped the exception : https://github.com/apache/pulsar/blob/6d717a08ef8cfcac032caee06105285594baf09f/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java#L202-L216 This could make the user confused that the log printed is not matched with the REST API. (cherry picked from commit 18d9f1b88c4ab8b3deb11b966a425da58ebd932c) --- .../apache/pulsar/broker/admin/AdminResource.java | 5 +--- .../pulsar/broker/web/PulsarWebResource.java | 4 +-- .../pulsar/broker/admin/PersistentTopicsTest.java | 30 ++++++++++++++++++++++ 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 242f3a3..547665f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -197,10 +197,7 @@ public abstract class AdminResource extends PulsarWebResource { } catch (IllegalArgumentException e) { throw new RestException(Status.PRECONDITION_FAILED, "Tenant name or namespace is not valid"); } catch (RestException re) { - if (re.getResponse().getStatus() == Status.NOT_FOUND.getStatusCode()) { - throw new RestException(Status.NOT_FOUND, "Namespace not found"); - } - throw new RestException(Status.PRECONDITION_FAILED, "Namespace does not have any clusters configured"); + throw re; } catch (Exception e) { log.warn("Failed to validate global cluster configuration : ns={} emsg={}", namespaceName, e.getMessage()); throw new RestException(Status.SERVICE_UNAVAILABLE, "Failed to validate global cluster configuration"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 696e3d1..8b6f30b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -740,9 +740,9 @@ public abstract class PulsarWebResource { validationFuture.complete(null); } } else { - String msg = String.format("Policies not found for %s namespace", namespace.toString()); + String msg = String.format("Namespace %s not found", namespace.toString()); log.warn(msg); - validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND, msg)); + validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Namespace not found")); } }).exceptionally(ex -> { String msg = String.format("Failed to validate global cluster configuration : cluster=%s ns=%s emsg=%s", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index e41db38..00c4683 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -52,6 +52,7 @@ import org.apache.pulsar.broker.admin.v2.NonPersistentTopics; import org.apache.pulsar.broker.admin.v2.PersistentTopics; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; +import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.resources.TopicResources; import org.apache.pulsar.broker.service.BrokerService; @@ -100,6 +101,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { protected Field uriField; protected UriInfo uriInfo; private NonPersistentTopics nonPersistentTopic; + private NamespaceResources namespaceResources; @BeforeClass public void initPersistentTopics() throws Exception { @@ -125,6 +127,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { nonPersistentTopic = spy(new NonPersistentTopics()); nonPersistentTopic.setServletContext(new MockServletContext()); nonPersistentTopic.setPulsar(pulsar); + namespaceResources = mock(NamespaceResources.class); doReturn(false).when(nonPersistentTopic).isRequestHttps(); doReturn(null).when(nonPersistentTopic).originalPrincipal(); doReturn("test").when(nonPersistentTopic).clientAppId(); @@ -406,6 +409,33 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(metadata.partitions, 0); } + @Test + public void testCreateTopicWithReplicationCluster() { + final String topicName = "test-topic-ownership"; + NamespaceName namespaceName = NamespaceName.get(testTenant, testNamespace); + CompletableFuture<Optional<Policies>> policyFuture = new CompletableFuture<>(); + Policies policies = new Policies(); + policyFuture.complete(Optional.of(policies)); + when(pulsar.getPulsarResources().getNamespaceResources()).thenReturn(namespaceResources); + doReturn(policyFuture).when(namespaceResources).getPoliciesAsync(namespaceName); + AsyncResponse response = mock(AsyncResponse.class); + ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2, true); + verify(response, timeout(5000).times(1)).resume(errCaptor.capture()); + Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode()); + Assert.assertTrue(errCaptor.getValue().getMessage().contains("Namespace does not have any clusters configured")); + // Test policy not exist and return 'Namespace not found' + CompletableFuture<Optional<Policies>> policyFuture2 = new CompletableFuture<>(); + policyFuture2.complete(Optional.empty()); + doReturn(policyFuture2).when(namespaceResources).getPoliciesAsync(namespaceName); + response = mock(AsyncResponse.class); + errCaptor = ArgumentCaptor.forClass(RestException.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2, true); + verify(response, timeout(5000).times(1)).resume(errCaptor.capture()); + Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); + Assert.assertTrue(errCaptor.getValue().getMessage().contains("Namespace not found")); + } + @Test(expectedExceptions = RestException.class) public void testCreateNonPartitionedTopicWithInvalidName() { final String topicName = "standard-topic-partition-10";
