This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b0b8693c725 KAFKA-15536: Dynamically resize remoteIndexCache (#14511)
b0b8693c725 is described below

commit b0b8693c725c1edd6256f707d6bee903d5d5fa19
Author: hudeqi <[email protected]>
AuthorDate: Mon Oct 16 15:24:36 2023 +0800

    KAFKA-15536: Dynamically resize remoteIndexCache (#14511)
    
    Dynamically resize remoteIndexCache
    
    Reviewers: Christo Lolov <[email protected]>, Luke Chen 
<[email protected]>, Divij Vaidya <[email protected]>, Kamal Chandraprakash 
<[email protected]>
---
 .../kafka/log/remote/RemoteIndexCacheTest.scala    | 133 +++++++++++++++++----
 .../storage/internals/log/RemoteIndexCache.java    |  17 ++-
 2 files changed, 120 insertions(+), 30 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala 
b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 07f07757b31..36d21d24999 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.{TopicIdPartition, 
TopicPartition, Uuid}
 import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
 import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, 
RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
 import org.apache.kafka.server.util.MockTime
-import 
org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD,
 remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, 
remoteTimeIndexFileName, remoteTransactionIndexFile, 
remoteTransactionIndexFileName}
+import org.apache.kafka.storage.internals.log.RemoteIndexCache.{Entry, 
REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteDeletedSuffixIndexFileName, 
remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, 
remoteTimeIndexFileName, remoteTransactionIndexFile, 
remoteTransactionIndexFileName}
 import org.apache.kafka.storage.internals.log.{AbortedTxn, 
CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, 
RemoteIndexCache, TimeIndex, TransactionIndex}
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
@@ -35,8 +35,8 @@ import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito._
 import org.slf4j.{Logger, LoggerFactory}
 
-import java.io.{File, FileInputStream, FileNotFoundException, IOException, 
PrintWriter}
-import java.nio.file.{Files, Paths}
+import java.io.{File, FileInputStream, IOException, PrintWriter}
+import java.nio.file.{Files, NoSuchFileException, Paths}
 import java.util
 import java.util.{Collections, Optional}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
@@ -517,29 +517,80 @@ class RemoteIndexCacheTest {
 
   @Test
   def testClearCacheAndIndexFilesWhenResizeCache(): Unit = {
+    val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+    val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
 
-    def getIndexFileFromRemoteCacheDir(suffix: String) = {
-      try {
-        Files.walk(cache.cacheDir().toPath())
-          .filter(Files.isRegularFile(_))
-          .filter(path => path.getFileName.toString.endsWith(suffix))
-          .findAny()
-      } catch {
-        case _: FileNotFoundException => Optional.empty()
-      }
+    assertCacheSize(0)
+    // getIndex for first time will call rsm#fetchIndex
+    val cacheEntry = cache.getIndexEntry(metadataList.head)
+    assertCacheSize(1)
+    assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+    assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+    assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+    cache.resizeCacheSize(1L)
+
+    // wait until entry is marked for deletion
+    TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+      "Failed to mark cache entry for cleanup after resizing cache.")
+    TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+      "Failed to cleanup cache entry after resizing cache.")
+
+    // verify no index files on remote cache dir
+    TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+      s"Offset index file should not be present on disk at 
${cache.cacheDir()}")
+    TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+      s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+    TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+      s"Time index file should not be present on disk at ${cache.cacheDir()}")
+    TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+      s"Index file marked for deletion should not be present on disk at 
${cache.cacheDir()}")
+
+    assertCacheSize(0)
+  }
+
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+    def verifyEntryIsEvicted(metadataToVerify: RemoteLogSegmentMetadata, 
entryToVerify: Entry): Unit = {
+      // wait until `entryToVerify` is marked for deletion
+      TestUtils.waitUntilTrue(() => entryToVerify.isMarkedForCleanup,
+        "Failed to mark evicted cache entry for cleanup after resizing cache.")
+      TestUtils.waitUntilTrue(() => entryToVerify.isCleanStarted,
+        "Failed to cleanup evicted cache entry after resizing cache.")
+      // verify no index files for `entryToVerify` on remote cache dir
+      TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadataToVerify)).isPresent,
+        s"Offset index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+      TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadataToVerify)).isPresent,
+        s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+      TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadataToVerify)).isPresent,
+        s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+      TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteDeletedSuffixIndexFileName(metadataToVerify)).isPresent,
+        s"Index file marked for deletion for evicted entry should not be 
present on disk at ${cache.cacheDir()}")
     }
 
+    def verifyEntryIsKept(metadataToVerify: RemoteLogSegmentMetadata): Unit = {
+      assertTrue(getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadataToVerify)).isPresent)
+      assertTrue(getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadataToVerify)).isPresent)
+      assertTrue(getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadataToVerify)).isPresent)
+      assertTrue(!getIndexFileFromRemoteCacheDir(cache, 
remoteDeletedSuffixIndexFileName(metadataToVerify)).isPresent)
+    }
+
+    // The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entries with limited capacity of 2 entries ->
+    // evict to 1 entry -> resize to 1 entry size -> resize to 2 entries size
+    val estimateEntryBytesSize = estimateOneEntryBytesSize()
     val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
