This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dd1f2b8aab5 KAFKA-18653: Fix mocks and potential thread leak issues 
causing silent RejectedExecutionException in share group broker tests (#18725)
dd1f2b8aab5 is described below

commit dd1f2b8aab56393eaec5c0a1c76e3eb4a75ca792
Author: Abhinav Dixit <[email protected]>
AuthorDate: Wed Jan 29 21:54:30 2025 +0530

    KAFKA-18653: Fix mocks and potential thread leak issues causing silent 
RejectedExecutionException in share group broker tests (#18725)
    
    Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield 
<[email protected]>
---
 .../java/kafka/server/share/DelayedShareFetch.java |  9 ++-
 .../java/kafka/server/share/SharePartition.java    |  2 +-
 .../kafka/server/share/DelayedShareFetchTest.java  | 40 ++++++++++---
 .../server/share/SharePartitionManagerTest.java    | 36 ++++++-----
 .../unit/kafka/server/ReplicaManagerTest.scala     | 70 +++++++++++++++++++++-
 5 files changed, 125 insertions(+), 32 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 1422e08524a..2298df9b85e 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -68,7 +68,14 @@ public class DelayedShareFetch extends DelayedOperation {
     private LinkedHashMap<TopicIdPartition, Long> partitionsAcquired;
     private LinkedHashMap<TopicIdPartition, LogReadResult> 
partitionsAlreadyFetched;
 
-    DelayedShareFetch(
+    /**
+     * This function constructs an instance of delayed share fetch operation 
for completing share fetch requests instantaneously or with delay.
+     * @param shareFetch - The share fetch parameters of the share fetch 
request.
+     * @param replicaManager - The replica manager instance used to read from 
log/complete the request.
+     * @param exceptionHandler - The handler to complete share fetch requests 
with exception.
+     * @param sharePartitions - The share partitions referenced in the share 
fetch request.
+     */
+    public DelayedShareFetch(
             ShareFetch shareFetch,
             ReplicaManager replicaManager,
             BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index a56c55b1b74..65dd7374720 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1107,7 +1107,7 @@ public class SharePartition {
      *
      * @return A boolean which indicates whether the fetch lock is acquired.
      */
-    boolean maybeAcquireFetchLock() {
+    public boolean maybeAcquireFetchLock() {
         if (stateNotActive()) {
             return false;
         }
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java 
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 6068fbb769d..363ae26b6c3 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -188,12 +188,14 @@ public class DelayedShareFetchTest {
         doAnswer(invocation -> 
buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
         BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
 
+        PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mockPartitionMaxBytes(Collections.singleton(tp0));
+
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions)
             .withReplicaManager(replicaManager)
             .withExceptionHandler(exceptionHandler)
-            
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+            .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
             .build());
         assertFalse(delayedShareFetch.isCompleted());
 
@@ -292,11 +294,14 @@ public class DelayedShareFetchTest {
 
         when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
         mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 
1);
+
+        PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mockPartitionMaxBytes(Collections.singleton(tp0));
+
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions)
             .withReplicaManager(replicaManager)
-            
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+            .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
             .build());
         assertFalse(delayedShareFetch.isCompleted());
 
@@ -337,7 +342,6 @@ public class DelayedShareFetchTest {
             .withShareFetchData(shareFetch)
             .withReplicaManager(replicaManager)
             .withSharePartitions(sharePartitions)
-            
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
             .build());
         assertFalse(delayedShareFetch.isCompleted());
         delayedShareFetch.forceComplete();
@@ -380,11 +384,14 @@ public class DelayedShareFetchTest {
         when(sp0.acquire(anyString(), anyInt(), anyInt(), 
any(FetchPartitionData.class))).thenReturn(
             ShareAcquiredRecords.fromAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
         doAnswer(invocation -> 
buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
+
+        PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mockPartitionMaxBytes(Collections.singleton(tp0));
+
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetch)
             .withReplicaManager(replicaManager)
             .withSharePartitions(sharePartitions)
-            
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+            .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
             .build());
         assertFalse(delayedShareFetch.isCompleted());
         delayedShareFetch.forceComplete();
@@ -477,7 +484,7 @@ public class DelayedShareFetchTest {
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
             "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
-            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
         mockReplicaManagerDelayedShareFetch(replicaManager, 
delayedShareFetchPurgatory);
 
         List<DelayedOperationKey> delayedShareFetchWatchKeys = new 
ArrayList<>();
@@ -505,6 +512,8 @@ public class DelayedShareFetchTest {
 
         doAnswer(invocation -> 
buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
 
+        PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mockPartitionMaxBytes(Collections.singleton(tp1));
+
         LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions2 = new 
LinkedHashMap<>();
         sharePartitions2.put(tp0, sp0);
         sharePartitions2.put(tp1, sp1);
@@ -514,7 +523,7 @@ public class DelayedShareFetchTest {
             .withShareFetchData(shareFetch2)
             .withReplicaManager(replicaManager)
             .withSharePartitions(sharePartitions2)
-            
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+            .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
             .build());
 
         // sp1 can be acquired now
@@ -559,11 +568,13 @@ public class DelayedShareFetchTest {
                 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
             future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS, 
BROKER_TOPIC_STATS);
 
+        PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mockPartitionMaxBytes(Collections.singleton(tp1));
+
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetch)
             .withReplicaManager(replicaManager)
             .withSharePartitions(sharePartitions)
-            
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+            .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
             .build();
 
         LinkedHashMap<TopicIdPartition, Long> topicPartitionData = new 
LinkedHashMap<>();
@@ -625,13 +636,15 @@ public class DelayedShareFetchTest {
         
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(partition);
         when(partition.fetchOffsetSnapshot(any(), anyBoolean())).thenThrow(new 
RuntimeException("Exception thrown"));
 
+        PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mockPartitionMaxBytes(Collections.singleton(tp0));
+
         BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions)
             .withReplicaManager(replicaManager)
             .withExceptionHandler(exceptionHandler)
-            
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+            .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
             .build());
 
         // Try complete should return false as the share partition has errored 
out.
@@ -680,10 +693,13 @@ public class DelayedShareFetchTest {
             new CompletableFuture<>(), orderedMap(PARTITION_MAX_BYTES, tp0), 
BATCH_SIZE, MAX_FETCH_RECORDS,
             BROKER_TOPIC_STATS);
 
+        PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mockPartitionMaxBytes(Collections.singleton(tp0));
+
         DelayedShareFetch delayedShareFetch = 
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions1)
             .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
             .build();
 
         DelayedShareFetch spy = spy(delayedShareFetch);
@@ -1048,6 +1064,14 @@ public class DelayedShareFetchTest {
         
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn(partition);
     }
 
+    private PartitionMaxBytesStrategy 
mockPartitionMaxBytes(Set<TopicIdPartition> partitions) {
+        PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mock(PartitionMaxBytesStrategy.class);
+        LinkedHashMap<TopicIdPartition, Integer> maxBytes = new 
LinkedHashMap<>();
+        partitions.forEach(partition -> maxBytes.put(partition, 1));
+        when(partitionMaxBytesStrategy.maxBytes(anyInt(), any(), 
anyInt())).thenReturn(maxBytes);
+        return partitionMaxBytesStrategy;
+    }
+
     @SuppressWarnings("unchecked")
     private static BiConsumer<SharePartitionKey, Throwable> 
mockExceptionHandler() {
         return mock(BiConsumer.class);
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 6786e192ea5..30600b681f0 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -1060,7 +1060,7 @@ public class SharePartitionManagerTest {
         Metrics metrics = new Metrics();
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
             "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
-            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
         mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, 
tp0, 1);
         mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, 
tp1, 1);
@@ -1140,7 +1140,7 @@ public class SharePartitionManagerTest {
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
             "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
-            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
         mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, 
tp0, 1);
@@ -1238,7 +1238,7 @@ public class SharePartitionManagerTest {
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
                 "TestShareFetch", mockTimer, 
mockReplicaManager.localBrokerId(),
-                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
 
         SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
@@ -1276,7 +1276,7 @@ public class SharePartitionManagerTest {
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
             "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
-            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
         mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, 
tp0, 1);
 
@@ -1759,7 +1759,7 @@ public class SharePartitionManagerTest {
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
                 "TestShareFetch", mockTimer, 
mockReplicaManager.localBrokerId(),
-                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
         when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
         mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, 
tp1, 2);
@@ -1869,7 +1869,7 @@ public class SharePartitionManagerTest {
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
                 "TestShareFetch", mockTimer, 
mockReplicaManager.localBrokerId(),
-                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
 
         // Initially you cannot acquire records for both all 3 share 
partitions.
@@ -1972,7 +1972,7 @@ public class SharePartitionManagerTest {
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
                 "TestShareFetch", mockTimer, 
mockReplicaManager.localBrokerId(),
-                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
         when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
         mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, 
tp1, 1);
@@ -2009,13 +2009,11 @@ public class SharePartitionManagerTest {
         // Since acquisition lock for sp1 and sp2 cannot be acquired, we 
should have 2 watched keys.
         assertEquals(2, delayedShareFetchPurgatory.watched());
 
-        doAnswer(invocation -> 
buildLogReadResult(partitionMaxBytes.keySet())).when(mockReplicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
-
-        assertEquals(2, delayedShareFetchPurgatory.watched());
-
         // The share session for this share group member returns tp1 and tp3, 
tp1 is common in both the delayed fetch request and the share session.
         
when(sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, 
Uuid.fromString(memberId))).thenReturn(Arrays.asList(tp1, tp3));
 
+        doAnswer(invocation -> 
buildLogReadResult(Set.of(tp1))).when(mockReplicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
+        when(sp1.acquire(anyString(), anyInt(), anyInt(), 
any())).thenReturn(new ShareAcquiredRecords(Collections.emptyList(), 0));
         // Release acquired records on session close request for tp1 and tp3.
         sharePartitionManager.releaseSession(groupId, memberId);
 
@@ -2079,7 +2077,7 @@ public class SharePartitionManagerTest {
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
                 "TestShareFetch", mockTimer, 
mockReplicaManager.localBrokerId(),
-                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
 
         // Initially you cannot acquire records for both all 3 share 
partitions.
@@ -2151,7 +2149,7 @@ public class SharePartitionManagerTest {
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
                 "TestShareFetch", mockTimer, 
mockReplicaManager.localBrokerId(),
-                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
 
         SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
@@ -2186,7 +2184,7 @@ public class SharePartitionManagerTest {
 
     @Flaky("KAFKA-18657")
     @Test
-    public void testDelayedInitializationShouldCompleteFetchRequest() throws 
Exception {
+    public void testDelayedInitializationShouldCompleteFetchRequest() {
         String groupId = "grp";
         Uuid memberId = Uuid.randomUuid();
         Uuid fooId = Uuid.randomUuid();
@@ -2207,7 +2205,7 @@ public class SharePartitionManagerTest {
 
         DelayedOperationPurgatory<DelayedShareFetch> shareFetchPurgatorySpy = 
spy(new DelayedOperationPurgatory<>(
             "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
-            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true));
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true));
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
shareFetchPurgatorySpy);
 
         SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
@@ -2272,7 +2270,7 @@ public class SharePartitionManagerTest {
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
                 "TestShareFetch", mockTimer, 
mockReplicaManager.localBrokerId(),
-                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
 
         SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
@@ -2510,7 +2508,7 @@ public class SharePartitionManagerTest {
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
             "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
-            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
         mockReplicaManagerDelayedShareFetch(replicaManager, 
delayedShareFetchPurgatory);
         when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
         mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1, 
1);
@@ -2568,7 +2566,7 @@ public class SharePartitionManagerTest {
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
             "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
-            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
 
         doThrow(new 
RuntimeException("Exception")).when(mockReplicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
@@ -2628,7 +2626,7 @@ public class SharePartitionManagerTest {
 
         DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
             "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
-            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
 
         // Throw FencedStateEpochException from replica manager fetch which 
should evict instance from the cache.
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 86dfba59c9d..12f4710fede 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -26,7 +26,7 @@ import 
org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_S
 import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
 import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
 import kafka.server.epoch.util.MockBlockingSender
-import kafka.server.share.DelayedShareFetch
+import kafka.server.share.{DelayedShareFetch, SharePartition}
 import kafka.utils.TestUtils.waitUntilTrue
 import kafka.utils.{Pool, TestUtils}
 import org.apache.kafka.clients.FetchSessionHandler
@@ -35,7 +35,7 @@ import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.errors.InvalidPidMappingException
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.message.DeleteRecordsResponseData
+import org.apache.kafka.common.message.{DeleteRecordsResponseData, 
ShareFetchResponseData}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
 import org.apache.kafka.common.metadata.{PartitionChangeRecord, 
PartitionRecord, RemoveTopicRecord, TopicRecord}
 import org.apache.kafka.common.metrics.Metrics
@@ -61,6 +61,8 @@ import org.apache.kafka.server.log.remote.storage._
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
+import org.apache.kafka.server.share.SharePartitionKey
+import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, 
DelayedShareFetchKey, ShareFetch}
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
 import org.apache.kafka.server.util.timer.MockTimer
 import org.apache.kafka.server.util.{MockScheduler, MockTime}
@@ -83,7 +85,8 @@ import java.net.InetAddress
 import java.nio.file.{Files, Paths}
 import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
-import java.util.concurrent.{Callable, ConcurrentHashMap, CountDownLatch, 
TimeUnit}
+import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, 
CountDownLatch, TimeUnit}
+import java.util.function.BiConsumer
 import java.util.stream.IntStream
 import java.util.{Collections, Optional, OptionalLong, Properties}
 import scala.collection.{Map, Seq, mutable}
@@ -5977,6 +5980,67 @@ class ReplicaManagerTest {
     )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+    val rm = new ReplicaManager(
+      metrics = metrics,
+      config = config,
+      time = time,
+      scheduler = new MockScheduler(time),
+      logManager = mockLogMgr,
+      quotaManagers = quotaManager,
+      metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+      alterPartitionManager = alterPartitionManager)
+
+    val groupId = "grp"
+    val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 
0))
+    val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+    partitionMaxBytes.put(tp1, 1000)
+
+    val sp1 = mock(classOf[SharePartition])
+    val sharePartitions = new util.LinkedHashMap[TopicIdPartition, 
SharePartition]
+    sharePartitions.put(tp1, sp1)
+
+    val future = new CompletableFuture[util.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]]
+    val shareFetch = new ShareFetch(
+      new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
+      groupId,
+      Uuid.randomUuid.toString,
+      future,
+      partitionMaxBytes,
+      500,
+      100,
+      brokerTopicStats)
+
+    val delayedShareFetch = spy(new DelayedShareFetch(
+      shareFetch,
+      rm,
+      mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
+      sharePartitions))
+
+    val delayedShareFetchWatchKeys : util.List[DelayedShareFetchKey] = new 
util.ArrayList[DelayedShareFetchKey]
+    partitionMaxBytes.keySet.forEach((topicIdPartition: TopicIdPartition) => 
delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, 
topicIdPartition.topicId, topicIdPartition.partition)))
+
+    // You cannot acquire records for sp1, so request will be stored in 
purgatory waiting for timeout.
+    when(sp1.maybeAcquireFetchLock).thenReturn(false)
+
+    rm.addDelayedShareFetchRequest(delayedShareFetch = delayedShareFetch, 
delayedShareFetchKeys = delayedShareFetchWatchKeys)
+    verify(delayedShareFetch, times(0)).forceComplete()
+    assertEquals(1, rm.delayedShareFetchPurgatory.watched)
+
+    // Future is not complete initially.
+    assertFalse(future.isDone)
+    // Post timeout, share fetch request will timeout and the future should 
complete. The timeout is set at 500ms but
+    // kept a buffer of additional 500ms so the task can always timeout.
+    waitUntilTrue(() => future.isDone, "Processing in delayed share fetch 
purgatory never ended.", 1000)
+    verify(delayedShareFetch, times(1)).forceComplete()
+    assertFalse(future.isCompletedExceptionally)
+    // Since no partition could be acquired, the future should be empty.
+    assertEquals(0, future.join.size)
+  }
+
   private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): 
Seq[(TopicIdPartition, LogReadResult)] = {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true)
     try {

Reply via email to