This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dd1f2b8aab5 KAFKA-18653: Fix mocks and potential thread leak issues
causing silent RejectedExecutionException in share group broker tests (#18725)
dd1f2b8aab5 is described below
commit dd1f2b8aab56393eaec5c0a1c76e3eb4a75ca792
Author: Abhinav Dixit <[email protected]>
AuthorDate: Wed Jan 29 21:54:30 2025 +0530
KAFKA-18653: Fix mocks and potential thread leak issues causing silent
RejectedExecutionException in share group broker tests (#18725)
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>
---
.../java/kafka/server/share/DelayedShareFetch.java | 9 ++-
.../java/kafka/server/share/SharePartition.java | 2 +-
.../kafka/server/share/DelayedShareFetchTest.java | 40 ++++++++++---
.../server/share/SharePartitionManagerTest.java | 36 ++++++-----
.../unit/kafka/server/ReplicaManagerTest.scala | 70 +++++++++++++++++++++-
5 files changed, 125 insertions(+), 32 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 1422e08524a..2298df9b85e 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -68,7 +68,14 @@ public class DelayedShareFetch extends DelayedOperation {
private LinkedHashMap<TopicIdPartition, Long> partitionsAcquired;
private LinkedHashMap<TopicIdPartition, LogReadResult>
partitionsAlreadyFetched;
- DelayedShareFetch(
+ /**
+ * This function constructs an instance of delayed share fetch operation
for completing share fetch requests instantaneously or with delay.
+ * @param shareFetch - The share fetch parameters of the share fetch
request.
+ * @param replicaManager - The replica manager instance used to read from
log/complete the request.
+ * @param exceptionHandler - The handler to complete share fetch requests
with exception.
+ * @param sharePartitions - The share partitions referenced in the share
fetch request.
+ */
+ public DelayedShareFetch(
ShareFetch shareFetch,
ReplicaManager replicaManager,
BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index a56c55b1b74..65dd7374720 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1107,7 +1107,7 @@ public class SharePartition {
*
* @return A boolean which indicates whether the fetch lock is acquired.
*/
- boolean maybeAcquireFetchLock() {
+ public boolean maybeAcquireFetchLock() {
if (stateNotActive()) {
return false;
}
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 6068fbb769d..363ae26b6c3 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -188,12 +188,14 @@ public class DelayedShareFetchTest {
doAnswer(invocation ->
buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
BiConsumer<SharePartitionKey, Throwable> exceptionHandler =
mockExceptionHandler();
+ PartitionMaxBytesStrategy partitionMaxBytesStrategy =
mockPartitionMaxBytes(Collections.singleton(tp0));
+
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.withReplicaManager(replicaManager)
.withExceptionHandler(exceptionHandler)
-
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+ .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
.build());
assertFalse(delayedShareFetch.isCompleted());
@@ -292,11 +294,14 @@ public class DelayedShareFetchTest {
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new
LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0,
1);
+
+ PartitionMaxBytesStrategy partitionMaxBytesStrategy =
mockPartitionMaxBytes(Collections.singleton(tp0));
+
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.withReplicaManager(replicaManager)
-
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+ .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
.build());
assertFalse(delayedShareFetch.isCompleted());
@@ -337,7 +342,6 @@ public class DelayedShareFetchTest {
.withShareFetchData(shareFetch)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions)
-
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
.build());
assertFalse(delayedShareFetch.isCompleted());
delayedShareFetch.forceComplete();
@@ -380,11 +384,14 @@ public class DelayedShareFetchTest {
when(sp0.acquire(anyString(), anyInt(), anyInt(),
any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
doAnswer(invocation ->
buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
+
+ PartitionMaxBytesStrategy partitionMaxBytesStrategy =
mockPartitionMaxBytes(Collections.singleton(tp0));
+
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions)
-
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+ .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
.build());
assertFalse(delayedShareFetch.isCompleted());
delayedShareFetch.forceComplete();
@@ -477,7 +484,7 @@ public class DelayedShareFetchTest {
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(replicaManager,
delayedShareFetchPurgatory);
List<DelayedOperationKey> delayedShareFetchWatchKeys = new
ArrayList<>();
@@ -505,6 +512,8 @@ public class DelayedShareFetchTest {
doAnswer(invocation ->
buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
+ PartitionMaxBytesStrategy partitionMaxBytesStrategy =
mockPartitionMaxBytes(Collections.singleton(tp1));
+
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions2 = new
LinkedHashMap<>();
sharePartitions2.put(tp0, sp0);
sharePartitions2.put(tp1, sp1);
@@ -514,7 +523,7 @@ public class DelayedShareFetchTest {
.withShareFetchData(shareFetch2)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions2)
-
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+ .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
.build());
// sp1 can be acquired now
@@ -559,11 +568,13 @@ public class DelayedShareFetchTest {
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
+ PartitionMaxBytesStrategy partitionMaxBytesStrategy =
mockPartitionMaxBytes(Collections.singleton(tp1));
+
DelayedShareFetch delayedShareFetch =
DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions)
-
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+ .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
.build();
LinkedHashMap<TopicIdPartition, Long> topicPartitionData = new
LinkedHashMap<>();
@@ -625,13 +636,15 @@ public class DelayedShareFetchTest {
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(partition);
when(partition.fetchOffsetSnapshot(any(), anyBoolean())).thenThrow(new
RuntimeException("Exception thrown"));
+ PartitionMaxBytesStrategy partitionMaxBytesStrategy =
mockPartitionMaxBytes(Collections.singleton(tp0));
+
BiConsumer<SharePartitionKey, Throwable> exceptionHandler =
mockExceptionHandler();
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.withReplicaManager(replicaManager)
.withExceptionHandler(exceptionHandler)
-
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+ .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
.build());
// Try complete should return false as the share partition has errored
out.
@@ -680,10 +693,13 @@ public class DelayedShareFetchTest {
new CompletableFuture<>(), orderedMap(PARTITION_MAX_BYTES, tp0),
BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
+ PartitionMaxBytesStrategy partitionMaxBytesStrategy =
mockPartitionMaxBytes(Collections.singleton(tp0));
+
DelayedShareFetch delayedShareFetch =
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions1)
.withReplicaManager(replicaManager)
+ .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
.build();
DelayedShareFetch spy = spy(delayedShareFetch);
@@ -1048,6 +1064,14 @@ public class DelayedShareFetchTest {
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn(partition);
}
+ private PartitionMaxBytesStrategy
mockPartitionMaxBytes(Set<TopicIdPartition> partitions) {
+ PartitionMaxBytesStrategy partitionMaxBytesStrategy =
mock(PartitionMaxBytesStrategy.class);
+ LinkedHashMap<TopicIdPartition, Integer> maxBytes = new
LinkedHashMap<>();
+ partitions.forEach(partition -> maxBytes.put(partition, 1));
+ when(partitionMaxBytesStrategy.maxBytes(anyInt(), any(),
anyInt())).thenReturn(maxBytes);
+ return partitionMaxBytesStrategy;
+ }
+
@SuppressWarnings("unchecked")
private static BiConsumer<SharePartitionKey, Throwable>
mockExceptionHandler() {
return mock(BiConsumer.class);
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 6786e192ea5..30600b681f0 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -1060,7 +1060,7 @@ public class SharePartitionManagerTest {
Metrics metrics = new Metrics();
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager,
tp0, 1);
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager,
tp1, 1);
@@ -1140,7 +1140,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager,
tp0, 1);
@@ -1238,7 +1238,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer,
mockReplicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
@@ -1276,7 +1276,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager,
tp0, 1);
@@ -1759,7 +1759,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer,
mockReplicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new
LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager,
tp1, 2);
@@ -1869,7 +1869,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer,
mockReplicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
// Initially you cannot acquire records for both all 3 share
partitions.
@@ -1972,7 +1972,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer,
mockReplicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new
LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager,
tp1, 1);
@@ -2009,13 +2009,11 @@ public class SharePartitionManagerTest {
// Since acquisition lock for sp1 and sp2 cannot be acquired, we
should have 2 watched keys.
assertEquals(2, delayedShareFetchPurgatory.watched());
- doAnswer(invocation ->
buildLogReadResult(partitionMaxBytes.keySet())).when(mockReplicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
-
- assertEquals(2, delayedShareFetchPurgatory.watched());
-
// The share session for this share group member returns tp1 and tp3,
tp1 is common in both the delayed fetch request and the share session.
when(sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId,
Uuid.fromString(memberId))).thenReturn(Arrays.asList(tp1, tp3));
+ doAnswer(invocation ->
buildLogReadResult(Set.of(tp1))).when(mockReplicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
+ when(sp1.acquire(anyString(), anyInt(), anyInt(),
any())).thenReturn(new ShareAcquiredRecords(Collections.emptyList(), 0));
// Release acquired records on session close request for tp1 and tp3.
sharePartitionManager.releaseSession(groupId, memberId);
@@ -2079,7 +2077,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer,
mockReplicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
// Initially you cannot acquire records for both all 3 share
partitions.
@@ -2151,7 +2149,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer,
mockReplicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
@@ -2186,7 +2184,7 @@ public class SharePartitionManagerTest {
@Flaky("KAFKA-18657")
@Test
- public void testDelayedInitializationShouldCompleteFetchRequest() throws
Exception {
+ public void testDelayedInitializationShouldCompleteFetchRequest() {
String groupId = "grp";
Uuid memberId = Uuid.randomUuid();
Uuid fooId = Uuid.randomUuid();
@@ -2207,7 +2205,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch> shareFetchPurgatorySpy =
spy(new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true));
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true));
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
shareFetchPurgatorySpy);
SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
@@ -2272,7 +2270,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer,
mockReplicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
@@ -2510,7 +2508,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(replicaManager,
delayedShareFetchPurgatory);
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new
LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1,
1);
@@ -2568,7 +2566,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
doThrow(new
RuntimeException("Exception")).when(mockReplicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
@@ -2628,7 +2626,7 @@ public class SharePartitionManagerTest {
DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
// Throw FencedStateEpochException from replica manager fetch which
should evict instance from the cache.
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 86dfba59c9d..12f4710fede 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -26,7 +26,7 @@ import
org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_S
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import kafka.server.epoch.util.MockBlockingSender
-import kafka.server.share.DelayedShareFetch
+import kafka.server.share.{DelayedShareFetch, SharePartition}
import kafka.utils.TestUtils.waitUntilTrue
import kafka.utils.{Pool, TestUtils}
import org.apache.kafka.clients.FetchSessionHandler
@@ -35,7 +35,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.InvalidPidMappingException
import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.message.DeleteRecordsResponseData
+import org.apache.kafka.common.message.{DeleteRecordsResponseData,
ShareFetchResponseData}
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.metadata.{PartitionChangeRecord,
PartitionRecord, RemoveTopicRecord, TopicRecord}
import org.apache.kafka.common.metrics.Metrics
@@ -61,6 +61,8 @@ import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
+import org.apache.kafka.server.share.SharePartitionKey
+import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey,
DelayedShareFetchKey, ShareFetch}
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime}
@@ -83,7 +85,8 @@ import java.net.InetAddress
import java.nio.file.{Files, Paths}
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
-import java.util.concurrent.{Callable, ConcurrentHashMap, CountDownLatch,
TimeUnit}
+import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap,
CountDownLatch, TimeUnit}
+import java.util.function.BiConsumer
import java.util.stream.IntStream
import java.util.{Collections, Optional, OptionalLong, Properties}
import scala.collection.{Map, Seq, mutable}
@@ -5977,6 +5980,67 @@ class ReplicaManagerTest {
)
}
+ @Test
+ def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+ val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new
File(_)))
+ val rm = new ReplicaManager(
+ metrics = metrics,
+ config = config,
+ time = time,
+ scheduler = new MockScheduler(time),
+ logManager = mockLogMgr,
+ quotaManagers = quotaManager,
+ metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () =>
KRaftVersion.KRAFT_VERSION_0),
+ logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+ alterPartitionManager = alterPartitionManager)
+
+ val groupId = "grp"
+ val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1",
0))
+ val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+ partitionMaxBytes.put(tp1, 1000)
+
+ val sp1 = mock(classOf[SharePartition])
+ val sharePartitions = new util.LinkedHashMap[TopicIdPartition,
SharePartition]
+ sharePartitions.put(tp1, sp1)
+
+ val future = new CompletableFuture[util.Map[TopicIdPartition,
ShareFetchResponseData.PartitionData]]
+ val shareFetch = new ShareFetch(
+ new FetchParams(ApiKeys.SHARE_FETCH.latestVersion,
FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024,
FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
+ groupId,
+ Uuid.randomUuid.toString,
+ future,
+ partitionMaxBytes,
+ 500,
+ 100,
+ brokerTopicStats)
+
+ val delayedShareFetch = spy(new DelayedShareFetch(
+ shareFetch,
+ rm,
+ mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
+ sharePartitions))
+
+ val delayedShareFetchWatchKeys : util.List[DelayedShareFetchKey] = new
util.ArrayList[DelayedShareFetchKey]
+ partitionMaxBytes.keySet.forEach((topicIdPartition: TopicIdPartition) =>
delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId,
topicIdPartition.topicId, topicIdPartition.partition)))
+
+ // You cannot acquire records for sp1, so request will be stored in
purgatory waiting for timeout.
+ when(sp1.maybeAcquireFetchLock).thenReturn(false)
+
+ rm.addDelayedShareFetchRequest(delayedShareFetch = delayedShareFetch,
delayedShareFetchKeys = delayedShareFetchWatchKeys)
+ verify(delayedShareFetch, times(0)).forceComplete()
+ assertEquals(1, rm.delayedShareFetchPurgatory.watched)
+
+ // Future is not complete initially.
+ assertFalse(future.isDone)
+ // Post timeout, share fetch request will timeout and the future should
complete. The timeout is set at 500ms but
+ // kept a buffer of additional 500ms so the task can always timeout.
+ waitUntilTrue(() => future.isDone, "Processing in delayed share fetch
purgatory never ended.", 1000)
+ verify(delayedShareFetch, times(1)).forceComplete()
+ assertFalse(future.isCompletedExceptionally)
+ // Since no partition could be acquired, the future should be empty.
+ assertEquals(0, future.join.size)
+ }
+
private def readFromLogWithOffsetOutOfRange(tp: TopicPartition):
Seq[(TopicIdPartition, LogReadResult)] = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true,
shouldMockLog = true)
try {