This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 051c676dc0d48bf4d5a99077cccf9cf87e23cdee
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon Jun 2 09:41:46 2025 +0200

    [improve][offloaders] Automatically evict Offloaded Ledgers from memory 
(#19783)
    
    Co-authored-by: Lari Hotari <[email protected]>
    (cherry picked from commit a1a2b363cfaa1bbc38933a742484a70a0a56e761)
---
 conf/broker.conf                                   |   4 +
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |   9 ++
 .../bookkeeper/mledger/OffloadedLedgerHandle.java  |  29 ++++++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  74 ++++++++++++++-
 .../impl/OffloadEvictUnusedLedgersTest.java        | 104 +++++++++++++++++++++
 .../mledger/impl/OffloadPrefixReadTest.java        |  20 +++-
 .../apache/pulsar/broker/ServiceConfiguration.java |   8 ++
 .../pulsar/broker/service/BrokerService.java       |   4 +
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java |  17 +++-
 9 files changed, 261 insertions(+), 8 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 0a8eddbb3c7..b60870f109f 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1212,6 +1212,10 @@ managedLedgerMaxLedgerRolloverTimeMinutes=240
 # Disable rollover with value 0 (Default value 0)
 managedLedgerInactiveLedgerRolloverTimeSeconds=0
 
+# Time to evict inactive offloaded ledger for inactive topic
+# Disable eviction with value 0
+managedLedgerInactiveOffloadedLedgerEvictionTimeSeconds=600
+
 # Maximum ledger size before triggering a rollover for a topic (MB)
 managedLedgerMaxSizePerLedgerMbytes=2048
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 721654b0529..89cc7e4fde4 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -79,6 +79,7 @@ public class ManagedLedgerConfig {
     private ManagedLedgerInterceptor managedLedgerInterceptor;
     private Map<String, String> properties;
     private int inactiveLedgerRollOverTimeMs = 0;
+    private long inactiveOffloadedLedgerEvictionTimeMs = 0;
     @Getter
     @Setter
     private boolean cacheEvictionByMarkDeletedPosition = false;
@@ -701,6 +702,14 @@ public class ManagedLedgerConfig {
         this.inactiveLedgerRollOverTimeMs = (int) 
unit.toMillis(inactiveLedgerRollOverTimeMs);
     }
 
+    public long getInactiveOffloadedLedgerEvictionTimeMs() {
+        return inactiveOffloadedLedgerEvictionTimeMs;
+    }
+
+    public void setInactiveOffloadedLedgerEvictionTime(long 
inactiveOffloadedLedgerEvictionTime, TimeUnit unit) {
+        this.inactiveOffloadedLedgerEvictionTimeMs = 
unit.toMillis(inactiveOffloadedLedgerEvictionTime);
+    }
+
     /**
      * Minimum cursors with backlog after which broker is allowed to cache 
read entries to reuse them for other cursors'
      * backlog reads. (Default = 0, broker will not cache backlog reads)
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java
new file mode 100644
index 00000000000..f45d115090f
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadedLedgerHandle.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+/**
+ *  This is a marked interface for ledger handle that represent offloaded data.
+ */
+public interface OffloadedLedgerHandle {
+
+    default long lastAccessTimestamp() {
+        return -1;
+    }
+}
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ede725c1a01..b2065476c3b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -112,6 +112,7 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundExce
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
+import org.apache.bookkeeper.mledger.OffloadedLedgerHandle;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
@@ -335,6 +336,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     @VisibleForTesting
     Map<String, byte[]> createdLedgerCustomMetadata;
 
+    private long lastEvictOffloadedLedgers;
+    private static final int MINIMUM_EVICTION_INTERVAL_DIVIDER = 10;
+
     public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper 
bookKeeper, MetaStore store,
             ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
             final String name) {
@@ -1979,6 +1983,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 // TODO: improve this to load ledger offloader by driver name 
recorded in metadata
                 Map<String, String> offloadDriverMetadata = 
OffloadUtils.getOffloadDriverMetadata(info);
                 offloadDriverMetadata.put("ManagedLedgerName", name);
+                log.info("[{}] Opening ledger {} from offload driver {} with 
uid {}", name, ledgerId,
+                        config.getLedgerOffloader().getOffloadDriverName(), 
uid);
                 openFuture = 
config.getLedgerOffloader().readOffloaded(ledgerId, uid,
                         offloadDriverMetadata);
             } else {
@@ -2004,11 +2010,20 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
     void invalidateReadHandle(long ledgerId) {
         CompletableFuture<ReadHandle> rhf = ledgerCache.remove(ledgerId);
         if (rhf != null) {
-            rhf.thenAccept(ReadHandle::closeAsync)
-                    .exceptionally(ex -> {
-                        log.warn("[{}] Failed to close a Ledger ReadHandle:", 
name, ex);
-                        return null;
-                    });
+            rhf.thenCompose(r -> {
+                if (r instanceof OffloadedLedgerHandle) {
+                    log.info("[{}] Closing ledger {} from offload driver {}", 
name, ledgerId,
+                            
config.getLedgerOffloader().getOffloadDriverName());
+                }
+                return r.closeAsync().exceptionally(ex -> {
+                    log.warn("[{}] Failed to close ledger {} ReadHandle with 
type {}", name, ledgerId,
+                            r.getClass().getName(), ex);
+                    return null;
+                });
+            }).exceptionally(ex -> {
+                log.warn("[{}] Failed to close Ledger ReadHandle {}:", name, 
ledgerId, ex);
+                return null;
+            });
         }
     }
 
@@ -2623,7 +2638,56 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         return Optional.ofNullable(ledgerOffloader.getOffloadPolicies());
     }
 
+    @VisibleForTesting
+    synchronized List<Long> internalEvictOffloadedLedgers() {
+        long inactiveOffloadedLedgerEvictionTimeMs = 
config.getInactiveOffloadedLedgerEvictionTimeMs();
+        if (inactiveOffloadedLedgerEvictionTimeMs <= 0) {
+            return Collections.emptyList();
+        }
+
+        long now = clock.millis();
+        long minimumEvictionIntervalMs = inactiveOffloadedLedgerEvictionTimeMs 
/ MINIMUM_EVICTION_INTERVAL_DIVIDER;
+        if (now - lastEvictOffloadedLedgers < minimumEvictionIntervalMs) {
+            // skip eviction if we have done it recently
+            return Collections.emptyList();
+        }
+
+        try {
+            List<Long> ledgersToRelease = new ArrayList<>();
+
+            ledgerCache.forEach((ledgerId, ledger) -> {
+                if (ledger.isDone() && !ledger.isCompletedExceptionally()) {
+                    ReadHandle readHandle = ledger.join();
+                    if (readHandle instanceof OffloadedLedgerHandle) {
+                        long lastAccessTimestamp = ((OffloadedLedgerHandle) 
readHandle).lastAccessTimestamp();
+                        if (lastAccessTimestamp >= 0) {
+                            long delta = now - lastAccessTimestamp;
+                            if (delta >= 
inactiveOffloadedLedgerEvictionTimeMs) {
+                                log.info("[{}] Offloaded ledger {} can be 
released ({} ms elapsed since last access)",
+                                        name, ledgerId, delta);
+                                ledgersToRelease.add(ledgerId);
+                            } else if (log.isDebugEnabled()) {
+                                log.debug(
+                                        "[{}] Offloaded ledger {} cannot be 
released ({} ms elapsed since last access)",
+                                        name, ledgerId, delta);
+                            }
+                        }
+                    }
+                }
+            });
+            for (Long ledgerId : ledgersToRelease) {
+                invalidateReadHandle(ledgerId);
+            }
+            return ledgersToRelease;
+        } finally {
+            lastEvictOffloadedLedgers = now;
+        }
+    }
+
     void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) 
{
+
+        internalEvictOffloadedLedgers();
+
         if (!factory.isMetadataServiceAvailable()) {
             // Defer trimming of ledger if we cannot connect to metadata 
service
             promise.completeExceptionally(new MetaStoreException("Metadata 
service is not available"));
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadEvictUnusedLedgersTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadEvictUnusedLedgersTest.java
new file mode 100644
index 00000000000..d4fce5585e3
--- /dev/null
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadEvictUnusedLedgersTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.awaitility.Awaitility;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+public class OffloadEvictUnusedLedgersTest extends MockedBookKeeperTestCase {
+    private static final Logger log = 
LoggerFactory.getLogger(OffloadEvictUnusedLedgersTest.class);
+
+    @Test
+    public void testEvictUnusedLedgers() throws Exception {
+        OffloadPrefixReadTest.MockLedgerOffloader offloader =
+                new OffloadPrefixReadTest.MockLedgerOffloader();
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setRetentionSizeInMB(10);
+        long inactiveOffloadedLedgerEvictionTimeMs = 1000;
+        
config.setInactiveOffloadedLedgerEvictionTime(inactiveOffloadedLedgerEvictionTimeMs,
 TimeUnit.MILLISECONDS);
+        config.setLedgerOffloader(offloader);
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger_evict", config);
+
+        // no evict when no offloaded ledgers
+        assertTrue(ledger.internalEvictOffloadedLedgers().isEmpty());
+
+        int i = 0;
+        for (; i < 25; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+        assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+
+        assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+        assertEquals(ledger.getLedgersInfoAsList().stream()
+                            .filter(e -> e.getOffloadContext().getComplete())
+                            .map(e -> 
e.getLedgerId()).collect(Collectors.toSet()),
+                            offloader.offloadedLedgers());
+
+        // ledgers should be marked as offloaded
+        ledger.getLedgersInfoAsList().stream().allMatch(l -> 
l.hasOffloadContext());
+
+        // no evict when no offloaded ledgers are marked as inactive
+        assertTrue(ledger.internalEvictOffloadedLedgers().isEmpty());
+
+        ManagedCursor cursor = 
ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+        int j = 0;
+        for (Entry e : cursor.readEntries(25)) {
+            assertEquals(new String(e.getData()), "entry-" + j++);
+        }
+        cursor.close();
+
+        // set last access time to be 2x inactiveOffloadedLedgerEvictionTimeMs
+        AtomicLong first = new AtomicLong(-1);
+        assertTrue(!ledger.ledgerCache.isEmpty());
+        ledger.ledgerCache.forEach((id, l) -> {
+            if (first.compareAndSet(-1, id)) {
+                OffloadPrefixReadTest.MockOffloadReadHandle handle =
+                        (OffloadPrefixReadTest.MockOffloadReadHandle) l.join();
+                handle.setLastAccessTimestamp(System.currentTimeMillis() - 
inactiveOffloadedLedgerEvictionTimeMs * 2);
+            }
+        });
+        assertNotEquals(first.get(), -1L);
+
+        Awaitility.await().untilAsserted(() -> {
+            List<Long> evicted = ledger.internalEvictOffloadedLedgers();
+            assertEquals(evicted.size(), 1);
+            assertEquals(first.get(), evicted.get(0).longValue());
+        });
+    }
+
+}
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index af5c46e328b..34a4ade9531 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -31,17 +31,18 @@ import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import lombok.SneakyThrows;
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -57,6 +58,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.OffloadedLedgerHandle;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.util.MockClock;
 import org.apache.bookkeeper.net.BookieId;
@@ -312,6 +314,10 @@ public class OffloadPrefixReadTest extends 
MockedBookKeeperTestCase {
                 OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
                 OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY);
 
+        Set<Long> offloadedLedgers() {
+            return 
offloads.values().stream().map(ReadHandle::getId).collect(Collectors.toSet());
+        }
+
 
         @Override
         public String getOffloadDriverName() {
@@ -372,10 +378,11 @@ public class OffloadPrefixReadTest extends 
MockedBookKeeperTestCase {
         }
     }
 
-    static class MockOffloadReadHandle implements ReadHandle {
+    static class MockOffloadReadHandle implements ReadHandle, 
OffloadedLedgerHandle {
         final long id;
         final List<ByteBuf> entries = new ArrayList();
         final LedgerMetadata metadata;
+        long lastAccessTimestamp = System.currentTimeMillis();
 
         MockOffloadReadHandle(ReadHandle toCopy) throws Exception {
             id = toCopy.getId();
@@ -453,6 +460,15 @@ public class OffloadPrefixReadTest extends 
MockedBookKeeperTestCase {
             future.completeExceptionally(new UnsupportedOperationException());
             return future;
         }
+
+        @Override
+        public long lastAccessTimestamp() {
+            return lastAccessTimestamp;
+        }
+
+        public void setLastAccessTimestamp(long lastAccessTimestamp) {
+            this.lastAccessTimestamp = lastAccessTimestamp;
+        }
     }
 
     static class MockMetadata implements LedgerMetadata {
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 57ea9838b2d..c25d9e9aae1 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3245,6 +3245,14 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         )
     private int managedLedgerInactiveLedgerRolloverTimeSeconds = 0;
 
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_STORAGE_ML,
+            doc = "Time to evict inactive offloaded ledger for inactive topic. 
"
+                    + "Disable eviction with value 0 (Default value 600)"
+    )
+    private int managedLedgerInactiveOffloadedLedgerEvictionTimeSeconds = 600;
+
     @FieldContext(
             category = CATEGORY_STORAGE_ML,
             doc = "Evicting cache data by the slowest markDeletedPosition or 
readPosition. "
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7b115b155a5..d1d60095b5b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2021,6 +2021,10 @@ public class BrokerService implements Closeable {
             
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
             managedLedgerConfig.setInactiveLedgerRollOverTime(
                     
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), 
TimeUnit.SECONDS);
+            managedLedgerConfig.setInactiveOffloadedLedgerEvictionTime(
+                    
serviceConfig.getManagedLedgerInactiveOffloadedLedgerEvictionTimeSeconds(),
+                    TimeUnit.SECONDS);
+
             managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
                     serviceConfig.isCacheEvictionByMarkDeletedPosition());
             managedLedgerConfig.setMinimumBacklogCursorsForCaching(
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index e050d74a332..1f2f901f514 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.OffloadedLedgerHandle;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
@@ -52,7 +53,7 @@ import org.jclouds.blobstore.domain.Blob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BlobStoreBackedReadHandleImpl implements ReadHandle {
+public class BlobStoreBackedReadHandleImpl implements ReadHandle, 
OffloadedLedgerHandle {
     private static final Logger log = 
LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class);
 
     private final long ledgerId;
@@ -70,6 +71,8 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
 
     private volatile State state = null;
 
+    private volatile long lastAccessTimestamp = System.currentTimeMillis();
+
     private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock 
index,
                                           BackedInputStream inputStream, 
ExecutorService executor,
                                           OffsetsCache entryOffsetsCache) {
@@ -119,7 +122,9 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
                     getId(), firstEntry, lastEntry, (1 + lastEntry - 
firstEntry));
         }
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
+        touch();
         executor.execute(() -> {
+            touch();
             if (state == State.Closed) {
                 log.warn("Reading a closed read handler. Ledger ID: {}, Read 
range: {}-{}",
                         ledgerId, firstEntry, lastEntry);
@@ -208,6 +213,7 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
     }
 
     private void seekToEntry(long nextExpectedId) throws IOException {
+        touch();
         Long knownOffset = entryOffsetsCache.getIfPresent(ledgerId, 
nextExpectedId);
         if (knownOffset != null) {
             inputStream.seek(knownOffset);
@@ -312,4 +318,13 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
     State getState() {
         return this.state;
     }
+
+    @Override
+    public long lastAccessTimestamp() {
+        return lastAccessTimestamp;
+    }
+
+    private void touch() {
+        lastAccessTimestamp = System.currentTimeMillis();
+    }
 }

Reply via email to