iit2009060 commented on code in PR #14482:
URL: https://github.com/apache/kafka/pull/14482#discussion_r1348399758


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -524,23 +527,239 @@ class RemoteIndexCacheTest {
     }
   }
 
-  @Test
-  def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
-    // create Corrupt Offset Index File
-    createCorruptRemoteIndexCacheOffsetFile()
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", 
"TRANSACTION"))
+  def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit 
= {
+    // create Corrupt  Index File in remote index cache
+    if (indexType == IndexType.OFFSET) {
+      createCorruptOffsetIndexFile(cache.cacheDir())
+    }
+    else if (indexType == IndexType.TIMESTAMP) {
+      createCorruptTimeIndexOffsetFile(cache.cacheDir())
+    }
+    else if (indexType == IndexType.TRANSACTION) {
+      createCorruptTxnIndexForSegmentMetadata(cache.cacheDir(), rlsMetadata)
+    }
     val entry = cache.getIndexEntry(rlsMetadata)
     // Test would fail if it throws corrupt Exception
-    val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
     val offsetIndexFile = entry.offsetIndex.file().toPath
+    val txnIndexFile = entry.txnIndex.file().toPath
+    val timeIndexFile = entry.timeIndex.file().toPath
+
+    val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
+    val expectedTimeIndexFileName: String = 
remoteTimeIndexFileName(rlsMetadata)
+    val expectedTxnIndexFileName: String = 
remoteTransactionIndexFileName(rlsMetadata)
 
     assertEquals(expectedOffsetIndexFileName, 
offsetIndexFile.getFileName.toString)
+    assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString)
+    assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)
+
     // assert that parent directory for the index files is correct
     assertEquals(RemoteIndexCache.DIR_NAME, 
offsetIndexFile.getParent.getFileName.toString,
-      s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
+      s"offsetIndex=entry.offsetIndex().file().toPath is created under 
incorrect parent")
+    assertEquals(RemoteIndexCache.DIR_NAME, 
txnIndexFile.getParent.getFileName.toString,
+      s"txnIndex=$txnIndexFile is created under incorrect parent")
+    assertEquals(RemoteIndexCache.DIR_NAME, 
timeIndexFile.getParent.getFileName.toString,
+      s"timeIndex=$timeIndexFile is created under incorrect parent")
+
     // file is corrupted it should fetch from remote storage again
     verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testMultipleIndexEntriesExecutionInCorruptException(): Unit = {
+    reset(rsm)
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        // Create corrupt index file which would be returned by rsm
+        createCorruptTimeIndexOffsetFile(tpDir)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not 
accessed.
+        }
+      })
+
+    assertThrows(classOf[CorruptIndexException], () => 
cache.getIndexEntry(rlsMetadata))
+    
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
+    verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
+    verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION))
+    // However cache fetch failed
+    // but it has already created offset and time index file in remote cache 
dir.
+    // Current status
+    // (cache is null)
+    // RemoteCacheDir contain
+    // 1. Offset Index File is fine and not corrupted
+    // 2. Time Index File is corrupted
+    // What should be the code flow in next execution
+    // 1. No rsm call for fetching OffSet Index File.
+    // 2. Time index file should be fetched from remote storage again as it is 
corrupted in the first execution.
+    reset(rsm)
+    // delete all files created in tpDir
+    Files.walk(tpDir.toPath, 1)
+      .filter(Files.isRegularFile(_))
+      .forEach(path => Files.deleteIfExists(path))
+    // rsm should return no corrupted file in the 2nd execution
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not 
accessed.
+        }
+      })
+    cache.getIndexEntry(rlsMetadata)
+    // No exception should occur
+    // rsm should not be called for offset Index
+    verifyFetchIndexInvocation(0, Seq(IndexType.OFFSET))
+    verifyFetchIndexInvocation(1, Seq(IndexType.TIMESTAMP))
+    verifyFetchIndexInvocation(1, Seq(IndexType.TRANSACTION))
+  }
+
+  @Test
+  def testIndexFileAlreadyExistOnDiskButNotInCache(): Unit = {
+    val remoteIndexCacheDir = cache.cacheDir()
+    val tempSuffix = ".tmptest"
+
+    def getRemoteCacheIndexFileFromDisk(suffix: String) = {
+      Files.walk(remoteIndexCacheDir.toPath)
+        .filter(Files.isRegularFile(_))
+        .filter(path => path.getFileName.toString.endsWith(suffix))
+        .findAny()
+    }
+
+    def renameRemoteCacheIndexFileFromDisk(suffix: String) = {
+      Files.walk(remoteIndexCacheDir.toPath)
+        .filter(Files.isRegularFile(_))
+        .filter(path => path.getFileName.toString.endsWith(suffix))
+        .forEach(f => Utils.atomicMoveWithFallback(f, 
f.resolveSibling(f.getFileName().toString().stripSuffix(tempSuffix))))
+    }
+
+    val entry = cache.getIndexEntry(rlsMetadata)
+    // copy files with temporary name
+    Files.copy(entry.offsetIndex().file().toPath(), 
Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", 
tempSuffix)))
+    Files.copy(entry.txnIndex().file().toPath(), 
Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", 
tempSuffix)))
+    Files.copy(entry.timeIndex().file().toPath(), 
Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", 
tempSuffix)))
+
+    cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id())
+
+    // wait until entry is marked for deletion
+    TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
+      "Failed to mark cache entry for cleanup after invalidation")
+    TestUtils.waitUntilTrue(() => entry.isCleanStarted,
+      "Failed to cleanup cache entry after invalidation")
+
+    // restore index files
+    renameRemoteCacheIndexFileFromDisk(tempSuffix)
+    // validate cache entry for the above key should be null
+    
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
+    cache.getIndexEntry(rlsMetadata)
+    // Index  Files already exist ,rsm should not be called again
+    // instead files exist on disk should be used
+    verifyFetchIndexInvocation(count = 1)
+    // verify index files on disk
+    
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
 s"Offset index file should be present on disk at 
${remoteIndexCacheDir.toPath}")
+    
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
 s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
+    
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
 s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", 
"TRANSACTION"))
+  def testRSMReturnCorruptedIndexFile(testIndexType: IndexType): Unit = {
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        var txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        // Create corrupt index file return from RSM
+        if (testIndexType == IndexType.OFFSET) {
+          createCorruptOffsetIndexFile(tpDir)
+        }
+        else if (testIndexType == IndexType.TIMESTAMP) {
+          createCorruptTimeIndexOffsetFile(tpDir)
+        }
+        else if (testIndexType == IndexType.TRANSACTION) {
+          txnIdx = createCorruptTxnIndexForSegmentMetadata(tpDir, rlsMetadata)
+        }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to