This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 29c244a74b5 [feat][admin] PIP-219 Part-1 Add admin API for trimming 
topic (#19094)
29c244a74b5 is described below

commit 29c244a74b5464eb6e40e3c3662effb9fa0a76a2
Author: feynmanlin <[email protected]>
AuthorDate: Thu Jan 19 18:08:11 2023 +0800

    [feat][admin] PIP-219 Part-1 Add admin API for trimming topic (#19094)
---
 .../authorization/PulsarAuthorizationProvider.java |  1 +
 .../broker/admin/impl/PersistentTopicsBase.java    | 63 ++++++++++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 38 +++++++++++
 .../broker/service/ConsumedLedgersTrimTest.java    | 77 +++++++++++++++++++++-
 .../org/apache/pulsar/client/admin/Topics.java     | 13 ++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 12 ++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  3 +
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 13 ++++
 .../common/policies/data/NamespaceOperation.java   |  1 +
 .../common/policies/data/TopicOperation.java       |  1 +
 10 files changed, 221 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index ce6c07f40c1..e0b2335089d 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -480,6 +480,7 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                             case GET_BUNDLE:
                                 return 
allowConsumeOrProduceOpsAsync(namespaceName, role, authData);
                             case UNSUBSCRIBE:
+                            case TRIM_TOPIC:
                             case CLEAR_BACKLOG:
                                 return allowTheSpecifiedActionOpsAsync(
                                         namespaceName, role, authData, 
AuthAction.consume);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 73f25914d7a..195e2af1473 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4612,6 +4612,69 @@ public class PersistentTopicsBase extends AdminResource {
                     return null;
         });
     }
