This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 46d8e9068046f7d6811292247ccac46571a20d73
Author: feynmanlin <[email protected]>
AuthorDate: Wed Jul 14 14:43:00 2021 +0800

    Fix using partitioned topic name to get Policy (#11294)
    
    ### Motivation
    In the master branch, the REST API no longer allows the topic name of the 
partition to be used to set the topic policy, but there are still many places 
where it will be used internally.
    
    Suppose we set a Topic policy for `persistent://tenant/namespace/topic`
    However, the policy cannot be obtained through 
`persistent://tenant/namespace/topic-partition-0`, which causes the policy to 
become invalid.
    
    For example:PersistentTopic.checkSubscriptionTypesEnable
    
    ### Modifications
    Convert the name in SystemTopicBasedTopicPoliciesService
    
    (cherry picked from commit 35d29b9d67df27c9238c15ce66052efff12dedb7)
---
 .../pulsar/broker/service/BrokerService.java       | 12 +----
 .../SystemTopicBasedTopicPoliciesService.java      |  4 +-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 57 +++++++++++++++-------
 3 files changed, 44 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2ebaee2..2c8c660 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1331,12 +1331,8 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
             OffloadPoliciesImpl topicLevelOffloadPolicies = null;
 
             if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
-                TopicName cloneTopicName = topicName;
-                if (topicName.isPartitioned()) {
-                    cloneTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
-                }
                 try {
-                    TopicPolicies topicPolicies = 
pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+                    TopicPolicies topicPolicies = 
pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
                     if (topicPolicies != null) {
                         persistencePolicies = topicPolicies.getPersistence();
                         retentionPolicies = 
topicPolicies.getRetentionPolicies();
@@ -2630,12 +2626,8 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
             return Optional.empty();
         }
-        TopicName cloneTopicName = topicName;
-        if (topicName.isPartitioned()) {
-            cloneTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
-        }
         try {
-            return 
Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName));
+            return 
Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName));
         } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
             log.debug("Topic {} policies have not been initialized yet.", 
topicName.getPartitionedTopicName());
             return Optional.empty();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index a21eb07..49f934a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -104,7 +104,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                 .domain(topicName.getDomain().toString())
                                 .tenant(topicName.getTenant())
                                 
.namespace(topicName.getNamespaceObject().getLocalName())
-                                .topic(topicName.getLocalName())
+                                
.topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
                                 .policies(policies)
                                 .build())
                         .build()).whenComplete(((messageId, e) -> {
@@ -154,7 +154,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 && !policyCacheInitMap.get(topicName.getNamespaceObject())) {
             throw new TopicPoliciesCacheNotInitException();
         }
-        return policiesCache.get(topicName);
+        return 
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index b2a6e83..c40b38a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -18,8 +18,23 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -43,6 +58,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -63,23 +79,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
 @Slf4j
 @Test(groups = "broker")
 public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
@@ -2214,6 +2213,30 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         assertNull(admin.topics().getMessageTTL(topic));
     }
 
+    @Test
+    public void testSubscriptionTypesWithPartitionedTopic() throws Exception {
+        final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 1);
+        
pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe().close();
+        Awaitility.await()
+                .until(() -> 
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        Set<SubscriptionType> subscriptionTypeSet = new HashSet<>();
+        subscriptionTypeSet.add(SubscriptionType.Key_Shared);
+        admin.topics().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);
+        Awaitility.await().untilAsserted(() -> 
assertNotNull(admin.topics().getSubscriptionTypesEnabled(topic)));
+
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
+                
.getTopicReference(TopicName.get(topic).getPartition(0).toString()).get();
+        Set<String> old = new 
HashSet<>(pulsar.getConfiguration().getSubscriptionTypesEnabled());
+        try {
+            pulsar.getConfiguration().getSubscriptionTypesEnabled().clear();
+            
assertTrue(persistentTopic.checkSubscriptionTypesEnable(CommandSubscribe.SubType.Key_Shared));
+        } finally {
+            //restore
+            
pulsar.getConfiguration().getSubscriptionTypesEnabled().addAll(old);
+        }
+    }
+
     @Test(timeOut = 30000)
     public void testSubscriptionTypesEnabled() throws Exception {
         final String topic = "persistent://" + myNamespace + "/test-" + 
UUID.randomUUID();

Reply via email to