This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 6b5fdbf2282 [fix][ml]Still got BK ledger, even though it has been 
deleted after offloaded (#24432)
6b5fdbf2282 is described below

commit 6b5fdbf2282efcfed30badab2c79bab3a6f92393
Author: fengyubiao <[email protected]>
AuthorDate: Sat Jun 21 00:03:39 2025 +0800

    [fix][ml]Still got BK ledger, even though it has been deleted after 
offloaded (#24432)
    
    (cherry picked from commit 73a4ae4f886936ec33e5c8086b6c0c0151ea7f76)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  34 ++++--
 .../mledger/impl/OffloadLedgerDeleteTest.java      | 128 +++++++++++++++++++++
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java |   2 +-
 3 files changed, 155 insertions(+), 9 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7a6d7b884bd..034a4dc10cf 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2890,7 +2890,15 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                     for (LedgerInfo ls : offloadedLedgersToDelete) {
                         log.info("[{}] Deleting offloaded ledger {} from 
bookkeeper - size: {}", name, ls.getLedgerId(),
                                 ls.getSize());
-                        asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
+                        invalidateReadHandle(ls.getLedgerId());
+                        
asyncDeleteLedgerFromBookKeeper(ls.getLedgerId()).thenAccept(__ -> {
+                            log.info("[{}] Deleted and invalidated offloaded 
ledger {} from bookkeeper - size: {}",
+                                    name, ls.getLedgerId(), ls.getSize());
+                        }).exceptionally(ex -> {
+                            log.error("[{}] Failed to delete offloaded ledger 
{} from bookkeeper - size: {}",
+                                    name, ls.getLedgerId(), ls.getSize(), ex);
+                            return null;
+                        });
                     }
                     promise.complete(null);
                 }
@@ -3111,8 +3119,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
-    private void asyncDeleteLedgerFromBookKeeper(long ledgerId) {
-        asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
+    private CompletableFuture<Void> asyncDeleteLedgerFromBookKeeper(long 
ledgerId) {
+        return asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
     }
 
     private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
@@ -3129,22 +3137,32 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         }
     }
 
-    private void asyncDeleteLedger(long ledgerId, long retry) {
-        if (retry <= 0) {
-            log.warn("[{}] Failed to delete ledger after retries {}", name, 
ledgerId);
-            return;
-        }
+    private CompletableFuture<Void> asyncDeleteLedger(long ledgerId, long 
retry) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        asyncDeleteLedgerWithRetry(future, ledgerId, retry);
+        return future;
+    }
+
+    private void asyncDeleteLedgerWithRetry(CompletableFuture<Void> future, 
long ledgerId, long retry) {
         bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
             if (isNoSuchLedgerExistsException(rc)) {
                 log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
+                future.complete(null);
             } else if (rc != BKException.Code.OK) {
                 log.error("[{}] Error deleting ledger {} : {}", name, 
ledgerId, BKException.getMessage(rc));
+                if (retry <= 1) {
+                    // The latest once of retry has failed
+                    log.warn("[{}] Failed to delete ledger after retries {}, 
code: {}", name, ledgerId, rc);
+                    future.completeExceptionally(BKException.create(rc));
+                    return;
+                }
                 scheduledExecutor.schedule(() -> asyncDeleteLedger(ledgerId, 
retry - 1),
                         DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, 
TimeUnit.SECONDS);
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Deleted ledger {}", name, ledgerId);
                 }
+                future.complete(null);
             }
         }, null);
     }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
index b46f06106cf..3fd20490beb 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
@@ -26,18 +26,27 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.util.MockClock;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
+import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -207,6 +216,125 @@ public class OffloadLedgerDeleteTest extends 
MockedBookKeeperTestCase {
         assertEventuallyTrue(() -> 
offloader.deletedOffloads().contains(firstLedgerId));
     }
 
