This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ed10ec33dc5 [improve][broker]Part-2 Add Admin API to delete topic 
policies (#24602)
ed10ec33dc5 is described below

commit ed10ec33dc554d1bf9722d419f83e95fc0122157
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Sat Aug 9 12:18:28 2025 +0800

    [improve][broker]Part-2 Add Admin API to delete topic policies (#24602)
---
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 36 +++++++++++++++
 .../broker/service/OneWayReplicatorTestBase.java   | 22 +++++++---
 .../service/OneWayReplicatorUsingGlobalZKTest.java | 51 +++++++++++++++++++++-
 .../broker/service/TopicPolicyTestUtils.java       |  8 ++++
 .../apache/pulsar/client/admin/TopicPolicies.java  | 11 +++++
 .../client/admin/internal/TopicPoliciesImpl.java   | 11 +++++
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  | 16 ++++++-
 7 files changed, 146 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index c539dad7426..5acb21f15a8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1203,6 +1203,42 @@ public class PersistentTopics extends 
PersistentTopicsBase {
 
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/policies")
+    @ApiOperation(value = "Delete policies for a topic.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Operation successful"),
+            @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace or topic does not 
exist"),
+            @ApiResponse(code = 500, message = "Internal server error")})
+    public void deleteTopicPolicies(
+            @Suspended AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, 
PolicyOperation.WRITE)
+                .thenCompose(__ -> 
pulsar().getTopicPoliciesService().deleteTopicPoliciesAsync(topicName, false))
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    Throwable t = FutureUtil.unwrapCompletionException(ex);
+                    if (t instanceof IllegalStateException){
+                        ex = new RestException(422/* Unprocessable entity*/, 
t.getMessage());
+                    } else if (isNot307And4xxException(ex)) {
+                        log.error("[{}] Failed to delete topic {}", 
clientAppId(), topicName, t);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/{topic}/subscriptions")
     @ApiOperation(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 58d8bcf7045..0196100b8ba 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -467,27 +467,34 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
 
     protected void setTopicLevelClusters(String topic, List<String> clusters, 
PulsarAdmin admin,
                                          PulsarService pulsar) throws 
Exception {
+        setTopicLevelClusters(topic, clusters, admin, pulsar, false);
+    }
+
+    protected void setTopicLevelClusters(String topic, List<String> clusters, 
PulsarAdmin admin,
+                                         PulsarService pulsar, boolean global) 
throws Exception {
         Set<String> expected = new HashSet<>(clusters);
         TopicName topicName = 
TopicName.get(TopicName.get(topic).getPartitionedTopicName());
         int partitions = ensurePartitionsAreSame(topic);
-        admin.topics().setReplicationClusters(topic, clusters);
+        admin.topicPolicies(global).setReplicationClusters(topic, clusters);
         Awaitility.await().untilAsserted(() -> {
-            TopicPolicies policies = 
TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), 
topicName);
+            TopicPolicies policies = 
TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), 
topicName,
+                    global);
             assertEquals(new HashSet<>(policies.getReplicationClusters()), 
expected);
             if (partitions == 0) {
-                checkNonPartitionedTopicLevelClusters(topicName.toString(), 
clusters, admin, pulsar.getBrokerService());
+                checkNonPartitionedTopicLevelClusters(topicName.toString(), 
clusters, admin, pulsar,
+                        global);
             } else {
                 for (int i = 0; i < partitions; i++) {
                     
checkNonPartitionedTopicLevelClusters(topicName.getPartition(i).toString(), 
clusters, admin,
-                            pulsar.getBrokerService());
+                            pulsar, global);
                 }
             }
         });
     }
 
     protected void checkNonPartitionedTopicLevelClusters(String topic, 
List<String> clusters, PulsarAdmin admin,
-                                           BrokerService broker) throws 
Exception {
-        CompletableFuture<Optional<Topic>> future = broker.getTopic(topic, 
false);
+                                                         PulsarService pulsar, 
boolean global) throws Exception {
+        CompletableFuture<Optional<Topic>> future = 
pulsar.getBrokerService().getTopic(topic, false);
         if (future == null) {
             return;
         }
@@ -497,7 +504,8 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
         }
         PersistentTopic persistentTopic = (PersistentTopic) optional.get();
         Set<String> expected = new HashSet<>(clusters);
-        Set<String> act = new 
HashSet<>(TopicPolicyTestUtils.getTopicPolicies(persistentTopic)
+        Set<String> act = new HashSet<>(TopicPolicyTestUtils
+                .getTopicPolicies(pulsar.getTopicPoliciesService(), 
TopicName.get(persistentTopic.topic), global)
                 .getReplicationClusters());
         assertEquals(act, expected);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index e14cc5045d6..5837de8c809 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static 
org.apache.pulsar.broker.service.TopicPoliciesService.GetType.GLOBAL_ONLY;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -40,6 +41,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -317,9 +319,56 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
         super.testDeleteNonPartitionedTopic();
     }
 