+    protected CompletableFuture<Void> internalTrimTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
+        if (!topicName.isPersistent()) {
+            log.info("[{}] Trim on a non-persistent topic {} is not allowed", 
clientAppId(), topicName);
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Trim on a non-persistent topic is not allowed"));
+            return null;
+        }
+        if (topicName.isPartitioned()) {
+            return validateTopicOperationAsync(topicName, 
TopicOperation.TRIM_TOPIC).thenCompose((x)
+                    -> trimNonPartitionedTopic(asyncResponse, topicName, 
authoritative));
+        }
+        return validateTopicOperationAsync(topicName, 
TopicOperation.TRIM_TOPIC)
+                .thenCompose(__ -> 
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
+                .thenCompose(metadata -> {
+                    if (metadata.partitions > 0) {
+                        return trimPartitionedTopic(asyncResponse, metadata);
+                    }
+                    return trimNonPartitionedTopic(asyncResponse, topicName, 
authoritative);
+                });
+    }
+
+    private CompletableFuture<Void> trimNonPartitionedTopic(AsyncResponse 
asyncResponse,
+                                                            TopicName 
topicName, boolean authoritative) {
+        return  validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    if (!(topic instanceof PersistentTopic persistentTopic)) {
+                        log.info("[{}] Trim on a non-persistent topic {} is 
not allowed", clientAppId(), topicName);
+                        asyncResponse.resume(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                "Trim on a non-persistent topic is not 
allowed"));
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    ManagedLedger managedLedger = 
persistentTopic.getManagedLedger();
+                    if (managedLedger == null) {
+                        asyncResponse.resume(null);
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    CompletableFuture<Void> result = new CompletableFuture<>();
+                    managedLedger.trimConsumedLedgersInBackground(result);
+                    return result.whenComplete((res, e) -> {
+                        if (e != null) {
+                            asyncResponse.resume(e);
+                        } else {
+                            asyncResponse.resume(res);
+                        }
+                    });
+                });
+    }
+
+    private CompletableFuture<Void> trimPartitionedTopic(AsyncResponse 
asyncResponse,
+                                                         
PartitionedTopicMetadata metadata) {
+        List<CompletableFuture<Void>> futures = new 
ArrayList<>(metadata.partitions);
+        for (int i = 0; i < metadata.partitions; i++) {
+            TopicName topicNamePartition = topicName.getPartition(i);
+            try {
+                
futures.add(pulsar().getAdminClient().topics().trimTopicAsync(topicNamePartition.toString()));
+            } catch (Exception e) {
+                log.error("[{}] Failed to trim topic {}", clientAppId(), 
topicNamePartition, e);
+                throw new RestException(e);
+            }
+        }
+        return 
FutureUtil.waitForAll(futures).thenAccept(asyncResponse::resume);
+    }
 
     protected CompletableFuture<DispatchRateImpl> 
internalGetDispatchRate(boolean applied, boolean isGlobal) {
         return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index b25e4d17353..2c02955c5ab 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -3269,6 +3269,44 @@ public class PersistentTopics extends 
PersistentTopicsBase {
         }
     }
 
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/trim")
+    @ApiOperation(value = " Trim a topic")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace or topic does not 
exist"),
+            @ApiResponse(code = 405, message = "Operation is not allowed on 
the persistent topic"),
+            @ApiResponse(code = 412, message = "Topic name is not valid"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global 
cluster configuration")})
+    public void trimTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalTrimTopic(asyncResponse, authoritative).exceptionally(ex 
-> {
+                // If the exception is not redirect exception we need to log 
it.
+                if (!isRedirectException(ex)) {
+                    log.error("[{}] Failed to trim topic {}", clientAppId(), 
topicName, ex);
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/{topic}/dispatchRate")
     @ApiOperation(value = "Get dispatch rate configuration for specified 
topic.")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
index f2ee85d3fd6..099a9028c46 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -21,17 +21,22 @@ package org.apache.pulsar.broker.service;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
-
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -179,4 +184,74 @@ public class ConsumedLedgersTrimTest extends 
BrokerTestBase {
         assertEquals(messageIdAfterTrim, MessageId.earliest);
 
     }
+
+    @Test
+    public void TestAdminTrimLedgers() throws Exception {
+        conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE / 2);
+        conf.setDefaultNumberOfNamespaceBundles(1);
+        super.baseSetup();
+        final String topicName = 
"persistent://prop/ns-abc/TestAdminTrimLedgers" + UUID.randomUUID();
+        final String subscriptionName = "my-sub";
+        final int maxEntriesPerLedger = 2;
+        final int partitionedNum = 3;
+
+        admin.topics().createPartitionedTopic(topicName, partitionedNum);
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .producerName("producer-name")
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                .subscribe();
+        for (int i = 0; i < partitionedNum; i++) {
+            String topic = TopicName.get(topicName).getPartition(i).toString();
+            Topic topicRef = 
pulsar.getBrokerService().getTopicReference(topic).get();
+            Assert.assertNotNull(topicRef);
+        }
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
+                
.getTopicReference(TopicName.get(topicName).getPartition(0).toString()).get();
+        ManagedLedgerConfig managedLedgerConfig = 
persistentTopic.getManagedLedger().getConfig();
+        managedLedgerConfig.setRetentionSizeInMB(-1);
+        managedLedgerConfig.setRetentionTime(1, TimeUnit.MILLISECONDS);
+        managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger);
+        managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+        int msgNum = 50;
+        for (int i = 0; i < msgNum; i++) {
+            producer.send(new byte[0]);
+        }
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        Assert.assertTrue(managedLedger.getLedgersInfoAsList().size() > 1);
+        for (int i = 0; i < msgNum; i++) {
+            Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            consumer.acknowledge(msg);
+        }
+        //consumed ledger should be cleaned
+        admin.topics().trimTopic(topicName);
+        Awaitility.await().untilAsserted(() ->
+                
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1));
+
+    }
+
+    @Test
+    public void trimNonPersistentTopic() throws Exception {
+        super.baseSetup();
+        String topicName = 
"non-persistent://prop/ns-abc/trimNonPersistentTopic" + UUID.randomUUID();
+        int partitionedNum = 3;
+        admin.topics().createPartitionedTopic(topicName, partitionedNum);
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .producerName("producer-name")
+                .create();
+        try {
+            admin.topics().trimTopic(topicName);
+            fail("should failed");
+        } catch (Exception e) {
+            assertTrue(e instanceof PulsarAdminException.NotAllowedException);
+        }
+    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 16b9afca5f2..0d5d14eb734 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1979,6 +1979,19 @@ public interface Topics {
      */
     CompletableFuture<Void> triggerCompactionAsync(String topic);
 
+    /**
+     * Trigger topic trimming.
+     * @param topic The topic to trim
+     * @throws PulsarAdminException
+     */
+    void trimTopic(String topic) throws PulsarAdminException;
+
+    /**
+     * Trigger topic trimming asynchronously.
+     * @param topic The topic to trim
+     */
+    CompletableFuture<Void> trimTopicAsync(String topic);
+
     /**
      * Check the status of an ongoing compaction for a topic.
      *
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 3b50737ffc0..33d1cd17858 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1145,6 +1145,18 @@ public class TopicsImpl extends BaseResource implements 
Topics {
         return asyncPutRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
     }
 
+    @Override
+    public void trimTopic(String topic) throws PulsarAdminException {
+        sync(() -> trimTopicAsync(topic));
+    }
+
+    @Override
+    public CompletableFuture<Void> trimTopicAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "trim");
+        return asyncPostRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
     @Override
     public LongRunningProcessStatus compactionStatus(String topic)
             throws PulsarAdminException {
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 3ec0d1cb237..2a16d09ad0a 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1584,6 +1584,9 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("analyze-backlog persistent://myprop/clust/ns1/ds1 
-s sub1"));
         
verify(mockTopics).analyzeSubscriptionBacklog("persistent://myprop/clust/ns1/ds1",
 "sub1", Optional.empty());
 
+        cmdTopics.run(split("trim-topic persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).trimTopic("persistent://myprop/clust/ns1/ds1");
+
         // jcommander is stateful, you cannot parse the same command twice
         cmdTopics = new CmdTopics(() -> admin);
         cmdTopics.run(split("create-subscription 
persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest --property a=b 
-p x=y,z"));
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 197b42d0363..19277025892 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -261,6 +261,8 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("get-schema-validation-enforce", new 
GetSchemaValidationEnforced());
         jcommander.addCommand("set-schema-validation-enforce", new 
SetSchemaValidationEnforced());
 
+        jcommander.addCommand("trim-topic", new TrimTopic());
+
         initDeprecatedCommands();
     }
 
@@ -3111,4 +3113,15 @@ public class CmdTopics extends CmdBase {
             getAdmin().topics().setSchemaValidationEnforced(topic, enable);
         }
     }
+    @Parameters(commandDescription = "Trim a topic")
+    private class TrimTopic extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String topic = validateTopicName(params);
+            getAdmin().topics().trimTopic(topic);
+        }
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
index dfcecfab9d6..732072747a4 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
@@ -39,4 +39,5 @@ public enum NamespaceOperation {
     UNSUBSCRIBE,
 
     PACKAGES,
+    TRIM_TOPIC,
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
index ad16b58e013..4c74ffcd8f0 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
@@ -55,4 +55,5 @@ public enum TopicOperation {
 
     SET_REPLICATED_SUBSCRIPTION_STATUS,
     GET_REPLICATED_SUBSCRIPTION_STATUS,
+    TRIM_TOPIC,
 }

Reply via email to