This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 90c79f4e1f4 KAFKA-15169: Added TestCase in RemoteIndexCache (#14482)
90c79f4e1f4 is described below

commit 90c79f4e1f4730253fe180e674114af29bbb5cf7
Author: Arpit Goyal <[email protected]>
AuthorDate: Wed Oct 11 08:28:17 2023 +0530

    KAFKA-15169: Added TestCase in RemoteIndexCache (#14482)
    
    est Cases Covered
    
        1. Index Files already exist on disk but not in Cache i.e. 
RemoteIndexCache should not call remoteStorageManager to fetch it instead cache 
it from the local index file present.
        2. RSM returns CorruptedIndex File i.e. RemoteIndexCache should throw 
CorruptedIndexException instead of successfull execution.
        3. Deleted Suffix Indexes file already present on disk i.e. If cleaner 
thread is slow , then there is a chance of deleted index files present on the 
disk while in parallel same index Entry is invalidated. To understand more 
refer https://issues.apache.org/jira/browse/KAFKA-15169
    
    Reviewers: Divij Vaidya <[email protected]>, Luke Chen <[email protected]>, 
Kamal Chandraprakash<[email protected]>
---
 .../kafka/log/remote/RemoteIndexCacheTest.scala    | 258 ++++++++++++++++++++-
 .../storage/internals/log/RemoteIndexCache.java    |   7 +-
 2 files changed, 253 insertions(+), 12 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala 
b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 93e3f01511f..e82c35ffb97 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -17,23 +17,26 @@
 package kafka.log.remote
 
 import kafka.utils.TestUtils
+import kafka.utils.TestUtils.waitUntilTrue
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
 import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, 
RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
 import org.apache.kafka.server.util.MockTime
 import 
org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD,
 remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, 
remoteTimeIndexFileName, remoteTransactionIndexFile, 
remoteTransactionIndexFileName}
-import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, 
OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, 
CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, 
RemoteIndexCache, TimeIndex, TransactionIndex}
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito._
 import org.slf4j.{Logger, LoggerFactory}
 
 import java.io.{File, FileInputStream, IOException, PrintWriter}
-import java.nio.file.Files
+import java.nio.file.{Files, Paths}
 import java.util
 import java.util.Collections
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
@@ -524,23 +527,223 @@ class RemoteIndexCacheTest {
     }
   }
 
