This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b0b8693c725 KAFKA-15536: Dynamically resize remoteIndexCache (#14511)
b0b8693c725 is described below
commit b0b8693c725c1edd6256f707d6bee903d5d5fa19
Author: hudeqi <[email protected]>
AuthorDate: Mon Oct 16 15:24:36 2023 +0800
KAFKA-15536: Dynamically resize remoteIndexCache (#14511)
Dynamically resize remoteIndexCache
Reviewers: Christo Lolov <[email protected]>, Luke Chen
<[email protected]>, Divij Vaidya <[email protected]>, Kamal Chandraprakash
<[email protected]>
---
.../kafka/log/remote/RemoteIndexCacheTest.scala | 133 +++++++++++++++++----
.../storage/internals/log/RemoteIndexCache.java | 17 ++-
2 files changed, 120 insertions(+), 30 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 07f07757b31..36d21d24999 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.{TopicIdPartition,
TopicPartition, Uuid}
import
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId,
RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
import org.apache.kafka.server.util.MockTime
-import
org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD,
remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile,
remoteTimeIndexFileName, remoteTransactionIndexFile,
remoteTransactionIndexFileName}
+import org.apache.kafka.storage.internals.log.RemoteIndexCache.{Entry,
REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteDeletedSuffixIndexFileName,
remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile,
remoteTimeIndexFileName, remoteTransactionIndexFile,
remoteTransactionIndexFileName}
import org.apache.kafka.storage.internals.log.{AbortedTxn,
CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition,
RemoteIndexCache, TimeIndex, TransactionIndex}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
@@ -35,8 +35,8 @@ import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.slf4j.{Logger, LoggerFactory}
-import java.io.{File, FileInputStream, FileNotFoundException, IOException,
PrintWriter}
-import java.nio.file.{Files, Paths}
+import java.io.{File, FileInputStream, IOException, PrintWriter}
+import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util
import java.util.{Collections, Optional}
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
@@ -517,29 +517,80 @@ class RemoteIndexCacheTest {
@Test
def testClearCacheAndIndexFilesWhenResizeCache(): Unit = {
+ val tpId = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0))
+ val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
- def getIndexFileFromRemoteCacheDir(suffix: String) = {
- try {
- Files.walk(cache.cacheDir().toPath())
- .filter(Files.isRegularFile(_))
- .filter(path => path.getFileName.toString.endsWith(suffix))
- .findAny()
- } catch {
- case _: FileNotFoundException => Optional.empty()
- }
+ assertCacheSize(0)
+ // getIndex for first time will call rsm#fetchIndex
+ val cacheEntry = cache.getIndexEntry(metadataList.head)
+ assertCacheSize(1)
+ assertTrue(getIndexFileFromRemoteCacheDir(cache,
LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+ assertTrue(getIndexFileFromRemoteCacheDir(cache,
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+ assertTrue(getIndexFileFromRemoteCacheDir(cache,
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+ cache.resizeCacheSize(1L)
+
+ // wait until entry is marked for deletion
+ TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+ "Failed to mark cache entry for cleanup after resizing cache.")
+ TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+ "Failed to cleanup cache entry after resizing cache.")
+
+ // verify no index files on remote cache dir
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache,
LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+ s"Offset index file should not be present on disk at
${cache.cacheDir()}")
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache,
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+ s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache,
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+ s"Time index file should not be present on disk at ${cache.cacheDir()}")
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache,
LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+ s"Index file marked for deletion should not be present on disk at
${cache.cacheDir()}")
+
+ assertCacheSize(0)
+ }
+
+ @Test
+ def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+ def verifyEntryIsEvicted(metadataToVerify: RemoteLogSegmentMetadata,
entryToVerify: Entry): Unit = {
+ // wait until `entryToVerify` is marked for deletion
+ TestUtils.waitUntilTrue(() => entryToVerify.isMarkedForCleanup,
+ "Failed to mark evicted cache entry for cleanup after resizing cache.")
+ TestUtils.waitUntilTrue(() => entryToVerify.isCleanStarted,
+ "Failed to cleanup evicted cache entry after resizing cache.")
+ // verify no index files for `entryToVerify` on remote cache dir
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache,
remoteOffsetIndexFileName(metadataToVerify)).isPresent,
+ s"Offset index file for evicted entry should not be present on disk at
${cache.cacheDir()}")
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache,
remoteTimeIndexFileName(metadataToVerify)).isPresent,
+ s"Time index file for evicted entry should not be present on disk at
${cache.cacheDir()}")
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache,
remoteTransactionIndexFileName(metadataToVerify)).isPresent,
+ s"Txn index file for evicted entry should not be present on disk at
${cache.cacheDir()}")
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache,
remoteDeletedSuffixIndexFileName(metadataToVerify)).isPresent,
+ s"Index file marked for deletion for evicted entry should not be
present on disk at ${cache.cacheDir()}")
}
+ def verifyEntryIsKept(metadataToVerify: RemoteLogSegmentMetadata): Unit = {
+ assertTrue(getIndexFileFromRemoteCacheDir(cache,
remoteOffsetIndexFileName(metadataToVerify)).isPresent)
+ assertTrue(getIndexFileFromRemoteCacheDir(cache,
remoteTimeIndexFileName(metadataToVerify)).isPresent)
+ assertTrue(getIndexFileFromRemoteCacheDir(cache,
remoteTransactionIndexFileName(metadataToVerify)).isPresent)
+ assertTrue(!getIndexFileFromRemoteCacheDir(cache,
remoteDeletedSuffixIndexFileName(metadataToVerify)).isPresent)
+ }
+
+ // The test process for resizing is: put 1 entry -> evict to empty -> put
3 entries with limited capacity of 2 entries ->
+ // evict to 1 entry -> resize to 1 entry size -> resize to 2 entries size
+ val estimateEntryBytesSize = estimateOneEntryBytesSize()
val tpId = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0))
- val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
+ val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
assertCacheSize(0)
// getIndex for first time will call rsm#fetchIndex
val cacheEntry = cache.getIndexEntry(metadataList.head)
assertCacheSize(1)
-
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
-
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
-
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+ assertTrue(getIndexFileFromRemoteCacheDir(cache,
LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+ assertTrue(getIndexFileFromRemoteCacheDir(cache,
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+ assertTrue(getIndexFileFromRemoteCacheDir(cache,
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+ // Reduce the cache size to 1 byte to ensure that all the entries are
evicted from it.
cache.resizeCacheSize(1L)
// wait until entry is marked for deletion
@@ -549,16 +600,45 @@ class RemoteIndexCacheTest {
"Failed to cleanup cache entry after resizing cache.")
// verify no index files on remote cache dir
- TestUtils.waitUntilTrue(() =>
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache,
LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
s"Offset index file should not be present on disk at
${cache.cacheDir()}")
- TestUtils.waitUntilTrue(() =>
!getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache,
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
s"Txn index file should not be present on disk at ${cache.cacheDir()}")
- TestUtils.waitUntilTrue(() =>
!getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache,
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
s"Time index file should not be present on disk at ${cache.cacheDir()}")
- TestUtils.waitUntilTrue(() =>
!getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache,
LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
s"Index file marked for deletion should not be present on disk at
${cache.cacheDir()}")
- assertTrue(cache.internalCache().estimatedSize() == 0)
+ assertCacheSize(0)
+
+ // Increase cache capacity to only store 2 entries
+ cache.resizeCacheSize(2 * estimateEntryBytesSize)
+ assertCacheSize(0)
+
+ val entry0 = cache.getIndexEntry(metadataList(0))
+ val entry1 = cache.getIndexEntry(metadataList(1))
+ cache.getIndexEntry(metadataList(2))
+ assertCacheSize(2)
+ verifyEntryIsEvicted(metadataList(0), entry0)
+
+ // Reduce cache capacity to only store 1 entries
+ cache.resizeCacheSize(1 * estimateEntryBytesSize)
+ assertCacheSize(1)
+ verifyEntryIsEvicted(metadataList(1), entry1)
+
+ // resize to the same size, all entries should be kept
+ cache.resizeCacheSize(1 * estimateEntryBytesSize)
+
+ // verify all existing entries (`cache.getIndexEntry(metadataList(2))`)
are kept
+ verifyEntryIsKept(metadataList(2))
+ assertCacheSize(1)
+
+ // increase the size
+ cache.resizeCacheSize(2 * estimateEntryBytesSize)
+
+ // verify all existing entries (`cache.getIndexEntry(metadataList(2))`)
are kept
+ verifyEntryIsKept(metadataList(2))
+ assertCacheSize(1)
}
@ParameterizedTest
@@ -928,4 +1008,15 @@ class RemoteIndexCacheTest {
createCorruptTxnIndexForSegmentMetadata(dir, rlsMetadata)
}
}
+
+ private def getIndexFileFromRemoteCacheDir(cache: RemoteIndexCache, suffix:
String) = {
+ try {
+ Files.walk(cache.cacheDir().toPath())
+ .filter(Files.isRegularFile(_))
+ .filter(path => path.getFileName.toString.endsWith(suffix))
+ .findAny()
+ } catch {
+ case _: NoSuchFileException => Optional.empty()
+ }
+ }
}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
index 42871698318..60202c35ee0 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -47,6 +47,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -137,15 +138,8 @@ public class RemoteIndexCache implements Closeable {
public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
lock.writeLock().lock();
try {
- // When resizing the cache, we always start with an empty cache.
There are two main reasons:
- // 1. Resizing the cache is not a high-frequency operation, and
there is no need to fill the data in the old
- // cache to the new cache in time when resizing inside.
- // 2. Since the eviction of the caffeine cache is cleared
asynchronously, it is possible that after the entry
- // in the old cache is filled in the new cache, the old cache will
clear the entry, and the data in the two caches
- // will be inconsistent.
- internalCache.invalidateAll();
- log.info("Invalidated all entries in the cache and triggered the
cleaning of all index files in the cache dir.");
- internalCache = initEmptyCache(remoteLogIndexFileCacheSize);
+ internalCache.policy().eviction().orElseThrow(() -> new
NoSuchElementException("No eviction policy is set for the remote index cache.")
+ ).setMaximum(remoteLogIndexFileCacheSize);
} finally {
lock.writeLock().unlock();
}
@@ -716,4 +710,9 @@ public class RemoteIndexCache implements Closeable {
return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) +
LogFileUtils.TXN_INDEX_FILE_SUFFIX;
}
+ // Visible for testing
+ public static String
remoteDeletedSuffixIndexFileName(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
+ return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) +
LogFileUtils.DELETED_FILE_SUFFIX;
+ }
+
}
\ No newline at end of file