This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cfd18132e8c KAFKA-19328: SharePartitionManagerTest 
testMultipleConcurrentShareFetches doAnswer chaining needs verification (#19872)
cfd18132e8c is described below

commit cfd18132e8c4ba89dbf0f20963d4747dd80900e1
Author: Ji-Seung Ryu <[email protected]>
AuthorDate: Thu Jun 5 01:15:18 2025 +0900

    KAFKA-19328: SharePartitionManagerTest testMultipleConcurrentShareFetches 
doAnswer chaining needs verification (#19872)
    
    Hi, I've created pull request.
    
    jira: [19328](https://issues.apache.org/jira/browse/KAFKA-19328)
    
    problem:
    
    1. doAnswer chaining works as intended only when calls are made
    sequentially. In a multithreaded environment, its behavior is
    unpredictable.
    2. errors in a thread can be swallowed, not seen in main thread.
    3. 5 doAnswer chain is not enough for 100 threads. The last chain is
    returned for most cases.
    4. nextFetchOffset seems to be called before doAnswer chain, so the last
    values (25, 5,  26, 16) always was found in doAsnwer chain.
    
    solution:
    
    Delete doAnswer chain so that above four problems disappear.
    
    Reviewers: Abhinav Dixit <[email protected]>, Apoorv Mittal
     <[email protected]>, Andrew Schofield <[email protected]>
---
 .../server/share/SharePartitionManagerTest.java    | 143 ---------------------
 1 file changed, 143 deletions(-)

diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index b4925633de0..4368d9656d2 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -75,7 +75,6 @@ import 
org.apache.kafka.server.share.session.ShareSessionCache;
 import org.apache.kafka.server.share.session.ShareSessionKey;
 import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.server.storage.log.FetchParams;
-import org.apache.kafka.server.storage.log.FetchPartitionData;
 import org.apache.kafka.server.util.FutureUtils;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.server.util.timer.MockTimer;
@@ -110,9 +109,6 @@ import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import scala.Tuple2;
@@ -120,7 +116,6 @@ import scala.collection.Seq;
 import scala.jdk.javaapi.CollectionConverters;
 
 import static 
kafka.server.share.DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes;
-import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
 import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.validateRotatedListEquals;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -135,8 +130,6 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.atMost;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -1187,142 +1180,6 @@ public class SharePartitionManagerTest {
         );
     }
 
-    @Test
-    public void testMultipleConcurrentShareFetches() throws 
InterruptedException {
-        String groupId = "grp";
-        Uuid memberId1 = Uuid.randomUuid();
-        Uuid fooId = Uuid.randomUuid();
-        Uuid barId = Uuid.randomUuid();
-        TopicIdPartition tp0 = new TopicIdPartition(fooId, new 
TopicPartition("foo", 0));
-        TopicIdPartition tp1 = new TopicIdPartition(fooId, new 
TopicPartition("foo", 1));
-        TopicIdPartition tp2 = new TopicIdPartition(barId, new 
TopicPartition("bar", 0));
-        TopicIdPartition tp3 = new TopicIdPartition(barId, new 
TopicPartition("bar", 1));
-        List<TopicIdPartition> topicIdPartitions = List.of(tp0, tp1, tp2, tp3);
-
-        mockFetchOffsetForTimestamp(mockReplicaManager);
-
-        Timer mockTimer = systemTimerReaper();
-        DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
-            "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
-            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
-        mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
-        mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, 
tp0, 1);
-        mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, 
tp1, 1);
-        mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, 
tp2, 1);
-        mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, 
tp3, 1);
-
-        SharePartition sp0 = mock(SharePartition.class);
-        SharePartition sp1 = mock(SharePartition.class);
-        SharePartition sp2 = mock(SharePartition.class);
-        SharePartition sp3 = mock(SharePartition.class);
-
-        // Mock the share partitions corresponding to the topic partitions.
-        SharePartitionCache partitionCache = new SharePartitionCache();
-        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
-        partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
-        partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
-        partitionCache.put(new SharePartitionKey(groupId, tp3), sp3);
-        // Mock the share partitions to get initialized instantaneously 
without any error.
-        
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
-        
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
-        
when(sp2.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
-        
when(sp3.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
-        // Required mocks so that the share partitions can acquire record.
-        when(sp0.maybeAcquireFetchLock(any())).thenReturn(true);
-        when(sp1.maybeAcquireFetchLock(any())).thenReturn(true);
-        when(sp2.maybeAcquireFetchLock(any())).thenReturn(true);
-        when(sp3.maybeAcquireFetchLock(any())).thenReturn(true);
-        when(sp0.canAcquireRecords()).thenReturn(true);
-        when(sp1.canAcquireRecords()).thenReturn(true);
-        when(sp2.canAcquireRecords()).thenReturn(true);
-        when(sp3.canAcquireRecords()).thenReturn(true);
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        // Mocks to have fetch offset metadata match for share partitions to 
avoid any extra calls to replicaManager.readFromLog.
-        
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
-        
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
-        
when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
-        
when(sp3.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
-
-        // Mock nextFetchOffset() functionality for share partitions to 
reflect the moving fetch of share partitions.
-        when(sp0.nextFetchOffset()).thenReturn((long) 1, (long) 15, (long) 6, 
(long) 30, (long) 25);
-        when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 1, (long) 18, 
(long) 5);
-        when(sp2.nextFetchOffset()).thenReturn((long) 10, (long) 25, (long) 
26);
-        when(sp3.nextFetchOffset()).thenReturn((long) 20, (long) 15, (long) 
23, (long) 16);
-
-        sharePartitionManager = SharePartitionManagerBuilder.builder()
-            .withReplicaManager(mockReplicaManager)
-            .withTimer(mockTimer)
-            .withBrokerTopicStats(brokerTopicStats)
-            .withPartitionCache(partitionCache)
-            .build();
-
-        doAnswer(invocation -> {
-            assertEquals(1, sp0.nextFetchOffset());
-            assertEquals(4, sp1.nextFetchOffset());
-            assertEquals(10, sp2.nextFetchOffset());
-            assertEquals(20, sp3.nextFetchOffset());
-            return buildLogReadResult(topicIdPartitions);
-        }).doAnswer(invocation -> {
-            assertEquals(15, sp0.nextFetchOffset());
-            assertEquals(1, sp1.nextFetchOffset());
-            assertEquals(25, sp2.nextFetchOffset());
-            assertEquals(15, sp3.nextFetchOffset());
-            return buildLogReadResult(topicIdPartitions);
-        }).doAnswer(invocation -> {
-            assertEquals(6, sp0.nextFetchOffset());
-            assertEquals(18, sp1.nextFetchOffset());
-            assertEquals(26, sp2.nextFetchOffset());
-            assertEquals(23, sp3.nextFetchOffset());
-            return buildLogReadResult(topicIdPartitions);
-        }).doAnswer(invocation -> {
-            assertEquals(30, sp0.nextFetchOffset());
-            assertEquals(5, sp1.nextFetchOffset());
-            assertEquals(26, sp2.nextFetchOffset());
-            assertEquals(16, sp3.nextFetchOffset());
-            return buildLogReadResult(topicIdPartitions);
-        }).doAnswer(invocation -> {
-            assertEquals(25, sp0.nextFetchOffset());
-            assertEquals(5, sp1.nextFetchOffset());
-            assertEquals(26, sp2.nextFetchOffset());
-            assertEquals(16, sp3.nextFetchOffset());
-            return buildLogReadResult(topicIdPartitions);
-        }).when(mockReplicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
-
-        int threadCount = 100;
-        ExecutorService executorService = 
Executors.newFixedThreadPool(threadCount);
-
-        FetchParams fetchParams = new FetchParams(
-            FetchRequest.ORDINARY_CONSUMER_ID, -1, 200,
-            1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty(), 
true);
-
-        try {
-            for (int i = 0; i != threadCount; ++i) {
-                executorService.submit(() -> {
-                    sharePartitionManager.fetchMessages(groupId, 
memberId1.toString(), fetchParams, 0,
-                        MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
-                });
-                // We are blocking the main thread at an interval of 10 
threads so that the currently running executorService threads can complete.
-                if (i % 10 == 0)
-                    executorService.awaitTermination(50, 
TimeUnit.MILLISECONDS);
-            }
-        } finally {
-            if (!executorService.awaitTermination(50, TimeUnit.MILLISECONDS))
-                executorService.shutdown();
-        }
-        // We are checking the number of replicaManager readFromLog() calls
-        Mockito.verify(mockReplicaManager, atMost(100)).readFromLog(
-            any(), any(), any(ReplicaQuota.class), anyBoolean());
-        Mockito.verify(mockReplicaManager, atLeast(10)).readFromLog(
-            any(), any(), any(ReplicaQuota.class), anyBoolean());
-    }
-
     @Test
     public void testReplicaManagerFetchShouldNotProceed() {
         String groupId = "grp";

Reply via email to