This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 77cc8ffbab4 KAFKA-17948: Potential issue during tryComplete and 
onComplete simultaneous calls to access global variables (#17739)
77cc8ffbab4 is described below

commit 77cc8ffbab40c81e7df62e77fcbd46dbad96aac9
Author: Abhinav Dixit <[email protected]>
AuthorDate: Fri Nov 15 23:56:54 2024 +0530

    KAFKA-17948: Potential issue during tryComplete and onComplete simultaneous 
calls to access global variables (#17739)
    
    Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal 
<[email protected]>, Jun Rao <[email protected]>
---
 .../java/kafka/server/share/DelayedShareFetch.java | 88 ++++++++++++----------
 .../kafka/server/share/DelayedShareFetchTest.java  | 36 +++++++--
 .../server/share/SharePartitionManagerTest.java    |  8 ++
 .../kafka/server/purgatory/DelayedOperation.java   |  4 +-
 4 files changed, 91 insertions(+), 45 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index a5cd79ee358..9f1ad3ef651 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -37,11 +37,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 
 import scala.Tuple2;
@@ -58,13 +58,12 @@ public class DelayedShareFetch extends DelayedOperation {
 
     private final ShareFetch shareFetch;
     private final ReplicaManager replicaManager;
-
-    private Map<TopicIdPartition, FetchRequest.PartitionData> 
partitionsAcquired;
-    private Map<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;
     private final SharePartitionManager sharePartitionManager;
     // The topic partitions that need to be completed for the share fetch 
request are given by sharePartitions.
     // sharePartitions is a subset of shareFetchData. The order of 
insertion/deletion of entries in sharePartitions is important.
     private final LinkedHashMap<TopicIdPartition, SharePartition> 
sharePartitions;
+    private LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
partitionsAcquired;
+    private LinkedHashMap<TopicIdPartition, LogReadResult> 
partitionsAlreadyFetched;
 
     DelayedShareFetch(
             ShareFetch shareFetch,
@@ -91,31 +90,39 @@ public class DelayedShareFetch extends DelayedOperation {
      */
     @Override
     public void onComplete() {
+        // We are utilizing lock so that onComplete doesn't do a dirty read 
for global variables -
+        // partitionsAcquired and partitionsAlreadyFetched, since these 
variables can get updated in a different tryComplete thread.
+        lock.lock();
         log.trace("Completing the delayed share fetch request for group {}, 
member {}, "
             + "topic partitions {}", shareFetch.groupId(), 
shareFetch.memberId(),
             partitionsAcquired.keySet());
 
-        if (shareFetch.isCompleted())
-            return;
+        try {
+            LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData;
+            // tryComplete did not invoke forceComplete, so we need to check 
if we have any partitions to fetch.
+            if (partitionsAcquired.isEmpty())
+                topicPartitionData = acquirablePartitions();
+            // tryComplete invoked forceComplete, so we can use the data from 
tryComplete.
+            else
+                topicPartitionData = partitionsAcquired;
 
-        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
-        // tryComplete did not invoke forceComplete, so we need to check if we 
have any partitions to fetch.
-        if (partitionsAcquired.isEmpty())
-            topicPartitionData = acquirablePartitions();
-        // tryComplete invoked forceComplete, so we can use the data from 
tryComplete.
-        else
-            topicPartitionData = partitionsAcquired;
+            if (topicPartitionData.isEmpty()) {
+                // No locks for share partitions could be acquired, so we 
complete the request with an empty response.
+                shareFetch.maybeComplete(Collections.emptyMap());
+                return;
+            }
+            log.trace("Fetchable share partitions data: {} with groupId: {} 
fetch params: {}",
+                topicPartitionData, shareFetch.groupId(), 
shareFetch.fetchParams());
 
-        if (topicPartitionData.isEmpty()) {
-            // No locks for share partitions could be acquired, so we complete 
the request with an empty response.
-            shareFetch.maybeComplete(Collections.emptyMap());
-            return;
+            completeShareFetchRequest(topicPartitionData);
+        } finally {
+            lock.unlock();
         }
-        log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}",
-            topicPartitionData, shareFetch.groupId(), 
shareFetch.fetchParams());
+    }
 
+    private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
         try {
-            Map<TopicIdPartition, LogReadResult> responseData;
+            LinkedHashMap<TopicIdPartition, LogReadResult> responseData;
             if (partitionsAlreadyFetched.isEmpty())
                 responseData = readFromLog(topicPartitionData);
             else
@@ -123,7 +130,7 @@ public class DelayedShareFetch extends DelayedOperation {
                 // updated in a different tryComplete thread.
                 responseData = combineLogReadResponse(topicPartitionData, 
partitionsAlreadyFetched);
 
-            Map<TopicIdPartition, FetchPartitionData> fetchPartitionsData = 
new LinkedHashMap<>();
+            LinkedHashMap<TopicIdPartition, FetchPartitionData> 
fetchPartitionsData = new LinkedHashMap<>();
             for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
responseData.entrySet())
                 fetchPartitionsData.put(entry.getKey(), 
entry.getValue().toFetchPartitionData(false));
 
@@ -150,14 +157,14 @@ public class DelayedShareFetch extends DelayedOperation {
      */
     @Override
     public boolean tryComplete() {
-        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
+        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData = acquirablePartitions();
 
         try {
             if (!topicPartitionData.isEmpty()) {
                 // In case, fetch offset metadata doesn't exist for one or 
more topic partitions, we do a
                 // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
                 // those topic partitions.
-                Map<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
+                LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
                 maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse);
                 if (anyPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData)) {
                     partitionsAcquired = topicPartitionData;
@@ -194,9 +201,9 @@ public class DelayedShareFetch extends DelayedOperation {
      * Prepare fetch request structure for partitions in the share fetch 
request for which we can acquire records.
      */
     // Visible for testing
-    Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
+    LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         // Initialize the topic partitions for which the fetch should be 
attempted.
-        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
new LinkedHashMap<>();
+        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData = new LinkedHashMap<>();
 
         sharePartitions.forEach((topicIdPartition, sharePartition) -> {
             int partitionMaxBytes = 
shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
@@ -231,8 +238,8 @@ public class DelayedShareFetch extends DelayedOperation {
         return topicPartitionData;
     }
 
-    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
-        Map<TopicIdPartition, FetchRequest.PartitionData> 
partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>();
+    private LinkedHashMap<TopicIdPartition, LogReadResult> 
maybeReadFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>();
         topicPartitionData.forEach((topicIdPartition, partitionData) -> {
             SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
             if (sharePartition.fetchOffsetMetadata().isEmpty()) {
@@ -240,14 +247,14 @@ public class DelayedShareFetch extends DelayedOperation {
             }
         });
         if (partitionsMissingFetchOffsetMetadata.isEmpty()) {
-            return Collections.emptyMap();
+            return new LinkedHashMap<>();
         }
         // We fetch data from replica manager corresponding to the topic 
partitions that have missing fetch offset metadata.
         return readFromLog(partitionsMissingFetchOffsetMetadata);
     }
 
     private void maybeUpdateFetchOffsetMetadata(
-        Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponseData) {
         for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponseData.entrySet()) {
             TopicIdPartition topicIdPartition = entry.getKey();
             SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
@@ -262,7 +269,7 @@ public class DelayedShareFetch extends DelayedOperation {
     }
 
     // minByes estimation currently assumes the common case where all fetched 
data is acquirable.
-    private boolean isMinBytesSatisfied(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+    private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
         long accumulatedSize = 0;
         for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : 
topicPartitionData.entrySet()) {
             TopicIdPartition topicIdPartition = entry.getKey();
@@ -324,11 +331,11 @@ public class DelayedShareFetch extends DelayedOperation {
 
     }
 
-    private Map<TopicIdPartition, LogReadResult> 
readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+    private LinkedHashMap<TopicIdPartition, LogReadResult> 
readFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
         // Filter if there already exists any erroneous topic partition.
         Set<TopicIdPartition> partitionsToFetch = 
shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet());
         if (partitionsToFetch.isEmpty()) {
-            return Collections.emptyMap();
+            return new LinkedHashMap<>();
         }
 
         Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
@@ -340,7 +347,7 @@ public class DelayedShareFetch extends DelayedOperation {
             QuotaFactory.UNBOUNDED_QUOTA,
             true);
 
-        Map<TopicIdPartition, LogReadResult> responseData = new HashMap<>();
+        LinkedHashMap<TopicIdPartition, LogReadResult> responseData = new 
LinkedHashMap<>();
         responseLogResult.foreach(tpLogResult -> {
             responseData.put(tpLogResult._1(), tpLogResult._2());
             return BoxedUnit.UNIT;
@@ -350,7 +357,7 @@ public class DelayedShareFetch extends DelayedOperation {
         return responseData;
     }
 
-    private boolean anyPartitionHasLogReadError(Map<TopicIdPartition, 
LogReadResult> replicaManagerReadResponse) {
+    private boolean 
anyPartitionHasLogReadError(LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse) {
         return replicaManagerReadResponse.values().stream()
             .anyMatch(logReadResult -> logReadResult.error().code() != 
Errors.NONE.code());
     }
@@ -379,9 +386,9 @@ public class DelayedShareFetch extends DelayedOperation {
     }
 
     // Visible for testing.
-    Map<TopicIdPartition, LogReadResult> 
combineLogReadResponse(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData,
-                                                                
Map<TopicIdPartition, LogReadResult> existingFetchedData) {
-        Map<TopicIdPartition, FetchRequest.PartitionData> 
missingLogReadTopicPartitions = new LinkedHashMap<>();
+    LinkedHashMap<TopicIdPartition, LogReadResult> 
combineLogReadResponse(LinkedHashMap<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData,
+                                                                
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
+        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
missingLogReadTopicPartitions = new LinkedHashMap<>();
         topicPartitionData.forEach((topicIdPartition, partitionData) -> {
             if (!existingFetchedData.containsKey(topicIdPartition)) {
                 missingLogReadTopicPartitions.put(topicIdPartition, 
partitionData);
@@ -390,7 +397,7 @@ public class DelayedShareFetch extends DelayedOperation {
         if (missingLogReadTopicPartitions.isEmpty()) {
             return existingFetchedData;
         }
-        Map<TopicIdPartition, LogReadResult> 
missingTopicPartitionsLogReadResponse = 
readFromLog(missingLogReadTopicPartitions);
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
missingTopicPartitionsLogReadResponse = 
readFromLog(missingLogReadTopicPartitions);
         missingTopicPartitionsLogReadResponse.putAll(existingFetchedData);
         return missingTopicPartitionsLogReadResponse;
     }
@@ -402,4 +409,9 @@ public class DelayedShareFetch extends DelayedOperation {
             sharePartition.releaseFetchLock();
         });
     }
+
+    // Visible for testing.
+    Lock lock() {
+        return lock;
+    }
 }
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java 
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index f1f708f9dd9..12b89cffd37 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -127,6 +127,8 @@ public class DelayedShareFetchTest {
         assertFalse(delayedShareFetch.tryComplete());
         assertFalse(delayedShareFetch.isCompleted());
         Mockito.verify(delayedShareFetch, 
times(0)).releasePartitionLocks(any());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
     }
 
     @Test
@@ -182,6 +184,8 @@ public class DelayedShareFetchTest {
         assertFalse(delayedShareFetch.tryComplete());
         assertFalse(delayedShareFetch.isCompleted());
         Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
     }
 
     @Test
@@ -233,6 +237,8 @@ public class DelayedShareFetchTest {
         assertFalse(delayedShareFetch.tryComplete());
         assertFalse(delayedShareFetch.isCompleted());
         Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
     }
 
     @Test
@@ -278,6 +284,8 @@ public class DelayedShareFetchTest {
         assertTrue(delayedShareFetch.tryComplete());
         assertTrue(delayedShareFetch.isCompleted());
         Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
     }
 
     @Test
@@ -321,6 +329,8 @@ public class DelayedShareFetchTest {
                 any(), any(), any(ReplicaQuota.class), anyBoolean());
         assertTrue(delayedShareFetch.isCompleted());
         Mockito.verify(delayedShareFetch, 
times(0)).releasePartitionLocks(any());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
     }
 
     @Test
@@ -368,6 +378,8 @@ public class DelayedShareFetchTest {
         assertTrue(delayedShareFetch.isCompleted());
         assertTrue(shareFetch.isCompleted());
         Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
     }
 
     @Test
@@ -404,6 +416,8 @@ public class DelayedShareFetchTest {
         // Verifying that the first forceComplete calls acquirablePartitions 
method in DelayedShareFetch.
         Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
         assertEquals(0, future.join().size());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
 
         // Force completing the share fetch request for the second time should 
hit the future completion check and not
         // proceed ahead in the function.
@@ -412,6 +426,8 @@ public class DelayedShareFetchTest {
         // Verifying that the second forceComplete does not call 
acquirablePartitions method in DelayedShareFetch.
         Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
         Mockito.verify(delayedShareFetch, 
times(0)).releasePartitionLocks(any());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
     }
 
     @Test
@@ -462,6 +478,8 @@ public class DelayedShareFetchTest {
 
         assertEquals(2, delayedShareFetchPurgatory.watched());
         assertFalse(shareFetch1.isCompleted());
+        assertTrue(delayedShareFetch1.lock().tryLock());
+        delayedShareFetch1.lock().unlock();
 
         Map<TopicIdPartition, Integer> partitionMaxBytes2 = new HashMap<>();
         partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES);
@@ -499,6 +517,8 @@ public class DelayedShareFetchTest {
         Mockito.verify(replicaManager, times(1)).addToActionQueue(any());
         Mockito.verify(replicaManager, times(0)).tryCompleteActions();
         Mockito.verify(delayedShareFetch2, 
times(1)).releasePartitionLocks(any());
+        assertTrue(delayedShareFetch2.lock().tryLock());
+        delayedShareFetch2.lock().unlock();
     }
 
     @Test
@@ -530,21 +550,21 @@ public class DelayedShareFetchTest {
             .withSharePartitions(sharePartitions)
             .build();
 
-        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
new HashMap<>();
+        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData = new LinkedHashMap<>();
         topicPartitionData.put(tp0, mock(FetchRequest.PartitionData.class));
         topicPartitionData.put(tp1, mock(FetchRequest.PartitionData.class));
 
         // Case 1 - logReadResponse contains tp0.
-        Map<TopicIdPartition, LogReadResult> logReadResponse = 
Collections.singletonMap(
-            tp0, mock(LogReadResult.class));
+        LinkedHashMap<TopicIdPartition, LogReadResult> logReadResponse = new 
LinkedHashMap<>();
+        logReadResponse.put(tp0, mock(LogReadResult.class));
 
         doAnswer(invocation -> 
buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
-        Map<TopicIdPartition, LogReadResult> combinedLogReadResponse = 
delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
+        LinkedHashMap<TopicIdPartition, LogReadResult> combinedLogReadResponse 
= delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
         assertEquals(topicPartitionData.keySet(), 
combinedLogReadResponse.keySet());
         assertEquals(combinedLogReadResponse.get(tp0), 
logReadResponse.get(tp0));
 
         // Case 2 - logReadResponse contains tp0 and tp1.
-        logReadResponse = new HashMap<>();
+        logReadResponse = new LinkedHashMap<>();
         logReadResponse.put(tp0, mock(LogReadResult.class));
         logReadResponse.put(tp1, mock(LogReadResult.class));
         combinedLogReadResponse = 
delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
@@ -613,6 +633,8 @@ public class DelayedShareFetchTest {
         Mockito.verify(replicaManager, times(1)).readFromLog(
             any(), any(), any(ReplicaQuota.class), anyBoolean());
         Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
     }
 
     @Test
@@ -645,6 +667,8 @@ public class DelayedShareFetchTest {
 
         assertFalse(spy.tryComplete());
         Mockito.verify(sp0, times(1)).releaseFetchLock();
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
     }
 
     @Test
@@ -669,6 +693,8 @@ public class DelayedShareFetchTest {
 
         assertFalse(delayedShareFetch.tryComplete());
         Mockito.verify(sp0, times(1)).releaseFetchLock();
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
     }
 
     static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager 
replicaManager, TopicIdPartition topicIdPartition, int minBytes) {
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index fbbccadff8b..06bb123f550 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -1727,6 +1727,8 @@ public class SharePartitionManagerTest {
 
         Mockito.verify(sp1, times(1)).nextFetchOffset();
         Mockito.verify(sp2, times(0)).nextFetchOffset();
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
     }
 
     @Test
@@ -1825,6 +1827,8 @@ public class SharePartitionManagerTest {
 
         Mockito.verify(sp1, times(0)).nextFetchOffset();
         Mockito.verify(sp2, times(0)).nextFetchOffset();
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
     }
 
     @Test
@@ -1923,6 +1927,8 @@ public class SharePartitionManagerTest {
 
         Mockito.verify(sp1, times(1)).nextFetchOffset();
         Mockito.verify(sp2, times(0)).nextFetchOffset();
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
     }
 
     @Test
@@ -2025,6 +2031,8 @@ public class SharePartitionManagerTest {
 
         Mockito.verify(sp1, times(0)).nextFetchOffset();
         Mockito.verify(sp2, times(0)).nextFetchOffset();
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
     }
 
     @Test
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
 
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
index 0ad638240c8..f3c818cb9c6 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
@@ -42,8 +42,8 @@ import java.util.concurrent.locks.ReentrantLock;
 public abstract class DelayedOperation extends TimerTask {
 
     private final AtomicBoolean completed = new AtomicBoolean(false);
-    // Visible for testing
-    final Lock lock;
+
+    protected final Lock lock;
 
     public DelayedOperation(long delayMs, Optional<Lock> lockOpt) {
         this(delayMs, lockOpt.orElse(new ReentrantLock()));

Reply via email to