This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 29c244a74b5 [feat][admin] PIP-219 Part-1 Add admin API for trimming
topic (#19094)
29c244a74b5 is described below
commit 29c244a74b5464eb6e40e3c3662effb9fa0a76a2
Author: feynmanlin <[email protected]>
AuthorDate: Thu Jan 19 18:08:11 2023 +0800
[feat][admin] PIP-219 Part-1 Add admin API for trimming topic (#19094)
---
.../authorization/PulsarAuthorizationProvider.java | 1 +
.../broker/admin/impl/PersistentTopicsBase.java | 63 ++++++++++++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 38 +++++++++++
.../broker/service/ConsumedLedgersTrimTest.java | 77 +++++++++++++++++++++-
.../org/apache/pulsar/client/admin/Topics.java | 13 ++++
.../pulsar/client/admin/internal/TopicsImpl.java | 12 ++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 3 +
.../org/apache/pulsar/admin/cli/CmdTopics.java | 13 ++++
.../common/policies/data/NamespaceOperation.java | 1 +
.../common/policies/data/TopicOperation.java | 1 +
10 files changed, 221 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index ce6c07f40c1..e0b2335089d 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -480,6 +480,7 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
case GET_BUNDLE:
return
allowConsumeOrProduceOpsAsync(namespaceName, role, authData);
case UNSUBSCRIBE:
+ case TRIM_TOPIC:
case CLEAR_BACKLOG:
return allowTheSpecifiedActionOpsAsync(
namespaceName, role, authData,
AuthAction.consume);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 73f25914d7a..195e2af1473 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4612,6 +4612,69 @@ public class PersistentTopicsBase extends AdminResource {
return null;
});
}
+ protected CompletableFuture<Void> internalTrimTopic(AsyncResponse
asyncResponse, boolean authoritative) {
+ if (!topicName.isPersistent()) {
+ log.info("[{}] Trim on a non-persistent topic {} is not allowed",
clientAppId(), topicName);
+ asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+ "Trim on a non-persistent topic is not allowed"));
+ return null;
+ }
+ if (topicName.isPartitioned()) {
+ return validateTopicOperationAsync(topicName,
TopicOperation.TRIM_TOPIC).thenCompose((x)
+ -> trimNonPartitionedTopic(asyncResponse, topicName,
authoritative));
+ }
+ return validateTopicOperationAsync(topicName,
TopicOperation.TRIM_TOPIC)
+ .thenCompose(__ ->
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
+ .thenCompose(metadata -> {
+ if (metadata.partitions > 0) {
+ return trimPartitionedTopic(asyncResponse, metadata);
+ }
+ return trimNonPartitionedTopic(asyncResponse, topicName,
authoritative);
+ });
+ }
+
+ private CompletableFuture<Void> trimNonPartitionedTopic(AsyncResponse
asyncResponse,
+ TopicName
topicName, boolean authoritative) {
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ if (!(topic instanceof PersistentTopic persistentTopic)) {
+ log.info("[{}] Trim on a non-persistent topic {} is
not allowed", clientAppId(), topicName);
+ asyncResponse.resume(new
RestException(Status.METHOD_NOT_ALLOWED,
+ "Trim on a non-persistent topic is not
allowed"));
+ return CompletableFuture.completedFuture(null);
+ }
+ ManagedLedger managedLedger =
persistentTopic.getManagedLedger();
+ if (managedLedger == null) {
+ asyncResponse.resume(null);
+ return CompletableFuture.completedFuture(null);
+ }
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ managedLedger.trimConsumedLedgersInBackground(result);
+ return result.whenComplete((res, e) -> {
+ if (e != null) {
+ asyncResponse.resume(e);
+ } else {
+ asyncResponse.resume(res);
+ }
+ });
+ });
+ }
+
+ private CompletableFuture<Void> trimPartitionedTopic(AsyncResponse
asyncResponse,
+
PartitionedTopicMetadata metadata) {
+ List<CompletableFuture<Void>> futures = new
ArrayList<>(metadata.partitions);
+ for (int i = 0; i < metadata.partitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ try {
+
futures.add(pulsar().getAdminClient().topics().trimTopicAsync(topicNamePartition.toString()));
+ } catch (Exception e) {
+ log.error("[{}] Failed to trim topic {}", clientAppId(),
topicNamePartition, e);
+ throw new RestException(e);
+ }
+ }
+ return
FutureUtil.waitForAll(futures).thenAccept(asyncResponse::resume);
+ }
protected CompletableFuture<DispatchRateImpl>
internalGetDispatchRate(boolean applied, boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index b25e4d17353..2c02955c5ab 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -3269,6 +3269,44 @@ public class PersistentTopics extends
PersistentTopicsBase {
}
}
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/trim")
+ @ApiOperation(value = " Trim a topic")
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Don't have permission to
administrate resources on this tenant or"
+ + "subscriber is not authorized to access this operation"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace or topic does not
exist"),
+ @ApiResponse(code = 405, message = "Operation is not allowed on
the persistent topic"),
+ @ApiResponse(code = 412, message = "Topic name is not valid"),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ @ApiResponse(code = 503, message = "Failed to validate global
cluster configuration")})
+ public void trimTopic(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalTrimTopic(asyncResponse, authoritative).exceptionally(ex
-> {
+ // If the exception is not redirect exception we need to log
it.
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to trim topic {}", clientAppId(),
topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
@GET
@Path("/{tenant}/{namespace}/{topic}/dispatchRate")
@ApiOperation(value = "Get dispatch rate configuration for specified
topic.")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
index f2ee85d3fd6..099a9028c46 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -21,17 +21,22 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
-
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -179,4 +184,74 @@ public class ConsumedLedgersTrimTest extends
BrokerTestBase {
assertEquals(messageIdAfterTrim, MessageId.earliest);
}
+
+ @Test
+ public void TestAdminTrimLedgers() throws Exception {
+ conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE / 2);
+ conf.setDefaultNumberOfNamespaceBundles(1);
+ super.baseSetup();
+ final String topicName =
"persistent://prop/ns-abc/TestAdminTrimLedgers" + UUID.randomUUID();
+ final String subscriptionName = "my-sub";
+ final int maxEntriesPerLedger = 2;
+ final int partitionedNum = 3;
+
+ admin.topics().createPartitionedTopic(topicName, partitionedNum);
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .producerName("producer-name")
+ .create();
+ @Cleanup
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+ .subscribe();
+ for (int i = 0; i < partitionedNum; i++) {
+ String topic = TopicName.get(topicName).getPartition(i).toString();
+ Topic topicRef =
pulsar.getBrokerService().getTopicReference(topic).get();
+ Assert.assertNotNull(topicRef);
+ }
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService()
+
.getTopicReference(TopicName.get(topicName).getPartition(0).toString()).get();
+ ManagedLedgerConfig managedLedgerConfig =
persistentTopic.getManagedLedger().getConfig();
+ managedLedgerConfig.setRetentionSizeInMB(-1);
+ managedLedgerConfig.setRetentionTime(1, TimeUnit.MILLISECONDS);
+ managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger);
+ managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+ int msgNum = 50;
+ for (int i = 0; i < msgNum; i++) {
+ producer.send(new byte[0]);
+ }
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ Assert.assertTrue(managedLedger.getLedgersInfoAsList().size() > 1);
+ for (int i = 0; i < msgNum; i++) {
+ Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ consumer.acknowledge(msg);
+ }
+ //consumed ledger should be cleaned
+ admin.topics().trimTopic(topicName);
+ Awaitility.await().untilAsserted(() ->
+
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1));
+
+ }
+
+ @Test
+ public void trimNonPersistentTopic() throws Exception {
+ super.baseSetup();
+ String topicName =
"non-persistent://prop/ns-abc/trimNonPersistentTopic" + UUID.randomUUID();
+ int partitionedNum = 3;
+ admin.topics().createPartitionedTopic(topicName, partitionedNum);
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .producerName("producer-name")
+ .create();
+ try {
+ admin.topics().trimTopic(topicName);
+ fail("should failed");
+ } catch (Exception e) {
+ assertTrue(e instanceof PulsarAdminException.NotAllowedException);
+ }
+ }
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 16b9afca5f2..0d5d14eb734 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1979,6 +1979,19 @@ public interface Topics {
*/
CompletableFuture<Void> triggerCompactionAsync(String topic);
+ /**
+ * Trigger topic trimming.
+ * @param topic The topic to trim
+ * @throws PulsarAdminException
+ */
+ void trimTopic(String topic) throws PulsarAdminException;
+
+ /**
+ * Trigger topic trimming asynchronously.
+ * @param topic The topic to trim
+ */
+ CompletableFuture<Void> trimTopicAsync(String topic);
+
/**
* Check the status of an ongoing compaction for a topic.
*
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 3b50737ffc0..33d1cd17858 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1145,6 +1145,18 @@ public class TopicsImpl extends BaseResource implements
Topics {
return asyncPutRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
}
+ @Override
+ public void trimTopic(String topic) throws PulsarAdminException {
+ sync(() -> trimTopicAsync(topic));
+ }
+
+ @Override
+ public CompletableFuture<Void> trimTopicAsync(String topic) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "trim");
+ return asyncPostRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
@Override
public LongRunningProcessStatus compactionStatus(String topic)
throws PulsarAdminException {
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 3ec0d1cb237..2a16d09ad0a 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1584,6 +1584,9 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("analyze-backlog persistent://myprop/clust/ns1/ds1
-s sub1"));
verify(mockTopics).analyzeSubscriptionBacklog("persistent://myprop/clust/ns1/ds1",
"sub1", Optional.empty());
+ cmdTopics.run(split("trim-topic persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopics).trimTopic("persistent://myprop/clust/ns1/ds1");
+
// jcommander is stateful, you cannot parse the same command twice
cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("create-subscription
persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest --property a=b
-p x=y,z"));
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 197b42d0363..19277025892 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -261,6 +261,8 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-schema-validation-enforce", new
GetSchemaValidationEnforced());
jcommander.addCommand("set-schema-validation-enforce", new
SetSchemaValidationEnforced());
+ jcommander.addCommand("trim-topic", new TrimTopic());
+
initDeprecatedCommands();
}
@@ -3111,4 +3113,15 @@ public class CmdTopics extends CmdBase {
getAdmin().topics().setSchemaValidationEnforced(topic, enable);
}
}
+ @Parameters(commandDescription = "Trim a topic")
+ private class TrimTopic extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String topic = validateTopicName(params);
+ getAdmin().topics().trimTopic(topic);
+ }
+ }
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
index dfcecfab9d6..732072747a4 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceOperation.java
@@ -39,4 +39,5 @@ public enum NamespaceOperation {
UNSUBSCRIBE,
PACKAGES,
+ TRIM_TOPIC,
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
index ad16b58e013..4c74ffcd8f0 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
@@ -55,4 +55,5 @@ public enum TopicOperation {
SET_REPLICATED_SUBSCRIPTION_STATUS,
GET_REPLICATED_SUBSCRIPTION_STATUS,
+ TRIM_TOPIC,
}