Jason918 commented on code in PR #15914:
URL: https://github.com/apache/pulsar/pull/15914#discussion_r973532353


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1072,28 +1074,31 @@ public CompletableFuture<Void> deleteTopic(String 
topic, boolean forceDelete) {
         }
 
         CompletableFuture<Void> future = new CompletableFuture<>();
+        log.info("Topic {} could not load, try to delete from metadata", 
topic);
 
         CompletableFuture<Void> deleteTopicAuthenticationFuture = new 
CompletableFuture<>();
         deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
+
         deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
             if (ex != null) {
                 future.completeExceptionally(ex);
                 return;
             }
-            
managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), new 
DeleteLedgerCallback() {
-                @Override
-                public void deleteLedgerComplete(Object ctx) {
-                    future.complete(null);
-                }
+            CompletableFuture<ManagedLedgerConfig> mlConfigFuture =  
getManagedLedgerConfig(topicName);

Review Comment:
   ```suggestion
               CompletableFuture<ManagedLedgerConfig> mlConfigFuture = 
getManagedLedgerConfig(topicName);
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java:
##########
@@ -154,6 +164,18 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, 
Position startPosition, M
      */
     void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx);
 
+    /**
+     * Delete a managed ledger. If it's not open, it's metadata will get 
regardless deleted.
+     *
+     * @param name
+     * @throws InterruptedException
+     * @throws ManagedLedgerException
+     */
+    default void asyncDelete(String name, 
CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
+                             DeleteLedgerCallback callback, Object ctx) {
+        asyncDelete(name, callback, ctx);

Review Comment:
   NIT: No need to add this default implementation as we already overrode this.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -873,23 +891,101 @@ public void getInfoFailed(ManagedLedgerException 
exception, Object ctx) {
         }, ctx);
     }
 
+    private CompletableFuture<Void> cleanupOffloaded(long ledgerId, UUID uuid, 
ManagedLedgerConfig mlConfig,
+                                     Map<String, String> 
offloadDriverMetadata, String cleanupReason, String name) {
+        log.info("[{}] Cleanup offload for ledgerId {} uuid {} because of the 
reason {}.",
+                name, ledgerId, uuid.toString(), cleanupReason);
+        Map<String, String> metadataMap = new HashMap();
+        metadataMap.putAll(offloadDriverMetadata);
+        metadataMap.put("ManagedLedgerName", name);
+
+        return 
Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1),
+                        TimeUnit.SECONDS.toHours(1)).limit(10),
+                Retries.NonFatalPredicate,
+                () -> mlConfig.getLedgerOffloader().deleteOffloaded(ledgerId, 
uuid, metadataMap),
+                scheduledExecutor, name).whenComplete((ignored, exception) -> {
+            if (exception != null) {
+                log.warn("[{}] Error cleaning up offload for {}, (cleanup 
reason: {})",
+                        name, ledgerId, cleanupReason, exception);
+            }
+        });
+    }
+
     private void deleteManagedLedgerData(BookKeeper bkc, String 
managedLedgerName, ManagedLedgerInfo info,
-            DeleteLedgerCallback callback, Object ctx) {
+                                         
CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
+                                         DeleteLedgerCallback callback, Object 
ctx) {
+        final CompletableFuture<Map<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo>>
+                ledgerInfosFuture = new CompletableFuture<>();
+        store.getManagedLedgerInfo(managedLedgerName, false, null,

Review Comment:
   Why do we read `ManagedLedgerInfo` from store instead of just use `info` in 
the parameter list? 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1072,28 +1074,31 @@ public CompletableFuture<Void> deleteTopic(String 
topic, boolean forceDelete) {
         }
 
         CompletableFuture<Void> future = new CompletableFuture<>();
+        log.info("Topic {} could not load, try to delete from metadata", 
topic);

Review Comment:
   Duplicate with debug log in L1067?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -873,23 +891,101 @@ public void getInfoFailed(ManagedLedgerException 
exception, Object ctx) {
         }, ctx);
     }
 
+    private CompletableFuture<Void> cleanupOffloaded(long ledgerId, UUID uuid, 
ManagedLedgerConfig mlConfig,

Review Comment:
   Can reuse the code with `ManagedLedgerImpl#cleanupOffloaded`, it's just the 
same.
   Maybe put the codes in `OffloaderUtils`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to