This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e57828  fix delete authentication policies when delete topic. (#12215)
3e57828 is described below

commit 3e578280539aa55c084a35b601902f0e16c5fc2f
Author: Bowen Li <[email protected]>
AuthorDate: Wed Oct 27 11:41:44 2021 +0800

    fix delete authentication policies when delete topic. (#12215)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 99 ++++++++++++++--------
 .../pulsar/broker/service/BrokerService.java       | 76 +++++++++++++++--
 .../broker/service/persistent/PersistentTopic.java | 10 ++-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  4 +-
 .../api/AuthenticatedProducerConsumerTest.java     | 80 +++++++++++++++--
 5 files changed, 213 insertions(+), 56 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0271abd..ed1dc89 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -131,6 +131,7 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -260,7 +261,7 @@ public class PersistentTopicsBase extends AdminResource {
             });
             log.info("[{}] Successfully granted access for role {}: {} - topic 
{}", clientAppId(), role, actions,
                     topicUri);
-        } catch 
(org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
+        } catch (MetadataStoreException.NotFoundException e) {
             log.warn("[{}] Failed to grant permissions on topic {}: Namespace 
does not exist", clientAppId(), topicUri);
             throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
         } catch (Exception e) {
@@ -574,42 +575,72 @@ public class PersistentTopicsBase extends AdminResource {
                                 }
                             });
                 }
-                for (int i = 0; i < numPartitions; i++) {
-                    TopicName topicNamePartition = topicName.getPartition(i);
-                    try {
-                        pulsar().getAdminClient().topics()
-                                .deleteAsync(topicNamePartition.toString(), 
force)
-                                .whenComplete((r, ex) -> {
-                                    if (ex != null) {
-                                        if (ex instanceof NotFoundException) {
-                                            // if the sub-topic is not found, 
the client might not have called create
-                                            // producer or it might have been 
deleted earlier,
-                                            //so we ignore the 404 error.
-                                            // For all other exception,
-                                            //we fail the delete partition 
method even if a single
-                                            // partition is failed to be 
deleted
-                                            if (log.isDebugEnabled()) {
-                                                log.debug("[{}] Partition not 
found: {}", clientAppId(),
-                                                        topicNamePartition);
+                // delete authentication policies of the partitioned topic
+                CompletableFuture<Void> deleteAuthFuture = new 
CompletableFuture<>();
+                pulsar().getPulsarResources().getNamespaceResources()
+                        .setPoliciesAsync(topicName.getNamespaceObject(), p -> 
{
+                            for (int i = 0; i < numPartitions; i++) {
+                                
p.auth_policies.getTopicAuthentication().remove(topicName.getPartition(i).toString());
+                            }
+                            
p.auth_policies.getTopicAuthentication().remove(topicName.toString());
+                            return p;
+                        }).thenAccept(v -> {
+                            log.info("Successfully delete authentication 
policies for partitioned topic {}", topicName);
+                            deleteAuthFuture.complete(null);
+                        }).exceptionally(ex -> {
+                            if (ex.getCause() instanceof 
MetadataStoreException.NotFoundException) {
+                                log.warn("Namespace policies of {} not found", 
topicName.getNamespaceObject());
+                                deleteAuthFuture.complete(null);
+                            } else {
+                                log.error("Failed to delete authentication 
policies for partitioned topic {}",
+                                        topicName, ex);
+                                deleteAuthFuture.completeExceptionally(ex);
+                            }
+                            return null;
+                        });
+
+                deleteAuthFuture.whenComplete((r, ex) -> {
+                    if (ex != null) {
+                        future.completeExceptionally(ex);
+                        return;
+                    }
+                    for (int i = 0; i < numPartitions; i++) {
+                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                        try {
+                            pulsar().getAdminClient().topics()
+                                    
.deleteAsync(topicNamePartition.toString(), force)
+                                    .whenComplete((r1, ex1) -> {
+                                        if (ex1 != null) {
+                                            if (ex1 instanceof 
NotFoundException) {
+                                                // if the sub-topic is not 
found, the client might not have called
+                                                // create producer or it might 
have been deleted earlier,
+                                                //so we ignore the 404 error.
+                                                // For all other exception,
+                                                //we fail the delete partition 
method even if a single
+                                                // partition is failed to be 
deleted
+                                                if (log.isDebugEnabled()) {
+                                                    log.debug("[{}] Partition 
not found: {}", clientAppId(),
+                                                            
topicNamePartition);
+                                                }
+                                            } else {
+                                                log.error("[{}] Failed to 
delete partition {}", clientAppId(),
+                                                        topicNamePartition, 
ex1);
+                                                
future.completeExceptionally(ex1);
+                                                return;
                                             }
                                         } else {
-                                            log.error("[{}] Failed to delete 
partition {}", clientAppId(),
-                                                    topicNamePartition, ex);
-                                            future.completeExceptionally(ex);
-                                            return;
+                                            log.info("[{}] Deleted partition 
{}", clientAppId(), topicNamePartition);
                                         }
-                                    } else {
-                                        log.info("[{}] Deleted partition {}", 
clientAppId(), topicNamePartition);
-                                    }
-                                    if (count.decrementAndGet() == 0) {
-                                        future.complete(null);
-                                    }
-                                });
-                    } catch (Exception e) {
-                        log.error("[{}] Failed to delete partition {}", 
clientAppId(), topicNamePartition, e);
-                        future.completeExceptionally(e);
+                                        if (count.decrementAndGet() == 0) {
+                                            future.complete(null);
+                                        }
+                                    });
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to delete partition {}", 
clientAppId(), topicNamePartition, e);
+                            future.completeExceptionally(e);
+                        }
                     }
-                }
+                });
             } else {
                 future.complete(null);
             }
@@ -2640,7 +2671,7 @@ public class PersistentTopicsBase extends AdminResource {
         // note that we do not want to load the topic and hence skip 
authorization check
         try {
             namespaceResources().getPolicies(namespaceName);
-        } catch 
(org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
+        } catch (MetadataStoreException.NotFoundException e) {
             log.warn("[{}] Failed to get topic backlog {}: Namespace does not 
exist", clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c3a241d..14c211b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -970,6 +970,9 @@ public class BrokerService implements Closeable {
             }
         }
 
+        if (log.isDebugEnabled()) {
+            log.debug("Topic {} is not loaded, try to delete from metadata", 
topic);
+        }
         // Topic is not loaded, though we still might be able to delete from 
metadata
         TopicName tn = TopicName.get(topic);
         if (!tn.isPersistent()) {
@@ -978,21 +981,76 @@ public class BrokerService implements Closeable {
         }
 
         CompletableFuture<Void> future = new CompletableFuture<>();
-        managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), 
new DeleteLedgerCallback() {
-            @Override
-            public void deleteLedgerComplete(Object ctx) {
-                future.complete(null);
-            }
 
-            @Override
-            public void deleteLedgerFailed(ManagedLedgerException exception, 
Object ctx) {
-                future.completeExceptionally(exception);
+        CompletableFuture<Void> deleteTopicAuthenticationFuture = new 
CompletableFuture<>();
+        deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
+        deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
+            if (ex != null) {
+                future.completeExceptionally(ex);
+                return;
             }
-        }, null);
+            
managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), new 
DeleteLedgerCallback() {
+                @Override
+                public void deleteLedgerComplete(Object ctx) {
+                    future.complete(null);
+                }
+
+                @Override
+                public void deleteLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
+                    future.completeExceptionally(exception);
+                }
+            }, null);
+        });
+
 
         return future;
     }
 