-    val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
+    val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
 
     assertCacheSize(0)
     // getIndex for first time will call rsm#fetchIndex
     val cacheEntry = cache.getIndexEntry(metadataList.head)
     assertCacheSize(1)
-    
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
-    
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
-    
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+    assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+    assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+    assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
 
+    // Reduce the cache size to 1 byte to ensure that all the entries are 
evicted from it.
     cache.resizeCacheSize(1L)
 
     // wait until entry is marked for deletion
@@ -549,16 +600,45 @@ class RemoteIndexCacheTest {
       "Failed to cleanup cache entry after resizing cache.")
 
     // verify no index files on remote cache dir
-    TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+    TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
       s"Offset index file should not be present on disk at 
${cache.cacheDir()}")
-    TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+    TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
       s"Txn index file should not be present on disk at ${cache.cacheDir()}")
-    TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+    TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
       s"Time index file should not be present on disk at ${cache.cacheDir()}")
-    TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+    TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
       s"Index file marked for deletion should not be present on disk at 
${cache.cacheDir()}")
 
-    assertTrue(cache.internalCache().estimatedSize() == 0)
+    assertCacheSize(0)
+
+    // Increase cache capacity to only store 2 entries
+    cache.resizeCacheSize(2 * estimateEntryBytesSize)
+    assertCacheSize(0)
+
+    val entry0 = cache.getIndexEntry(metadataList(0))
+    val entry1 = cache.getIndexEntry(metadataList(1))
+    cache.getIndexEntry(metadataList(2))
+    assertCacheSize(2)
+    verifyEntryIsEvicted(metadataList(0), entry0)
+
+    // Reduce cache capacity to only store 1 entries
+    cache.resizeCacheSize(1 * estimateEntryBytesSize)
+    assertCacheSize(1)
+    verifyEntryIsEvicted(metadataList(1), entry1)
+
+    // resize to the same size, all entries should be kept
+    cache.resizeCacheSize(1 * estimateEntryBytesSize)
+
+    // verify all existing entries (`cache.getIndexEntry(metadataList(2))`) 
are kept
+    verifyEntryIsKept(metadataList(2))
+    assertCacheSize(1)
+
+    // increase the size
+    cache.resizeCacheSize(2 * estimateEntryBytesSize)
+
+    // verify all existing entries (`cache.getIndexEntry(metadataList(2))`) 
are kept
+    verifyEntryIsKept(metadataList(2))
+    assertCacheSize(1)
   }
 
   @ParameterizedTest
@@ -928,4 +1008,15 @@ class RemoteIndexCacheTest {
       createCorruptTxnIndexForSegmentMetadata(dir, rlsMetadata)
     }
   }
+
+  private def getIndexFileFromRemoteCacheDir(cache: RemoteIndexCache, suffix: 
String) = {
+    try {
+      Files.walk(cache.cacheDir().toPath())
+        .filter(Files.isRegularFile(_))
+        .filter(path => path.getFileName.toString.endsWith(suffix))
+        .findAny()
+    } catch {
+      case _: NoSuchFileException => Optional.empty()
+    }
+  }
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
index 42871698318..60202c35ee0 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -47,6 +47,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -137,15 +138,8 @@ public class RemoteIndexCache implements Closeable {
     public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
         lock.writeLock().lock();
         try {
-            // When resizing the cache, we always start with an empty cache. 
There are two main reasons:
-            // 1. Resizing the cache is not a high-frequency operation, and 
there is no need to fill the data in the old
-            // cache to the new cache in time when resizing inside.
-            // 2. Since the eviction of the caffeine cache is cleared 
asynchronously, it is possible that after the entry
-            // in the old cache is filled in the new cache, the old cache will 
clear the entry, and the data in the two caches
-            // will be inconsistent.
-            internalCache.invalidateAll();
-            log.info("Invalidated all entries in the cache and triggered the 
cleaning of all index files in the cache dir.");
-            internalCache = initEmptyCache(remoteLogIndexFileCacheSize);
+            internalCache.policy().eviction().orElseThrow(() -> new 
NoSuchElementException("No eviction policy is set for the remote index cache.")
+            ).setMaximum(remoteLogIndexFileCacheSize);
         } finally {
             lock.writeLock().unlock();
         }
@@ -716,4 +710,9 @@ public class RemoteIndexCache implements Closeable {
         return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + 
LogFileUtils.TXN_INDEX_FILE_SUFFIX;
     }
 
+    // Visible for testing
+    public static String 
remoteDeletedSuffixIndexFileName(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + 
LogFileUtils.DELETED_FILE_SUFFIX;
+    }
+
 }
\ No newline at end of file

Reply via email to