This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ffe574d535b1233ecb8fbc047daecc05c76537cd
Author: chenhang <[email protected]>
AuthorDate: Thu Nov 4 19:16:17 2021 +0800

    fix cherry-pick issue
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 41 ++++++++++------------
 .../pulsar/broker/service/AbstractTopic.java       |  1 +
 .../pulsar/broker/service/BrokerService.java       |  5 +--
 .../pulsar/broker/service/PersistentTopicTest.java |  4 +--
 .../api/AuthenticatedProducerConsumerTest.java     | 14 ++++----
 5 files changed, 31 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 7abf9e3..94c0a16 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -590,26 +590,26 @@ public class PersistentTopicsBase extends AdminResource {
                 // delete authentication policies of the partitioned topic
                 CompletableFuture<Void> deleteAuthFuture = new 
CompletableFuture<>();
                 pulsar().getPulsarResources().getNamespaceResources()
-                        .setPoliciesAsync(topicName.getNamespaceObject(), p -> 
{
-                            for (int i = 0; i < numPartitions; i++) {
-                                
p.auth_policies.getTopicAuthentication().remove(topicName.getPartition(i).toString());
-                            }
-                            
p.auth_policies.getTopicAuthentication().remove(topicName.toString());
-                            return p;
-                        }).thenAccept(v -> {
-                            log.info("Successfully delete authentication 
policies for partitioned topic {}", topicName);
+                    .setAsync(topicName.getNamespace(), p -> {
+                        for (int i = 0; i < numPartitions; ++i) {
+                            
p.auth_policies.getTopicAuthentication().remove(topicName.getPartition(i).toString());
+                        }
+                        
p.auth_policies.getTopicAuthentication().remove(topicName.toString());
+                        return p;
+                    }).thenAccept(v -> {
+                        log.info("Successfully delete authentication policies 
for partitioned topic {}", topicName);
+                        deleteAuthFuture.complete(null);
+                    }).exceptionally(ex -> {
+                        if (ex.getCause() instanceof 
MetadataStoreException.NotFoundException) {
+                            log.warn("Namespace policies of {} not found", 
topicName.getNamespaceObject());
                             deleteAuthFuture.complete(null);
-                        }).exceptionally(ex -> {
-                            if (ex.getCause() instanceof 
MetadataStoreException.NotFoundException) {
-                                log.warn("Namespace policies of {} not found", 
topicName.getNamespaceObject());
-                                deleteAuthFuture.complete(null);
-                            } else {
-                                log.error("Failed to delete authentication 
policies for partitioned topic {}",
+                        } else {
+                            log.error("Failed to delete authentication 
policies for partitioned topic {}",
                                         topicName, ex);
-                                deleteAuthFuture.completeExceptionally(ex);
-                            }
-                            return null;
-                        });
+                            deleteAuthFuture.completeExceptionally(ex);
+                        }
+                        return null;
+                    });
 
                 deleteAuthFuture.whenComplete((r, ex) -> {
                     if (ex != null) {
@@ -2609,13 +2609,8 @@ public class PersistentTopicsBase extends AdminResource {
         // Validate that namespace exists, throw 404 if it doesn't exist
         // note that we do not want to load the topic and hence skip 
authorization check
         try {
-<<<<<<< HEAD
             namespaceResources().get(path(POLICIES, namespaceName.toString()));
-        } catch 
(org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
-=======
-            namespaceResources().getPolicies(namespaceName);
         } catch (MetadataStoreException.NotFoundException e) {
->>>>>>> 3e578280539 (fix delete authentication policies when delete topic. 
(#12215))
             log.warn("[{}] Failed to get topic backlog {}: Namespace does not 
exist", clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 615ec08..ebd05b6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import lombok.Getter;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b6bfc25..0b81625 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -166,6 +166,7 @@ import org.apache.pulsar.common.util.netty.ChannelFutures;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.common.util.netty.NettyFutureUtil;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
 import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
@@ -1020,7 +1021,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         }
         NamespaceName namespaceName = 
TopicName.get(topic).getNamespaceObject();
         // Check whether there are auth policies for the topic
-        
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespaceName).thenAccept(optPolicies
 -> {
+        
pulsar.getPulsarResources().getNamespaceResources().getAsync(namespaceName.toString()).thenAccept(optPolicies
 -> {
             if (!optPolicies.isPresent() || 
!optPolicies.get().auth_policies.getTopicAuthentication()
                     .containsKey(topic)) {
                 // if there is no auth policy for the topic, just complete and 
return
@@ -1031,7 +1032,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                 return;
             }
             pulsar.getPulsarResources().getNamespaceResources()
-                    
.setPoliciesAsync(TopicName.get(topic).getNamespaceObject(), p -> {
+                    .setAsync(TopicName.get(topic).getNamespace(), p -> {
                         p.auth_policies.getTopicAuthentication().remove(topic);
                         return p;
                     }).thenAccept(v -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 51d8936..a174edb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -128,8 +128,8 @@ import 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.Compactor;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
+import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.ZooKeeper;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 046b268..a12c62f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -359,14 +359,14 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
         admin.topics().grantPermission(topic, "test-user", 
EnumSet.of(AuthAction.consume));
 
         Awaitility.await().untilAsserted(() -> {
-            
assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
-                    
.get().auth_policies.getTopicAuthentication().containsKey(topic));
+            
assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
+                
.get().auth_policies.getTopicAuthentication().containsKey(topic));
         });
 
         admin.topics().delete(topic);
 
         Awaitility.await().untilAsserted(() -> {
-            
assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+            
assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
                     
.get().auth_policies.getTopicAuthentication().containsKey(topic));
         });
 
@@ -379,10 +379,10 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
                 .grantPermission(partitionedTopic, "test-user", 
EnumSet.of(AuthAction.consume));
 
         Awaitility.await().untilAsserted(() -> {
-            
assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+            
assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
                     
.get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
             for (int i = 0; i < numPartitions; i++) {
-                
assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+                
assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
                         .get().auth_policies.getTopicAuthentication()
                         
.containsKey(TopicName.get(partitionedTopic).getPartition(i).toString()));
             }
@@ -390,10 +390,10 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
 
         
admin.topics().deletePartitionedTopic("persistent://p1/ns1/partitioned-topic");
         Awaitility.await().untilAsserted(() -> {
-            
assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+            
assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
                     
.get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
             for (int i = 0; i < numPartitions; i++) {
-                
assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+                
assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
                         .get().auth_policies.getTopicAuthentication()
                         
.containsKey(TopicName.get(partitionedTopic).getPartition(i).toString()));
             }

Reply via email to