+    public void deleteTopicAuthenticationWithRetry(String topic, 
CompletableFuture<Void> future, int count) {
+        if (count == 0) {
+            log.error("The number of retries has exhausted for topic {}", 
topic);
+            future.completeExceptionally(new MetadataStoreException("The 
number of retries has exhausted"));
+            return;
+        }
+        NamespaceName namespaceName = 
TopicName.get(topic).getNamespaceObject();
+        // Check whether there are auth policies for the topic
+        
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespaceName).thenAccept(optPolicies
 -> {
+            if (!optPolicies.isPresent() || 
!optPolicies.get().auth_policies.getTopicAuthentication()
+                    .containsKey(topic)) {
+                // if there is no auth policy for the topic, just complete and 
return
+                if (log.isDebugEnabled()) {
+                    log.debug("Authentication policies not found for topic 
{}", topic);
+                }
+                future.complete(null);
+                return;
+            }
+            pulsar.getPulsarResources().getNamespaceResources()
+                    
.setPoliciesAsync(TopicName.get(topic).getNamespaceObject(), p -> {
+                        p.auth_policies.getTopicAuthentication().remove(topic);
+                        return p;
+                    }).thenAccept(v -> {
+                        log.info("Successfully delete authentication policies 
for topic {}", topic);
+                        future.complete(null);
+                    }).exceptionally(ex1 -> {
+                        if (ex1.getCause() instanceof 
MetadataStoreException.BadVersionException) {
+                            log.warn(
+                                    "Failed to delete authentication policies 
because of bad version. "
+                                            + "Retry to delete authentication 
policies for topic {}",
+                                    topic);
+                            deleteTopicAuthenticationWithRetry(topic, future, 
count - 1);
+                        } else {
+                            log.error("Failed to delete authentication 
policies for topic {}", topic, ex1);
+                            future.completeExceptionally(ex1);
+                        }
+                        return null;
+                    });
+        }).exceptionally(ex -> {
+            log.error("Failed to get policies for topic {}", topic, ex);
+            future.completeExceptionally(ex);
+            return null;
+        });
+    }
+
     private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String 
topic) {
         if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
             if (log.isDebugEnabled()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9f2a567..b1ee3e3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -143,7 +143,6 @@ import 
org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
-import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -1128,10 +1127,12 @@ public class PersistentTopic extends AbstractTopic
                 //  2. We want to kick out everyone and forcefully delete the 
topic.
                 //     In this case, we shouldn't care if the usageCount is 0 
or not, just proceed
                 if (currentUsageCount() ==  0 || (closeIfClientsConnected && 
!failIfHasSubscriptions)) {
-                    CompletableFuture<SchemaVersion> deleteSchemaFuture =
-                            deleteSchema ? deleteSchema() : 
CompletableFuture.completedFuture(null);
+                    CompletableFuture<Void> deleteTopicAuthenticationFuture = 
new CompletableFuture<>();
+                    brokerService.deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
 
-                    deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies())
+                    deleteTopicAuthenticationFuture.thenCompose(
+                                    __ -> deleteSchema ? deleteSchema() : 
CompletableFuture.completedFuture(null))
+                            .thenAccept(__ -> deleteTopicPolicies())
                             .thenCompose(__ -> 
transactionBuffer.clearSnapshot()).whenComplete((v, ex) -> {
                         if (ex != null) {
                             log.error("[{}] Error deleting topic", topic, ex);
@@ -1158,6 +1159,7 @@ public class PersistentTopic extends AbstractTopic
 
                                             
brokerService.pulsar().getTopicPoliciesService()
                                                     
.clean(TopicName.get(topic));
+
                                             log.info("[{}] Topic deleted", 
topic);
                                             deleteFuture.complete(null);
                                         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 7e1c576..2c3f704 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -782,9 +782,9 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         }
 
         // Force topic creation and namespace being loaded
-        producer = 
pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/use/ns2/my-topic").create();
+        producer = 
pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/ns2/my-topic").create();
         producer.close();
-        admin.topics().delete("persistent://prop-xyz/use/ns2/my-topic");
+        admin.topics().delete("persistent://prop-xyz/ns2/my-topic");
 
         // both unload and delete should succeed for ns2 on other broker with 
a redirect
         // otheradmin.namespaces().unload("prop-xyz/use/ns2");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 6d76ce8..046b268 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -19,7 +19,10 @@
 package org.apache.pulsar.client.api;
 
 import static org.mockito.Mockito.spy;
-
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -27,20 +30,20 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
 import javax.ws.rs.InternalServerErrorException;
-
 import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.zookeeper.KeeperException.Code;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -49,8 +52,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Sets;
-
 @Test(groups = "broker-api")
 public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(AuthenticatedProducerConsumerTest.class);
@@ -87,9 +88,10 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
         superUserRoles.add("admin");
         conf.setSuperUserRoles(superUserRoles);
 
+        conf.setBrokerClientTlsEnabled(true);
         
conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
         conf.setBrokerClientAuthenticationParameters(
-                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + 
"tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + 
"tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
 
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
@@ -337,4 +339,68 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
         mockZooKeeperGlobal.unsetAlwaysFail();
     }
 
+    @Test
+    public void testDeleteAuthenticationPoliciesOfTopic() throws Exception {
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+        internalSetup(authTls);
+
+        admin.clusters().createCluster("test", ClusterData.builder().build());
+        admin.tenants().createTenant("p1",
+                new TenantInfoImpl(Collections.emptySet(), new 
HashSet<>(admin.clusters().getClusters())));
+        admin.namespaces().createNamespace("p1/ns1");
+
+        // test for non-partitioned topic
+        String topic = "persistent://p1/ns1/topic";
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().grantPermission(topic, "test-user", 
EnumSet.of(AuthAction.consume));
+
+        Awaitility.await().untilAsserted(() -> {
+            
assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+                    
.get().auth_policies.getTopicAuthentication().containsKey(topic));
+        });
+
+        admin.topics().delete(topic);
+
+        Awaitility.await().untilAsserted(() -> {
+            
assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+                    
.get().auth_policies.getTopicAuthentication().containsKey(topic));
+        });
+
+        // test for partitioned topic
+        String partitionedTopic = "persistent://p1/ns1/partitioned-topic";
+        int numPartitions = 5;
+
+        admin.topics().createPartitionedTopic(partitionedTopic, numPartitions);
+        admin.topics()
+                .grantPermission(partitionedTopic, "test-user", 
EnumSet.of(AuthAction.consume));
+
+        Awaitility.await().untilAsserted(() -> {
+            
assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+                    
.get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
+            for (int i = 0; i < numPartitions; i++) {
+                
assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+                        .get().auth_policies.getTopicAuthentication()
+                        
.containsKey(TopicName.get(partitionedTopic).getPartition(i).toString()));
+            }
+        });
+
+        
admin.topics().deletePartitionedTopic("persistent://p1/ns1/partitioned-topic");
+        Awaitility.await().untilAsserted(() -> {
+            
assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+                    
.get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
+            for (int i = 0; i < numPartitions; i++) {
+                
assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
+                        .get().auth_policies.getTopicAuthentication()
+                        
.containsKey(TopicName.get(partitionedTopic).getPartition(i).toString()));
+            }
+        });
+
+        admin.namespaces().deleteNamespace("p1/ns1");
+        admin.tenants().deleteTenant("p1");
+        admin.clusters().deleteCluster("test");
+    }
 }

Reply via email to