+    @Override
     @Test
     public void testDeletePartitionedTopic() throws Exception {
-        super.testDeletePartitionedTopic();
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+        admin1.topics().createPartitionedTopic(topicName, 2);
+
+        // Verify replicator works.
+        verifyReplicationWorks(topicName);
+
+        // Remove remote cluster from remote cluster.
+        setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, 
pulsar1, true);
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(pulsar1.getPulsarResources().getTopicResources()
+                    
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
+            assertTrue(pulsar1.getPulsarResources().getTopicResources()
+                    
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
+            assertFalse(pulsar2.getPulsarResources().getTopicResources()
+                    
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
+            assertFalse(pulsar2.getPulsarResources().getTopicResources()
+                    
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
+        });
+
+
+        // Delete topic.
+        admin1.topics().deletePartitionedTopic(topicName);
+        Awaitility.await().untilAsserted(() -> {
+            assertFalse(pulsar1.getPulsarResources().getTopicResources()
+                    
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
+            assertFalse(pulsar1.getPulsarResources().getTopicResources()
+                    
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
+            assertFalse(pulsar2.getPulsarResources().getTopicResources()
+                    
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
+            assertFalse(pulsar2.getPulsarResources().getTopicResources()
+                    
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
+        });
+
+        Awaitility.await().untilAsserted(() -> {
+            Optional<TopicPolicies> op1 = pulsar1.getTopicPoliciesService()
+                    .getTopicPoliciesAsync(TopicName.get(topicName), 
GLOBAL_ONLY).join();
+            assertFalse(op1.isPresent());
+            Optional<TopicPolicies> op2 = pulsar2.getTopicPoliciesService()
+                    .getTopicPoliciesAsync(TopicName.get(topicName), 
GLOBAL_ONLY).join();
+            assertTrue(op2.isPresent());
+        });
+        admin2.topicPolicies().deleteTopicPolicies(topicName);
+        Awaitility.await().untilAsserted(() -> {
+            Optional<TopicPolicies> op2 = pulsar2.getTopicPoliciesService()
+                    .getTopicPoliciesAsync(TopicName.get(topicName), 
GLOBAL_ONLY).join();
+            assertFalse(op2.isPresent());
+        });
     }
 
     @Test(enabled = false)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
index 0aa8e070d31..b5adaeccad7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
@@ -48,6 +48,14 @@ public class TopicPolicyTestUtils {
                 .orElse(null);
     }
 
+    public static TopicPolicies getTopicPolicies(TopicPoliciesService 
topicPoliciesService, TopicName topicName,
+             boolean global) throws ExecutionException, InterruptedException {
+        TopicPoliciesService.GetType getType = global ? 
TopicPoliciesService.GetType.GLOBAL_ONLY
+                : TopicPoliciesService.GetType.LOCAL_ONLY;
+        return topicPoliciesService.getTopicPoliciesAsync(topicName, 
getType).get()
+                .orElse(null);
+    }
+
     public static TopicPolicies getLocalTopicPolicies(TopicPoliciesService 
topicPoliciesService, TopicName topicName)
             throws ExecutionException, InterruptedException {
         return topicPoliciesService.getTopicPoliciesAsync(topicName, 
TopicPoliciesService.GetType.LOCAL_ONLY).get()
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
index 7a5623f849f..3e985dd7281 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
@@ -1936,7 +1936,18 @@ public interface TopicPolicies {
      */
     CompletableFuture<Void> setReplicationClusters(String topic, List<String> 
clusterIds);
 
+    /**
+     * get the replication clusters for the topic.
+     */
     Set<String> getReplicationClusters(String topic, boolean applied) throws 
PulsarAdminException;
 
+    /**
+     * get the replication clusters for the topic.
+     */
     void removeReplicationClusters(String topic) throws PulsarAdminException;
+
+    /**
+     * Delete topic policies, it works even if the topic has been deleted.
+     */
+    void deleteTopicPolicies(String topic) throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
index 0a4a816640f..6cfa981f1c4 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
@@ -1312,6 +1312,17 @@ public class TopicPoliciesImpl extends BaseResource 
implements TopicPolicies {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public void deleteTopicPolicies(String topic) throws PulsarAdminException {
+        sync(() -> deleteTopicPoliciesAsync(topic));
+    }
+
+    public CompletableFuture<Void> deleteTopicPoliciesAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "policies");
+        return asyncDeleteRequest(path);
+    }
+
     /*
      * returns topic name with encoded Local Name
      */
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index b49c4d40a53..5730722a486 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -60,7 +60,7 @@ public class CmdTopicPolicies extends CmdBase {
 
     public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
         super("topicPolicies", admin);
-
+        addCommand("delete", new DeletePolicies());
         addCommand("get-message-ttl", new GetMessageTTL());
         addCommand("set-message-ttl", new SetMessageTTL());
         addCommand("remove-message-ttl", new RemoveMessageTTL());
@@ -2058,6 +2058,20 @@ public class CmdTopicPolicies extends CmdBase {
         }
     }
 
+    @Command(description = "Remove the all policies for a topic, it will not 
remove policies from the remote"
+            + "cluster")
+    private class DeletePolicies extends CliCommand {
+
+        @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
+        private String topicName;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(topicName);
+            getTopicPolicies(false).deleteTopicPolicies(persistentTopic);
+        }
+    }
+
     private TopicPolicies getTopicPolicies(boolean isGlobal) {
         return getAdmin().topicPolicies(isGlobal);
     }

Reply via email to