Jason918 commented on code in PR #15914:
URL: https://github.com/apache/pulsar/pull/15914#discussion_r973532353
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1072,28 +1074,31 @@ public CompletableFuture<Void> deleteTopic(String
topic, boolean forceDelete) {
}
CompletableFuture<Void> future = new CompletableFuture<>();
+ log.info("Topic {} could not load, try to delete from metadata",
topic);
CompletableFuture<Void> deleteTopicAuthenticationFuture = new
CompletableFuture<>();
deleteTopicAuthenticationWithRetry(topic,
deleteTopicAuthenticationFuture, 5);
+
deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
return;
}
-
managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), new
DeleteLedgerCallback() {
- @Override
- public void deleteLedgerComplete(Object ctx) {
- future.complete(null);
- }
+ CompletableFuture<ManagedLedgerConfig> mlConfigFuture =
getManagedLedgerConfig(topicName);
Review Comment:
```suggestion
CompletableFuture<ManagedLedgerConfig> mlConfigFuture =
getManagedLedgerConfig(topicName);
```
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java:
##########
@@ -154,6 +164,18 @@ void asyncOpenReadOnlyCursor(String managedLedgerName,
Position startPosition, M
*/
void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx);
+ /**
+ * Delete a managed ledger. If it's not open, it's metadata will get
regardless deleted.
+ *
+ * @param name
+ * @throws InterruptedException
+ * @throws ManagedLedgerException
+ */
+ default void asyncDelete(String name,
CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
+ DeleteLedgerCallback callback, Object ctx) {
+ asyncDelete(name, callback, ctx);
Review Comment:
NIT: No need to add this default implementation as we already overrode this.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -873,23 +891,101 @@ public void getInfoFailed(ManagedLedgerException
exception, Object ctx) {
}, ctx);
}
+ private CompletableFuture<Void> cleanupOffloaded(long ledgerId, UUID uuid,
ManagedLedgerConfig mlConfig,
+ Map<String, String>
offloadDriverMetadata, String cleanupReason, String name) {
+ log.info("[{}] Cleanup offload for ledgerId {} uuid {} because of the
reason {}.",
+ name, ledgerId, uuid.toString(), cleanupReason);
+ Map<String, String> metadataMap = new HashMap();
+ metadataMap.putAll(offloadDriverMetadata);
+ metadataMap.put("ManagedLedgerName", name);
+
+ return
Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1),
+ TimeUnit.SECONDS.toHours(1)).limit(10),
+ Retries.NonFatalPredicate,
+ () -> mlConfig.getLedgerOffloader().deleteOffloaded(ledgerId,
uuid, metadataMap),
+ scheduledExecutor, name).whenComplete((ignored, exception) -> {
+ if (exception != null) {
+ log.warn("[{}] Error cleaning up offload for {}, (cleanup
reason: {})",
+ name, ledgerId, cleanupReason, exception);
+ }
+ });
+ }
+
private void deleteManagedLedgerData(BookKeeper bkc, String
managedLedgerName, ManagedLedgerInfo info,
- DeleteLedgerCallback callback, Object ctx) {
+
CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
+ DeleteLedgerCallback callback, Object
ctx) {
+ final CompletableFuture<Map<Long,
MLDataFormats.ManagedLedgerInfo.LedgerInfo>>
+ ledgerInfosFuture = new CompletableFuture<>();
+ store.getManagedLedgerInfo(managedLedgerName, false, null,
Review Comment:
Why do we read `ManagedLedgerInfo` from store instead of just use `info` in
the parameter list?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1072,28 +1074,31 @@ public CompletableFuture<Void> deleteTopic(String
topic, boolean forceDelete) {
}
CompletableFuture<Void> future = new CompletableFuture<>();
+ log.info("Topic {} could not load, try to delete from metadata",
topic);
Review Comment:
Duplicate with debug log in L1067?
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -873,23 +891,101 @@ public void getInfoFailed(ManagedLedgerException
exception, Object ctx) {
}, ctx);
}
+ private CompletableFuture<Void> cleanupOffloaded(long ledgerId, UUID uuid,
ManagedLedgerConfig mlConfig,
Review Comment:
Can reuse the code with `ManagedLedgerImpl#cleanupOffloaded`, it's just the
same.
Maybe put the codes in `OffloaderUtils`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]