This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 1bca601 Added infinite time retention configuration option (#1135) 1bca601 is described below commit 1bca601537f7a399ea8324f4e07919b00d1fc771 Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Jan 31 10:22:38 2018 -0800 Added infinite time retention configuration option (#1135) * Added infinite time retention configuration option * Fixed test * Updated CLI docs --- .../bookkeeper/mledger/ManagedLedgerConfig.java | 25 ++++++++++-- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 19 +++++++-- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 30 ++++++++++++-- .../broker/service/persistent/PersistentTopic.java | 20 +++++---- .../broker/service/PersistentTopicE2ETest.java | 47 ++++++++++++++++++++++ .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 18 ++++++--- site/_data/cli/pulsar-admin.yaml | 8 ++-- 7 files changed, 139 insertions(+), 28 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 6f9847b..fd81ba1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -319,6 +319,16 @@ public class ManagedLedgerConfig { } /** + * Set the retention time for the ManagedLedger + * <p> + * Retention time will prevent data from being deleted for at least the specified amount of time, even if no cursors + * are created, or if all the cursors have marked the data for deletion. + * <p> + * A retention time of 0 (the default), will to have no time based retention. + * <p> + * Specifying a negative retention time will make the data to be retained indefinitely, based on the + * {@link #setRetentionSizeInMB(long)} value. + * * @param retentionTime * duration for which messages should be retained * @param unit @@ -338,6 +348,15 @@ public class ManagedLedgerConfig { } /** + * The retention size is used to set a maximum retention size quota on the ManagedLedger. + * <p> + * This setting works in conjuction with {@link #setRetentionSizeInMB(long)} and places a max size for retention, + * after which the data is deleted. + * <p> + * A retention size of 0, will make data to be deleted immediately. + * <p> + * A retention size of -1, means to have an unlimited retention size. + * * @param retentionSizeInMB * quota for message retention */ @@ -357,7 +376,7 @@ public class ManagedLedgerConfig { /** * Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets * corrupted at bookkeeper and managed-cursor is stuck at that ledger. - * + * * @param autoSkipNonRecoverableData */ public boolean isAutoSkipNonRecoverableData() { @@ -384,10 +403,10 @@ public class ManagedLedgerConfig { this.maxUnackedRangesToPersist = maxUnackedRangesToPersist; return this; } - + /** * @return max unacked message ranges up to which it can store in Zookeeper - * + * */ public int getMaxUnackedRangesToPersistInZk() { return maxUnackedRangesToPersistInZk; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 767d3dc..9bbea72 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -161,7 +161,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { final static long WaitTimeAfterLedgerCreationFailureMs = 10000; volatile PositionImpl lastConfirmedEntry; - + protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3; protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60; @@ -1485,10 +1485,21 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } private boolean hasLedgerRetentionExpired(long ledgerTimestamp) { + if (config.getRetentionTimeMillis() < 0) { + // Negative retention time equates to infinite retention + return false; + } + long elapsedMs = System.currentTimeMillis() - ledgerTimestamp; return elapsedMs > config.getRetentionTimeMillis(); } + private boolean isLedgerRetentionOverSizeQuota() { + // Handle the -1 size limit as "infinite" size quota + return config.getRetentionSizeInMB() > 0 + && TOTAL_SIZE_UPDATER.get(this) > ((long) config.getRetentionSizeInMB()) * 1024 * 1024; + } + /** * Checks whether there are ledger that have been fully consumed and deletes them * @@ -1537,7 +1548,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // skip ledger if retention constraint met for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) { boolean expired = hasLedgerRetentionExpired(ls.getTimestamp()); - boolean overRetentionQuota = TOTAL_SIZE_UPDATER.get(this) > ((long) config.getRetentionSizeInMB()) * 1024 * 1024; + boolean overRetentionQuota = isLedgerRetentionOverSizeQuota(); if (log.isDebugEnabled()) { log.debug( @@ -1714,7 +1725,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } }, null); } - + private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) { List<LedgerInfo> ledgers = Lists.newArrayList(ManagedLedgerImpl.this.ledgers.values()); AtomicInteger ledgersToDelete = new AtomicInteger(ledgers.size()); @@ -2199,7 +2210,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return new ManagedLedgerException(BKException.getMessage(bkErrorCode)); } } - + private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 821061a..2dcf8cd 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -1668,11 +1668,35 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { c1.skipEntries(1, IndividualDeletedEntries.Exclude); // let retention expire Thread.sleep(1000); - ml.close(); - // sleep for trim - Thread.sleep(100); + ml.internalTrimConsumedLedgers(); + assertTrue(ml.getLedgersInfoAsList().size() <= 1); assertTrue(ml.getTotalSize() <= "shortmessage".getBytes().length); + ml.close(); + } + + @Test + public void testInfiniteRetention() throws Exception { + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionSizeInMB(-1); + config.setRetentionTime(-1, TimeUnit.HOURS); + config.setMaxEntriesPerLedger(1); + + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("retention_test_ledger", config); + ManagedCursor c1 = ml.openCursor("c1"); + ml.addEntry("iamaverylongmessagethatshouldberetained".getBytes()); + c1.skipEntries(1, IndividualDeletedEntries.Exclude); + ml.close(); + + // reopen ml + ml = (ManagedLedgerImpl) factory.open("retention_test_ledger", config); + c1 = ml.openCursor("c1"); + ml.addEntry("shortmessage".getBytes()); + c1.skipEntries(1, IndividualDeletedEntries.Exclude); + ml.close(); + assertTrue(ml.getLedgersInfoAsList().size() > 1); + assertTrue(ml.getTotalSize() > "shortmessage".getBytes().length); } @Test diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7255bef..d38fa3b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -22,10 +22,11 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; -import java.time.Instant; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.util.*; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -86,7 +87,6 @@ import org.apache.pulsar.common.policies.data.PersistentTopicStats; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublisherStats; import org.apache.pulsar.common.policies.data.ReplicatorStats; -import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; @@ -1323,9 +1323,13 @@ public class PersistentTopic implements Topic, AddEntryCallback { Optional<Policies> policies = brokerService.pulsar().getConfigurationCache().policiesCache() .get(AdminResource.path(POLICIES, name.getNamespace())); // If no policies, the default is to have no retention and delete the inactive topic - return policies.map(p -> p.retention_policies) - .map(rp -> System.nanoTime() - lastActive < TimeUnit.MINUTES.toNanos(rp.getRetentionTimeInMinutes())) - .orElse(false).booleanValue(); + return policies.map(p -> p.retention_policies).map(rp -> { + long retentionTime = TimeUnit.MINUTES.toNanos(rp.getRetentionTimeInMinutes()); + + // Negative retention time means the topic should be retained indefinitely, + // because its own data has to be retained + return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime; + }).orElse(false).booleanValue(); } catch (Exception e) { if (log.isDebugEnabled()) { log.debug("[{}] Error getting policies", topic); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 9086203..93ce4b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -642,6 +642,53 @@ public class PersistentTopicE2ETest extends BrokerTestBase { assertNull(pulsar.getBrokerService().getTopicReference(topicName)); } + /** + * A topic that has retention policy set to -1, should not be GCed + * until it has been inactive for at least the retention time and the data + * should never be deleted + */ + @Test + public void testInfiniteRetentionPolicy() throws Exception { + // Retain data forever + admin.namespaces().setRetention("prop/use/ns-abc", new RetentionPolicies(-1, -1)); + + // 1. Simple successful GC + String topicName = "persistent://prop/use/ns-abc/topic-10"; + Producer producer = pulsarClient.createProducer(topicName); + producer.close(); + + assertNotNull(pulsar.getBrokerService().getTopicReference(topicName)); + runGC(); + // Should not have been deleted, since we have retention + assertNotNull(pulsar.getBrokerService().getTopicReference(topicName)); + + + // Remove retention + admin.namespaces().setRetention("prop/use/ns-abc", new RetentionPolicies(0, 10)); + Thread.sleep(300); + + // 2. Topic is not GCed with live connection + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setSubscriptionType(SubscriptionType.Exclusive); + String subName = "sub1"; + Consumer consumer = pulsarClient.subscribe(topicName, subName, conf); + + runGC(); + assertNotNull(pulsar.getBrokerService().getTopicReference(topicName)); + + // 3. Topic with subscription is not GCed even with no connections + consumer.close(); + + runGC(); + assertNotNull(pulsar.getBrokerService().getTopicReference(topicName)); + + // 4. Topic can be GCed after unsubscribe + admin.persistentTopics().deleteSubscription(topicName, subName); + + runGC(); + assertNull(pulsar.getBrokerService().getTopicReference(topicName)); + } + @Test public void testMessageExpiry() throws Exception { int messageTTLSecs = 1; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index e04eef7..6e84bf6 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -232,7 +232,7 @@ public class CmdNamespaces extends CmdBase { private class GetAntiAffinityGroup extends CliCommand { @Parameter(description = "property/cluster/namespace\n", required = true) private java.util.List<String> params; - + @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); @@ -263,14 +263,14 @@ public class CmdNamespaces extends CmdBase { private class DeleteAntiAffinityGroup extends CliCommand { @Parameter(description = "property/cluster/namespace\n", required = true) private java.util.List<String> params; - + @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace); } } - + @Parameters(commandDescription = "Enable or disable deduplication for a namespace") private class SetDeduplication extends CliCommand { @@ -300,10 +300,12 @@ public class CmdNamespaces extends CmdBase { private java.util.List<String> params; @Parameter(names = { "--time", - "-t" }, description = "Retention time in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w)", required = true) + "-t" }, description = "Retention time in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w). " + + "0 means no retention and -1 means infinite time retention", required = true) private String retentionTimeStr; - @Parameter(names = { "--size", "-s" }, description = "Retention size limit (eg: 10M, 16G)", required = true) + @Parameter(names = { "--size", "-s" }, description = "Retention size limit (eg: 10M, 16G, 3T). " + + "0 means no retention and -1 means infinite size retention", required = true) private String limitStr; @Override @@ -625,6 +627,10 @@ public class CmdNamespaces extends CmdBase { case 'G': return Long.parseLong(subStr) * 1024 * 1024 * 1024; + case 't': + case 'T': + return Long.parseLong(subStr) * 1024 * 1024 * 1024 * 1024; + default: return Long.parseLong(s); } @@ -680,7 +686,7 @@ public class CmdNamespaces extends CmdBase { jcommander.addCommand("get-message-ttl", new GetMessageTTL()); jcommander.addCommand("set-message-ttl", new SetMessageTTL()); - + jcommander.addCommand("get-anti-affinity-group", new GetAntiAffinityGroup()); jcommander.addCommand("set-anti-affinity-group", new SetAntiAffinityGroup()); jcommander.addCommand("get-anti-affinity-namespaces", new GetAntiAffinityNamespaces()); diff --git a/site/_data/cli/pulsar-admin.yaml b/site/_data/cli/pulsar-admin.yaml index 7ac972f..eb7c805 100644 --- a/site/_data/cli/pulsar-admin.yaml +++ b/site/_data/cli/pulsar-admin.yaml @@ -213,9 +213,9 @@ commands: argument: property/cluster/namespace options: - flags: -s, --size - description: The retention size limits (for example `10M` or `16G`) + description: The retention size limits (for example `10M`, `16G` or `3T`). 0 means no retention and -1 means infinite size retention - flags: -t, --time - description: "The retention time in minutes, hours, days, or weeks. Examples: `100m`, `13h`, `2d`, `5w`." + description: "The retention time in minutes, hours, days, or weeks. Examples: `100m`, `13h`, `2d`, `5w`. 0 means no retention and -1 means infinite time retention" - name: unload description: Unload a namespace or namespace bundle from the current serving broker. argument: property/cluster/namespace @@ -295,14 +295,14 @@ commands: description: Look up a topic from the current serving broker argument: persistent://property/cluster/namespace/topic - name: bundle-range - description: Get the namespace bundle which contains the given topic + description: Get the namespace bundle which contains the given topic argument: persistent://property/cluster/namespace/topic - name: delete description: Delete a topic. The topic cannot be deleted if there are any active subscriptions or producers connected to the topic. argument: persistent://property/cluster/namespace/topic - name: unload description: Unload a topic - argument: persistent://property/cluster/namespace/topic + argument: persistent://property/cluster/namespace/topic - name: subscriptions description: Get the list of subscriptions on the topic argument: persistent://property/cluster/namespace/topic -- To stop receiving notification emails like this one, please contact mme...@apache.org.