This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 77502c8 Allow ML offload immediately after ledger closed (#1965)
77502c8 is described below
commit 77502c8b9e1851f5e419146e2d5f67218fef38d1
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Jun 14 18:18:48 2018 +0200
Allow ML offload immediately after ledger closed (#1965)
The documentation said that only negative values disabled. If a user
wants data to be offloaded as soon as possible, the obvious thing is
to set the threshold to 0. Previously this disabled as the check
was > 0, rather than >=.
This patch changes the ML implementation to accept 0 as a threshold
and adds a test to ensure if specified, the ledgers are offloaded as
soon as possible.
Master Issue: #1511
---
.../bookkeeper/mledger/ManagedLedgerConfig.java | 2 +-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +-
.../bookkeeper/mledger/impl/OffloadPrefixTest.java | 31 ++++++++++++++++++++++
3 files changed, 33 insertions(+), 2 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 13d259c..698d245 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -412,7 +412,7 @@ public class ManagedLedgerConfig {
/**
* Size, in bytes, at which the managed ledger will start to automatically
offload ledgers to longterm storage.
- * A negative value disables autotriggering.
+ * A negative value disables autotriggering. A threshold of 0 offloads
data as soon as possible.
* Offloading will not occur if no offloader has been set {@link
#setLedgerOffloader(LedgerOffloader)}.
* Automatical offloading occurs when the ledger is rolled, and the
ledgers up to that point exceed the threshold.
*
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3e24eb3..ac546ce 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1582,7 +1582,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
private void maybeOffloadInBackground(CompletableFuture<PositionImpl>
promise) {
- if (config.getOffloadAutoTriggerSizeThresholdBytes() > 0) {
+ if (config.getOffloadAutoTriggerSizeThresholdBytes() >= 0) {
executor.executeOrdered(name, safeRun(() ->
maybeOffload(promise)));
}
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 27fe14b..6d21ee2 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -865,6 +865,37 @@ public class OffloadPrefixTest extends
MockedBookKeeperTestCase {
ledger.getLedgersInfoAsList().get(2).getLedgerId()));
}
+ @Test
+ public void offloadAsSoonAsClosed() throws Exception {
+
+ MockLedgerOffloader offloader = new MockLedgerOffloader();
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ config.setOffloadAutoTriggerSizeThresholdBytes(0);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setLedgerOffloader(offloader);
+
+ ManagedLedgerImpl ledger =
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+
+ for (int i = 0; i < 11; i++) {
+ ledger.addEntry(buildEntry(10, "entry-" + i));
+ }
+
+ assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1);
+ Assert.assertEquals(offloader.offloadedLedgers(),
+
ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId()));
+
+ for (int i = 0; i < 10; i++) {
+ ledger.addEntry(buildEntry(10, "entry-" + i));
+ }
+
+ assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
+ Assert.assertEquals(offloader.offloadedLedgers(),
+
ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
+
ledger.getLedgersInfoAsList().get(1).getLedgerId()));
+ }
+
+
static void assertEventuallyTrue(BooleanSupplier predicate) throws
Exception {
// wait up to 3 seconds
for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) {
--
To stop receiving notification emails like this one, please contact
[email protected].