This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ed10ec33dc5 [improve][broker]Part-2 Add Admin API to delete topic policies (#24602) ed10ec33dc5 is described below commit ed10ec33dc554d1bf9722d419f83e95fc0122157 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Sat Aug 9 12:18:28 2025 +0800 [improve][broker]Part-2 Add Admin API to delete topic policies (#24602) --- .../pulsar/broker/admin/v2/PersistentTopics.java | 36 +++++++++++++++ .../broker/service/OneWayReplicatorTestBase.java | 22 +++++++--- .../service/OneWayReplicatorUsingGlobalZKTest.java | 51 +++++++++++++++++++++- .../broker/service/TopicPolicyTestUtils.java | 8 ++++ .../apache/pulsar/client/admin/TopicPolicies.java | 11 +++++ .../client/admin/internal/TopicPoliciesImpl.java | 11 +++++ .../apache/pulsar/admin/cli/CmdTopicPolicies.java | 16 ++++++- 7 files changed, 146 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index c539dad7426..5acb21f15a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1203,6 +1203,42 @@ public class PersistentTopics extends PersistentTopicsBase { } + @DELETE + @Path("/{tenant}/{namespace}/{topic}/policies") + @ApiOperation(value = "Delete policies for a topic.") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Operation successful"), + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace or topic does not exist"), + @ApiResponse(code = 500, message = "Internal server error")}) + public void deleteTopicPolicies( + @Suspended AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE) + .thenCompose(__ -> pulsar().getTopicPoliciesService().deleteTopicPoliciesAsync(topicName, false)) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + Throwable t = FutureUtil.unwrapCompletionException(ex); + if (t instanceof IllegalStateException){ + ex = new RestException(422/* Unprocessable entity*/, t.getMessage()); + } else if (isNot307And4xxException(ex)) { + log.error("[{}] Failed to delete topic {}", clientAppId(), topicName, t); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + @GET @Path("/{tenant}/{namespace}/{topic}/subscriptions") @ApiOperation( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 58d8bcf7045..0196100b8ba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -467,27 +467,34 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { protected void setTopicLevelClusters(String topic, List<String> clusters, PulsarAdmin admin, PulsarService pulsar) throws Exception { + setTopicLevelClusters(topic, clusters, admin, pulsar, false); + } + + protected void setTopicLevelClusters(String topic, List<String> clusters, PulsarAdmin admin, + PulsarService pulsar, boolean global) throws Exception { Set<String> expected = new HashSet<>(clusters); TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName()); int partitions = ensurePartitionsAreSame(topic); - admin.topics().setReplicationClusters(topic, clusters); + admin.topicPolicies(global).setReplicationClusters(topic, clusters); Awaitility.await().untilAsserted(() -> { - TopicPolicies policies = TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), topicName); + TopicPolicies policies = TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), topicName, + global); assertEquals(new HashSet<>(policies.getReplicationClusters()), expected); if (partitions == 0) { - checkNonPartitionedTopicLevelClusters(topicName.toString(), clusters, admin, pulsar.getBrokerService()); + checkNonPartitionedTopicLevelClusters(topicName.toString(), clusters, admin, pulsar, + global); } else { for (int i = 0; i < partitions; i++) { checkNonPartitionedTopicLevelClusters(topicName.getPartition(i).toString(), clusters, admin, - pulsar.getBrokerService()); + pulsar, global); } } }); } protected void checkNonPartitionedTopicLevelClusters(String topic, List<String> clusters, PulsarAdmin admin, - BrokerService broker) throws Exception { - CompletableFuture<Optional<Topic>> future = broker.getTopic(topic, false); + PulsarService pulsar, boolean global) throws Exception { + CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopic(topic, false); if (future == null) { return; } @@ -497,7 +504,8 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { } PersistentTopic persistentTopic = (PersistentTopic) optional.get(); Set<String> expected = new HashSet<>(clusters); - Set<String> act = new HashSet<>(TopicPolicyTestUtils.getTopicPolicies(persistentTopic) + Set<String> act = new HashSet<>(TopicPolicyTestUtils + .getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(persistentTopic.topic), global) .getReplicationClusters()); assertEquals(act, expected); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index e14cc5045d6..5837de8c809 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.service.TopicPoliciesService.GetType.GLOBAL_ONLY; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -40,6 +41,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -317,9 +319,56 @@ public class OneWayReplicatorUsingGlobalZKTest extends OneWayReplicatorTest { super.testDeleteNonPartitionedTopic(); } + @Override @Test public void testDeletePartitionedTopic() throws Exception { - super.testDeletePartitionedTopic(); + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + admin1.topics().createPartitionedTopic(topicName, 2); + + // Verify replicator works. + verifyReplicationWorks(topicName); + + // Remove remote cluster from remote cluster. + setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, pulsar1, true); + Awaitility.await().untilAsserted(() -> { + assertTrue(pulsar1.getPulsarResources().getTopicResources() + .persistentTopicExists(TopicName.get(topicName).getPartition(0)).join()); + assertTrue(pulsar1.getPulsarResources().getTopicResources() + .persistentTopicExists(TopicName.get(topicName).getPartition(1)).join()); + assertFalse(pulsar2.getPulsarResources().getTopicResources() + .persistentTopicExists(TopicName.get(topicName).getPartition(0)).join()); + assertFalse(pulsar2.getPulsarResources().getTopicResources() + .persistentTopicExists(TopicName.get(topicName).getPartition(1)).join()); + }); + + + // Delete topic. + admin1.topics().deletePartitionedTopic(topicName); + Awaitility.await().untilAsserted(() -> { + assertFalse(pulsar1.getPulsarResources().getTopicResources() + .persistentTopicExists(TopicName.get(topicName).getPartition(0)).join()); + assertFalse(pulsar1.getPulsarResources().getTopicResources() + .persistentTopicExists(TopicName.get(topicName).getPartition(1)).join()); + assertFalse(pulsar2.getPulsarResources().getTopicResources() + .persistentTopicExists(TopicName.get(topicName).getPartition(0)).join()); + assertFalse(pulsar2.getPulsarResources().getTopicResources() + .persistentTopicExists(TopicName.get(topicName).getPartition(1)).join()); + }); + + Awaitility.await().untilAsserted(() -> { + Optional<TopicPolicies> op1 = pulsar1.getTopicPoliciesService() + .getTopicPoliciesAsync(TopicName.get(topicName), GLOBAL_ONLY).join(); + assertFalse(op1.isPresent()); + Optional<TopicPolicies> op2 = pulsar2.getTopicPoliciesService() + .getTopicPoliciesAsync(TopicName.get(topicName), GLOBAL_ONLY).join(); + assertTrue(op2.isPresent()); + }); + admin2.topicPolicies().deleteTopicPolicies(topicName); + Awaitility.await().untilAsserted(() -> { + Optional<TopicPolicies> op2 = pulsar2.getTopicPoliciesService() + .getTopicPoliciesAsync(TopicName.get(topicName), GLOBAL_ONLY).join(); + assertFalse(op2.isPresent()); + }); } @Test(enabled = false) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java index 0aa8e070d31..b5adaeccad7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java @@ -48,6 +48,14 @@ public class TopicPolicyTestUtils { .orElse(null); } + public static TopicPolicies getTopicPolicies(TopicPoliciesService topicPoliciesService, TopicName topicName, + boolean global) throws ExecutionException, InterruptedException { + TopicPoliciesService.GetType getType = global ? TopicPoliciesService.GetType.GLOBAL_ONLY + : TopicPoliciesService.GetType.LOCAL_ONLY; + return topicPoliciesService.getTopicPoliciesAsync(topicName, getType).get() + .orElse(null); + } + public static TopicPolicies getLocalTopicPolicies(TopicPoliciesService topicPoliciesService, TopicName topicName) throws ExecutionException, InterruptedException { return topicPoliciesService.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.LOCAL_ONLY).get() diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java index 7a5623f849f..3e985dd7281 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java @@ -1936,7 +1936,18 @@ public interface TopicPolicies { */ CompletableFuture<Void> setReplicationClusters(String topic, List<String> clusterIds); + /** + * get the replication clusters for the topic. + */ Set<String> getReplicationClusters(String topic, boolean applied) throws PulsarAdminException; + /** + * get the replication clusters for the topic. + */ void removeReplicationClusters(String topic) throws PulsarAdminException; + + /** + * Delete topic policies, it works even if the topic has been deleted. + */ + void deleteTopicPolicies(String topic) throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java index 0a4a816640f..6cfa981f1c4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java @@ -1312,6 +1312,17 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies { return asyncDeleteRequest(path); } + @Override + public void deleteTopicPolicies(String topic) throws PulsarAdminException { + sync(() -> deleteTopicPoliciesAsync(topic)); + } + + public CompletableFuture<Void> deleteTopicPoliciesAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "policies"); + return asyncDeleteRequest(path); + } + /* * returns topic name with encoded Local Name */ diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index b49c4d40a53..5730722a486 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -60,7 +60,7 @@ public class CmdTopicPolicies extends CmdBase { public CmdTopicPolicies(Supplier<PulsarAdmin> admin) { super("topicPolicies", admin); - + addCommand("delete", new DeletePolicies()); addCommand("get-message-ttl", new GetMessageTTL()); addCommand("set-message-ttl", new SetMessageTTL()); addCommand("remove-message-ttl", new RemoveMessageTTL()); @@ -2058,6 +2058,20 @@ public class CmdTopicPolicies extends CmdBase { } } + @Command(description = "Remove the all policies for a topic, it will not remove policies from the remote" + + "cluster") + private class DeletePolicies extends CliCommand { + + @Parameters(description = "persistent://tenant/namespace/topic", arity = "1") + private String topicName; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(topicName); + getTopicPolicies(false).deleteTopicPolicies(persistentTopic); + } + } + private TopicPolicies getTopicPolicies(boolean isGlobal) { return getAdmin().topicPolicies(isGlobal); }