This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 00e3089 Support configuring DeleteInactiveTopic setting in namespace
policy (#7598)
00e3089 is described below
commit 00e30895b22129d5189db1592851bdb62e8c498b
Author: feynmanlin <[email protected]>
AuthorDate: Tue Jul 28 14:25:58 2020 +0800
Support configuring DeleteInactiveTopic setting in namespace policy (#7598)
### Motivation
Support configuring DeleteInactiveTopic setting in namespace policy
### Modifications
Only the two parameters `brokerDeleteInactiveTopicsMode` and
`brokerDeleteInactiveTopicsMaxInactiveDurationSeconds` support namespace
policy. The parameters are changed to Map structure, the key is the namespace,
and the value is the parameter value.
Such as: namespace1=delete_when_no_subscriptions,
namespace2=delete_when_no_subscriptions.
In addition, there is a key name called `default`. If it is set, other
namespaces that do not specify parameters will use this parameter.
Such as: default=delete_when_no_subscriptions
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 48 ++++--
.../apache/pulsar/broker/admin/v2/Namespaces.java | 36 +++++
.../pulsar/broker/service/AbstractTopic.java | 28 +++-
.../pulsar/broker/service/BrokerService.java | 8 +-
.../org/apache/pulsar/broker/service/Topic.java | 2 +-
.../service/nonpersistent/NonPersistentTopic.java | 23 ++-
.../broker/service/persistent/PersistentTopic.java | 21 ++-
.../broker/service/persistent/SystemTopic.java | 2 +-
.../pulsar/broker/service/BrokerTestBase.java | 7 +-
.../broker/service/InactiveTopicDeleteTest.java | 165 +++++++++++++++++++++
.../service/PersistentTopicConcurrentTest.java | 4 +-
.../org/apache/pulsar/client/admin/Namespaces.java | 48 ++++++
.../client/admin/internal/NamespacesImpl.java | 81 ++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 14 +-
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 73 ++++++++-
...{PolicyName.java => InactiveTopicPolicies.java} | 34 ++---
.../pulsar/common/policies/data/Policies.java | 6 +-
.../pulsar/common/policies/data/PolicyName.java | 1 +
18 files changed, 543 insertions(+), 58 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index ec2c1bd..c76eeda 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -26,6 +26,8 @@ import static
org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_PO
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
+
+import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
import java.util.Collections;
@@ -90,6 +92,7 @@ import
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.KeeperException;
@@ -1832,39 +1835,64 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
- protected void internalSetDelayedDelivery(DelayedDeliveryPolicies
delayedDeliveryPolicies) {
+ protected InactiveTopicPolicies internalGetInactiveTopic() {
+ validateNamespacePolicyOperation(namespaceName,
PolicyName.INACTIVE_TOPIC, PolicyOperation.READ);
+
+ Policies policies = getNamespacePolicies(namespaceName);
+ if (policies.inactive_topic_policies == null) {
+ return new
InactiveTopicPolicies(config().getBrokerDeleteInactiveTopicsMode()
+ ,
config().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds()
+ , config().isBrokerDeleteInactiveTopicsEnabled());
+ } else {
+ return policies.inactive_topic_policies;
+ }
+ }
+
+ protected void internalSetInactiveTopic(InactiveTopicPolicies
inactiveTopicPolicies){
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
+ internalSetPolicies("inactive_topic_policies", inactiveTopicPolicies);
+ }
+ protected void internalSetPolicies(String fieldName, Object value){
try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content,
Policies.class);
- policies.delayed_delivery_policies = delayedDeliveryPolicies;
+ Field field = Policies.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(policies, value);
+
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies),
nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES,
namespaceName.toString()));
- log.info("[{}] Successfully updated delayed delivery messages
configuration: namespace={}, map={}", clientAppId(),
- namespaceName,
jsonMapper().writeValueAsString(policies.retention_policies));
+ log.info("[{}] Successfully updated {} configuration:
namespace={}, value={}", clientAppId(), fieldName,
+ namespaceName, jsonMapper().writeValueAsString(value));
} catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to update delayed delivery messages
configuration for namespace {}: does not exist", clientAppId(),
- namespaceName);
+ log.warn("[{}] Failed to update {} configuration for namespace {}:
does not exist", clientAppId(),
+ fieldName, namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
} catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to update delayed delivery messages
configuration for namespace {}: concurrent modification",
- clientAppId(), namespaceName);
+ log.warn("[{}] Failed to update {} configuration for namespace {}:
concurrent modification",
+ clientAppId(), fieldName, namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent
modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
- log.error("[{}] Failed to update delayed delivery messages
configuration for namespace {}", clientAppId(), namespaceName,
- e);
+ log.error("[{}] Failed to update {} configuration for namespace
{}", clientAppId(), fieldName
+ , namespaceName, e);
throw new RestException(e);
}
}
+ protected void internalSetDelayedDelivery(DelayedDeliveryPolicies
delayedDeliveryPolicies) {
+ validateSuperUserAccess();
+ validatePoliciesReadOnlyAccess();
+ internalSetPolicies("delayed_delivery_policies",
delayedDeliveryPolicies);
+ }
+
protected void internalSetNamespaceAntiAffinityGroup(String
antiAffinityGroup) {
validateNamespacePolicyOperation(namespaceName,
PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be
null");
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 0a9ec96..f162207 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -68,6 +68,7 @@ import
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrat
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.slf4j.Logger;
@@ -858,6 +859,41 @@ public class Namespaces extends NamespacesBase {
}
@GET
+ @Path("/{tenant}/{namespace}/inactiveTopicPolicies")
+ @ApiOperation(value = "Get inactive topic policies config on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"), })
+ public InactiveTopicPolicies getInactiveTopicPolicies(@PathParam("tenant")
String tenant,
+
@PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ return internalGetInactiveTopic();
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/inactiveTopicPolicies")
+ @ApiOperation(value = "Remove inactive topic policies from a namespace.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removeInactiveTopicPolicies(@PathParam("tenant") String
tenant, @PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ internalSetInactiveTopic( null);
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/inactiveTopicPolicies")
+ @ApiOperation(value = "Set inactive topic policies config on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist"), })
+ public void setInactiveTopicPolicies(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Inactive topic policies for the specified
namespace") InactiveTopicPolicies inactiveTopicPolicies) {
+ validateNamespaceName(tenant, namespace);
+ internalSetInactiveTopic(inactiveTopicPolicies);
+ }
+
+ @GET
@Path("/{tenant}/{namespace}/maxProducersPerTopic")
@ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 4a02f1f..09a0521 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -36,6 +36,8 @@ import
org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -63,8 +65,8 @@ public abstract class AbstractTopic implements Topic {
protected volatile boolean isFenced;
- // When set to false, this inactive topic can not be deleted
- protected boolean deleteWhileInactive;
+ // Inactive topic policies
+ protected InactiveTopicPolicies inactiveTopicPolicies = new
InactiveTopicPolicies();
// Timestamp of when this topic was last seen active
protected volatile long lastActive;
@@ -98,8 +100,9 @@ public abstract class AbstractTopic implements Topic {
this.producers = new ConcurrentHashMap<>();
this.isFenced = false;
this.replicatorPrefix =
brokerService.pulsar().getConfiguration().getReplicatorPrefix();
- this.deleteWhileInactive =
-
brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled();
+
this.inactiveTopicPolicies.setDeleteWhileInactive(brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled());
+
this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
+
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode());
this.lastActive = System.nanoTime();
Policies policies = null;
try {
@@ -132,12 +135,14 @@ public abstract class AbstractTopic implements Topic {
return false;
}
+ @Override
public void disableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer ->
producer.getCnx().disableCnxAutoRead());
}
}
+ @Override
public void enableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer ->
producer.getCnx().enableCnxAutoRead());
@@ -466,12 +471,23 @@ public abstract class AbstractTopic implements Topic {
}
public boolean isDeleteWhileInactive() {
- return deleteWhileInactive;
+ return this.inactiveTopicPolicies.isDeleteWhileInactive();
}
public void setDeleteWhileInactive(boolean deleteWhileInactive) {
- this.deleteWhileInactive = deleteWhileInactive;
+ this.inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
}
private static final Logger log =
LoggerFactory.getLogger(AbstractTopic.class);
+
+ public InactiveTopicPolicies getInactiveTopicPolicies() {
+ return inactiveTopicPolicies;
+ }
+
+ public void resetInactiveTopicPolicies(InactiveTopicDeleteMode
inactiveTopicDeleteMode
+ , int maxInactiveDurationSeconds, boolean deleteWhileInactive) {
+
inactiveTopicPolicies.setInactiveTopicDeleteMode(inactiveTopicDeleteMode);
+
inactiveTopicPolicies.setMaxInactiveDurationSeconds(maxInactiveDurationSeconds);
+ inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 34cca90..e6b1c81 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -442,8 +442,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
protected void startInactivityMonitor() {
if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled())
{
int interval =
pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
- int maxInactiveDurationInSec =
pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds();
- inactivityMonitor.scheduleAtFixedRate(safeRun(() ->
checkGC(maxInactiveDurationInSec)), interval, interval,
+ inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC()),
interval, interval,
TimeUnit.SECONDS);
}
@@ -1244,9 +1243,8 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
return lookupRequestSemaphore.get();
}
- public void checkGC(int maxInactiveDurationInSec) {
- forEachTopic(topic -> topic.checkGC(maxInactiveDurationInSec,
- pulsar.getConfiguration().getBrokerDeleteInactiveTopicsMode()));
+ public void checkGC() {
+ forEachTopic(Topic::checkGC);
}
public void checkMessageExpiry() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index d20e700..4d14326 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -124,7 +124,7 @@ public interface Topic {
CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);
- void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode
deleteMode);
+ void checkGC();
void checkInactiveSubscriptions();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index eb8d9d3..5a4d7cd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
@@ -68,7 +69,6 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
@@ -145,6 +145,9 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
.orElseThrow(() -> new KeeperException.NoNodeException());
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
+ if (policies.inactive_topic_policies != null) {
+ inactiveTopicPolicies = policies.inactive_topic_policies;
+ }
setSchemaCompatibilityStrategy(policies);
schemaValidationEnforced = policies.schema_validation_enforced;
@@ -420,7 +423,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
/**
* Close this topic - close all producers and subscriptions associated
with this topic
- *
+ *
* @param closeWithoutWaitingClientDisconnect
* don't wait for client disconnect and forcefully close
managed-ledger
* @return Completable future indicating completion of close operation
@@ -626,6 +629,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
return replicators.get(remoteCluster);
}
+ @Override
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats
bundleStats,
StatsOutputStream topicStatsStream, ClusterReplicationMetrics
replStats, String namespace,
boolean hydratePublishers) {
@@ -755,6 +759,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
topicStatsStream.endObject();
}
+ @Override
public NonPersistentTopicStats getStats(boolean getPreciseBacklog) {
NonPersistentTopicStats stats = new NonPersistentTopicStats();
@@ -808,6 +813,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
return stats;
}
+ @Override
public PersistentTopicInternalStats getInternalStats() {
PersistentTopicInternalStats stats = new
PersistentTopicInternalStats();
@@ -829,11 +835,12 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
}
@Override
- public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode
deleteMode) {
- if (!deleteWhileInactive) {
+ public void checkGC() {
+ if (!isDeleteWhileInactive()) {
// This topic is not included in GC
return;
}
+ int maxInactiveDurationInSec =
inactiveTopicPolicies.getMaxInactiveDurationSeconds();
if (isActive()) {
lastActive = System.nanoTime();
} else {
@@ -895,6 +902,14 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
producer.checkEncryption();
});
subscriptions.forEach((subName, sub) ->
sub.getConsumers().forEach(Consumer::checkPermissions));
+
+ if (data.inactive_topic_policies != null) {
+ this.inactiveTopicPolicies = data.inactive_topic_policies;
+ } else {
+ ServiceConfiguration cfg =
brokerService.getPulsar().getConfiguration();
+ resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
+ ,
cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
cfg.isBrokerDeleteInactiveTopicsEnabled());
+ }
return checkReplicationAndRetryOnFailure();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8983238..377dc98 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
@@ -255,6 +256,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
schemaValidationEnforced = policies.schema_validation_enforced;
+ if (policies.inactive_topic_policies != null) {
+ inactiveTopicPolicies = policies.inactive_topic_policies;
+ }
maxUnackedMessagesOnConsumer =
unackedMessagesExceededOnConsumer(policies);
maxUnackedMessagesOnSubscription =
unackedMessagesExceededOnSubscription(policies);
@@ -1273,6 +1277,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return ledger;
}
+ @Override
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats
bundleStats, StatsOutputStream topicStatsStream,
ClusterReplicationMetrics replStats, String namespace, boolean
hydratePublishers) {
@@ -1488,6 +1493,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return lastUpdatedAvgPublishRateInByte;
}
+ @Override
public TopicStats getStats(boolean getPreciseBacklog) {
TopicStats stats = new TopicStats();
@@ -1546,6 +1552,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return stats;
}
+ @Override
public PersistentTopicInternalStats getInternalStats() {
PersistentTopicInternalStats stats = new
PersistentTopicInternalStats();
@@ -1628,11 +1635,13 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
@Override
- public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode
deleteMode) {
- if (!deleteWhileInactive) {
+ public void checkGC() {
+ if (!isDeleteWhileInactive()) {
// This topic is not included in GC
return;
}
+ InactiveTopicDeleteMode deleteMode =
inactiveTopicPolicies.getInactiveTopicDeleteMode();
+ int maxInactiveDurationInSec =
inactiveTopicPolicies.getMaxInactiveDurationSeconds();
if (isActive(deleteMode)) {
lastActive = System.nanoTime();
} else if (System.nanoTime() - lastActive <
TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
@@ -1787,6 +1796,13 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
delayedDeliveryTickTimeMillis =
data.delayed_delivery_policies.getTickTime();
delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
}
+ if (data.inactive_topic_policies != null) {
+ this.inactiveTopicPolicies = data.inactive_topic_policies;
+ } else {
+ ServiceConfiguration cfg =
brokerService.getPulsar().getConfiguration();
+ resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
+ ,
cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
cfg.isBrokerDeleteInactiveTopicsEnabled());
+ }
initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
@@ -1977,6 +1993,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return FutureUtil.failedFuture(new BrokerServiceException("Cursor not
found"));
}
+ @Override
public Optional<DispatchRateLimiter> getDispatchRateLimiter() {
return this.dispatchRateLimiter;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 6720209..4b338a9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -46,7 +46,7 @@ public class SystemTopic extends PersistentTopic {
}
@Override
- public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode
deleteMode) {
+ public void checkGC() {
// do nothing for system topic
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
index 314ccfa..ada2242 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
@@ -55,7 +55,12 @@ public abstract class BrokerTestBase extends
MockedPulsarServiceBaseTest {
void runGC() {
try {
- pulsar.getExecutor().submit(() ->
pulsar.getBrokerService().checkGC(0)).get();
+ pulsar.getBrokerService().forEachTopic(topic -> {
+ if (topic instanceof AbstractTopic) {
+ ((AbstractTopic)
topic).getInactiveTopicPolicies().setMaxInactiveDurationSeconds(0);
+ }
+ });
+ pulsar.getExecutor().submit(() ->
pulsar.getBrokerService().checkGC()).get();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
} catch (Exception e) {
LOG.error("GC executor error", e);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index c3d353b..143a4ff 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -19,10 +19,18 @@
package org.apache.pulsar.broker.service;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -68,6 +76,163 @@ public class InactiveTopicDeleteTest extends BrokerTestBase
{
super.internalCleanup();
}
+ @Test(timeOut = 20000)
+ public void testTopicPolicyUpdateAndClean() throws Exception {
+ final String namespace = "prop/ns-abc";
+ final String namespace2 = "prop/ns-abc2";
+ final String namespace3 = "prop/ns-abc3";
+ List<String> namespaceList = Arrays.asList(namespace2, namespace3);
+
+ super.resetConfig();
+ conf.setBrokerDeleteInactiveTopicsEnabled(true);
+ conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(1000);
+
conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ InactiveTopicPolicies defaultPolicy = new
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions
+ , 1000, true);
+
+ super.baseSetup();
+
+ for (String ns : namespaceList) {
+ admin.namespaces().createNamespace(ns);
+ admin.namespaces().setNamespaceReplicationClusters(ns,
Sets.newHashSet("test"));
+ }
+
+ final String topic = "persistent://prop/ns-abc/testDeletePolicyUpdate";
+ final String topic2 =
"persistent://prop/ns-abc2/testDeletePolicyUpdate";
+ final String topic3 =
"persistent://prop/ns-abc3/testDeletePolicyUpdate";
+ List<String> topics = Arrays.asList(topic, topic2, topic3);
+
+ for (String tp : topics) {
+ admin.topics().createNonPartitionedTopic(tp);
+ }
+
+ InactiveTopicPolicies inactiveTopicPolicies =
+ new
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,
true);
+ admin.namespaces().setInactiveTopicPolicies(namespace,
inactiveTopicPolicies);
+
inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+ admin.namespaces().setInactiveTopicPolicies(namespace2,
inactiveTopicPolicies);
+
inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ admin.namespaces().setInactiveTopicPolicies(namespace3,
inactiveTopicPolicies);
+
+ InactiveTopicPolicies policies;
+ //wait for zk
+ while (true) {
+ policies = ((PersistentTopic)
pulsar.getBrokerService().getTopic(topic,
false).get().get()).inactiveTopicPolicies;
+ if (policies.isDeleteWhileInactive()) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ Assert.assertTrue(policies.isDeleteWhileInactive());
+ Assert.assertEquals(policies.getInactiveTopicDeleteMode(),
InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ Assert.assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+ Assert.assertEquals(policies,
admin.namespaces().getInactiveTopicPolicies(namespace));
+
+ admin.namespaces().removeInactiveTopicPolicies(namespace);
+ while (true) {
+ Thread.sleep(500);
+ policies = ((PersistentTopic)
pulsar.getBrokerService().getTopic(topic,
false).get().get()).inactiveTopicPolicies;
+ if (policies.getMaxInactiveDurationSeconds() == 1000) {
+ break;
+ }
+ }
+ Assert.assertEquals(((PersistentTopic)
pulsar.getBrokerService().getTopic(topic,
false).get().get()).inactiveTopicPolicies
+ , defaultPolicy);
+
+ policies =
((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies;
+ Assert.assertTrue(policies.isDeleteWhileInactive());
+ Assert.assertEquals(policies.getInactiveTopicDeleteMode(),
InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+ Assert.assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+ Assert.assertEquals(policies,
admin.namespaces().getInactiveTopicPolicies(namespace2));
+
+ admin.namespaces().removeInactiveTopicPolicies(namespace2);
+ while (true) {
+ Thread.sleep(500);
+ policies = ((PersistentTopic)
pulsar.getBrokerService().getTopic(topic2,
false).get().get()).inactiveTopicPolicies;
+ if (policies.getMaxInactiveDurationSeconds() == 1000) {
+ break;
+ }
+ }
+ Assert.assertEquals(((PersistentTopic)
pulsar.getBrokerService().getTopic(topic2,
false).get().get()).inactiveTopicPolicies
+ , defaultPolicy);
+
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 20000)
+ public void testDeleteWhenNoSubscriptionsWithMultiConfig() throws
Exception {
+ final String namespace = "prop/ns-abc";
+ final String namespace2 = "prop/ns-abc2";
+ final String namespace3 = "prop/ns-abc3";
+ List<String> namespaceList = Arrays.asList(namespace2, namespace3);
+
+ conf.setBrokerDeleteInactiveTopicsEnabled(true);
+ conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
+ super.baseSetup();
+
+ for (String ns : namespaceList) {
+ admin.namespaces().createNamespace(ns);
+ admin.namespaces().setNamespaceReplicationClusters(ns,
Sets.newHashSet("test"));
+ }
+
+ final String topic =
"persistent://prop/ns-abc/testDeleteWhenNoSubscriptionsWithMultiConfig";
+ final String topic2 =
"persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig";
+ final String topic3 =
"persistent://prop/ns-abc3/testDeleteWhenNoSubscriptionsWithMultiConfig";
+ List<String> topics = Arrays.asList(topic, topic2, topic3);
+ //create producer/consumer and close
+ Map<String, String> topicToSub = new HashMap<>();
+ for (String tp : topics) {
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(tp).create();
+ String subName = "sub" + System.currentTimeMillis();
+ topicToSub.put(tp, subName);
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(tp).subscriptionName(subName).subscribe();
+ for (int i = 0; i < 10; i++) {
+ producer.send("Pulsar".getBytes());
+ }
+ consumer.close();
+ producer.close();
+ Thread.sleep(1);
+ }
+ // namespace use delete_when_no_subscriptions, namespace2 use
delete_when_subscriptions_caught_up
+ // namespace3 use default:delete_when_no_subscriptions
+ InactiveTopicPolicies inactiveTopicPolicies =
+ new
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,1,true);
+ admin.namespaces().setInactiveTopicPolicies(namespace,
inactiveTopicPolicies);
+
inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+ admin.namespaces().setInactiveTopicPolicies(namespace2,
inactiveTopicPolicies);
+
+ //wait for zk
+ while (true) {
+ InactiveTopicPolicies policies = ((PersistentTopic)
pulsar.getBrokerService()
+ .getTopic(topic, false).get().get()).inactiveTopicPolicies;
+ if (policies.isDeleteWhileInactive()) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+
+ // topic should still exist
+ Thread.sleep(2000);
+ Assert.assertTrue(admin.topics().getList(namespace).contains(topic));
+ Assert.assertTrue(admin.topics().getList(namespace2).contains(topic2));
+ Assert.assertTrue(admin.topics().getList(namespace3).contains(topic3));
+
+ // no backlog, trigger delete_when_subscriptions_caught_up
+ admin.topics().skipAllMessages(topic2, topicToSub.remove(topic2));
+ Thread.sleep(2000);
+
Assert.assertFalse(admin.topics().getList(namespace2).contains(topic2));
+ // delete subscription, trigger delete_when_no_subscriptions
+ for (Map.Entry<String, String> entry : topicToSub.entrySet()) {
+ admin.topics().deleteSubscription(entry.getKey(),
entry.getValue());
+ }
+ Thread.sleep(2000);
+ Assert.assertFalse(admin.topics().getList(namespace).contains(topic));
+
Assert.assertFalse(admin.topics().getList(namespace3).contains(topic3));
+
+ super.internalCleanup();
+ }
+
@Test
public void testDeleteWhenNoBacklogs() throws Exception {
conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index cfc3937..12773b0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -196,7 +196,9 @@ public class PersistentTopicConcurrentTest extends
MockedBookKeeperTestCase {
// Thread.sleep(5,0);
log.info("{} forcing topic GC ", Thread.currentThread());
for (int i = 0; i < 2000; i++) {
- topic.checkGC(0,
InactiveTopicDeleteMode.delete_when_no_subscriptions);
+
topic.getInactiveTopicPolicies().setMaxInactiveDurationSeconds(0);
+
topic.getInactiveTopicPolicies().setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ topic.checkGC();
}
log.info("GC done..");
} catch (Exception e) {
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 7cb7349..f4315e5 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -36,6 +36,7 @@ import
org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
@@ -2207,6 +2208,53 @@ public interface Namespaces {
String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies);
/**
+ * Get the inactive deletion strategy for all topics within a namespace
synchronously.
+ * @param namespace
+ * @return
+ * @throws PulsarAdminException
+ */
+ InactiveTopicPolicies getInactiveTopicPolicies(String namespace) throws
PulsarAdminException;
+
+ /**
+ * remove InactiveTopicPolicies from a namespace asynchronously.
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<Void> removeInactiveTopicPoliciesAsync(String namespace);
+
+ /**
+ * Remove inactive topic policies from a namespace.
+ * @param namespace
+ * @throws PulsarAdminException
+ */
+ void removeInactiveTopicPolicies(String namespace) throws
PulsarAdminException;
+
+ /**
+ * Get the inactive deletion strategy for all topics within a namespace
asynchronously.
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<InactiveTopicPolicies>
getInactiveTopicPoliciesAsync(String namespace);
+
+ /**
+ * As same as setInactiveTopicPoliciesAsync,but it is synchronous.
+ * @param namespace
+ * @param inactiveTopicPolicies
+ */
+ void setInactiveTopicPolicies(
+ String namespace, InactiveTopicPolicies inactiveTopicPolicies)
throws PulsarAdminException;
+
+ /**
+ * You can set the inactive deletion strategy at the namespace level.
+ * Its priority is higher than the inactive deletion strategy at the
broker level.
+ * All topics under this namespace will follow this strategy.
+ * @param namespace
+ * @param inactiveTopicPolicies
+ * @return
+ */
+ CompletableFuture<Void> setInactiveTopicPoliciesAsync(
+ String namespace, InactiveTopicPolicies inactiveTopicPolicies);
+ /**
* Set the given subscription auth mode on all topics on a namespace.
*
* @param namespace
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 1a9f010..83c6ce4 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
@@ -956,6 +957,28 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
}
@Override
+ public void removeInactiveTopicPolicies(String namespace) throws
PulsarAdminException {
+ try {
+ removeInactiveTopicPoliciesAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeInactiveTopicPoliciesAsync(String
namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "inactiveTopicPolicies");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public CompletableFuture<Void> removeBacklogQuotaAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "backlogQuota")
@@ -1775,6 +1798,64 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
}
@Override
+ public InactiveTopicPolicies getInactiveTopicPolicies(String namespace)
throws PulsarAdminException {
+ try {
+ return getInactiveTopicPoliciesAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<InactiveTopicPolicies>
getInactiveTopicPoliciesAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "inactiveTopicPolicies");
+ final CompletableFuture<InactiveTopicPolicies> future = new
CompletableFuture<>();
+ asyncGetRequest(path, new InvocationCallback<InactiveTopicPolicies>() {
+ @Override
+ public void completed(InactiveTopicPolicies
inactiveTopicPolicies) {
+ future.complete(inactiveTopicPolicies);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setInactiveTopicPolicies(
+ String namespace, InactiveTopicPolicies inactiveTopicPolicies)
throws PulsarAdminException {
+ try {
+ setInactiveTopicPoliciesAsync(namespace, inactiveTopicPolicies)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setInactiveTopicPoliciesAsync(
+ String namespace, InactiveTopicPolicies inactiveTopicPolicies) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "inactiveTopicPolicies");
+ return asyncPostRequest(path, Entity.entity(inactiveTopicPolicies,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public int getMaxProducersPerTopic(String namespace) throws
PulsarAdminException {
try {
return getMaxProducersPerTopicAsync(namespace).
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index d5ca27e..18f0d39 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -76,6 +76,8 @@ import
org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
@@ -406,6 +408,16 @@ public class PulsarAdminToolTest {
namespaces.run(split("get-delayed-delivery myprop/clust/ns1"));
verify(mockNamespaces).getDelayedDelivery("myprop/clust/ns1");
+ namespaces.run(split("set-inactive-topic-policies myprop/clust/ns1 -e
-t 1s -m delete_when_no_subscriptions"));
+ verify(mockNamespaces).setInactiveTopicPolicies("myprop/clust/ns1"
+ , new
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,
1,true));
+
+ namespaces.run(split("get-inactive-topic-policies myprop/clust/ns1"));
+ verify(mockNamespaces).getInactiveTopicPolicies("myprop/clust/ns1");
+
+ namespaces.run(split("remove-inactive-topic-policies
myprop/clust/ns1"));
+ verify(mockNamespaces).removeInactiveTopicPolicies("myprop/clust/ns1");
+
namespaces.run(split("clear-backlog myprop/clust/ns1 -force"));
verify(mockNamespaces).clearNamespaceBacklog("myprop/clust/ns1");
@@ -484,7 +496,7 @@ public class PulsarAdminToolTest {
namespaces.run(split("get-dispatch-rate myprop/clust/ns1"));
verify(mockNamespaces).getDispatchRate("myprop/clust/ns1");
-
+
namespaces.run(split("set-publish-rate myprop/clust/ns1 -m 10 -b 20"));
verify(mockNamespaces).setPublishRate("myprop/clust/ns1", new
PublishRate(10, 20));
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 78d034f..661a56a 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -44,6 +44,8 @@ import
org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
@@ -658,7 +660,7 @@ public class CmdNamespaces extends CmdBase {
@Parameter(names = { "--relative-to-publish-rate",
"-rp" }, description = "dispatch rate relative to publish-rate
(if publish-relative flag is enabled then broker will apply throttling value to
(publish-rate + dispatch rate))\n", required = false)
private boolean relativeToPublishRate = false;
-
+
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
@@ -1014,6 +1016,67 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get the inactive topic policy for a
namespace")
+ private class GetInactiveTopicPolicies extends CliCommand {
+ @Parameter(description = "tenant/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(admin.namespaces().getInactiveTopicPolicies(namespace));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove inactive topic policies from a
namespace")
+ private class RemoveInactiveTopicPolicies extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().removeInactiveTopicPolicies(namespace);
+ }
+ }
+
+ @Parameters(commandDescription = "Set the inactive topic policies on a
namespace")
+ private class SetInactiveTopicPolicies extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--enable-delete-while-inactive", "-e" },
description = "Enable delete while inactive")
+ private boolean enableDeleteWhileInactive = false;
+
+ @Parameter(names = { "--disable-delete-while-inactive", "-d" },
description = "Disable delete while inactive")
+ private boolean disableDeleteWhileInactive = false;
+
+ @Parameter(names = {"--max-inactive-duration", "-t"}, description =
"Max duration of topic inactivity in seconds" +
+ ",topics that are inactive for longer than this value will be
deleted (eg: 1s, 10s, 1m, 5h, 3d)", required = true)
+ private String deleteInactiveTopicsMaxInactiveDuration;
+
+ @Parameter(names = { "--delete-mode", "-m" }, description = "Mode of
delete inactive topic" +
+ ",Valid options are: [delete_when_no_subscriptions,
delete_when_subscriptions_caught_up]", required = true)
+ private String inactiveTopicDeleteMode;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ long maxInactiveDurationInSeconds =
TimeUnit.SECONDS.toSeconds(RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
+
+ if (enableDeleteWhileInactive == disableDeleteWhileInactive) {
+ throw new ParameterException("Need to specify either
enable-delete-while-inactive or disable-delete-while-inactive");
+ }
+ InactiveTopicDeleteMode deleteMode = null;
+ try {
+ deleteMode =
InactiveTopicDeleteMode.valueOf(inactiveTopicDeleteMode);
+ } catch (IllegalArgumentException e) {
+ throw new ParameterException("delete mode can only be set to
delete_when_no_subscriptions or delete_when_subscriptions_caught_up");
+ }
+ admin.namespaces().setInactiveTopicPolicies(namespace, new
InactiveTopicPolicies(deleteMode, (int) maxInactiveDurationInSeconds,
enableDeleteWhileInactive));
+ }
+ }
+
@Parameters(commandDescription = "Set the delayed delivery policy on a
namespace")
private class SetDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
@@ -1662,7 +1725,7 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("get-retention", new GetRetention());
jcommander.addCommand("set-retention", new SetRetention());
-
+
jcommander.addCommand("set-bookie-affinity-group", new
SetBookieAffinityGroup());
jcommander.addCommand("get-bookie-affinity-group", new
GetBookieAffinityGroup());
jcommander.addCommand("delete-bookie-affinity-group", new
DeleteBookieAffinityGroup());
@@ -1679,7 +1742,7 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("set-subscription-dispatch-rate", new
SetSubscriptionDispatchRate());
jcommander.addCommand("get-subscription-dispatch-rate", new
GetSubscriptionDispatchRate());
-
+
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("get-publish-rate", new GetPublishRate());
@@ -1696,6 +1759,10 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("set-delayed-delivery", new
SetDelayedDelivery());
jcommander.addCommand("get-delayed-delivery", new
GetDelayedDelivery());
+ jcommander.addCommand("get-inactive-topic-policies", new
GetInactiveTopicPolicies());
+ jcommander.addCommand("set-inactive-topic-policies", new
SetInactiveTopicPolicies());
+ jcommander.addCommand("remove-inactive-topic-policies", new
RemoveInactiveTopicPolicies());
+
jcommander.addCommand("get-max-producers-per-topic", new
GetMaxProducersPerTopic());
jcommander.addCommand("set-max-producers-per-topic", new
SetMaxProducersPerTopic());
jcommander.addCommand("get-max-consumers-per-topic", new
GetMaxConsumersPerTopic());
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java
similarity index 68%
copy from
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
copy to
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java
index 439ed7b..ac4607b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java
@@ -16,30 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.pulsar.common.policies.data;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
/**
- * PolicyName authorization operations.
+ * Definition of the inactive topic policy.
*/
-public enum PolicyName {
- ALL,
- ANTI_AFFINITY,
- BACKLOG,
- COMPACTION,
- DELAYED_DELIVERY,
- DEDUPLICATION,
- MAX_CONSUMERS,
- MAX_PRODUCERS,
- MAX_UNACKED,
- OFFLOAD,
- PERSISTENCE,
- RATE,
- RETENTION,
- REPLICATION,
- REPLICATION_RATE,
- SCHEMA_COMPATIBILITY_STRATEGY,
- SUBSCRIPTION_AUTH_MODE,
- ENCRYPTION,
- TTL,
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class InactiveTopicPolicies {
+ private InactiveTopicDeleteMode inactiveTopicDeleteMode;
+ private int maxInactiveDurationSeconds;
+ private boolean deleteWhileInactive;
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 2946157..0fe0811 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -74,6 +74,8 @@ public class Policies {
@SuppressWarnings("checkstyle:MemberName")
public DelayedDeliveryPolicies delayed_delivery_policies = null;
@SuppressWarnings("checkstyle:MemberName")
+ public InactiveTopicPolicies inactive_topic_policies = null;
+ @SuppressWarnings("checkstyle:MemberName")
public SubscriptionAuthMode subscription_auth_mode =
SubscriptionAuthMode.None;
@SuppressWarnings("checkstyle:MemberName")
@@ -120,7 +122,7 @@ public class Policies {
autoSubscriptionCreationOverride, persistence,
bundles, latency_stats_sample_rate,
message_ttl_in_seconds, subscription_expiration_time_minutes,
retention_policies,
- encryption_required, delayed_delivery_policies,
+ encryption_required, delayed_delivery_policies,
inactive_topic_policies,
subscription_auth_mode,
antiAffinityGroup, max_producers_per_topic,
max_consumers_per_topic, max_consumers_per_subscription,
@@ -158,6 +160,7 @@ public class Policies {
&& Objects.equals(retention_policies,
other.retention_policies)
&& Objects.equals(encryption_required,
other.encryption_required)
&& Objects.equals(delayed_delivery_policies,
other.delayed_delivery_policies)
+ && Objects.equals(inactive_topic_policies,
other.inactive_topic_policies)
&& Objects.equals(subscription_auth_mode,
other.subscription_auth_mode)
&& Objects.equals(antiAffinityGroup,
other.antiAffinityGroup)
&& max_producers_per_topic == other.max_producers_per_topic
@@ -218,6 +221,7 @@ public class Policies {
.add("deleted", deleted)
.add("encryption_required", encryption_required)
.add("delayed_delivery_policies", delayed_delivery_policies)
+ .add("inactive_topic_policies", inactive_topic_policies)
.add("subscription_auth_mode", subscription_auth_mode)
.add("max_producers_per_topic", max_producers_per_topic)
.add("max_consumers_per_topic", max_consumers_per_topic)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index 439ed7b..8439a1f 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -28,6 +28,7 @@ public enum PolicyName {
BACKLOG,
COMPACTION,
DELAYED_DELIVERY,
+ INACTIVE_TOPIC,
DEDUPLICATION,
MAX_CONSUMERS,
MAX_PRODUCERS,