+    @Test
+    public void testGetReadLedgerHandleAfterTrimOffloadedLedgers() throws 
Exception {
+        // Create managed ledger.
+        final long offloadThresholdSeconds = 5;
+        final long offloadDeletionLagInSeconds = 1;
+        OffloadPrefixTest.MockLedgerOffloader offloader = new 
OffloadPrefixTest.MockLedgerOffloader();
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setRetentionSizeInMB(10);
+        
offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(offloadDeletionLagInSeconds
 * 1000);
+        
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(offloadThresholdSeconds);
+        
offloader.getOffloadPolicies().setManagedLedgerOffloadedReadPriority(OffloadedReadPriority.BOOKKEEPER_FIRST);
+        config.setLedgerOffloader(offloader);
+        ManagedLedgerImpl ml =
+                
(ManagedLedgerImpl)factory.open("testGetReadLedgerHandleAfterTrimOffloadedLedgers",
 config);
+        ml.openCursor("c1");
+
+        // Write entries.
+        int i = 0;
+        for (; i < 35; i++) {
+            String content = "entry-" + i;
+            ml.addEntry(content.getBytes());
+        }
+        Assert.assertEquals(ml.getLedgersInfoAsList().size(), 4);
+        long ledger1 = ml.getLedgersInfoAsList().get(0).getLedgerId();
+        long ledger2 = ml.getLedgersInfoAsList().get(1).getLedgerId();
+        long ledger3 = ml.getLedgersInfoAsList().get(2).getLedgerId();
+        long ledger4 = ml.getLedgersInfoAsList().get(3).getLedgerId();
+
+        // Offload ledgers.
+        Thread.sleep(offloadThresholdSeconds * 2 * 1000);
+        CompletableFuture<Position> offloadFuture = new 
CompletableFuture<Position>();
+        ml.maybeOffloadInBackground(offloadFuture);
+        offloadFuture.join();
+
+        // Cache ledger handle.
+        CountDownLatch readCountDownLatch = new CountDownLatch(4);
+        AsyncCallbacks.ReadEntryCallback readCb = new 
AsyncCallbacks.ReadEntryCallback(){
+
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                readCountDownLatch.countDown();
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+                readCountDownLatch.countDown();
+            }
+        };
+        ml.asyncReadEntry(PositionFactory.create(ledger1, 0), readCb, null);
+        ml.asyncReadEntry(PositionFactory.create(ledger2, 0), readCb, null);
+        ml.asyncReadEntry(PositionFactory.create(ledger3, 0), readCb, null);
+        ml.asyncReadEntry(PositionFactory.create(ledger4, 0), readCb, null);
+        readCountDownLatch.await();
+        ReadHandle originalReadHandle4 = ml.getLedgerHandle(ledger4).join();
+
+        // Trim offloaded BK ledger handles.
+        Thread.sleep(offloadDeletionLagInSeconds * 2 * 1000);
+        CompletableFuture<Position> trimLedgerFuture = new 
CompletableFuture<Position>();
+        ml.internalTrimLedgers(false, trimLedgerFuture);
+        trimLedgerFuture.join();
+        MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = 
ml.getLedgerInfo(ledger1).get();
+        MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = 
ml.getLedgerInfo(ledger2).get();
+        MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo3 = 
ml.getLedgerInfo(ledger3).get();
+        MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo4 = 
ml.getLedgerInfo(ledger4).get();
+        Assert.assertTrue(ledgerInfo1.hasOffloadContext() && 
ledgerInfo1.getOffloadContext().getBookkeeperDeleted());
+        Assert.assertTrue(ledgerInfo2.hasOffloadContext() && 
ledgerInfo2.getOffloadContext().getBookkeeperDeleted());
+        Assert.assertTrue(ledgerInfo3.hasOffloadContext() && 
ledgerInfo3.getOffloadContext().getBookkeeperDeleted());
+        Assert.assertFalse(ledgerInfo4.hasOffloadContext() || 
ledgerInfo4.getOffloadContext().getBookkeeperDeleted());
+
+        Awaitility.await().untilAsserted(() -> {
+            try {
+                factory.getBookKeeper().get().openLedger(ledger3, 
ml.digestType, ml.config.getPassword());
+                Assert.fail("Should fail: the ledger has been deleted");
+            } catch (BKException.BKNoSuchLedgerExistsException ex) {
+                // Expected.
+            }
+            try {
+                factory.getBookKeeper().get().openLedger(ledger2, 
ml.digestType, ml.config.getPassword());
+                Assert.fail("Should fail: the ledger has been deleted");
+            } catch (BKException.BKNoSuchLedgerExistsException ex) {
+                // Expected.
+            }
+            try {
+                factory.getBookKeeper().get().openLedger(ledger1, 
ml.digestType, ml.config.getPassword());
+                Assert.fail("Should fail: the ledger has been deleted");
+            } catch (BKException.BKNoSuchLedgerExistsException ex) {
+                // Expected.
+            }
+        });
+
+        // Verify: "ml.getLedgerHandle" returns a correct ledger handle.
+        ReadHandle currentReadHandle4 = ml.getLedgerHandle(ledger4).join();
+        Assert.assertEquals(currentReadHandle4, originalReadHandle4);
+        try {
+            ml.getLedgerHandle(ledger3).join();
+            Assert.fail("should get a failure: MockLedgerOffloader does not 
support read");
+        } catch (Exception ex) {
+            Assert.assertTrue(ex.getCause().getCause().getMessage()
+                    .contains("MockLedgerOffloader does not support read"));
+        }
+        try {
+            ml.getLedgerHandle(ledger2).join();
+            Assert.fail("should get a failure: MockLedgerOffloader does not 
support read");
+        } catch (Exception ex) {
+            Assert.assertTrue(ex.getCause().getCause().getMessage()
+                    .contains("MockLedgerOffloader does not support read"));
+        }
+        try {
+            ml.getLedgerHandle(ledger1).join();
+            Assert.fail("should get a failure: MockLedgerOffloader does not 
support read");
+        } catch (Exception ex) {
+            Assert.assertTrue(ex.getCause().getCause().getMessage()
+                    .contains("MockLedgerOffloader does not support read"));
+        }
+    }
+
     @Test(timeOut = 5000)
     public void testFileSystemOffloadDeletePath() throws Exception {
         MockFileSystemLedgerOffloader offloader = new 
MockFileSystemLedgerOffloader();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 3f9f4f8da12..2dfc846bf42 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -1293,7 +1293,7 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
         public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uuid,
                                                            Map<String, String> 
offloadDriverMetadata) {
             CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
-            promise.completeExceptionally(new UnsupportedOperationException());
+            promise.completeExceptionally(new 
UnsupportedOperationException("MockLedgerOffloader does not support read"));
             return promise;
         }
 

Reply via email to