hudeqi commented on code in PR #14243:
URL: https://github.com/apache/kafka/pull/14243#discussion_r1320999054


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -110,27 +113,54 @@ public class RemoteIndexCache implements Closeable {
      *
      * We use {@link Caffeine} cache instead of implementing a thread safe LRU 
cache on our own.
      */
-    private final Cache<Uuid, Entry> internalCache;
-    private final RemoteStorageManager remoteStorageManager;
-    private final ShutdownableThread cleanerThread;
+    private Cache<Uuid, Entry> internalCache;
 
     public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
-        this(1024, remoteStorageManager, logDir);
+        this(DEFAULT_REMOTE_INDEX_CACHE_SIZE_BYTES, remoteStorageManager, 
logDir);
     }
 
     /**
      * Creates RemoteIndexCache with the given configs.
      *
-     * @param maxSize              maximum number of segment index entries to 
be cached.
+     * @param maxSize              maximum bytes size of segment index entries 
to be cached.
      * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
      * @param logDir               log directory
      */
-    public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+    public RemoteIndexCache(long maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
         this.remoteStorageManager = remoteStorageManager;
         cacheDir = new File(logDir, DIR_NAME);
 
-        internalCache = Caffeine.newBuilder()
-                .maximumSize(maxSize)
+        internalCache = initEmptyCache(maxSize);
+        init();
+
+        // Start cleaner thread that will clean the expired entries.
+        cleanerThread = createCleanerThread();
+        cleanerThread.start();
+    }
+
+    public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
+        lock.writeLock().lock();
+        try {
+            // When resizing the cache, we always start with an empty cache. 
There are two main reasons:
+            // 1. Resizing the cache is not a high-frequency operation, and 
there is no need to fill the data in the old
+            // cache to the new cache in time when resizing inside.
+            // 2. Since the eviction of the caffeine cache is cleared 
asynchronously, it is possible that after the entry
+            // in the old cache is filled in the new cache, the old cache will 
clear the entry, and the data in the two caches
+            // will be inconsistent.
+            internalCache.invalidateAll();
+            log.info("Invalidated all entries in the cache and triggered the 
cleaning of all index files in the cache dir.");
+            internalCache = initEmptyCache(remoteLogIndexFileCacheSize);

Review Comment:
   > @hudeqi , sorry, I don't want to add pressure to you, but we're planning 
to have RC for v3.6.0 release this Wed.. At this point of time, I think we'd 
better to get the current PR merged, then having a follow-up PR for the cache 
resize improvement. So that we can have at least a configurable index cache in 
v3.6.0, even if we can't have the improvement in v3.6.0 in time.
   > 
   > That said, please keep the current PR as is, and see if @divijvaidya has 
any other comments. In the mean time, you can create a separate PR for the 
improvement. Thanks for your effort!
   
   Okay, after this PR is merged, I will put this improved commit into the next 
PR. grateful! @showuon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to