-  @Test
-  def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
-    // create Corrupt Offset Index File
-    createCorruptRemoteIndexCacheOffsetFile()
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", 
"TRANSACTION"))
+  def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit 
= {
+    // create Corrupted Index File in remote index cache
+    createCorruptedIndexFile(indexType, cache.cacheDir())
     val entry = cache.getIndexEntry(rlsMetadata)
-    // Test would fail if it throws corrupt Exception
-    val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
+    // Test would fail if it throws Exception other than CorruptIndexException
     val offsetIndexFile = entry.offsetIndex.file().toPath
+    val txnIndexFile = entry.txnIndex.file().toPath
+    val timeIndexFile = entry.timeIndex.file().toPath
+
+    val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
+    val expectedTimeIndexFileName: String = 
remoteTimeIndexFileName(rlsMetadata)
+    val expectedTxnIndexFileName: String = 
remoteTransactionIndexFileName(rlsMetadata)
 
     assertEquals(expectedOffsetIndexFileName, 
offsetIndexFile.getFileName.toString)
+    assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString)
+    assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)
+
     // assert that parent directory for the index files is correct
     assertEquals(RemoteIndexCache.DIR_NAME, 
offsetIndexFile.getParent.getFileName.toString,
-      s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
+      s"offsetIndex=$offsetIndexFile is created under incorrect parent")
+    assertEquals(RemoteIndexCache.DIR_NAME, 
txnIndexFile.getParent.getFileName.toString,
+      s"txnIndex=$txnIndexFile is created under incorrect parent")
+    assertEquals(RemoteIndexCache.DIR_NAME, 
timeIndexFile.getParent.getFileName.toString,
+      s"timeIndex=$timeIndexFile is created under incorrect parent")
+
     // file is corrupted it should fetch from remote storage again
     verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testMultipleIndexEntriesExecutionInCorruptException(): Unit = {
+    reset(rsm)
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        // Create corrupted index file
+        createCorruptTimeIndexOffsetFile(tpDir)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not 
accessed.
+        }
+      })
+
+    assertThrows(classOf[CorruptIndexException], () => 
cache.getIndexEntry(rlsMetadata))
+    
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
+    verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
+    verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION))
+    // Current status
+    // (cache is null)
+    // RemoteCacheDir contain
+    // 1. Offset Index File is fine and not corrupted
+    // 2. Time Index File is corrupted
+    // What should be the code flow in next execution
+    // 1. No rsm call for fetching OffSet Index File.
+    // 2. Time index file should be fetched from remote storage again as it is 
corrupted in the first execution.
+    // 3. Transaction index file should be fetched from remote storage.
+    reset(rsm)
+    // delete all files created in tpDir
+    Files.walk(tpDir.toPath, 1)
+      .filter(Files.isRegularFile(_))
+      .forEach(path => Files.deleteIfExists(path))
+    // rsm should return no corrupted file in the 2nd execution
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not 
accessed.
+        }
+      })
+    cache.getIndexEntry(rlsMetadata)
+    // rsm should not be called to fetch offset Index
+    verifyFetchIndexInvocation(0, Seq(IndexType.OFFSET))
+    verifyFetchIndexInvocation(1, Seq(IndexType.TIMESTAMP))
+    // Transaction index would be fetched again
+    // as previous getIndexEntry failed before fetchTransactionIndex
+    verifyFetchIndexInvocation(1, Seq(IndexType.TRANSACTION))
+  }
+
+  @Test
+  def testIndexFileAlreadyExistOnDiskButNotInCache(): Unit = {
+    val remoteIndexCacheDir = cache.cacheDir()
+    val tempSuffix = ".tmptest"
+
+    def getRemoteCacheIndexFileFromDisk(suffix: String) = {
+      Files.walk(remoteIndexCacheDir.toPath)
+        .filter(Files.isRegularFile(_))
+        .filter(path => path.getFileName.toString.endsWith(suffix))
+        .findAny()
+    }
+
+    def renameRemoteCacheIndexFileFromDisk(suffix: String) = {
+      Files.walk(remoteIndexCacheDir.toPath)
+        .filter(Files.isRegularFile(_))
+        .filter(path => path.getFileName.toString.endsWith(suffix))
+        .forEach(f => Utils.atomicMoveWithFallback(f, 
f.resolveSibling(f.getFileName().toString().stripSuffix(tempSuffix))))
+    }
+
+    val entry = cache.getIndexEntry(rlsMetadata)
+    verifyFetchIndexInvocation(count = 1)
+    // copy files with temporary name
+    Files.copy(entry.offsetIndex().file().toPath(), 
Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", 
tempSuffix)))
+    Files.copy(entry.txnIndex().file().toPath(), 
Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", 
tempSuffix)))
+    Files.copy(entry.timeIndex().file().toPath(), 
Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", 
tempSuffix)))
+
+    cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id())
+
+    // wait until entry is marked for deletion
+    TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
+      "Failed to mark cache entry for cleanup after invalidation")
+    TestUtils.waitUntilTrue(() => entry.isCleanStarted,
+      "Failed to cleanup cache entry after invalidation")
+
+    // restore index files
+    renameRemoteCacheIndexFileFromDisk(tempSuffix)
+    // validate cache entry for the above key should be null
+    
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
+    cache.getIndexEntry(rlsMetadata)
+    // Index  Files already exist ,rsm should not fetch them again.
+    verifyFetchIndexInvocation(count = 1)
+    // verify index files on disk
+    
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
 s"Offset index file should be present on disk at 
${remoteIndexCacheDir.toPath}")
+    
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
 s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
+    
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
 s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", 
"TRANSACTION"))
+  def testRSMReturnCorruptedIndexFile(testIndexType: IndexType): Unit = {
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        // Create corrupt index file return from RSM
+        createCorruptedIndexFile(testIndexType, tpDir)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not 
accessed.
+        }
+      })
+    assertThrows(classOf[CorruptIndexException], () => 
cache.getIndexEntry(rlsMetadata))
+  }
+
+  @Test
+  def testConcurrentCacheDeletedFileExists(): Unit = {
+    val remoteIndexCacheDir = cache.cacheDir()
+
+    def getRemoteCacheIndexFileFromDisk(suffix: String) = {
+      Files.walk(remoteIndexCacheDir.toPath)
+        .filter(Files.isRegularFile(_))
+        .filter(path => path.getFileName.toString.endsWith(suffix))
+        .findAny()
+    }
+
+    val entry = cache.getIndexEntry(rlsMetadata)
+    // verify index files on disk
+    
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
 s"Offset index file should be present on disk at 
${remoteIndexCacheDir.toPath}")
+    
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
 s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
+    
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
 s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")
+
+    // Simulating a concurrency issue where deleted files already exist on disk
+    // This happen when cleanerThread is slow and not able to delete index 
entries
+    // while same index Entry is cached again and invalidated.
+    // The new deleted file created should be replaced by existing deleted 
file.
+
+    // create deleted suffix file
+    Files.copy(entry.offsetIndex().file().toPath(), 
Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", 
LogFileUtils.DELETED_FILE_SUFFIX)))
+    Files.copy(entry.txnIndex().file().toPath(), 
Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", 
LogFileUtils.DELETED_FILE_SUFFIX)))
+    Files.copy(entry.timeIndex().file().toPath(), 
Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", 
LogFileUtils.DELETED_FILE_SUFFIX)))
+
+    // verify deleted file exists on disk
+    
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
 s"Deleted Offset index file should be present on disk at 
