This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cfd18132e8c KAFKA-19328: SharePartitionManagerTest
testMultipleConcurrentShareFetches doAnswer chaining needs verification (#19872)
cfd18132e8c is described below
commit cfd18132e8c4ba89dbf0f20963d4747dd80900e1
Author: Ji-Seung Ryu <[email protected]>
AuthorDate: Thu Jun 5 01:15:18 2025 +0900
KAFKA-19328: SharePartitionManagerTest testMultipleConcurrentShareFetches
doAnswer chaining needs verification (#19872)
Hi, I've created pull request.
jira: [19328](https://issues.apache.org/jira/browse/KAFKA-19328)
problem:
1. doAnswer chaining works as intended only when calls are made
sequentially. In a multithreaded environment, its behavior is
unpredictable.
2. errors in a thread can be swallowed, not seen in main thread.
3. 5 doAnswer chain is not enough for 100 threads. The last chain is
returned for most cases.
4. nextFetchOffset seems to be called before doAnswer chain, so the last
values (25, 5, 26, 16) always was found in doAsnwer chain.
solution:
Delete doAnswer chain so that above four problems disappear.
Reviewers: Abhinav Dixit <[email protected]>, Apoorv Mittal
<[email protected]>, Andrew Schofield <[email protected]>
---
.../server/share/SharePartitionManagerTest.java | 143 ---------------------
1 file changed, 143 deletions(-)
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index b4925633de0..4368d9656d2 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -75,7 +75,6 @@ import
org.apache.kafka.server.share.session.ShareSessionCache;
import org.apache.kafka.server.share.session.ShareSessionKey;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
-import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.MockTimer;
@@ -110,9 +109,6 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import scala.Tuple2;
@@ -120,7 +116,6 @@ import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;
import static
kafka.server.share.DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes;
-import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.validateRotatedListEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -135,8 +130,6 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -1187,142 +1180,6 @@ public class SharePartitionManagerTest {
);
}
- @Test
- public void testMultipleConcurrentShareFetches() throws
InterruptedException {
- String groupId = "grp";
- Uuid memberId1 = Uuid.randomUuid();
- Uuid fooId = Uuid.randomUuid();
- Uuid barId = Uuid.randomUuid();
- TopicIdPartition tp0 = new TopicIdPartition(fooId, new
TopicPartition("foo", 0));
- TopicIdPartition tp1 = new TopicIdPartition(fooId, new
TopicPartition("foo", 1));
- TopicIdPartition tp2 = new TopicIdPartition(barId, new
TopicPartition("bar", 0));
- TopicIdPartition tp3 = new TopicIdPartition(barId, new
TopicPartition("bar", 1));
- List<TopicIdPartition> topicIdPartitions = List.of(tp0, tp1, tp2, tp3);
-
- mockFetchOffsetForTimestamp(mockReplicaManager);
-
- Timer mockTimer = systemTimerReaper();
- DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
- "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
- DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
- mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
- mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager,
tp0, 1);
- mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager,
tp1, 1);
- mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager,
tp2, 1);
- mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager,
tp3, 1);
-
- SharePartition sp0 = mock(SharePartition.class);
- SharePartition sp1 = mock(SharePartition.class);
- SharePartition sp2 = mock(SharePartition.class);
- SharePartition sp3 = mock(SharePartition.class);
-
- // Mock the share partitions corresponding to the topic partitions.
- SharePartitionCache partitionCache = new SharePartitionCache();
- partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
- partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
- partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
- partitionCache.put(new SharePartitionKey(groupId, tp3), sp3);
- // Mock the share partitions to get initialized instantaneously
without any error.
-
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
-
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
-
when(sp2.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
-
when(sp3.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
- // Required mocks so that the share partitions can acquire record.
- when(sp0.maybeAcquireFetchLock(any())).thenReturn(true);
- when(sp1.maybeAcquireFetchLock(any())).thenReturn(true);
- when(sp2.maybeAcquireFetchLock(any())).thenReturn(true);
- when(sp3.maybeAcquireFetchLock(any())).thenReturn(true);
- when(sp0.canAcquireRecords()).thenReturn(true);
- when(sp1.canAcquireRecords()).thenReturn(true);
- when(sp2.canAcquireRecords()).thenReturn(true);
- when(sp3.canAcquireRecords()).thenReturn(true);
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- // Mocks to have fetch offset metadata match for share partitions to
avoid any extra calls to replicaManager.readFromLog.
-
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
-
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
-
when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
-
when(sp3.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
-
- // Mock nextFetchOffset() functionality for share partitions to
reflect the moving fetch of share partitions.
- when(sp0.nextFetchOffset()).thenReturn((long) 1, (long) 15, (long) 6,
(long) 30, (long) 25);
- when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 1, (long) 18,
(long) 5);
- when(sp2.nextFetchOffset()).thenReturn((long) 10, (long) 25, (long)
26);
- when(sp3.nextFetchOffset()).thenReturn((long) 20, (long) 15, (long)
23, (long) 16);
-
- sharePartitionManager = SharePartitionManagerBuilder.builder()
- .withReplicaManager(mockReplicaManager)
- .withTimer(mockTimer)
- .withBrokerTopicStats(brokerTopicStats)
- .withPartitionCache(partitionCache)
- .build();
-
- doAnswer(invocation -> {
- assertEquals(1, sp0.nextFetchOffset());
- assertEquals(4, sp1.nextFetchOffset());
- assertEquals(10, sp2.nextFetchOffset());
- assertEquals(20, sp3.nextFetchOffset());
- return buildLogReadResult(topicIdPartitions);
- }).doAnswer(invocation -> {
- assertEquals(15, sp0.nextFetchOffset());
- assertEquals(1, sp1.nextFetchOffset());
- assertEquals(25, sp2.nextFetchOffset());
- assertEquals(15, sp3.nextFetchOffset());
- return buildLogReadResult(topicIdPartitions);
- }).doAnswer(invocation -> {
- assertEquals(6, sp0.nextFetchOffset());
- assertEquals(18, sp1.nextFetchOffset());
- assertEquals(26, sp2.nextFetchOffset());
- assertEquals(23, sp3.nextFetchOffset());
- return buildLogReadResult(topicIdPartitions);
- }).doAnswer(invocation -> {
- assertEquals(30, sp0.nextFetchOffset());
- assertEquals(5, sp1.nextFetchOffset());
- assertEquals(26, sp2.nextFetchOffset());
- assertEquals(16, sp3.nextFetchOffset());
- return buildLogReadResult(topicIdPartitions);
- }).doAnswer(invocation -> {
- assertEquals(25, sp0.nextFetchOffset());
- assertEquals(5, sp1.nextFetchOffset());
- assertEquals(26, sp2.nextFetchOffset());
- assertEquals(16, sp3.nextFetchOffset());
- return buildLogReadResult(topicIdPartitions);
- }).when(mockReplicaManager).readFromLog(any(), any(),
any(ReplicaQuota.class), anyBoolean());
-
- int threadCount = 100;
- ExecutorService executorService =
Executors.newFixedThreadPool(threadCount);
-
- FetchParams fetchParams = new FetchParams(
- FetchRequest.ORDINARY_CONSUMER_ID, -1, 200,
- 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty(),
true);
-
- try {
- for (int i = 0; i != threadCount; ++i) {
- executorService.submit(() -> {
- sharePartitionManager.fetchMessages(groupId,
memberId1.toString(), fetchParams, 0,
- MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
- });
- // We are blocking the main thread at an interval of 10
threads so that the currently running executorService threads can complete.
- if (i % 10 == 0)
- executorService.awaitTermination(50,
TimeUnit.MILLISECONDS);
- }
- } finally {
- if (!executorService.awaitTermination(50, TimeUnit.MILLISECONDS))
- executorService.shutdown();
- }
- // We are checking the number of replicaManager readFromLog() calls
- Mockito.verify(mockReplicaManager, atMost(100)).readFromLog(
- any(), any(), any(ReplicaQuota.class), anyBoolean());
- Mockito.verify(mockReplicaManager, atLeast(10)).readFromLog(
- any(), any(), any(ReplicaQuota.class), anyBoolean());
- }
-
@Test
public void testReplicaManagerFetchShouldNotProceed() {
String groupId = "grp";