This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new c205839088c KAFKA-15481: Fix concurrency bug in RemoteIndexCache 
(#14483)
c205839088c is described below

commit c205839088ca3c0aa5a1d1d8c249dcbccd2a3029
Author: Jotaniya Jeel <[email protected]>
AuthorDate: Mon Oct 23 13:50:46 2023 +0100

    KAFKA-15481: Fix concurrency bug in RemoteIndexCache (#14483)
    
    RemoteIndexCache has a concurrency bug which leads to IOException while 
fetching data from remote tier.
    
    The bug could be reproduced as per the following order of events:-
    
    Thread 1 (cache thread): invalidates the entry, removalListener is invoked 
async, so the files have not been renamed to "deleted" suffix yet.
    Thread 2: (fetch thread): tries to find entry in cache, doesn't find it 
because it has been removed by 1, fetches the entry from S3, writes it to 
existing file (using replace existing)
    Thread 1: async removalListener is invoked, acquires a lock on old entry 
(which has been removed from cache), it renames the file to "deleted" and 
starts deleting it
    Thread 2: Tries to create in-memory/mmapped index, but doesn't find the 
file and hence, creates a new file of size 2GB in AbstractIndex constructor. 
JVM returns an error as it won't allow creation of 2GB random access file.
    
    This commit fixes the bug by using EvictionListener instead of 
RemovalListener to perform the eviction atomically with the file rename. It 
handles the manual removal (not handled by EvictionListener) by using 
computeIfAbsent() and enforcing atomic cache removal & file rename.
    
    Reviewers: Luke Chen <[email protected]>, Divij Vaidya <[email protected]>, 
Arpit Goyal
    <[email protected]>, Kamal Chandraprakash 
<[email protected]>
---
 .../kafka/log/remote/RemoteIndexCacheTest.scala    | 177 +++++++++++++++------
 .../storage/internals/log/RemoteIndexCache.java    |  42 +++--
 2 files changed, 159 insertions(+), 60 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala 
b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index e82c35ffb97..1424d1297ff 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.{TopicIdPartition, 
TopicPartition, Uuid}
 import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
 import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, 
RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
 import org.apache.kafka.server.util.MockTime
-import 
org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD,
 remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, 
remoteTimeIndexFileName, remoteTransactionIndexFile, 
remoteTransactionIndexFileName}
+import org.apache.kafka.storage.internals.log.RemoteIndexCache.{DIR_NAME, 
REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD,  remoteOffsetIndexFile, 
remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, 
remoteTransactionIndexFile, remoteTransactionIndexFileName}
 import org.apache.kafka.storage.internals.log.{AbortedTxn, 
CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, 
RemoteIndexCache, TimeIndex, TransactionIndex}
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
@@ -32,14 +32,15 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.EnumSource
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.any
+import org.mockito.invocation.InvocationOnMock
 import org.mockito.Mockito._
 import org.slf4j.{Logger, LoggerFactory}
 
-import java.io.{File, FileInputStream, IOException, PrintWriter}
-import java.nio.file.{Files, Paths}
+import java.io.{File, FileInputStream, IOException, PrintWriter, 
UncheckedIOException}
+import java.nio.file.{Files, NoSuchFileException, Paths}
 import java.util
-import java.util.Collections
-import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
+import java.util.{Collections, Optional}
+import java.util.concurrent.{CountDownLatch, Executors, Future, TimeUnit}
 import scala.collection.mutable
 
 class RemoteIndexCacheTest {
@@ -73,9 +74,9 @@ class RemoteIndexCacheTest {
       .thenAnswer(ans => {
         val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
         val indexType = ans.getArgument[IndexType](1)
-        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
-        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
-        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir)
         maybeAppendIndexEntries(offsetIdx, timeIdx)
         indexType match {
           case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
@@ -152,8 +153,8 @@ class RemoteIndexCacheTest {
       .thenAnswer(ans => {
         val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
         val indexType = ans.getArgument[IndexType](1)
-        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
-        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir)
         maybeAppendIndexEntries(offsetIdx, timeIdx)
         indexType match {
           case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
@@ -262,7 +263,7 @@ class RemoteIndexCacheTest {
   }
 
   @Test
-  def testCacheEntryIsDeletedOnInvalidation(): Unit = {
+  def testCacheEntryIsDeletedOnRemoval(): Unit = {
     def getIndexFileFromDisk(suffix: String) = {
       Files.walk(tpDir.toPath)
         .filter(Files.isRegularFile(_))
@@ -284,8 +285,8 @@ class RemoteIndexCacheTest {
     // no expired entries yet
     assertEquals(0, cache.expiredIndexes.size, "expiredIndex queue should be 
zero at start of test")
 
-    // invalidate the cache. it should async mark the entry for removal
-    cache.internalCache.invalidate(internalIndexKey)
+    // call remove function to mark the entry for removal
+    cache.remove(internalIndexKey)
 
     // wait until entry is marked for deletion
     TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
@@ -304,13 +305,13 @@ class RemoteIndexCacheTest {
     verify(cacheEntry.txnIndex).renameTo(any(classOf[File]))
 
     // verify no index files on disk
-    assertFalse(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+    assertFalse(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
       s"Offset index file should not be present on disk at ${tpDir.toPath}")
-    
assertFalse(getIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+    assertFalse(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
       s"Txn index file should not be present on disk at ${tpDir.toPath}")
-    
assertFalse(getIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+    assertFalse(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
       s"Time index file should not be present on disk at ${tpDir.toPath}")
-    
assertFalse(getIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+    assertFalse(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
       s"Index file marked for deletion should not be present on disk at 
${tpDir.toPath}")
   }
 
@@ -558,6 +559,84 @@ class RemoteIndexCacheTest {
     verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testConcurrentRemoveReadForCache(): Unit = {
+    // Create a spy Cache Entry
+    val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset,
+      time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
+
+    val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+    val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+    val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+
+    val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, 
txIndex))
+    cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
+
+    assertCacheSize(1)
+
+    var entry: RemoteIndexCache.Entry = null
+
+    val latchForCacheRead = new CountDownLatch(1)
+    val latchForCacheRemove = new CountDownLatch(1)
+    val latchForTestWait = new CountDownLatch(1)
+
+    var markForCleanupCallCount = 0
+
+    doAnswer((invocation: InvocationOnMock) => {
+      markForCleanupCallCount += 1
+
+      if (markForCleanupCallCount == 1) {
+        // Signal the CacheRead to unblock itself
+        latchForCacheRead.countDown()
+        // Wait for signal to start renaming the files
+        latchForCacheRemove.await()
+        // Calling the markForCleanup() actual method to start renaming the 
files
+        invocation.callRealMethod()
+        // Signal TestWait to unblock itself so that test can be completed
+        latchForTestWait.countDown()
+      }
+    }).when(spyEntry).markForCleanup()
+
+    val removeCache = (() => {
+      cache.remove(rlsMetadata.remoteLogSegmentId().id())
+    }): Runnable
+
+    val readCache = (() => {
+      // Wait for signal to start CacheRead
+      latchForCacheRead.await()
+      entry = cache.getIndexEntry(rlsMetadata)
+      // Signal the CacheRemove to start renaming the files
+      latchForCacheRemove.countDown()
+    }): Runnable
+
+    val executor = Executors.newFixedThreadPool(2)
+    try {
+      val removeCacheFuture: Future[_] = executor.submit(removeCache: Runnable)
+      val readCacheFuture: Future[_] = executor.submit(readCache: Runnable)
+
+      // Verify both tasks are completed without any exception
+      removeCacheFuture.get()
+      readCacheFuture.get()
+
+      // Wait for signal to complete the test
+      latchForTestWait.await()
+
+      // We can't determine read thread or remove thread will go first so if,
+      // 1. Read thread go first, cache file should not exist and cache size 
should be zero.
+      // 2. Remove thread go first, cache file should present and cache size 
should be one.
+      // so basically here we are making sure that if cache existed, the cache 
file should exist,
+      // and if cache is non-existed, the cache file should not exist.
+      if (getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isPresent) {
+        assertCacheSize(1)
+      } else {
+        assertCacheSize(0)
+      }
+    } finally {
+      executor.shutdownNow()
+    }
+
+  }
+
   @Test
   def testMultipleIndexEntriesExecutionInCorruptException(): Unit = {
     reset(rsm)
@@ -565,9 +644,9 @@ class RemoteIndexCacheTest {
       .thenAnswer(ans => {
         val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
         val indexType = ans.getArgument[IndexType](1)
-        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
-        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
-        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir)
         maybeAppendIndexEntries(offsetIdx, timeIdx)
         // Create corrupted index file
         createCorruptTimeIndexOffsetFile(tpDir)
@@ -603,9 +682,9 @@ class RemoteIndexCacheTest {
       .thenAnswer(ans => {
         val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
         val indexType = ans.getArgument[IndexType](1)
-        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
-        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
-        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir)
         maybeAppendIndexEntries(offsetIdx, timeIdx)
         indexType match {
           case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
@@ -629,13 +708,6 @@ class RemoteIndexCacheTest {
     val remoteIndexCacheDir = cache.cacheDir()
     val tempSuffix = ".tmptest"
 
-    def getRemoteCacheIndexFileFromDisk(suffix: String) = {
-      Files.walk(remoteIndexCacheDir.toPath)
-        .filter(Files.isRegularFile(_))
-        .filter(path => path.getFileName.toString.endsWith(suffix))
-        .findAny()
-    }
-
     def renameRemoteCacheIndexFileFromDisk(suffix: String) = {
       Files.walk(remoteIndexCacheDir.toPath)
         .filter(Files.isRegularFile(_))
@@ -650,7 +722,7 @@ class RemoteIndexCacheTest {
     Files.copy(entry.txnIndex().file().toPath(), 
Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", 
tempSuffix)))
     Files.copy(entry.timeIndex().file().toPath(), 
Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", 
tempSuffix)))
 
-    cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id())
+    cache.remove(rlsMetadata.remoteLogSegmentId().id())
 
     // wait until entry is marked for deletion
     TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
@@ -666,9 +738,9 @@ class RemoteIndexCacheTest {
     // Index  Files already exist ,rsm should not fetch them again.
     verifyFetchIndexInvocation(count = 1)
     // verify index files on disk
-    
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
 s"Offset index file should be present on disk at 
${remoteIndexCacheDir.toPath}")
-    
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
 s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
-    
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
 s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")
+    assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be 
present on disk at ${remoteIndexCacheDir.toPath}")
+    assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be 
present on disk at ${remoteIndexCacheDir.toPath}")
+    assertTrue(getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be 
present on disk at ${remoteIndexCacheDir.toPath}")
   }
 
   @ParameterizedTest
@@ -678,9 +750,9 @@ class RemoteIndexCacheTest {
       .thenAnswer(ans => {
         val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
         val indexType = ans.getArgument[IndexType](1)
-        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
-        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
-        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir)
         maybeAppendIndexEntries(offsetIdx, timeIdx)
         // Create corrupt index file return from RSM
         createCorruptedIndexFile(testIndexType, tpDir)
@@ -725,7 +797,7 @@ class RemoteIndexCacheTest {
     // verify deleted file exists on disk
     
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
 s"Deleted Offset index file should be present on disk at 
${remoteIndexCacheDir.toPath}")
 
-    cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id())
+    cache.remove(rlsMetadata.remoteLogSegmentId().id())
 
     // wait until entry is marked for deletion
     TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
@@ -748,9 +820,9 @@ class RemoteIndexCacheTest {
                                     = 
RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
     val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, 
baseOffset, lastOffset,
       time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
-    val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata))
-    val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata))
-    val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata))
+    val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, tpDir))
+    val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, tpDir))
+    val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, 
tpDir))
     spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex))
   }
 
@@ -778,8 +850,8 @@ class RemoteIndexCacheTest {
     }
   }
 
-  private def createTxIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata): TransactionIndex = {
-    val txnIdxFile = remoteTransactionIndexFile(tpDir, metadata)
+  private def createTxIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata, dir: File): TransactionIndex = {
+    val txnIdxFile = remoteTransactionIndexFile(dir, metadata)
     txnIdxFile.createNewFile()
     new TransactionIndex(metadata.startOffset(), txnIdxFile)
   }
@@ -800,14 +872,14 @@ class RemoteIndexCacheTest {
     return new TransactionIndex(100L, txnIdxFile)
   }
 
-  private def createTimeIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata): TimeIndex = {
+  private def createTimeIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata, dir: File): TimeIndex = {
     val maxEntries = (metadata.endOffset() - 
metadata.startOffset()).asInstanceOf[Int]
-    new TimeIndex(remoteTimeIndexFile(tpDir, metadata), 
metadata.startOffset(), maxEntries * 12)
+    new TimeIndex(remoteTimeIndexFile(dir, metadata), metadata.startOffset(), 
maxEntries * 12)
   }
 
-  private def createOffsetIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata) = {
+  private def createOffsetIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata, dir: File) = {
     val maxEntries = (metadata.endOffset() - 
metadata.startOffset()).asInstanceOf[Int]
-    new OffsetIndex(remoteOffsetIndexFile(tpDir, metadata), 
metadata.startOffset(), maxEntries * 8)
+    new OffsetIndex(remoteOffsetIndexFile(dir, metadata), 
metadata.startOffset(), maxEntries * 8)
   }
 
   private def generateRemoteLogSegmentMetadata(size: Int,
@@ -860,4 +932,15 @@ class RemoteIndexCacheTest {
       createCorruptTxnIndexForSegmentMetadata(dir, rlsMetadata)
     }
   }
+
+  private def getIndexFileFromRemoteCacheDir(cache: RemoteIndexCache, suffix: 
String) = {
+    try {
+      Files.walk(cache.cacheDir().toPath())
+        .filter(Files.isRegularFile(_))
+        .filter(path => path.getFileName.toString.endsWith(suffix))
+        .findAny()
+    } catch {
+      case e @ (_ : NoSuchFileException | _ : UncheckedIOException) => 
Optional.empty()
+    }
+  }
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
index 5cdaf77c294..51b6ee3e5bc 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -132,19 +132,16 @@ public class RemoteIndexCache implements Closeable {
 
         internalCache = Caffeine.newBuilder()
                 .maximumSize(maxSize)
-                // removeListener is invoked when either the entry is 
invalidated (means manual removal by the caller) or
-                // evicted (means removal due to the policy)
-                .removalListener((Uuid key, Entry entry, RemovalCause cause) 
-> {
+                // This listener is invoked each time an entry is being 
automatically removed due to eviction. The cache will invoke this listener
+                // during the atomic operation to remove the entry (refer: 
https://github.com/ben-manes/caffeine/wiki/Removal),
+                // hence, care must be taken to ensure that this operation is 
not expensive. Note that this listener is not invoked when
+                // RemovalCause from cache is EXPLICIT or REPLACED (e.g. on 
Cache.invalidate(), Cache.put() etc.) For a complete list see:
+                // 
https://github.com/ben-manes/caffeine/blob/0cef55168986e3816314e7fdba64cb0b996dd3cc/caffeine/src/main/java/com/github/benmanes/caffeine/cache/RemovalCause.java#L23
+                // Hence, any operation required after removal from cache must 
be performed manually for these scenarios.
+                .evictionListener((Uuid key, Entry entry, RemovalCause cause) 
-> {
                     // Mark the entries for cleanup and add them to the queue 
to be garbage collected later by the background thread.
                     if (entry != null) {
-                        try {
-                            entry.markForCleanup();
-                        } catch (IOException e) {
-                            throw new KafkaException(e);
-                        }
-                        if (!expiredIndexes.offer(entry)) {
-                            log.error("Error while inserting entry {} for key 
{} into the cleaner queue because queue is full.", entry, key);
-                        }
+                        enqueueEntryForCleanup(entry, key);
                     } else {
                         log.error("Received entry as null for key {} when the 
it is removed from the cache.", key);
                     }
@@ -174,7 +171,11 @@ public class RemoteIndexCache implements Closeable {
     public void remove(Uuid key) {
         lock.readLock().lock();
         try {
-            internalCache.invalidate(key);
+            internalCache.asMap().computeIfPresent(key, (k, v) -> {
+                enqueueEntryForCleanup(v, k);
+                // Returning null to remove the key from the cache
+                return null;
+            });
         } finally {
             lock.readLock().unlock();
         }
@@ -183,12 +184,27 @@ public class RemoteIndexCache implements Closeable {
     public void removeAll(Collection<Uuid> keys) {
         lock.readLock().lock();
         try {
-            internalCache.invalidateAll(keys);
+            keys.forEach(key -> internalCache.asMap().computeIfPresent(key, 
(k, v) -> {
+                enqueueEntryForCleanup(v, k);
+                // Returning null to remove the key from the cache
+                return null;
+            }));
         } finally {
             lock.readLock().unlock();
         }
     }
 
+    private void enqueueEntryForCleanup(Entry entry, Uuid key) {
+        try {
+            entry.markForCleanup();
+            if (!expiredIndexes.offer(entry)) {
+                log.error("Error while inserting entry {} for key {} into the 
cleaner queue because queue is full.", entry, key);
+            }
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
     // Visible for testing
     public ShutdownableThread cleanerThread() {
         return cleanerThread;

Reply via email to