This is an automated email from the ASF dual-hosted git repository.
yuruguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7075a5ce0d4 [broker][admin] Add cmd to remove topic properties (#17337)
7075a5ce0d4 is described below
commit 7075a5ce0d4a70f52625ac8c3d0c48894442b72a
Author: Ruguo Yu <[email protected]>
AuthorDate: Fri Sep 2 00:05:53 2022 +0800
[broker][admin] Add cmd to remove topic properties (#17337)
* [broker][admin] Add cmd to remove topic properties
* address comment
* address comment
---
.../authorization/PulsarAuthorizationProvider.java | 1 +
.../broker/admin/impl/PersistentTopicsBase.java | 52 ++++++++++++++++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 35 +++++++++++++++
.../apache/pulsar/broker/admin/AdminApi2Test.java | 20 +++++++++
.../org/apache/pulsar/client/admin/Topics.java | 18 ++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 13 ++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 4 ++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 16 +++++++
.../common/policies/data/TopicOperation.java | 1 +
9 files changed, 160 insertions(+)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 399ecc4a9f0..de586345fd0 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -553,6 +553,7 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
case COMPACT:
case OFFLOAD:
case UNLOAD:
+ case DELETE_METADATA:
case ADD_BUNDLE_RANGE:
case GET_BUNDLE_RANGE:
case DELETE_BUNDLE_RANGE:
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 72249fdc1e0..9d13c43ed3d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -658,6 +658,58 @@ public class PersistentTopicsBase extends AdminResource {
return future;
}
+ protected CompletableFuture<Void> internalRemovePropertiesAsync(boolean
authoritative, String key) {
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateTopicOperationAsync(topicName,
TopicOperation.DELETE_METADATA))
+ .thenCompose(__ -> {
+ if (topicName.isPartitioned()) {
+ return
internalRemoveNonPartitionedTopicProperties(key);
+ } else {
+ return
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
+ .thenCompose(metadata -> {
+ if (metadata.partitions == 0) {
+ return
internalRemoveNonPartitionedTopicProperties(key);
+ }
+ return namespaceResources()
+
.getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
+ p -> {
+ if (p.properties !=
null) {
+
p.properties.remove(key);
+ }
+ return new
PartitionedTopicMetadata(p.partitions, p.properties);
+ });
+ });
+ }
+ }).thenAccept(__ ->
+ log.info("[{}] remove [{}] properties success with key
{}",
+ clientAppId(), topicName, key));
+ }
+
+ private CompletableFuture<Void>
internalRemoveNonPartitionedTopicProperties(String key) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ pulsar().getBrokerService().getTopicIfExists(topicName.toString())
+ .thenAccept(opt -> {
+ if (!opt.isPresent()) {
+ throw new RestException(Status.NOT_FOUND,
+
getTopicNotFoundErrorMessage(topicName.toString()));
+ }
+ ManagedLedger managedLedger = ((PersistentTopic)
opt.get()).getManagedLedger();
+ managedLedger.asyncDeleteProperty(key, new
AsyncCallbacks.UpdatePropertiesCallback() {
+
+ @Override
+ public void updatePropertiesComplete(Map<String,
String> properties, Object ctx) {
+ future.complete(null);
+ }
+
+ @Override
+ public void
updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+ future.completeExceptionally(exception);
+ }
+ }, null);
+ });
+ return future;
+ }
+
protected CompletableFuture<Void> internalCheckTopicExists(TopicName
topicName) {
return pulsar().getNamespaceService().checkTopicExists(topicName)
.thenAccept(exist -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index f9aff2baf6f..f283bb7aa5c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1003,6 +1003,41 @@ public class PersistentTopics extends
PersistentTopicsBase {
});
}
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/properties")
+ @ApiOperation(value = "Remove the key in properties on the given topic.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Don't have permission to
administrate resources on this tenant"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Partitioned topic does not
exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 412, message = "Partitioned topic name is
invalid"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ public void removeProperties(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("key") String key,
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validatePersistentTopicName(tenant, namespace, encodedTopic);
+ internalRemovePropertiesAsync(authoritative, key)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to remove key {} in properties
on topic {}",
+ clientAppId(), key, topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
@DELETE
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Delete a partitioned topic.",
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index d492ad73d78..2c2d7057cfa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -892,6 +892,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
public void testUpdatePartitionedTopicProperties() throws Exception {
final String namespace = "prop-xyz/ns2";
final String topicName = "persistent://" + namespace +
"/testUpdatePartitionedTopicProperties";
+ final String topicNameTwo = "persistent://" + namespace +
"/testUpdatePartitionedTopicProperties2";
admin.namespaces().createNamespace(namespace, 20);
// create partitioned topic without properties
@@ -924,6 +925,25 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value11");
Assert.assertEquals(properties.get("key2"), "value2");
+
+ // create topic without properties
+ admin.topics().createPartitionedTopic(topicNameTwo, 2);
+ properties = admin.topics().getProperties(topicNameTwo);
+ Assert.assertNull(properties);
+ // remove key of properties on this topic
+ admin.topics().removeProperties(topicNameTwo, "key1");
+ properties = admin.topics().getProperties(topicNameTwo);
+ Assert.assertNull(properties);
+ Map<String, String> topicProp = new HashMap<>();
+ topicProp.put("key1", "value1");
+ topicProp.put("key2", "value2");
+ admin.topics().updateProperties(topicNameTwo, topicProp);
+ properties = admin.topics().getProperties(topicNameTwo);
+ Assert.assertEquals(properties, topicProp);
+ admin.topics().removeProperties(topicNameTwo, "key1");
+ topicProp.remove("key1");
+ properties = admin.topics().getProperties(topicNameTwo);
+ Assert.assertEquals(properties, topicProp);
}
@Test
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 98a071c4680..177cae9a9a4 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -754,6 +754,24 @@ public interface Topics {
*/
CompletableFuture<Void> updatePropertiesAsync(String topic, Map<String,
String> properties);
+ /**
+ * Remove the key in properties on a topic.
+ *
+ * @param topic
+ * @param key
+ * @throws PulsarAdminException
+ */
+ void removeProperties(String topic, String key) throws
PulsarAdminException;
+
+ /**
+ * Remove the key in properties on a topic asynchronously.
+ *
+ * @param topic
+ * @param key
+ * @return
+ */
+ CompletableFuture<Void> removePropertiesAsync(String topic, String key);
+
/**
* Delete a partitioned topic and its schemas.
* <p/>
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 45ca53cbd0a..87be258dbec 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -476,6 +476,19 @@ public class TopicsImpl extends BaseResource implements
Topics {
deletePartitionedTopic(topic, false);
}
+ @Override
+ public void removeProperties(String topic, String key) throws
PulsarAdminException {
+ sync(() -> removePropertiesAsync(topic, key));
+ }
+
+ @Override
+ public CompletableFuture<Void> removePropertiesAsync(String topic, String
key) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "properties")
+ .queryParam("key", key);
+ return asyncDeleteRequest(path);
+ }
+
@Override
public CompletableFuture<Void> deletePartitionedTopicAsync(String topic) {
return deletePartitionedTopicAsync(topic, false);
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index deda7c41d1b..c339da80e8f 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1519,6 +1519,10 @@ public class PulsarAdminToolTest {
props.put("x", "y,z");
verify(mockTopics).updateProperties("persistent://myprop/clust/ns1/ds1", props);
+ cmdTopics = new CmdTopics(() -> admin);
+ cmdTopics.run(split("remove-properties
persistent://myprop/clust/ns1/ds1 --key a"));
+
verify(mockTopics).removeProperties("persistent://myprop/clust/ns1/ds1", "a");
+
cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("get-subscription-properties
persistent://myprop/clust/ns1/ds1 -s sub1"));
verify(mockTopics).getSubscriptionProperties("persistent://myprop/clust/ns1/ds1",
"sub1");
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index dc6313b1be1..8b12d70d057 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -123,6 +123,7 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-partitioned-topic-metadata", new
GetPartitionedTopicMetadataCmd());
jcommander.addCommand("get-properties", new GetPropertiesCmd());
jcommander.addCommand("update-properties", new UpdateProperties());
+ jcommander.addCommand("remove-properties", new RemoveProperties());
jcommander.addCommand("delete-partitioned-topic", new
DeletePartitionedCmd());
jcommander.addCommand("peek-messages", new PeekMessages());
@@ -649,6 +650,21 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Remove the key in properties of a topic")
+ private class RemoveProperties extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"--key", "-k"}, description = "The key to remove
in the properties of topic")
+ private String key;
+
+ @Override
+ void run() throws Exception {
+ String topic = validateTopicName(params);
+ getTopics().removeProperties(topic, key);
+ }
+ }
+
@Parameters(commandDescription = "Delete a partitioned topic. "
+ "It will also delete all the partitions of the topic if it
exists."
+ "And the application is not able to connect to the topic(delete
then re-create with same name) again "
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
index d4de706e607..0184e0efb82 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
@@ -50,6 +50,7 @@ public enum TopicOperation {
GET_STATS,
GET_METADATA,
+ DELETE_METADATA,
GET_BACKLOG_SIZE,
SET_REPLICATED_SUBSCRIPTION_STATUS,