This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new ced5779  [Branch-2.8] Fix validateGlobalNamespaceOwnership wrap 
exception issue. (#14612)
ced5779 is described below

commit ced57798e4a45427a738e8734a1de924b42045d5
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Mar 9 13:23:30 2022 +0800

    [Branch-2.8] Fix validateGlobalNamespaceOwnership wrap exception issue. 
(#14612)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  5 +--
 .../pulsar/broker/web/PulsarWebResource.java       |  4 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 48 +++++++++++++++++++++-
 3 files changed, 50 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 9d9cf76..04e8c50 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -233,10 +233,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
         } catch (IllegalArgumentException e) {
             throw new RestException(Status.PRECONDITION_FAILED, "Tenant name 
or namespace is not valid");
         } catch (RestException re) {
-            if (re.getResponse().getStatus() == 
Status.NOT_FOUND.getStatusCode()) {
-                throw new RestException(Status.NOT_FOUND, "Namespace not 
found");
-            }
-            throw new RestException(Status.PRECONDITION_FAILED, "Namespace 
does not have any clusters configured");
+            throw re;
         } catch (Exception e) {
             log.warn("Failed to validate global cluster configuration : ns={}  
emsg={}", namespace, e.getMessage());
             throw new RestException(Status.SERVICE_UNAVAILABLE, "Failed to 
validate global cluster configuration");
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index ac2e4da..71eae45 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -754,9 +754,9 @@ public abstract class PulsarWebResource {
                     validationFuture.complete(null);
                 }
             } else {
-                String msg = String.format("Policies not found for %s 
namespace", namespace.toString());
+                String msg = String.format("Namespace %s not found", 
namespace.toString());
                 log.warn(msg);
-                validationFuture.completeExceptionally(new 
RestException(Status.NOT_FOUND, msg));
+                validationFuture.completeExceptionally(new 
RestException(Status.NOT_FOUND, "Namespace not found"));
             }
         }).exceptionally(ex -> {
             String msg = String.format("Failed to validate global cluster 
configuration : cluster=%s ns=%s  emsg=%s",
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 8cb5b76..4fe10cf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -29,6 +28,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -37,6 +37,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -49,7 +50,10 @@ import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
 import org.apache.pulsar.broker.admin.v2.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
@@ -74,6 +78,7 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.zookeeper.KeeperException;
@@ -99,6 +104,8 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
     protected Field uriField;
     protected UriInfo uriInfo;
     private NonPersistentTopics nonPersistentTopic;
+    private NamespaceResources namespaceResources;
+    private PulsarResources pulsarResources;
 
     @BeforeClass
     public void initPersistentTopics() throws Exception {
@@ -125,6 +132,8 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         nonPersistentTopic = spy(NonPersistentTopics.class);
         nonPersistentTopic.setServletContext(new MockServletContext());
         nonPersistentTopic.setPulsar(pulsar);
+        pulsarResources = mock(PulsarResources.class);
+        namespaceResources = mock(NamespaceResources.class);
         doReturn(mockZooKeeper).when(nonPersistentTopic).localZk();
         doReturn(false).when(nonPersistentTopic).isRequestHttps();
         doReturn(null).when(nonPersistentTopic).originalPrincipal();
@@ -374,6 +383,43 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertEquals(metadata.partitions, 0);
     }
 
+    @Test
+    public void testCreateTopicWithReplicationCluster() {
+        final String topicName = "test-topic-ownership";
+        NamespaceName namespaceName = NamespaceName.get(testTenant, 
testNamespace);
+        CompletableFuture<Optional<Policies>> policyFuture = new 
CompletableFuture<>();
+        Policies policies = new Policies();
+        policyFuture.complete(Optional.of(policies));
+        when(pulsar.getPulsarResources()).thenReturn(pulsarResources);
+        
when(pulsar.getPulsarResources().getNamespaceResources()).thenReturn(namespaceResources);
+        NamespaceResources.PartitionedTopicResources partitionedTopicResources 
= mock(NamespaceResources.PartitionedTopicResources.class);
+        
when(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()).thenReturn(partitionedTopicResources);
+        when(partitionedTopicResources.createAsync(any(String.class), 
any(PartitionedTopicMetadata.class))).thenReturn(new CompletableFuture<>());
+        String prefix = "/admin/policies/";
+        ConfigurationCacheService configurationCacheService = 
spy(pulsar.getConfigurationCache());
+        
when(pulsar.getConfigurationCache()).thenReturn(configurationCacheService);
+        ZooKeeperDataCache<Policies> policiesZooKeeperDataCache = 
spy(pulsar.getConfigurationCache().policiesCache());
+        
when(pulsar.getConfigurationCache().policiesCache()).thenReturn(policiesZooKeeperDataCache);
+        
doReturn(policyFuture).when(policiesZooKeeperDataCache).getAsync(any(String.class));
+        doReturn(policyFuture).when(namespaceResources).getAsync(prefix + 
namespaceName.toString());
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<RestException> errCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, topicName, 2, true);
+        verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
+        Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), 
Response.Status.PRECONDITION_FAILED.getStatusCode());
+        
Assert.assertTrue(errCaptor.getValue().getMessage().contains("Namespace does 
not have any clusters configured"));
+        // Test policy not exist and return 'Namespace not found'
+        CompletableFuture<Optional<Policies>> policyFuture2 = new 
CompletableFuture<>();
+        policyFuture2.complete(Optional.empty());
+        
doReturn(policyFuture2).when(policiesZooKeeperDataCache).getAsync(any(String.class));
+        response = mock(AsyncResponse.class);
+        errCaptor = ArgumentCaptor.forClass(RestException.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, topicName, 2, true);
+        verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
+        Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), 
Response.Status.NOT_FOUND.getStatusCode());
+        
Assert.assertTrue(errCaptor.getValue().getMessage().contains("Namespace not 
found"));
+    }
+
     @Test(expectedExceptions = RestException.class)
     public void testCreateNonPartitionedTopicWithInvalidName() {
         final String topicName = "standard-topic-partition-10";

Reply via email to