eolivelli commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r946919802
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java:
##########
@@ -1079,6 +1080,34 @@ public void deleteTopic(
});
}
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/ledger/delete")
+ @ApiOperation(value = "Delete a topic.",
+ notes = "The topic cannot be deleted if delete is not forcefully
and there's any active "
+ + "subscription or producer connected to the it. "
+ + "Force delete ignores connected clients and deletes
topic by explicitly closing them.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Don't have permission to
administrate resources on this tenant"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 412, message = "Topic has active
producers/subscriptions"),
+ @ApiResponse(code = 500, message = "Internal server error")})
+ public void deleteLedger(
Review Comment:
why do we need a new API?
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2570,70 +2651,107 @@ void internalTrimLedgers(boolean isTruncate,
CompletableFuture<?> promise) {
advanceCursorsIfNecessary(ledgersToDelete);
PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
- // Update metadata
- for (LedgerInfo ls : ledgersToDelete) {
- if (currentLastConfirmedEntry != null && ls.getLedgerId() ==
currentLastConfirmedEntry.getLedgerId()) {
- // this info is relevant because the lastMessageId won't
be available anymore
- log.info("[{}] Ledger {} contains the current last
confirmed entry {}, and it is going to be "
- + "deleted", name, ls.getLedgerId(),
currentLastConfirmedEntry);
- }
-
- invalidateReadHandle(ls.getLedgerId());
-
- ledgers.remove(ls.getLedgerId());
- NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
- TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
- entryCache.invalidateAllEntries(ls.getLedgerId());
- }
- for (LedgerInfo ls : offloadedLedgersToDelete) {
- LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
-
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
- String driverName = OffloadUtils.getOffloadDriverName(ls,
- config.getLedgerOffloader().getOffloadDriverName());
- Map<String, String> driverMetadata =
OffloadUtils.getOffloadDriverMetadata(ls,
-
config.getLedgerOffloader().getOffloadDriverMetadata());
- OffloadUtils.setOffloadDriverMetadata(newInfoBuilder,
driverName, driverMetadata);
- ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
- }
+ // Update metadata
+ // Mark deletable ledgers
+ Set<Long> deletableLedgers =
+ Stream.concat(ledgersToDelete.stream().filter(ls ->
!ls.getOffloadContext().getBookkeeperDeleted()),
+
offloadedLedgersToDelete.stream()).map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+ // Mark deletable offloaded ledgers
+ Set<Long> deletableOffloadedLedgers = ledgersToDelete.stream()
+ .filter(ls -> ls.getOffloadContext().hasUidMsb())
+ .map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+ CompletableFuture<?> appendDeleteLedgerFuture =
+ appendPendingDeleteLedger(deletableLedgers,
deletableOffloadedLedgers);
+ appendDeleteLedgerFuture.thenAccept(ignore -> {
+ believedDeleteIds.addAll(deletableLedgers);
+ for (LedgerInfo ls : ledgersToDelete) {
+ if (currentLastConfirmedEntry != null
+ && ls.getLedgerId() ==
currentLastConfirmedEntry.getLedgerId()) {
+ // this info is relevant because the lastMessageId
won't be available anymore
+ log.info("[{}] Ledger {} contains the current last
confirmed entry {}, and it is going to be "
+ + "deleted", name, ls.getLedgerId(),
currentLastConfirmedEntry);
+ }
+ invalidateReadHandle(ls.getLedgerId());
+ ledgers.remove(ls.getLedgerId());
+ entryCache.invalidateAllEntries(ls.getLedgerId());
- if (log.isDebugEnabled()) {
- log.debug("[{}] Updating of ledgers list after trimming",
name);
- }
+ NUMBER_OF_ENTRIES_UPDATER.addAndGet(this,
-ls.getEntries());
+ TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
+ }
+ for (LedgerInfo ls : offloadedLedgersToDelete) {
+ LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
+
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
+ String driverName = OffloadUtils.getOffloadDriverName(ls,
+
config.getLedgerOffloader().getOffloadDriverName());
+ Map<String, String> driverMetadata =
OffloadUtils.getOffloadDriverMetadata(ls,
+
config.getLedgerOffloader().getOffloadDriverMetadata());
+ OffloadUtils.setOffloadDriverMetadata(newInfoBuilder,
driverName, driverMetadata);
+ ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
+ }
- store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(),
ledgersStat, new MetaStoreCallback<Void>() {
- @Override
- public void operationComplete(Void result, Stat stat) {
- log.info("[{}] End TrimConsumedLedgers. ledgers={}
totalSize={}", name, ledgers.size(),
- TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
- ledgersStat = stat;
- metadataMutex.unlock();
- trimmerMutex.unlock();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Updating of ledgers list after trimming",
name);
+ }
- for (LedgerInfo ls : ledgersToDelete) {
- log.info("[{}] Removing ledger {} - size: {}", name,
ls.getLedgerId(), ls.getSize());
- asyncDeleteLedger(ls.getLedgerId(), ls);
- }
- for (LedgerInfo ls : offloadedLedgersToDelete) {
- log.info("[{}] Deleting offloaded ledger {} from
bookkeeper - size: {}", name, ls.getLedgerId(),
- ls.getSize());
- asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
+ store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(),
ledgersStat, new MetaStoreCallback<Void>() {
+ @Override
+ public void operationComplete(Void result, Stat stat) {
+ log.info("[{}] End TrimConsumedLedgers. ledgers={}
totalSize={}", name, ledgers.size(),
+
TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
+ ledgersStat = stat;
+ metadataMutex.unlock();
+ trimmerMutex.unlock();
+ if (ledgerDeletionService instanceof
LedgerDeletionService.LedgerDeletionServiceDisable) {
Review Comment:
using instanceof is a bad practice.
we should add a method to LedgerDeletionService and override it on the
various implementations
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]