This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6001c41 Delete offloaded ledger when ledger deleted (#1641)
6001c41 is described below
commit 6001c417c7edf5892c55bc64e299023f16a5fbd9
Author: Ivan Kelly <[email protected]>
AuthorDate: Wed Apr 25 23:33:21 2018 +0200
Delete offloaded ledger when ledger deleted (#1641)
* Delete offloaded ledger when ledger deleted
When a managed ledger trims a ledger, if that ledger has been
offloaded to long term storage, delete it from long term storage
also.
Currently, it will always try to delete the bookkeeper ledger also,
even if the ledger has already been offloaded. Handling for this case
will be added in a later patch along with delayed bookkeeper ledger
deletion in the case of offload.
Master Issue: #1511
* Fixup for delete
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 18 ++--
.../bookkeeper/mledger/impl/OffloadPrefixTest.java | 98 ++++++++++++++++++++++
2 files changed, 110 insertions(+), 6 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index db644b2..50bc85d 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1666,7 +1666,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
for (LedgerInfo ls : ledgersToDelete) {
log.info("[{}] Removing ledger {} - size: {}", name,
ls.getLedgerId(), ls.getSize());
- asyncDeleteLedger(ls.getLedgerId());
+ asyncDeleteLedger(ls.getLedgerId(), ls);
}
}
@@ -1758,8 +1758,14 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
}
- private void asyncDeleteLedger(long ledgerId) {
+ private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
+
+ if (info.getOffloadContext().hasUidMsb()) {
+ UUID uuid = new UUID(info.getOffloadContext().getUidMsb(),
+ info.getOffloadContext().getUidLsb());
+ cleanupOffloaded(ledgerId, uuid, "Trimming");
+ }
}
private void asyncDeleteLedger(long ledgerId, long retry) {
@@ -1963,7 +1969,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
scheduledExecutor, name)
.whenComplete((ignore2, exception) -> {
if (exception != null) {
- cleanupOffloadedOnFailure(ledgerId,
uuid, "Metastore failure");
+ cleanupOffloaded(ledgerId, uuid,
"Metastore failure");
}
});
})
@@ -2056,7 +2062,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
oldInfo.getOffloadContext().getUidLsb());
log.info("[{}] Found previous
offload attempt for ledger {}, uuid {}"
+ ", cleaning up", name,
ledgerId, uuid);
- cleanupOffloadedOnFailure(ledgerId,
oldUuid, "Previous failed offload");
+ cleanupOffloaded(ledgerId, oldUuid,
"Previous failed offload");
}
LedgerInfo.Builder builder =
oldInfo.toBuilder();
builder.getOffloadContextBuilder()
@@ -2078,7 +2084,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
return transformLedgerInfo(ledgerId,
(oldInfo) -> {
UUID existingUuid = new
UUID(oldInfo.getOffloadContext().getUidMsb(),
-
oldInfo.getOffloadContext().getUidLsb());
+
oldInfo.getOffloadContext().getUidLsb());
if (existingUuid.equals(uuid)) {
LedgerInfo.Builder builder =
oldInfo.toBuilder();
builder.getOffloadContextBuilder()
@@ -2101,7 +2107,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
});
}
- private void cleanupOffloadedOnFailure(long ledgerId, UUID uuid, String
cleanupReason) {
+ private void cleanupOffloaded(long ledgerId, UUID uuid, String
cleanupReason) {
Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1),
TimeUnit.SECONDS.toHours(1)).limit(10),
Retries.NonFatalPredicate,
() ->
config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid),
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 230456d..311a82e 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -541,6 +541,98 @@ public class OffloadPrefixTest extends
MockedBookKeeperTestCase {
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete());
}
+ @Test
+ public void testOffloadDelete() throws Exception {
+ Set<Pair<Long, UUID>> deleted = ConcurrentHashMap.newKeySet();
+ CompletableFuture<Set<Long>> errorLedgers = new CompletableFuture<>();
+ Set<Pair<Long, UUID>> failedOffloads = ConcurrentHashMap.newKeySet();
+
+ MockLedgerOffloader offloader = new MockLedgerOffloader();
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(0, TimeUnit.MINUTES);
+ config.setLedgerOffloader(offloader);
+ ManagedLedgerImpl ledger =
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+ ManagedCursor cursor = ledger.openCursor("foobar");
+ for (int i = 0; i < 15; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+
+ Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+ ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+ Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+
+ Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+ .filter(e ->
e.getOffloadContext().getComplete()).count(), 1);
+
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete());
+ long firstLedger = ledger.getLedgersInfoAsList().get(0).getLedgerId();
+ long secondLedger = ledger.getLedgersInfoAsList().get(1).getLedgerId();
+
+ cursor.markDelete(ledger.getLastConfirmedEntry());
+ assertEventuallyTrue(() -> ledger.getLedgersInfoAsList().size() == 1);
+
Assert.assertEquals(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
secondLedger);
+
+ assertEventuallyTrue(() ->
offloader.deletedOffloads().contains(firstLedger));
+ }
+
+ @Test
+ public void testOffloadDeleteIncomplete() throws Exception {
+ Set<Pair<Long, UUID>> deleted = ConcurrentHashMap.newKeySet();
+ CompletableFuture<Set<Long>> errorLedgers = new CompletableFuture<>();
+ Set<Pair<Long, UUID>> failedOffloads = ConcurrentHashMap.newKeySet();
+
+ MockLedgerOffloader offloader = new MockLedgerOffloader() {
+ @Override
+ public CompletableFuture<Void> offload(ReadHandle ledger,
+ UUID uuid,
+ Map<String, String>
extraMetadata) {
+ return super.offload(ledger, uuid, extraMetadata)
+ .thenCompose((res) -> {
+ CompletableFuture<Void> f = new
CompletableFuture<>();
+ f.completeExceptionally(new Exception("Fail
after offload occurred"));
+ return f;
+ });
+ }
+ };
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(0, TimeUnit.MINUTES);
+ config.setLedgerOffloader(offloader);
+ ManagedLedgerImpl ledger =
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+ ManagedCursor cursor = ledger.openCursor("foobar");
+ for (int i = 0; i < 15; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+
+ Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+ try {
+ ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+ } catch (ManagedLedgerException mle) {
+ // expected
+ }
+
+ Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+
+ Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+ .filter(e ->
e.getOffloadContext().getComplete()).count(), 0);
+ Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+ .filter(e ->
e.getOffloadContext().hasUidMsb()).count(), 1);
+
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().hasUidMsb());
+
+ long firstLedger = ledger.getLedgersInfoAsList().get(0).getLedgerId();
+ long secondLedger = ledger.getLedgersInfoAsList().get(1).getLedgerId();
+
+ cursor.markDelete(ledger.getLastConfirmedEntry());
+ assertEventuallyTrue(() -> ledger.getLedgersInfoAsList().size() == 1);
+
Assert.assertEquals(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
secondLedger);
+
+ assertEventuallyTrue(() ->
offloader.deletedOffloads().contains(firstLedger));
+ }
+
void assertEventuallyTrue(BooleanSupplier predicate) throws Exception {
// wait up to 3 seconds
for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) {
@@ -563,11 +655,16 @@ public class OffloadPrefixTest extends
MockedBookKeeperTestCase {
static class MockLedgerOffloader implements LedgerOffloader {
ConcurrentHashMap<Long, UUID> offloads = new ConcurrentHashMap<Long,
UUID>();
+ ConcurrentHashMap<Long, UUID> deletes = new ConcurrentHashMap<Long,
UUID>();
Set<Long> offloadedLedgers() {
return offloads.keySet();
}
+ Set<Long> deletedOffloads() {
+ return deletes.keySet();
+ }
+
@Override
public CompletableFuture<Void> offload(ReadHandle ledger,
UUID uuid,
@@ -592,6 +689,7 @@ public class OffloadPrefixTest extends
MockedBookKeeperTestCase {
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID
uuid) {
CompletableFuture<Void> promise = new CompletableFuture<>();
if (offloads.remove(ledgerId, uuid)) {
+ deletes.put(ledgerId, uuid);
promise.complete(null);
} else {
promise.completeExceptionally(new Exception("Not found"));
--
To stop receiving notification emails like this one, please contact
[email protected].