iit2009060 commented on code in PR #14483:
URL: https://github.com/apache/kafka/pull/14483#discussion_r1359185319


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -592,16 +593,75 @@ class RemoteIndexCacheTest {
     verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testConcurrentRemoveReadForCache(): Unit = {
+    // Create a spy Cache Entry
+    val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset,
+      time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
+
+    val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+    val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+    val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+
+    val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, 
txIndex))
+    cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
+
+    assertCacheSize(1)
+
+    var entry: RemoteIndexCache.Entry = null
+
+    val latchForCacheRead = new CountDownLatch(1)
+    val latchForCacheRemove = new CountDownLatch(1)
+    val latchForTestWait = new CountDownLatch(1)
+
+    doAnswer((invocation: InvocationOnMock) => {
+      // Signal the CacheRead to unblock itself
+      latchForCacheRead.countDown()
+      // Wait for signal to start renaming the files
+      latchForCacheRemove.await()
+      // Calling the markForCleanup() actual method to start renaming the files
+      invocation.callRealMethod()

Review Comment:
   Why we are calling actual rename of the file in callRealMethod ? 
   Correct me if my understanding is wrong here ? 
   Two threads are defined 
   1. removalCache - executing remove function of the cache. 
   2. ReadCache - for reading data from the cache  when spy.markforEntryCleanUp 
is executed. 
   =====
   
   Operations
   1.  removeCache triggered 
   2. spyEntry.markForCleanup when executed ( the files are already renamed to 
.delete)
   3. readCache executed and finished because no lock is pending on the remove 
operation
   4. It creates new file in the entry again(fetched from remote storage, 
rather than existed in the cache)(We should validate the number of  calls to 
rsm here )
   5.After latchfoCacheRemove.await() ,why we are  explicitlly calling 
markCleanup again ? 
   



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -193,7 +192,16 @@ public File cacheDir() {
     public void remove(Uuid key) {
         lock.readLock().lock();
         try {
-            internalCache.invalidate(key);
+            internalCache.asMap().computeIfPresent(key, (k, v) -> {
+                try {
+                    v.markForCleanup();
+                    expiredIndexes.put(v);

Review Comment:
   @showuon  I was just thinking around it , if this fails , it will create a 
dump of files with delete suffix entry which never gets deleted from the disk ? 
Is the behaviour ok ?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -193,7 +192,16 @@ public File cacheDir() {
     public void remove(Uuid key) {
         lock.readLock().lock();
         try {
-            internalCache.invalidate(key);
+            internalCache.asMap().computeIfPresent(key, (k, v) -> {
+                try {
+                    v.markForCleanup();
+                    expiredIndexes.put(v);

Review Comment:
   @showuon @kamalcph  @jeel2420  I think we are entering into a deadlock state 
here 
   During remove we try to take a readLock  but in markForCleanUp  we try to 
take a write lock , Will it not result a deadlock state ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to