divijvaidya commented on code in PR #14482: URL: https://github.com/apache/kafka/pull/14482#discussion_r1345355926
########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -541,6 +541,126 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testOffsetIndexFileAlreadyExistOnDiskButNotInCache(): Unit ={ + val remoteIndexCacheDir = new File(tpDir,RemoteIndexCache.DIR_NAME) Review Comment: Please follow style guide used in rest of the code. For example, space between = and {. You can find more information here: https://kafka.apache.org/coding-guide.html I remember your previous PR also had similar comments. I would recommend to set your IDE to use this style and automatically format newly added code. ########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -541,6 +541,126 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testOffsetIndexFileAlreadyExistOnDiskButNotInCache(): Unit ={ + val remoteIndexCacheDir = new File(tpDir,RemoteIndexCache.DIR_NAME) Review Comment: please create a getter in the RemoteIndexCache to get this dir, add javadoc for it `// visible for testing` and use that here. It will make this test independent of the remote cache location in case we change that in future, ########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -541,6 +541,126 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testOffsetIndexFileAlreadyExistOnDiskButNotInCache(): Unit ={ + val remoteIndexCacheDir = new File(tpDir,RemoteIndexCache.DIR_NAME) + val tempSuffix = ".tmptest" + def getRemoteCacheIndexFileFromDisk(suffix: String) = { + Files.walk(remoteIndexCacheDir.toPath) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } + def renameRemoteCacheIndexFileFromDisk(suffix: String) = { + Files.walk(remoteIndexCacheDir.toPath) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .forEach(f => Files.move(f,f.resolveSibling(f.getFileName().toString().stripSuffix(tempSuffix)))) + } + val entry = cache.getIndexEntry(rlsMetadata) + // copy files with temporary name + Files.copy(entry.offsetIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(),"",tempSuffix))) + Files.copy(entry.txnIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(),"",tempSuffix))) + Files.copy(entry.timeIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(),"",tempSuffix))) + + cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id()) + + // wait until entry is marked for deletion + TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after invalidation") + TestUtils.waitUntilTrue(() => entry.isCleanStarted, + "Failed to cleanup cache entry after invalidation") + + // restore index files + renameRemoteCacheIndexFileFromDisk(tempSuffix) + // validate cache entry for the above key should be null + assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id())) + cache.getIndexEntry(rlsMetadata) + // Index Files already exist + // rsm should not be called again + // instead files exist on disk + // should be used + verifyFetchIndexInvocation(count = 1) Review Comment: should this be 0 if there is going to be no fetch invocation? ########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -541,6 +541,126 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testOffsetIndexFileAlreadyExistOnDiskButNotInCache(): Unit ={ + val remoteIndexCacheDir = new File(tpDir,RemoteIndexCache.DIR_NAME) + val tempSuffix = ".tmptest" + def getRemoteCacheIndexFileFromDisk(suffix: String) = { + Files.walk(remoteIndexCacheDir.toPath) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } + def renameRemoteCacheIndexFileFromDisk(suffix: String) = { + Files.walk(remoteIndexCacheDir.toPath) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .forEach(f => Files.move(f,f.resolveSibling(f.getFileName().toString().stripSuffix(tempSuffix)))) + } + val entry = cache.getIndexEntry(rlsMetadata) + // copy files with temporary name + Files.copy(entry.offsetIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(),"",tempSuffix))) + Files.copy(entry.txnIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(),"",tempSuffix))) + Files.copy(entry.timeIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(),"",tempSuffix))) + + cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id()) + + // wait until entry is marked for deletion + TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after invalidation") + TestUtils.waitUntilTrue(() => entry.isCleanStarted, + "Failed to cleanup cache entry after invalidation") + + // restore index files + renameRemoteCacheIndexFileFromDisk(tempSuffix) + // validate cache entry for the above key should be null + assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id())) + cache.getIndexEntry(rlsMetadata) + // Index Files already exist + // rsm should not be called again + // instead files exist on disk + // should be used + verifyFetchIndexInvocation(count = 1) + // verify index files on disk + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}") + } + + @Test + def testRSMReturnCorruptedIndexFile(): Unit ={ + + when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType]))) + .thenAnswer(ans => { + val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) + val indexType = ans.getArgument[IndexType](1) + val pw = new PrintWriter(remoteOffsetIndexFile(tpDir, metadata)) + pw.write("Hello, world") + // The size of the string written in the file is 12 bytes, + // but it should be multiple of Offset Index EntrySIZE which is equal to 8. + pw.close() + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) Review Comment: could we have 3 variants of this test where we are corruption all 3 indexes one by one. You can use `@Parameterized` to run those scenarios in same test. ########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -541,6 +541,126 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testOffsetIndexFileAlreadyExistOnDiskButNotInCache(): Unit ={ + val remoteIndexCacheDir = new File(tpDir,RemoteIndexCache.DIR_NAME) + val tempSuffix = ".tmptest" + def getRemoteCacheIndexFileFromDisk(suffix: String) = { + Files.walk(remoteIndexCacheDir.toPath) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } + def renameRemoteCacheIndexFileFromDisk(suffix: String) = { + Files.walk(remoteIndexCacheDir.toPath) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .forEach(f => Files.move(f,f.resolveSibling(f.getFileName().toString().stripSuffix(tempSuffix)))) Review Comment: Instead of files.move, please use our custom utility function `Utils.atomicMoveWithFallback` ########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -541,6 +541,126 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testOffsetIndexFileAlreadyExistOnDiskButNotInCache(): Unit ={ + val remoteIndexCacheDir = new File(tpDir,RemoteIndexCache.DIR_NAME) + val tempSuffix = ".tmptest" + def getRemoteCacheIndexFileFromDisk(suffix: String) = { + Files.walk(remoteIndexCacheDir.toPath) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } + def renameRemoteCacheIndexFileFromDisk(suffix: String) = { + Files.walk(remoteIndexCacheDir.toPath) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .forEach(f => Files.move(f,f.resolveSibling(f.getFileName().toString().stripSuffix(tempSuffix)))) + } + val entry = cache.getIndexEntry(rlsMetadata) + // copy files with temporary name + Files.copy(entry.offsetIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(),"",tempSuffix))) + Files.copy(entry.txnIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(),"",tempSuffix))) + Files.copy(entry.timeIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(),"",tempSuffix))) + + cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id()) + + // wait until entry is marked for deletion + TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after invalidation") + TestUtils.waitUntilTrue(() => entry.isCleanStarted, + "Failed to cleanup cache entry after invalidation") + + // restore index files + renameRemoteCacheIndexFileFromDisk(tempSuffix) + // validate cache entry for the above key should be null + assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id())) + cache.getIndexEntry(rlsMetadata) + // Index Files already exist + // rsm should not be called again + // instead files exist on disk + // should be used + verifyFetchIndexInvocation(count = 1) + // verify index files on disk + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}") + } + + @Test + def testRSMReturnCorruptedIndexFile(): Unit ={ + + when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType]))) + .thenAnswer(ans => { + val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) + val indexType = ans.getArgument[IndexType](1) + val pw = new PrintWriter(remoteOffsetIndexFile(tpDir, metadata)) + pw.write("Hello, world") + // The size of the string written in the file is 12 bytes, + // but it should be multiple of Offset Index EntrySIZE which is equal to 8. + pw.close() + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) Review Comment: there is a bug in existing `createOffsetIndexForSegmentMetadata` and similar function. They create cache at the wrong place in the line: `new OffsetIndex(remoteOffsetIndexFile(tpDir, metadata)` Instead of `tpDir`, it should point to remote cache director which is a folder inside `tpDir`. Please fix that and see if it impacts your test in any way. ########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -541,6 +541,126 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testOffsetIndexFileAlreadyExistOnDiskButNotInCache(): Unit ={ + val remoteIndexCacheDir = new File(tpDir,RemoteIndexCache.DIR_NAME) + val tempSuffix = ".tmptest" + def getRemoteCacheIndexFileFromDisk(suffix: String) = { + Files.walk(remoteIndexCacheDir.toPath) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } + def renameRemoteCacheIndexFileFromDisk(suffix: String) = { + Files.walk(remoteIndexCacheDir.toPath) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .forEach(f => Files.move(f,f.resolveSibling(f.getFileName().toString().stripSuffix(tempSuffix)))) + } + val entry = cache.getIndexEntry(rlsMetadata) + // copy files with temporary name + Files.copy(entry.offsetIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(),"",tempSuffix))) + Files.copy(entry.txnIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(),"",tempSuffix))) + Files.copy(entry.timeIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(),"",tempSuffix))) + + cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id()) + + // wait until entry is marked for deletion + TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after invalidation") + TestUtils.waitUntilTrue(() => entry.isCleanStarted, + "Failed to cleanup cache entry after invalidation") + + // restore index files + renameRemoteCacheIndexFileFromDisk(tempSuffix) + // validate cache entry for the above key should be null + assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id())) + cache.getIndexEntry(rlsMetadata) + // Index Files already exist + // rsm should not be called again + // instead files exist on disk + // should be used + verifyFetchIndexInvocation(count = 1) + // verify index files on disk + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}") + } + + @Test + def testRSMReturnCorruptedIndexFile(): Unit ={ + + when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType]))) + .thenAnswer(ans => { + val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) + val indexType = ans.getArgument[IndexType](1) + val pw = new PrintWriter(remoteOffsetIndexFile(tpDir, metadata)) + pw.write("Hello, world") + // The size of the string written in the file is 12 bytes, + // but it should be multiple of Offset Index EntrySIZE which is equal to 8. + pw.close() + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) + val timeIdx = createTimeIndexForSegmentMetadata(metadata) + val txnIdx = createTxIndexForSegmentMetadata(metadata) + maybeAppendIndexEntries(offsetIdx, timeIdx) + indexType match { + case IndexType.OFFSET => new FileInputStream(offsetIdx.file) + case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file) + case IndexType.TRANSACTION => new FileInputStream(txnIdx.file) + case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed. + case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed. + } + }) + assertThrows(classOf[CorruptIndexException], () => cache.getIndexEntry(rlsMetadata)) + } + + @Test + def testConcurrentCacheDeletedFileExists(): Unit = { + val remoteIndexCacheDir = new File(tpDir,RemoteIndexCache.DIR_NAME) + def getRemoteCacheIndexFileFromDisk(suffix: String) = { + Files.walk(remoteIndexCacheDir.toPath) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } + val entry = cache.getIndexEntry(rlsMetadata) + // verify index files on disk + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}") + + // Simulating a concurrency issue where deleted files already exist on disk + // This happen when cleanerThread is slow and not able to delete index entries + // while same index Entry is cached again and invalidated. + // The new deleted file created should be replaced by existing deleted file. + + // create deleted suffix file + Files.copy(entry.offsetIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(),"",LogFileUtils.DELETED_FILE_SUFFIX))) + Files.copy(entry.txnIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(),"",LogFileUtils.DELETED_FILE_SUFFIX))) + Files.copy(entry.timeIndex().file().toPath(),Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(),"",LogFileUtils.DELETED_FILE_SUFFIX))) + + // verify deleted file exists on disk + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Deleted Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") + + cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id()) + + // wait until entry is marked for deletion + TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after invalidation") + TestUtils.waitUntilTrue(() => entry.isCleanStarted, Review Comment: Clean started doesn't mean that index has been deleted. There could be a case where next line executes even though file has not been deleted yet. I would suggest to wrap the next assertions in `waitUntilTrue()`. It would make the test non-flaky. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org