iit2009060 commented on code in PR #14482:
URL: https://github.com/apache/kafka/pull/14482#discussion_r1347382635


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -524,23 +527,241 @@ class RemoteIndexCacheTest {
     }
   }
 
-  @Test
-  def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
-    // create Corrupt Offset Index File
-    createCorruptRemoteIndexCacheOffsetFile()
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", 
"TRANSACTION"))
+  def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit 
= {
+    // create Corrupt  Index File in remote index cache
+    if (indexType == IndexType.OFFSET) {
+      createCorruptOffsetIndexFile(cache.cacheDir())
+    }
+    else if (indexType == IndexType.TIMESTAMP) {
+      createCorruptTimeIndexOffsetFile(cache.cacheDir())
+    }
+    else if (indexType == IndexType.TRANSACTION) {
+      createCorruptTxnIndexForSegmentMetadata(cache.cacheDir(), rlsMetadata)
+    }
     val entry = cache.getIndexEntry(rlsMetadata)
     // Test would fail if it throws corrupt Exception
-    val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
     val offsetIndexFile = entry.offsetIndex.file().toPath
+    val txnIndexFile = entry.txnIndex.file().toPath
+    val timeIndexFile = entry.timeIndex.file().toPath
+
+    val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
+    val expectedTimeIndexFileName: String = 
remoteTimeIndexFileName(rlsMetadata)
+    val expectedTxnIndexFileName: String = 
remoteTransactionIndexFileName(rlsMetadata)
 
     assertEquals(expectedOffsetIndexFileName, 
offsetIndexFile.getFileName.toString)
+    assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString)
+    assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)
+
     // assert that parent directory for the index files is correct
     assertEquals(RemoteIndexCache.DIR_NAME, 
offsetIndexFile.getParent.getFileName.toString,
-      s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
+      s"offsetIndex=entry.offsetIndex().file().toPath is created under 
incorrect parent")
+    assertEquals(RemoteIndexCache.DIR_NAME, 
txnIndexFile.getParent.getFileName.toString,
+      s"txnIndex=$txnIndexFile is created under incorrect parent")
+    assertEquals(RemoteIndexCache.DIR_NAME, 
timeIndexFile.getParent.getFileName.toString,
+      s"timeIndex=$timeIndexFile is created under incorrect parent")
+
     // file is corrupted it should fetch from remote storage again
     verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testMultipleIndexEntriesExecutionInCorruptException(): Unit = {
+    reset(rsm)
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        // Create corrupt index file which would be returned by rsm
+        createCorruptTimeIndexOffsetFile(tpDir)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not 
accessed.
+        }
+      })
+
+    assertThrows(classOf[CorruptIndexException], () => 
cache.getIndexEntry(rlsMetadata))
+    
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
+    verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
+    verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION))
+    // However cache fetch failed
+    // but it has  already created offset and time index file in remote cache 
dir.
+    // Current status
+    // ( cache  is null)
+    // RemoteCacheDir contain
+    // 1. Offset Index File which is fine and not corrupted
+    // 2. Time Index File which is corrupted
+    // What should be the code flow in next execution
+    // 1. No rsm call for fetching OffSet Index File.
+    // 2. Time index file should be fetched from remote storage again as it is 
corrupted in the first execution.

Review Comment:
   done



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -524,23 +527,241 @@ class RemoteIndexCacheTest {
     }
   }
 
-  @Test
-  def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
-    // create Corrupt Offset Index File
-    createCorruptRemoteIndexCacheOffsetFile()
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", 
"TRANSACTION"))
+  def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit 
= {
+    // create Corrupt  Index File in remote index cache
+    if (indexType == IndexType.OFFSET) {
+      createCorruptOffsetIndexFile(cache.cacheDir())
+    }
+    else if (indexType == IndexType.TIMESTAMP) {
+      createCorruptTimeIndexOffsetFile(cache.cacheDir())
+    }
+    else if (indexType == IndexType.TRANSACTION) {
+      createCorruptTxnIndexForSegmentMetadata(cache.cacheDir(), rlsMetadata)
+    }
     val entry = cache.getIndexEntry(rlsMetadata)
     // Test would fail if it throws corrupt Exception
-    val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
     val offsetIndexFile = entry.offsetIndex.file().toPath
+    val txnIndexFile = entry.txnIndex.file().toPath
+    val timeIndexFile = entry.timeIndex.file().toPath
+
+    val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
+    val expectedTimeIndexFileName: String = 
remoteTimeIndexFileName(rlsMetadata)
+    val expectedTxnIndexFileName: String = 
remoteTransactionIndexFileName(rlsMetadata)
 
     assertEquals(expectedOffsetIndexFileName, 
offsetIndexFile.getFileName.toString)
+    assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString)
+    assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)
+
     // assert that parent directory for the index files is correct
     assertEquals(RemoteIndexCache.DIR_NAME, 
offsetIndexFile.getParent.getFileName.toString,
-      s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
+      s"offsetIndex=entry.offsetIndex().file().toPath is created under 
incorrect parent")
+    assertEquals(RemoteIndexCache.DIR_NAME, 
txnIndexFile.getParent.getFileName.toString,
+      s"txnIndex=$txnIndexFile is created under incorrect parent")
+    assertEquals(RemoteIndexCache.DIR_NAME, 
timeIndexFile.getParent.getFileName.toString,
+      s"timeIndex=$timeIndexFile is created under incorrect parent")
+
     // file is corrupted it should fetch from remote storage again
     verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testMultipleIndexEntriesExecutionInCorruptException(): Unit = {
+    reset(rsm)
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        // Create corrupt index file which would be returned by rsm
+        createCorruptTimeIndexOffsetFile(tpDir)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not 
accessed.
+        }
+      })
+
+    assertThrows(classOf[CorruptIndexException], () => 
cache.getIndexEntry(rlsMetadata))
+    
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
+    verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
+    verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION))
+    // However cache fetch failed
+    // but it has  already created offset and time index file in remote cache 
dir.
+    // Current status
+    // ( cache  is null)
+    // RemoteCacheDir contain
+    // 1. Offset Index File which is fine and not corrupted
+    // 2. Time Index File which is corrupted

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to