jeel2420 commented on code in PR #14483: URL: https://github.com/apache/kafka/pull/14483#discussion_r1347075039
########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -538,7 +533,75 @@ class RemoteIndexCacheTest { assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString, s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent") // file is corrupted it should fetch from remote storage again - verifyFetchIndexInvocation(count = 1) + verifyFetchIndexInvocation(count = 1, indexTypes = Seq(IndexType.OFFSET)) + } + + @Test + def testConcurrentRemoveReadForCache(): Unit = { + // Create a spy Cache Entry + val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset, + time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) + + val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata)) + val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata)) + val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata)) + + val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex)) + cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry) + + assertCacheSize(1) + + val latchForCacheRead = new CountDownLatch(1) + val latchForCacheRemove = new CountDownLatch(1) + val latchForTestWait = new CountDownLatch(1) + + var markForCleanupCallCount = 0 + + doAnswer((invocation: InvocationOnMock) => { + markForCleanupCallCount += 1 + + if (markForCleanupCallCount == 1) { + // Signal the CacheRead to unblock itself + latchForCacheRead.countDown() + // Wait for signal to start renaming the files + latchForCacheRemove.await() + // Calling the markForCleanup() actual method to start renaming the files + invocation.callRealMethod() + // Signal TestWait to unblock itself so that test can be completed + latchForTestWait.countDown() + } else { + // Subsequent call for markForCleanup method + latchForCacheRead.countDown() + latchForCacheRemove.countDown() + } + }).when(spyEntry).markForCleanup() + + val removeCache = (() => { + cache.remove(rlsMetadata.remoteLogSegmentId().id()) + }): Runnable + + val readCache = (() => { + // Wait for signal to start CacheRead + latchForCacheRead.await() + cache.getIndexEntry(rlsMetadata) + // Signal the CacheRemove to start renaming the files + latchForCacheRemove.countDown() + }): Runnable + + val executor = Executors.newFixedThreadPool(2) + try { + executor.submit(removeCache: Runnable) + executor.submit(readCache: Runnable) + + // Wait for signal to complete the test + latchForTestWait.await() + assertCacheSize(1) + val entry = cache.getIndexEntry(rlsMetadata) + assertTrue(Files.exists(entry.offsetIndex().file().toPath)) Review Comment: Here we want to prove that the entry is present in the cache so when we call getIndexEntry we should get the entry and index files should be present in the cacheDir. This reproduce the actual issue by replicating the scenario where we can have inconsistency where key is present in the cache but reference files are already renamed by that time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org