${remoteIndexCacheDir.toPath}")
+
+    cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id())
+
+    // wait until entry is marked for deletion
+    TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
+      "Failed to mark cache entry for cleanup after invalidation")
+    TestUtils.waitUntilTrue(() => entry.isCleanStarted,
+      "Failed to cleanup cache entry after invalidation")
+
+    // verify no index files on disk
+    waitUntilTrue(() => 
!getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+      s"Offset index file should not be present on disk at 
${remoteIndexCacheDir.toPath}")
+    waitUntilTrue(() => 
!getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+      s"Txn index file should not be present on disk at 
${remoteIndexCacheDir.toPath}")
+    waitUntilTrue(() => 
!getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+      s"Time index file should not be present on disk at 
${remoteIndexCacheDir.toPath}")
+    waitUntilTrue(() => 
!getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+      s"Index file marked for deletion should not be present on disk at 
${remoteIndexCacheDir.toPath}")
+  }
+
   private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
                                     = 
RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
     val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, 
baseOffset, lastOffset,
@@ -581,6 +784,22 @@ class RemoteIndexCacheTest {
     new TransactionIndex(metadata.startOffset(), txnIdxFile)
   }
 
+  private def createCorruptTxnIndexForSegmentMetadata(dir: File, metadata: 
RemoteLogSegmentMetadata): TransactionIndex = {
+    val txnIdxFile = remoteTransactionIndexFile(dir, metadata)
+    txnIdxFile.createNewFile()
+    val txnIndex = new TransactionIndex(metadata.startOffset(), txnIdxFile)
+    val abortedTxns = List(
+      new AbortedTxn(0L, 0, 10, 11),
+      new AbortedTxn(1L, 5, 15, 13),
+      new AbortedTxn(2L, 18, 35, 25),
+      new AbortedTxn(3L, 32, 50, 40))
+    abortedTxns.foreach(txnIndex.append)
+    txnIndex.close()
+
+    // open the index with a different starting offset to fake invalid data
+    return new TransactionIndex(100L, txnIdxFile)
+  }
+
   private def createTimeIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata): TimeIndex = {
     val maxEntries = (metadata.endOffset() - 
metadata.startOffset()).asInstanceOf[Int]
     new TimeIndex(remoteTimeIndexFile(tpDir, metadata), 
metadata.startOffset(), maxEntries * 12)
@@ -616,12 +835,29 @@ class RemoteIndexCacheTest {
     }
   }
 
-  private def createCorruptRemoteIndexCacheOffsetFile(): Unit = {
-    val pw =  new PrintWriter(remoteOffsetIndexFile(new File(tpDir, 
RemoteIndexCache.DIR_NAME), rlsMetadata))
+  private def createCorruptOffsetIndexFile(dir: File): Unit = {
+    val pw = new PrintWriter(remoteOffsetIndexFile(dir, rlsMetadata))
     pw.write("Hello, world")
     // The size of the string written in the file is 12 bytes,
     // but it should be multiple of Offset Index EntrySIZE which is equal to 8.
     pw.close()
   }
 
+  private def createCorruptTimeIndexOffsetFile(dir: File): Unit = {
+    val pw = new PrintWriter(remoteTimeIndexFile(dir, rlsMetadata))
+    pw.write("Hello, world1")
+    // The size of the string written in the file is 13 bytes,
+    // but it should be multiple of Time Index EntrySIZE which is equal to 12.
+    pw.close()
+  }
+
+  private def createCorruptedIndexFile(indexType: IndexType, dir: File): Unit 
= {
+    if (indexType == IndexType.OFFSET) {
+      createCorruptOffsetIndexFile(dir)
+    } else if (indexType == IndexType.TIMESTAMP) {
+      createCorruptTimeIndexOffsetFile(dir)
+    } else if (indexType == IndexType.TRANSACTION) {
+      createCorruptTxnIndexForSegmentMetadata(dir, rlsMetadata)
+    }
+  }
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
index 2bbe9d76ecf..5cdaf77c294 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -166,6 +166,11 @@ public class RemoteIndexCache implements Closeable {
         return internalCache;
     }
 
+    // Visible for testing
+    public File cacheDir() {
+        return cacheDir;
+    }
+
     public void remove(Uuid key) {
         lock.readLock().lock();
         try {
@@ -674,4 +679,4 @@ public class RemoteIndexCache implements Closeable {
         return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + 
LogFileUtils.TXN_INDEX_FILE_SUFFIX;
     }
 
-}
\ No newline at end of file
+}

Reply via email to