iit2009060 commented on code in PR #14482:
URL: https://github.com/apache/kafka/pull/14482#discussion_r1348387812


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -524,23 +527,239 @@ class RemoteIndexCacheTest {
     }
   }
 
-  @Test
-  def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
-    // create Corrupt Offset Index File
-    createCorruptRemoteIndexCacheOffsetFile()
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", 
"TRANSACTION"))
+  def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit 
= {
+    // create Corrupt  Index File in remote index cache
+    if (indexType == IndexType.OFFSET) {
+      createCorruptOffsetIndexFile(cache.cacheDir())
+    }
+    else if (indexType == IndexType.TIMESTAMP) {
+      createCorruptTimeIndexOffsetFile(cache.cacheDir())
+    }
+    else if (indexType == IndexType.TRANSACTION) {
+      createCorruptTxnIndexForSegmentMetadata(cache.cacheDir(), rlsMetadata)
+    }
     val entry = cache.getIndexEntry(rlsMetadata)
     // Test would fail if it throws corrupt Exception
-    val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
     val offsetIndexFile = entry.offsetIndex.file().toPath
+    val txnIndexFile = entry.txnIndex.file().toPath
+    val timeIndexFile = entry.timeIndex.file().toPath
+
+    val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
+    val expectedTimeIndexFileName: String = 
remoteTimeIndexFileName(rlsMetadata)
+    val expectedTxnIndexFileName: String = 
remoteTransactionIndexFileName(rlsMetadata)
 
     assertEquals(expectedOffsetIndexFileName, 
offsetIndexFile.getFileName.toString)
+    assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString)
+    assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)
+
     // assert that parent directory for the index files is correct
     assertEquals(RemoteIndexCache.DIR_NAME, 
offsetIndexFile.getParent.getFileName.toString,
-      s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
+      s"offsetIndex=entry.offsetIndex().file().toPath is created under 
incorrect parent")
+    assertEquals(RemoteIndexCache.DIR_NAME, 
txnIndexFile.getParent.getFileName.toString,
+      s"txnIndex=$txnIndexFile is created under incorrect parent")
+    assertEquals(RemoteIndexCache.DIR_NAME, 
timeIndexFile.getParent.getFileName.toString,
+      s"timeIndex=$timeIndexFile is created under incorrect parent")
+
     // file is corrupted it should fetch from remote storage again
     verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testMultipleIndexEntriesExecutionInCorruptException(): Unit = {
+    reset(rsm)
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        // Create corrupt index file which would be returned by rsm
+        createCorruptTimeIndexOffsetFile(tpDir)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not 
accessed.
+        }
+      })
+
+    assertThrows(classOf[CorruptIndexException], () => 
cache.getIndexEntry(rlsMetadata))
+    
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
+    verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
+    verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION))
+    // However cache fetch failed
+    // but it has already created offset and time index file in remote cache 
dir.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to