This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 566330ca8d0 [fix][offload] Fix OOM in tiered storage, caused by 
unbounded offsets cache (#22679)
566330ca8d0 is described below

commit 566330ca8d0b3419853e0252276ef42c643d3465
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Thu May 9 10:25:13 2024 +0300

    [fix][offload] Fix OOM in tiered storage, caused by unbounded offsets cache 
(#22679)
    
    Co-authored-by: Jiwe Guo <techno...@apache.org>
---
 .../bookkeeper/mledger/LedgerOffloaderFactory.java |  7 +-
 .../bookkeeper/mledger/offload/Offloaders.java     |  6 ++
 .../jcloud/JCloudLedgerOffloaderFactory.java       | 16 ++--
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 24 +++---
 .../impl/BlobStoreManagedLedgerOffloader.java      | 14 +++-
 .../mledger/offload/jcloud/impl/OffsetsCache.java  | 85 ++++++++++++++++++++++
 .../impl/BlobStoreManagedLedgerOffloaderBase.java  |  9 +++
 ...obStoreManagedLedgerOffloaderStreamingTest.java |  4 +-
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  |  6 +-
 .../offload/jcloud/impl/OffsetsCacheTest.java      | 45 ++++++++++++
 10 files changed, 185 insertions(+), 31 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
index 7ecb8f08d57..9fbf9b73c05 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
@@ -31,7 +31,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaStorage;
  */
 @LimitedPrivate
 @Evolving
-public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
+public interface LedgerOffloaderFactory<T extends LedgerOffloader> extends 
AutoCloseable {
 
     /**
      * Check whether the provided driver <tt>driverName</tt> is supported.
@@ -111,4 +111,9 @@ public interface LedgerOffloaderFactory<T extends 
LedgerOffloader> {
             throws IOException {
         return create(offloadPolicies, userMetadata, scheduler, 
offloaderStats);
     }
+
+    @Override
+    default void close() throws Exception {
+        // no-op
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
index 6910439e091..cec15599242 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
@@ -46,6 +46,12 @@ public class Offloaders implements AutoCloseable {
     @Override
     public void close() throws Exception {
         offloaders.forEach(offloader -> {
+            try {
+                offloader.getRight().close();
+            } catch (Exception e) {
+                log.warn("Failed to close offloader '{}': {}",
+                        offloader.getRight().getClass(), e.getMessage());
+            }
             try {
                 offloader.getLeft().close();
             } catch (IOException e) {
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
index 2c916567444..60363cf8406 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
@@ -25,6 +25,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
 import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.LedgerOffloaderStatsDisable;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffsetsCache;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -33,12 +34,7 @@ import 
org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
  * A jcloud based offloader factory.
  */
 public class JCloudLedgerOffloaderFactory implements 
LedgerOffloaderFactory<BlobStoreManagedLedgerOffloader> {
-
-    public static JCloudLedgerOffloaderFactory of() {
-        return INSTANCE;
-    }
-
-    private static final JCloudLedgerOffloaderFactory INSTANCE = new 
JCloudLedgerOffloaderFactory();
+    private final OffsetsCache entryOffsetsCache = new OffsetsCache();
 
     @Override
     public boolean isDriverSupported(String driverName) {
@@ -58,6 +54,12 @@ public class JCloudLedgerOffloaderFactory implements 
LedgerOffloaderFactory<Blob
 
         TieredStorageConfiguration config =
                 
TieredStorageConfiguration.create(offloadPolicies.toProperties());
-        return BlobStoreManagedLedgerOffloader.create(config, userMetadata, 
scheduler, offloaderStats);
+        return BlobStoreManagedLedgerOffloader.create(config, userMetadata, 
scheduler, offloaderStats,
+                entryOffsetsCache);
+    }
+
+    @Override
+    public void close() throws Exception {
+        entryOffsetsCache.close();
     }
 }
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 4f68f90370e..e050d74a332 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -19,8 +19,6 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import io.netty.buffer.ByteBuf;
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -56,19 +54,13 @@ import org.slf4j.LoggerFactory;
 
 public class BlobStoreBackedReadHandleImpl implements ReadHandle {
     private static final Logger log = 
LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class);
-    private static final int CACHE_TTL_SECONDS =
-            
Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds", 
30 * 60);
 
     private final long ledgerId;
     private final OffloadIndexBlock index;
     private final BackedInputStream inputStream;
     private final DataInputStream dataStream;
     private final ExecutorService executor;
-    // this Cache is accessed only by one thread
-    private final Cache<Long, Long> entryOffsets = CacheBuilder
-            .newBuilder()
-            .expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
-            .build();
+    private final OffsetsCache entryOffsetsCache;
     private final AtomicReference<CompletableFuture<Void>> closeFuture = new 
AtomicReference<>();
 
     enum State {
@@ -79,12 +71,14 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
     private volatile State state = null;
 
     private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock 
index,
-                                          BackedInputStream inputStream, 
ExecutorService executor) {
+                                          BackedInputStream inputStream, 
ExecutorService executor,
+                                          OffsetsCache entryOffsetsCache) {
         this.ledgerId = ledgerId;
         this.index = index;
         this.inputStream = inputStream;
         this.dataStream = new DataInputStream(inputStream);
         this.executor = executor;
+        this.entryOffsetsCache = entryOffsetsCache;
         state = State.Opened;
     }
 
@@ -109,7 +103,6 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
             try {
                 index.close();
                 inputStream.close();
-                entryOffsets.invalidateAll();
                 state = State.Closed;
                 promise.complete(null);
             } catch (IOException t) {
@@ -164,7 +157,7 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
                     long entryId = dataStream.readLong();
 
                     if (entryId == nextExpectedId) {
-                        entryOffsets.put(entryId, currentPosition);
+                        entryOffsetsCache.put(ledgerId, entryId, 
currentPosition);
                         ByteBuf buf = 
PulsarByteBufAllocator.DEFAULT.buffer(length, length);
                         entries.add(LedgerEntryImpl.create(ledgerId, entryId, 
length, buf));
                         int toWrite = length;
@@ -215,7 +208,7 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
     }
 
     private void seekToEntry(long nextExpectedId) throws IOException {
-        Long knownOffset = entryOffsets.getIfPresent(nextExpectedId);
+        Long knownOffset = entryOffsetsCache.getIfPresent(ledgerId, 
nextExpectedId);
         if (knownOffset != null) {
             inputStream.seek(knownOffset);
         } else {
@@ -269,7 +262,8 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
                                   BlobStore blobStore, String bucket, String 
key, String indexKey,
                                   VersionCheck versionCheck,
                                   long ledgerId, int readBufferSize,
-                                  LedgerOffloaderStats offloaderStats, String 
managedLedgerName)
+                                  LedgerOffloaderStats offloaderStats, String 
managedLedgerName,
+                                  OffsetsCache entryOffsetsCache)
             throws IOException, BKException.BKNoSuchLedgerExistsException {
         int retryCount = 3;
         OffloadIndexBlock index = null;
@@ -310,7 +304,7 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
         BackedInputStream inputStream = new 
BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
                 versionCheck, index.getDataObjectLength(), readBufferSize, 
offloaderStats, managedLedgerName);
 
-        return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, 
executor);
+        return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, 
executor, entryOffsetsCache);
     }
 
     // for testing
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 1b6062ffa03..9f89bd52a86 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -108,6 +108,7 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
     private AtomicLong bufferLength = new AtomicLong(0);
     private AtomicLong segmentLength = new AtomicLong(0);
     private final long maxBufferLength;
