This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0c6f89921d2112e424db170bee5473d2e546ef3f Author: ken <[email protected]> AuthorDate: Sun Sep 4 12:06:55 2022 +0800 [improve] clean the empty topicAuthenticationMap in zk when revoke permission (#16815) Co-authored-by: fanjianye <[email protected]> (cherry picked from commit d139d884bcf38d8d9f2ff99bb355591819b85ef5) --- .../broker/admin/impl/PersistentTopicsBase.java | 8 ++++- .../broker/service/ReplicatorSubscriptionTest.java | 2 +- .../api/AuthenticatedProducerConsumerTest.java | 41 ++++++++++++++++++++++ 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4ff236af5ac..d5952f8627e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -328,7 +328,13 @@ public class PersistentTopicsBase extends AdminResource { try { // Write the new policies to metadata store namespaceResources().setPolicies(namespaceName, p -> { - p.auth_policies.getTopicAuthentication().get(topicUri).remove(role); + p.auth_policies.getTopicAuthentication().computeIfPresent(topicUri, (k, roles) -> { + roles.remove(role); + if (roles.isEmpty()) { + return null; + } + return roles; + }); return p; }); log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role, topicUri); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index 175574ed828..186a2491a47 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -180,7 +180,7 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase { .getTopic(topicName, false).get().get(); ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get(); // no snapshot should have been created before any messages are published - assertTrue(rsc1.getLastCompletedSnapshotId().isEmpty()); + assertFalse(rsc1.getLastCompletedSnapshotId().isPresent()); @Cleanup PulsarClient client2 = PulsarClient.builder() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java index 046b26846e2..6d9135af1ac 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -403,4 +403,45 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { admin.tenants().deleteTenant("p1"); admin.clusters().deleteCluster("test"); } + + @Test + public void testCleanupEmptyTopicAuthenticationMap() throws Exception { + Map<String, String> authParams = new HashMap<>(); + authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); + authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); + Authentication authTls = new AuthenticationTls(); + authTls.configure(authParams); + internalSetup(authTls); + + admin.clusters().createCluster("test", ClusterData.builder().build()); + admin.tenants().createTenant("p1", + new TenantInfoImpl(Collections.emptySet(), new HashSet<>(admin.clusters().getClusters()))); + admin.namespaces().createNamespace("p1/ns1"); + + String topic = "persistent://p1/ns1/topic"; + admin.topics().createNonPartitionedTopic(topic); + assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1")) + .get().auth_policies.getTopicAuthentication().containsKey(topic)); + + // grant permission + admin.topics().grantPermission(topic, "test-user-1", EnumSet.of(AuthAction.consume)); + Awaitility.await().untilAsserted(() -> { + assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1")) + .get().auth_policies.getTopicAuthentication().containsKey(topic)); + }); + + // revoke permission + admin.topics().revokePermissions(topic, "test-user-1"); + Awaitility.await().untilAsserted(() -> { + assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1")) + .get().auth_policies.getTopicAuthentication().containsKey(topic)); + }); + + // grant permission again + admin.topics().grantPermission(topic, "test-user-1", EnumSet.of(AuthAction.consume)); + Awaitility.await().untilAsserted(() -> { + assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1")) + .get().auth_policies.getTopicAuthentication().containsKey(topic)); + }); + } }
