This is an automated email from the ASF dual-hosted git repository.

yuruguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7075a5ce0d4 [broker][admin] Add cmd to remove topic properties (#17337)
7075a5ce0d4 is described below

commit 7075a5ce0d4a70f52625ac8c3d0c48894442b72a
Author: Ruguo Yu <[email protected]>
AuthorDate: Fri Sep 2 00:05:53 2022 +0800

    [broker][admin] Add cmd to remove topic properties (#17337)
    
    * [broker][admin] Add cmd to remove topic properties
    
    * address comment
    
    * address comment
---
 .../authorization/PulsarAuthorizationProvider.java |  1 +
 .../broker/admin/impl/PersistentTopicsBase.java    | 52 ++++++++++++++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 35 +++++++++++++++
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 20 +++++++++
 .../org/apache/pulsar/client/admin/Topics.java     | 18 ++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 13 ++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  4 ++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 16 +++++++
 .../common/policies/data/TopicOperation.java       |  1 +
 9 files changed, 160 insertions(+)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 399ecc4a9f0..de586345fd0 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -553,6 +553,7 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                             case COMPACT:
                             case OFFLOAD:
                             case UNLOAD:
+                            case DELETE_METADATA:
                             case ADD_BUNDLE_RANGE:
                             case GET_BUNDLE_RANGE:
                             case DELETE_BUNDLE_RANGE:
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 72249fdc1e0..9d13c43ed3d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -658,6 +658,58 @@ public class PersistentTopicsBase extends AdminResource {
         return future;
     }
 
+    protected CompletableFuture<Void> internalRemovePropertiesAsync(boolean 
authoritative, String key) {
+        return validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.DELETE_METADATA))
+                .thenCompose(__ -> {
+                    if (topicName.isPartitioned()) {
+                        return 
internalRemoveNonPartitionedTopicProperties(key);
+                    } else {
+                        return 
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
+                                .thenCompose(metadata -> {
+                                    if (metadata.partitions == 0) {
+                                        return 
internalRemoveNonPartitionedTopicProperties(key);
+                                    }
+                                    return namespaceResources()
+                                            
.getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
+                                                    p -> {
+                                                        if (p.properties != 
null) {
+                                                            
p.properties.remove(key);
+                                                        }
+                                                        return new 
PartitionedTopicMetadata(p.partitions, p.properties);
+                                                    });
+                                });
+                    }
+                }).thenAccept(__ ->
+                        log.info("[{}] remove [{}] properties success with key 
{}",
+                                clientAppId(), topicName, key));
+    }
+
+    private CompletableFuture<Void> 
internalRemoveNonPartitionedTopicProperties(String key) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        pulsar().getBrokerService().getTopicIfExists(topicName.toString())
+                .thenAccept(opt -> {
+                    if (!opt.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND,
+                                
getTopicNotFoundErrorMessage(topicName.toString()));
+                    }
+                    ManagedLedger managedLedger = ((PersistentTopic) 
opt.get()).getManagedLedger();
+                    managedLedger.asyncDeleteProperty(key, new 
AsyncCallbacks.UpdatePropertiesCallback() {
+
+                        @Override
+                        public void updatePropertiesComplete(Map<String, 
String> properties, Object ctx) {
+                            future.complete(null);
+                        }
+
+                        @Override
+                        public void 
updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+                            future.completeExceptionally(exception);
+                        }
+                    }, null);
+                });
+        return future;
+    }
+
     protected CompletableFuture<Void> internalCheckTopicExists(TopicName 
topicName) {
         return pulsar().getNamespaceService().checkTopicExists(topicName)
                 .thenAccept(exist -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index f9aff2baf6f..f283bb7aa5c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1003,6 +1003,41 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             });
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/properties")
+    @ApiOperation(value = "Remove the key in properties on the given topic.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Partitioned topic does not 
exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Partitioned topic name is 
invalid"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    public void removeProperties(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("key") String key,
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validatePersistentTopicName(tenant, namespace, encodedTopic);
+        internalRemovePropertiesAsync(authoritative, key)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to remove key {} in properties 
on topic {}",
+                                clientAppId(), key, topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
     @DELETE
     @Path("/{tenant}/{namespace}/{topic}/partitions")
     @ApiOperation(value = "Delete a partitioned topic.",
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index d492ad73d78..2c2d7057cfa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -892,6 +892,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
     public void testUpdatePartitionedTopicProperties() throws Exception {
         final String namespace = "prop-xyz/ns2";
         final String topicName = "persistent://" + namespace + 
"/testUpdatePartitionedTopicProperties";
+        final String topicNameTwo = "persistent://" + namespace + 
"/testUpdatePartitionedTopicProperties2";
         admin.namespaces().createNamespace(namespace, 20);
 
         // create partitioned topic without properties
@@ -924,6 +925,25 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         Assert.assertEquals(properties.size(), 2);
         Assert.assertEquals(properties.get("key1"), "value11");
         Assert.assertEquals(properties.get("key2"), "value2");
+
+        // create topic without properties
+        admin.topics().createPartitionedTopic(topicNameTwo, 2);
+        properties = admin.topics().getProperties(topicNameTwo);
+        Assert.assertNull(properties);
+        // remove key of properties on this topic
+        admin.topics().removeProperties(topicNameTwo, "key1");
+        properties = admin.topics().getProperties(topicNameTwo);
+        Assert.assertNull(properties);
+        Map<String, String> topicProp = new HashMap<>();
+        topicProp.put("key1", "value1");
+        topicProp.put("key2", "value2");
+        admin.topics().updateProperties(topicNameTwo, topicProp);
+        properties = admin.topics().getProperties(topicNameTwo);
+        Assert.assertEquals(properties, topicProp);
+        admin.topics().removeProperties(topicNameTwo, "key1");
+        topicProp.remove("key1");
+        properties = admin.topics().getProperties(topicNameTwo);
+        Assert.assertEquals(properties, topicProp);
     }
 
     @Test
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 98a071c4680..177cae9a9a4 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -754,6 +754,24 @@ public interface Topics {
      */
     CompletableFuture<Void> updatePropertiesAsync(String topic, Map<String, 
String> properties);
 
+    /**
+     * Remove the key in properties on a topic.
+     *
+     * @param topic
+     * @param key
+     * @throws PulsarAdminException
+     */
+    void removeProperties(String topic, String key) throws 
PulsarAdminException;
+
+    /**
+     * Remove the key in properties on a topic asynchronously.
+     *
+     * @param topic
+     * @param key
+     * @return
+     */
+    CompletableFuture<Void> removePropertiesAsync(String topic, String key);
+
     /**
      * Delete a partitioned topic and its schemas.
      * <p/>
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 45ca53cbd0a..87be258dbec 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -476,6 +476,19 @@ public class TopicsImpl extends BaseResource implements 
Topics {
         deletePartitionedTopic(topic, false);
     }
 
+    @Override
+    public void removeProperties(String topic, String key) throws 
PulsarAdminException {
+        sync(() -> removePropertiesAsync(topic, key));
+    }
+
+    @Override
+    public CompletableFuture<Void> removePropertiesAsync(String topic, String 
key) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "properties")
+                .queryParam("key", key);
+        return asyncDeleteRequest(path);
+    }
+
     @Override
     public CompletableFuture<Void> deletePartitionedTopicAsync(String topic) {
         return deletePartitionedTopicAsync(topic, false);
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index deda7c41d1b..c339da80e8f 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1519,6 +1519,10 @@ public class PulsarAdminToolTest {
         props.put("x", "y,z");
         
verify(mockTopics).updateProperties("persistent://myprop/clust/ns1/ds1", props);
 
+        cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("remove-properties 
persistent://myprop/clust/ns1/ds1 --key a"));
+        
verify(mockTopics).removeProperties("persistent://myprop/clust/ns1/ds1", "a");
+
         cmdTopics = new CmdTopics(() -> admin);
         cmdTopics.run(split("get-subscription-properties 
persistent://myprop/clust/ns1/ds1 -s sub1"));
         
verify(mockTopics).getSubscriptionProperties("persistent://myprop/clust/ns1/ds1",
 "sub1");
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index dc6313b1be1..8b12d70d057 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -123,6 +123,7 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("get-partitioned-topic-metadata", new 
GetPartitionedTopicMetadataCmd());
         jcommander.addCommand("get-properties", new GetPropertiesCmd());
         jcommander.addCommand("update-properties", new UpdateProperties());
+        jcommander.addCommand("remove-properties", new RemoveProperties());
 
         jcommander.addCommand("delete-partitioned-topic", new 
DeletePartitionedCmd());
         jcommander.addCommand("peek-messages", new PeekMessages());
@@ -649,6 +650,21 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Remove the key in properties of a topic")
+    private class RemoveProperties extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--key", "-k"}, description = "The key to remove 
in the properties of topic")
+        private String key;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+            getTopics().removeProperties(topic, key);
+        }
+    }
+
     @Parameters(commandDescription = "Delete a partitioned topic. "
             + "It will also delete all the partitions of the topic if it 
exists."
             + "And the application is not able to connect to the topic(delete 
then re-create with same name) again "
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
index d4de706e607..0184e0efb82 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
@@ -50,6 +50,7 @@ public enum TopicOperation {
 
     GET_STATS,
     GET_METADATA,
+    DELETE_METADATA,
     GET_BACKLOG_SIZE,
 
     SET_REPLICATED_SUBSCRIPTION_STATUS,

Reply via email to