This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b9717c34d9be1c68b1cc4c0e2f22e57d9d1fcca7
Author: fengyubiao <[email protected]>
AuthorDate: Thu Jun 5 17:48:22 2025 +0800

    [improve][ml]Release idle offloaded read handle only the ref count is 0 
(#24381)
    
    (cherry picked from commit 21406b594a05a6362afdf68bbe5d041a5907394d)
---
 .../bookkeeper/mledger/OffloadedLedgerHandle.java  |  4 ++++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 12 +++++++----
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 24 +++++++++++++++++-----
 3 files changed, 31 insertions(+), 9 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java
index f45d115090f..64464187895 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java
@@ -26,4 +26,8 @@ public interface OffloadedLedgerHandle {
     default long lastAccessTimestamp() {
         return -1;
     }
+
+    default int getPendingRead() {
+        return 0;
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index b2065476c3b..2b6a596b479 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2658,10 +2658,10 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
             ledgerCache.forEach((ledgerId, ledger) -> {
                 if (ledger.isDone() && !ledger.isCompletedExceptionally()) {
                     ReadHandle readHandle = ledger.join();
-                    if (readHandle instanceof OffloadedLedgerHandle) {
-                        long lastAccessTimestamp = ((OffloadedLedgerHandle) 
readHandle).lastAccessTimestamp();
-                        if (lastAccessTimestamp >= 0) {
-                            long delta = now - lastAccessTimestamp;
+                    if (readHandle instanceof OffloadedLedgerHandle 
offloadedLedgerHandle) {
+                        int pendingRead = 
offloadedLedgerHandle.getPendingRead();
+                        if (pendingRead == 0) {
+                            long delta = now - 
offloadedLedgerHandle.lastAccessTimestamp();
                             if (delta >= 
inactiveOffloadedLedgerEvictionTimeMs) {
                                 log.info("[{}] Offloaded ledger {} can be 
released ({} ms elapsed since last access)",
                                         name, ledgerId, delta);
@@ -2671,6 +2671,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                                         "[{}] Offloaded ledger {} cannot be 
released ({} ms elapsed since last access)",
                                         name, ledgerId, delta);
                             }
+                        } else if (pendingRead < 0) {
+                            log.error("[{}] Offloaded ledger {} went to a 
wrong state because its pending read is a"
+                                + " negative value {}. Please raise an issue 
to https://github.com/apache/pulsar";, name,
+                                ledgerId, pendingRead);
                         }
                     }
                 }
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 1f2f901f514..84d9d118728 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -56,6 +57,9 @@ import org.slf4j.LoggerFactory;
 public class BlobStoreBackedReadHandleImpl implements ReadHandle, 
OffloadedLedgerHandle {
     private static final Logger log = 
LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class);
 
+    protected static final 
AtomicIntegerFieldUpdater<BlobStoreBackedReadHandleImpl> PENDING_READ_UPDATER =
+            
AtomicIntegerFieldUpdater.newUpdater(BlobStoreBackedReadHandleImpl.class, 
"pendingRead");
+
     private final long ledgerId;
     private final OffloadIndexBlock index;
     private final BackedInputStream inputStream;
@@ -71,6 +75,8 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle, OffloadedLedge
 
     private volatile State state = null;
 
+    private volatile int pendingRead;
+
     private volatile long lastAccessTimestamp = System.currentTimeMillis();
 
     private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock 
index,
@@ -122,9 +128,17 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle, OffloadedLedge
                     getId(), firstEntry, lastEntry, (1 + lastEntry - 
firstEntry));
         }
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
-        touch();
+
+        // Ledger handles will be only marked idle when "pendingRead" is "0", 
it is not needed to update
+        // "lastAccessTimestamp" if "pendingRead" is larger than "0".
+        // Rather than update "lastAccessTimestamp" when starts a reading, 
updating it when a reading task is finished
+        // is better.
+        PENDING_READ_UPDATER.incrementAndGet(this);
+        promise.whenComplete((__, ex) -> {
+            
PENDING_READ_UPDATER.decrementAndGet(BlobStoreBackedReadHandleImpl.this);
+            lastAccessTimestamp = System.currentTimeMillis();
+        });
         executor.execute(() -> {
-            touch();
             if (state == State.Closed) {
                 log.warn("Reading a closed read handler. Ledger ID: {}, Read 
range: {}-{}",
                         ledgerId, firstEntry, lastEntry);
@@ -213,7 +227,6 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle, OffloadedLedge
     }
 
     private void seekToEntry(long nextExpectedId) throws IOException {
-        touch();
         Long knownOffset = entryOffsetsCache.getIfPresent(ledgerId, 
nextExpectedId);
         if (knownOffset != null) {
             inputStream.seek(knownOffset);
@@ -324,7 +337,8 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle, OffloadedLedge
         return lastAccessTimestamp;
     }
 
-    private void touch() {
-        lastAccessTimestamp = System.currentTimeMillis();
+    @Override
+    public int getPendingRead() {
+        return PENDING_READ_UPDATER.get(this);
     }
 }

Reply via email to