This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b9717c34d9be1c68b1cc4c0e2f22e57d9d1fcca7 Author: fengyubiao <[email protected]> AuthorDate: Thu Jun 5 17:48:22 2025 +0800 [improve][ml]Release idle offloaded read handle only the ref count is 0 (#24381) (cherry picked from commit 21406b594a05a6362afdf68bbe5d041a5907394d) --- .../bookkeeper/mledger/OffloadedLedgerHandle.java | 4 ++++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 12 +++++++---- .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 24 +++++++++++++++++----- 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java index f45d115090f..64464187895 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java @@ -26,4 +26,8 @@ public interface OffloadedLedgerHandle { default long lastAccessTimestamp() { return -1; } + + default int getPendingRead() { + return 0; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index b2065476c3b..2b6a596b479 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2658,10 +2658,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { ledgerCache.forEach((ledgerId, ledger) -> { if (ledger.isDone() && !ledger.isCompletedExceptionally()) { ReadHandle readHandle = ledger.join(); - if (readHandle instanceof OffloadedLedgerHandle) { - long lastAccessTimestamp = ((OffloadedLedgerHandle) readHandle).lastAccessTimestamp(); - if (lastAccessTimestamp >= 0) { - long delta = now - lastAccessTimestamp; + if (readHandle instanceof OffloadedLedgerHandle offloadedLedgerHandle) { + int pendingRead = offloadedLedgerHandle.getPendingRead(); + if (pendingRead == 0) { + long delta = now - offloadedLedgerHandle.lastAccessTimestamp(); if (delta >= inactiveOffloadedLedgerEvictionTimeMs) { log.info("[{}] Offloaded ledger {} can be released ({} ms elapsed since last access)", name, ledgerId, delta); @@ -2671,6 +2671,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { "[{}] Offloaded ledger {} cannot be released ({} ms elapsed since last access)", name, ledgerId, delta); } + } else if (pendingRead < 0) { + log.error("[{}] Offloaded ledger {} went to a wrong state because its pending read is a" + + " negative value {}. Please raise an issue to https://github.com/apache/pulsar", name, + ledgerId, pendingRead); } } } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index 1f2f901f514..84d9d118728 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; @@ -56,6 +57,9 @@ import org.slf4j.LoggerFactory; public class BlobStoreBackedReadHandleImpl implements ReadHandle, OffloadedLedgerHandle { private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class); + protected static final AtomicIntegerFieldUpdater<BlobStoreBackedReadHandleImpl> PENDING_READ_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(BlobStoreBackedReadHandleImpl.class, "pendingRead"); + private final long ledgerId; private final OffloadIndexBlock index; private final BackedInputStream inputStream; @@ -71,6 +75,8 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle, OffloadedLedge private volatile State state = null; + private volatile int pendingRead; + private volatile long lastAccessTimestamp = System.currentTimeMillis(); private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, @@ -122,9 +128,17 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle, OffloadedLedge getId(), firstEntry, lastEntry, (1 + lastEntry - firstEntry)); } CompletableFuture<LedgerEntries> promise = new CompletableFuture<>(); - touch(); + + // Ledger handles will be only marked idle when "pendingRead" is "0", it is not needed to update + // "lastAccessTimestamp" if "pendingRead" is larger than "0". + // Rather than update "lastAccessTimestamp" when starts a reading, updating it when a reading task is finished + // is better. + PENDING_READ_UPDATER.incrementAndGet(this); + promise.whenComplete((__, ex) -> { + PENDING_READ_UPDATER.decrementAndGet(BlobStoreBackedReadHandleImpl.this); + lastAccessTimestamp = System.currentTimeMillis(); + }); executor.execute(() -> { - touch(); if (state == State.Closed) { log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", ledgerId, firstEntry, lastEntry); @@ -213,7 +227,6 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle, OffloadedLedge } private void seekToEntry(long nextExpectedId) throws IOException { - touch(); Long knownOffset = entryOffsetsCache.getIfPresent(ledgerId, nextExpectedId); if (knownOffset != null) { inputStream.seek(knownOffset); @@ -324,7 +337,8 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle, OffloadedLedge return lastAccessTimestamp; } - private void touch() { - lastAccessTimestamp = System.currentTimeMillis(); + @Override + public int getPendingRead() { + return PENDING_READ_UPDATER.get(this); } }
