This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7c77519f590 MINOR: changed the test 
testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers to remove 
ambiguity (#19997)
7c77519f590 is described below

commit 7c77519f590d40ca20bc047657190f83f06c86f8
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Sat Jun 21 13:06:31 2025 +0530

    MINOR: changed the test 
testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers to remove 
ambiguity (#19997)
    
    The test testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers
    was recently found to be flaky. Making the following small change that
    could potentially resolve the issue. Earlier, 1000 records were being
    produced and then 3 consecutive share fetch requests were being sent. At
    the end, assertions were done to make sure each share consumer receives
    some records, and that none of them consume the same record. Since the
    motive for the test is to see if multiple consumers can share the same
    subscription and not consume the same record, a better way would be to
    produce a record, consume that and repeat it 3 times with the 3
    consumers. This ensures that every consumer consume a record, and a
    previously consume record is not consumed again by the subsequent share
    fetches.
    
    Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../server/ShareFetchAcknowledgeRequestTest.scala  | 61 ++++++++++------------
 1 file changed, 28 insertions(+), 33 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index 44e57c5518a..7d571ae0336 100644
--- 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -1368,55 +1368,50 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     sendFirstShareFetchRequest(memberId1, groupId, send, socket1)
 
     initProducer()
-    // Producing 10000 records to the topic created above
-    produceData(topicIdPartition, 10000)
+    // Producing 1 record to the topic created above
+    produceData(topicIdPartition, 1)
 
-    // Sending 3 share Fetch Requests with same groupId to the same 
topicPartition but with different memberIds,
-    // mocking the behaviour of multiple share consumers from the same share 
group
+    // Sending a share Fetch Request
     val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1, 
ShareRequestMetadata.INITIAL_EPOCH)
     val acknowledgementsMap1: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     val shareFetchRequest1 = createShareFetchRequest(groupId, metadata1, send, 
Seq.empty, acknowledgementsMap1, minBytes = 100, maxBytes = 1500)
+    val shareFetchResponse1 = 
IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, 
socket1)
+    val shareFetchResponseData1 = shareFetchResponse1.data()
+    val partitionData1 = 
shareFetchResponseData1.responses().stream().findFirst().get().partitions().get(0)
+
+    // Producing 1 record to the topic created above
+    produceData(topicIdPartition, 1)
 
+    // Sending another share Fetch Request with same groupId to the same 
topicPartition but with different memberId,
+    // mocking the behaviour of multiple share consumers from the same share 
group
     val metadata2: ShareRequestMetadata = new ShareRequestMetadata(memberId2, 
ShareRequestMetadata.INITIAL_EPOCH)
     val acknowledgementsMap2: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     val shareFetchRequest2 = createShareFetchRequest(groupId, metadata2, send, 
Seq.empty, acknowledgementsMap2, minBytes = 100, maxBytes = 1500)
+    val shareFetchResponse2 = 
IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, 
socket2)
+    val shareFetchResponseData2 = shareFetchResponse2.data()
+    val partitionData2 = 
shareFetchResponseData2.responses().stream().findFirst().get().partitions().get(0)
+
+    // Producing 1 record to the topic created above
+    produceData(topicIdPartition, 1)
 
+    // Sending another share Fetch Request with same groupId to the same 
topicPartition but with different memberId,
+    // mocking the behaviour of multiple share consumers from the same share 
group
     val metadata3: ShareRequestMetadata = new ShareRequestMetadata(memberId3, 
ShareRequestMetadata.INITIAL_EPOCH)
     val acknowledgementsMap3: Map[TopicIdPartition, 
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
     val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, send, 
Seq.empty, acknowledgementsMap3, minBytes = 100, maxBytes = 1500)
-
-    val shareFetchResponse1 = 
IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, 
socket1)
-    val shareFetchResponse2 = 
IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, 
socket2)
     val shareFetchResponse3 = 
IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, 
socket3)
-
-
-    val shareFetchResponseData1 = shareFetchResponse1.data()
-    assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
-    assertEquals(1, shareFetchResponseData1.responses().size())
-    assertEquals(topicId, 
shareFetchResponseData1.responses().stream().findFirst().get().topicId())
-    assertEquals(1, 
shareFetchResponseData1.responses().stream().findFirst().get().partitions().size())
-
-    val partitionData1 = 
shareFetchResponseData1.responses().stream().findFirst().get().partitions().get(0)
-
-    val shareFetchResponseData2 = shareFetchResponse2.data()
-    assertEquals(Errors.NONE.code, shareFetchResponseData2.errorCode)
-    assertEquals(1, shareFetchResponseData2.responses().size())
-    assertEquals(topicId, 
shareFetchResponseData2.responses().stream().findFirst().get().topicId())
-    assertEquals(1, 
shareFetchResponseData2.responses().stream().findFirst().get().partitions().size())
-
-    val partitionData2 = 
shareFetchResponseData2.responses().stream().findFirst().get().partitions().get(0)
-
     val shareFetchResponseData3 = shareFetchResponse3.data()
-    assertEquals(Errors.NONE.code, shareFetchResponseData3.errorCode)
-    assertEquals(1, shareFetchResponseData3.responses().size())
-    assertEquals(topicId, 
shareFetchResponseData3.responses().stream().findFirst().get().topicId())
-    assertEquals(1, 
shareFetchResponseData3.responses().stream().findFirst().get().partitions().size())
-
     val partitionData3 = 
shareFetchResponseData3.responses().stream().findFirst().get().partitions().get(0)
 
-    // There should be no common records between the 3 consumers as they are 
part of the same group
-    assertTrue(partitionData1.acquiredRecords().get(0).lastOffset() < 
partitionData2.acquiredRecords().get(0).firstOffset())
-    assertTrue(partitionData2.acquiredRecords().get(0).lastOffset() < 
partitionData3.acquiredRecords().get(0).firstOffset())
+    // Each consumer should have received 1 record and any record should only 
be consumed by 1 consumer
+    assertEquals(partitionData1.acquiredRecords().get(0).firstOffset(), 
partitionData1.acquiredRecords().get(0).lastOffset())
+    assertEquals(partitionData1.acquiredRecords().get(0).firstOffset(), 0)
+
+    assertEquals(partitionData2.acquiredRecords().get(0).firstOffset(), 
partitionData2.acquiredRecords().get(0).lastOffset())
+    assertEquals(partitionData2.acquiredRecords().get(0).firstOffset(), 1)
+
+    assertEquals(partitionData3.acquiredRecords().get(0).firstOffset(), 
partitionData3.acquiredRecords().get(0).lastOffset())
+    assertEquals(partitionData3.acquiredRecords().get(0).firstOffset(), 2)
   }
 
   @ClusterTests(

Reply via email to