+    private final OffsetsCache entryOffsetsCache;
     private final ConcurrentLinkedQueue<Entry> offloadBuffer = new 
ConcurrentLinkedQueue<>();
     private CompletableFuture<OffloadResult> offloadResult;
     private volatile PositionImpl lastOfferedPosition = PositionImpl.LATEST;
@@ -123,13 +124,16 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
     public static BlobStoreManagedLedgerOffloader 
create(TieredStorageConfiguration config,
                                                          Map<String, String> 
userMetadata,
                                                          OrderedScheduler 
scheduler,
-                                                         LedgerOffloaderStats 
offloaderStats) throws IOException {
+                                                         LedgerOffloaderStats 
offloaderStats,
+                                                         OffsetsCache 
entryOffsetsCache)
+            throws IOException {
 
-        return new BlobStoreManagedLedgerOffloader(config, scheduler, 
userMetadata, offloaderStats);
+        return new BlobStoreManagedLedgerOffloader(config, scheduler, 
userMetadata, offloaderStats, entryOffsetsCache);
     }
 
     BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config, 
OrderedScheduler scheduler,
-                                    Map<String, String> userMetadata, 
LedgerOffloaderStats offloaderStats) {
+                                    Map<String, String> userMetadata, 
LedgerOffloaderStats offloaderStats,
+                                    OffsetsCache entryOffsetsCache) {
 
         this.scheduler = scheduler;
         this.userMetadata = userMetadata;
@@ -140,6 +144,7 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
         this.minSegmentCloseTimeMillis = 
Duration.ofSeconds(config.getMinSegmentTimeInSecond()).toMillis();
         //ensure buffer can have enough content to fill a block
         this.maxBufferLength = Math.max(config.getWriteBufferSizeInBytes(), 
config.getMinBlockSizeInBytes());
+        this.entryOffsetsCache = entryOffsetsCache;
         this.segmentBeginTimeMillis = System.currentTimeMillis();
         if (!Strings.isNullOrEmpty(config.getRegion())) {
             this.writeLocation = new LocationBuilder()
@@ -555,7 +560,8 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
                         readBucket, key, indexKey,
                         DataBlockUtils.VERSION_CHECK,
                         ledgerId, config.getReadBufferSizeInBytes(),
-                        this.offloaderStats, 
offloadDriverMetadata.get(MANAGED_LEDGER_NAME)));
+                        this.offloaderStats, 
offloadDriverMetadata.get(MANAGED_LEDGER_NAME),
+                        this.entryOffsetsCache));
             } catch (Throwable t) {
                 log.error("Failed readOffloaded: ", t);
                 promise.completeExceptionally(t);
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCache.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCache.java
new file mode 100644
index 00000000000..fa13afa8ff0
--- /dev/null
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCache.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import io.grpc.netty.shaded.io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class OffsetsCache implements AutoCloseable {
+    private static final int CACHE_TTL_SECONDS =
+            
Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds", 5 
* 60);
+    // limit the cache size to avoid OOM
+    // 1 million entries consumes about 60MB of heap space
+    private static final int CACHE_MAX_SIZE =
+            
Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.max.size", 
1_000_000);
+    private final ScheduledExecutorService cacheEvictionExecutor;
+
+    record Key(long ledgerId, long entryId) {
+
+    }
+
+    private final Cache<OffsetsCache.Key, Long> entryOffsetsCache;
+
+    public OffsetsCache() {
+        if (CACHE_MAX_SIZE > 0) {
+            entryOffsetsCache = CacheBuilder
+                    .newBuilder()
+                    .expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
+                    .maximumSize(CACHE_MAX_SIZE)
+                    .build();
+            cacheEvictionExecutor =
+                    Executors.newSingleThreadScheduledExecutor(
+                            new 
DefaultThreadFactory("jcloud-offsets-cache-eviction"));
+            int period = Math.max(CACHE_TTL_SECONDS / 2, 1);
+            cacheEvictionExecutor.scheduleAtFixedRate(() -> {
+                entryOffsetsCache.cleanUp();
+            }, period, period, TimeUnit.SECONDS);
+        } else {
+            cacheEvictionExecutor = null;
+            entryOffsetsCache = null;
+        }
+    }
+
+    public void put(long ledgerId, long entryId, long currentPosition) {
+        if (entryOffsetsCache != null) {
+            entryOffsetsCache.put(new Key(ledgerId, entryId), currentPosition);
+        }
+    }
+
+    public Long getIfPresent(long ledgerId, long entryId) {
+        return entryOffsetsCache != null ? entryOffsetsCache.getIfPresent(new 
Key(ledgerId, entryId)) : null;
+    }
+
+    public void clear() {
+        if (entryOffsetsCache != null) {
+            entryOffsetsCache.invalidateAll();
+        }
+    }
+
+    @Override
+    public void close() {
+        if (cacheEvictionExecutor != null) {
+            cacheEvictionExecutor.shutdownNow();
+        }
+    }
+}
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
index 89d9021d36d..75faf098b40 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
@@ -33,6 +33,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.domain.Credentials;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 
 public abstract class BlobStoreManagedLedgerOffloaderBase {
@@ -46,6 +47,7 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
     protected final JCloudBlobStoreProvider provider;
     protected TieredStorageConfiguration config;
     protected BlobStore blobStore = null;
+    protected final OffsetsCache entryOffsetsCache = new OffsetsCache();
 
     protected BlobStoreManagedLedgerOffloaderBase() throws Exception {
         scheduler = 
OrderedScheduler.newSchedulerBuilder().numThreads(5).name("offloader").build();
@@ -56,6 +58,13 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
     @AfterMethod(alwaysRun = true)
     public void cleanupMockBookKeeper() {
         bk.getLedgerMap().clear();
+        entryOffsetsCache.clear();
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void cleanup() throws Exception {
+        entryOffsetsCache.close();
+        scheduler.shutdownNow();
     }
 
     protected static MockManagedLedger createMockManagedLedger() {
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
index ad1529072f8..e706e4254cb 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
@@ -82,7 +82,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest 
extends BlobStoreManag
         mockedConfig = mock(TieredStorageConfiguration.class, 
delegatesTo(getConfiguration(bucket, additionalConfig)));
         Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use 
the REAL blobStore
         BlobStoreManagedLedgerOffloader offloader = 
BlobStoreManagedLedgerOffloader
-                .create(mockedConfig, new HashMap<String, String>(), 
scheduler, this.offloaderStats);
+                .create(mockedConfig, new HashMap<String, String>(), 
scheduler, this.offloaderStats, entryOffsetsCache);
         return offloader;
     }
 
@@ -91,7 +91,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest 
extends BlobStoreManag
         mockedConfig = mock(TieredStorageConfiguration.class, 
delegatesTo(getConfiguration(bucket, additionalConfig)));
         Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
         BlobStoreManagedLedgerOffloader offloader = 
BlobStoreManagedLedgerOffloader
-                .create(mockedConfig, new HashMap<String, String>(), 
scheduler, this.offloaderStats);
+                .create(mockedConfig, new HashMap<String, String>(), 
scheduler, this.offloaderStats, entryOffsetsCache);
         return offloader;
     }
 
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 4419210c251..bf6ede896ab 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -98,14 +98,16 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
     private BlobStoreManagedLedgerOffloader getOffloader(String bucket) throws 
IOException {
         mockedConfig = mock(TieredStorageConfiguration.class, 
delegatesTo(getConfiguration(bucket)));
         Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use 
the REAL blobStore
-        BlobStoreManagedLedgerOffloader offloader = 
BlobStoreManagedLedgerOffloader.create(mockedConfig, new 
HashMap<String,String>(), scheduler, this.offloaderStats);
+        BlobStoreManagedLedgerOffloader offloader = 
BlobStoreManagedLedgerOffloader.create(mockedConfig, new 
HashMap<String,String>(), scheduler, this.offloaderStats,
+                entryOffsetsCache);
         return offloader;
     }
 
     private BlobStoreManagedLedgerOffloader getOffloader(String bucket, 
BlobStore mockedBlobStore) throws IOException {
         mockedConfig = mock(TieredStorageConfiguration.class, 
delegatesTo(getConfiguration(bucket)));
         Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
-        BlobStoreManagedLedgerOffloader offloader = 
BlobStoreManagedLedgerOffloader.create(mockedConfig, new 
HashMap<String,String>(), scheduler, this.offloaderStats);
+        BlobStoreManagedLedgerOffloader offloader = 
BlobStoreManagedLedgerOffloader.create(mockedConfig, new 
HashMap<String,String>(), scheduler, this.offloaderStats,
+                entryOffsetsCache);
         return offloader;
     }
 
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCacheTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCacheTest.java
new file mode 100644
index 00000000000..86a72c7b554
--- /dev/null
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffsetsCacheTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+@Slf4j
+public class OffsetsCacheTest {
+
+    @Test
+    public void testCache() throws Exception {
+        
System.setProperty("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds", 
"1");
+        OffsetsCache offsetsCache = new OffsetsCache();
+        assertNull(offsetsCache.getIfPresent(1, 2));
+        offsetsCache.put(1, 1, 1);
+        assertEquals(offsetsCache.getIfPresent(1, 1), 1);
+        offsetsCache.clear();
+        assertNull(offsetsCache.getIfPresent(1, 1));
+        // test ttl
+        offsetsCache.put(1, 2, 2);
+        assertEquals(offsetsCache.getIfPresent(1, 2), 2);
+        Thread.sleep(1500);
+        assertNull(offsetsCache.getIfPresent(1, 2));
+        offsetsCache.close();
+    }
+}

Reply via email to