This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ffe574d535b1233ecb8fbc047daecc05c76537cd Author: chenhang <[email protected]> AuthorDate: Thu Nov 4 19:16:17 2021 +0800 fix cherry-pick issue --- .../broker/admin/impl/PersistentTopicsBase.java | 41 ++++++++++------------ .../pulsar/broker/service/AbstractTopic.java | 1 + .../pulsar/broker/service/BrokerService.java | 5 +-- .../pulsar/broker/service/PersistentTopicTest.java | 4 +-- .../api/AuthenticatedProducerConsumerTest.java | 14 ++++---- 5 files changed, 31 insertions(+), 34 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 7abf9e3..94c0a16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -590,26 +590,26 @@ public class PersistentTopicsBase extends AdminResource { // delete authentication policies of the partitioned topic CompletableFuture<Void> deleteAuthFuture = new CompletableFuture<>(); pulsar().getPulsarResources().getNamespaceResources() - .setPoliciesAsync(topicName.getNamespaceObject(), p -> { - for (int i = 0; i < numPartitions; i++) { - p.auth_policies.getTopicAuthentication().remove(topicName.getPartition(i).toString()); - } - p.auth_policies.getTopicAuthentication().remove(topicName.toString()); - return p; - }).thenAccept(v -> { - log.info("Successfully delete authentication policies for partitioned topic {}", topicName); + .setAsync(topicName.getNamespace(), p -> { + for (int i = 0; i < numPartitions; ++i) { + p.auth_policies.getTopicAuthentication().remove(topicName.getPartition(i).toString()); + } + p.auth_policies.getTopicAuthentication().remove(topicName.toString()); + return p; + }).thenAccept(v -> { + log.info("Successfully delete authentication policies for partitioned topic {}", topicName); + deleteAuthFuture.complete(null); + }).exceptionally(ex -> { + if (ex.getCause() instanceof MetadataStoreException.NotFoundException) { + log.warn("Namespace policies of {} not found", topicName.getNamespaceObject()); deleteAuthFuture.complete(null); - }).exceptionally(ex -> { - if (ex.getCause() instanceof MetadataStoreException.NotFoundException) { - log.warn("Namespace policies of {} not found", topicName.getNamespaceObject()); - deleteAuthFuture.complete(null); - } else { - log.error("Failed to delete authentication policies for partitioned topic {}", + } else { + log.error("Failed to delete authentication policies for partitioned topic {}", topicName, ex); - deleteAuthFuture.completeExceptionally(ex); - } - return null; - }); + deleteAuthFuture.completeExceptionally(ex); + } + return null; + }); deleteAuthFuture.whenComplete((r, ex) -> { if (ex != null) { @@ -2609,13 +2609,8 @@ public class PersistentTopicsBase extends AdminResource { // Validate that namespace exists, throw 404 if it doesn't exist // note that we do not want to load the topic and hence skip authorization check try { -<<<<<<< HEAD namespaceResources().get(path(POLICIES, namespaceName.toString())); - } catch (org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) { -======= - namespaceResources().getPolicies(namespaceName); } catch (MetadataStoreException.NotFoundException e) { ->>>>>>> 3e578280539 (fix delete authentication policies when delete topic. (#12215)) log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(), namespaceName); throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 615ec08..ebd05b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; +import lombok.Getter; import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b6bfc25..0b81625 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -166,6 +166,7 @@ import org.apache.pulsar.common.util.netty.ChannelFutures; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.common.util.netty.NettyFutureUtil; import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy; import org.apache.pulsar.zookeeper.ZooKeeperCacheListener; @@ -1020,7 +1021,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies } NamespaceName namespaceName = TopicName.get(topic).getNamespaceObject(); // Check whether there are auth policies for the topic - pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespaceName).thenAccept(optPolicies -> { + pulsar.getPulsarResources().getNamespaceResources().getAsync(namespaceName.toString()).thenAccept(optPolicies -> { if (!optPolicies.isPresent() || !optPolicies.get().auth_policies.getTopicAuthentication() .containsKey(topic)) { // if there is no auth policy for the topic, just complete and return @@ -1031,7 +1032,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies return; } pulsar.getPulsarResources().getNamespaceResources() - .setPoliciesAsync(TopicName.get(topic).getNamespaceObject(), p -> { + .setAsync(TopicName.get(topic).getNamespace(), p -> { p.auth_policies.getTopicAuthentication().remove(topic); return p; }).thenAccept(v -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 51d8936..a174edb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -128,8 +128,8 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicContext; import org.apache.pulsar.compaction.Compactor; -import org.apache.pulsar.metadata.api.MetadataStore; -import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.apache.pulsar.zookeeper.ZooKeeperCache; +import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.apache.zookeeper.ZooKeeper; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java index 046b268..a12c62f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -359,14 +359,14 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { admin.topics().grantPermission(topic, "test-user", EnumSet.of(AuthAction.consume)); Awaitility.await().untilAsserted(() -> { - assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1")) - .get().auth_policies.getTopicAuthentication().containsKey(topic)); + assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString()) + .get().auth_policies.getTopicAuthentication().containsKey(topic)); }); admin.topics().delete(topic); Awaitility.await().untilAsserted(() -> { - assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1")) + assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString()) .get().auth_policies.getTopicAuthentication().containsKey(topic)); }); @@ -379,10 +379,10 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { .grantPermission(partitionedTopic, "test-user", EnumSet.of(AuthAction.consume)); Awaitility.await().untilAsserted(() -> { - assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1")) + assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString()) .get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic)); for (int i = 0; i < numPartitions; i++) { - assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1")) + assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString()) .get().auth_policies.getTopicAuthentication() .containsKey(TopicName.get(partitionedTopic).getPartition(i).toString())); } @@ -390,10 +390,10 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { admin.topics().deletePartitionedTopic("persistent://p1/ns1/partitioned-topic"); Awaitility.await().untilAsserted(() -> { - assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1")) + assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString()) .get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic)); for (int i = 0; i < numPartitions; i++) { - assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1")) + assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString()) .get().auth_policies.getTopicAuthentication() .containsKey(TopicName.get(partitionedTopic).getPartition(i).toString())); }
