iit2009060 commented on code in PR #14482: URL: https://github.com/apache/kafka/pull/14482#discussion_r1348388097
########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -524,23 +527,239 @@ class RemoteIndexCacheTest { } } - @Test - def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = { - // create Corrupt Offset Index File - createCorruptRemoteIndexCacheOffsetFile() + @ParameterizedTest + @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION")) + def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit = { + // create Corrupt Index File in remote index cache + if (indexType == IndexType.OFFSET) { + createCorruptOffsetIndexFile(cache.cacheDir()) + } + else if (indexType == IndexType.TIMESTAMP) { + createCorruptTimeIndexOffsetFile(cache.cacheDir()) + } + else if (indexType == IndexType.TRANSACTION) { + createCorruptTxnIndexForSegmentMetadata(cache.cacheDir(), rlsMetadata) + } val entry = cache.getIndexEntry(rlsMetadata) // Test would fail if it throws corrupt Exception - val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata) val offsetIndexFile = entry.offsetIndex.file().toPath + val txnIndexFile = entry.txnIndex.file().toPath + val timeIndexFile = entry.timeIndex.file().toPath + + val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata) + val expectedTimeIndexFileName: String = remoteTimeIndexFileName(rlsMetadata) + val expectedTxnIndexFileName: String = remoteTransactionIndexFileName(rlsMetadata) assertEquals(expectedOffsetIndexFileName, offsetIndexFile.getFileName.toString) + assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString) + assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString) + // assert that parent directory for the index files is correct assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString, - s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent") + s"offsetIndex=entry.offsetIndex().file().toPath is created under incorrect parent") + assertEquals(RemoteIndexCache.DIR_NAME, txnIndexFile.getParent.getFileName.toString, + s"txnIndex=$txnIndexFile is created under incorrect parent") + assertEquals(RemoteIndexCache.DIR_NAME, timeIndexFile.getParent.getFileName.toString, + s"timeIndex=$timeIndexFile is created under incorrect parent") + // file is corrupted it should fetch from remote storage again verifyFetchIndexInvocation(count = 1) } + @Test + def testMultipleIndexEntriesExecutionInCorruptException(): Unit = { + reset(rsm) + when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType]))) + .thenAnswer(ans => { + val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) + val indexType = ans.getArgument[IndexType](1) + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) + val timeIdx = createTimeIndexForSegmentMetadata(metadata) + val txnIdx = createTxIndexForSegmentMetadata(metadata) + maybeAppendIndexEntries(offsetIdx, timeIdx) + // Create corrupt index file which would be returned by rsm + createCorruptTimeIndexOffsetFile(tpDir) + indexType match { + case IndexType.OFFSET => new FileInputStream(offsetIdx.file) + case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file) + case IndexType.TRANSACTION => new FileInputStream(txnIdx.file) + case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed. + case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed. + } + }) + + assertThrows(classOf[CorruptIndexException], () => cache.getIndexEntry(rlsMetadata)) + assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id())) + verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP)) + verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION)) + // However cache fetch failed + // but it has already created offset and time index file in remote cache dir. + // Current status + // (cache is null) + // RemoteCacheDir contain + // 1. Offset Index File is fine and not corrupted + // 2. Time Index File is corrupted + // What should be the code flow in next execution + // 1. No rsm call for fetching OffSet Index File. + // 2. Time index file should be fetched from remote storage again as it is corrupted in the first execution. Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org