This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7c77519f590 MINOR: changed the test
testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers to remove
ambiguity (#19997)
7c77519f590 is described below
commit 7c77519f590d40ca20bc047657190f83f06c86f8
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Sat Jun 21 13:06:31 2025 +0530
MINOR: changed the test
testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers to remove
ambiguity (#19997)
The test testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers
was recently found to be flaky. Making the following small change that
could potentially resolve the issue. Earlier, 1000 records were being
produced and then 3 consecutive share fetch requests were being sent. At
the end, assertions were done to make sure each share consumer receives
some records, and that none of them consume the same record. Since the
motive for the test is to see if multiple consumers can share the same
subscription and not consume the same record, a better way would be to
produce a record, consume that and repeat it 3 times with the 3
consumers. This ensures that every consumer consume a record, and a
previously consume record is not consumed again by the subsequent share
fetches.
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>
---
.../server/ShareFetchAcknowledgeRequestTest.scala | 61 ++++++++++------------
1 file changed, 28 insertions(+), 33 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index 44e57c5518a..7d571ae0336 100644
---
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -1368,55 +1368,50 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
sendFirstShareFetchRequest(memberId1, groupId, send, socket1)
initProducer()
- // Producing 10000 records to the topic created above
- produceData(topicIdPartition, 10000)
+ // Producing 1 record to the topic created above
+ produceData(topicIdPartition, 1)
- // Sending 3 share Fetch Requests with same groupId to the same
topicPartition but with different memberIds,
- // mocking the behaviour of multiple share consumers from the same share
group
+ // Sending a share Fetch Request
val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1,
ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap1: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest1 = createShareFetchRequest(groupId, metadata1, send,
Seq.empty, acknowledgementsMap1, minBytes = 100, maxBytes = 1500)
+ val shareFetchResponse1 =
IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1,
socket1)
+ val shareFetchResponseData1 = shareFetchResponse1.data()
+ val partitionData1 =
shareFetchResponseData1.responses().stream().findFirst().get().partitions().get(0)
+
+ // Producing 1 record to the topic created above
+ produceData(topicIdPartition, 1)
+ // Sending another share Fetch Request with same groupId to the same
topicPartition but with different memberId,
+ // mocking the behaviour of multiple share consumers from the same share
group
val metadata2: ShareRequestMetadata = new ShareRequestMetadata(memberId2,
ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap2: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest2 = createShareFetchRequest(groupId, metadata2, send,
Seq.empty, acknowledgementsMap2, minBytes = 100, maxBytes = 1500)
+ val shareFetchResponse2 =
IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2,
socket2)
+ val shareFetchResponseData2 = shareFetchResponse2.data()
+ val partitionData2 =
shareFetchResponseData2.responses().stream().findFirst().get().partitions().get(0)
+
+ // Producing 1 record to the topic created above
+ produceData(topicIdPartition, 1)
+ // Sending another share Fetch Request with same groupId to the same
topicPartition but with different memberId,
+ // mocking the behaviour of multiple share consumers from the same share
group
val metadata3: ShareRequestMetadata = new ShareRequestMetadata(memberId3,
ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap3: Map[TopicIdPartition,
util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, send,
Seq.empty, acknowledgementsMap3, minBytes = 100, maxBytes = 1500)
-
- val shareFetchResponse1 =
IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1,
socket1)
- val shareFetchResponse2 =
IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2,
socket2)
val shareFetchResponse3 =
IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3,
socket3)
-
-
- val shareFetchResponseData1 = shareFetchResponse1.data()
- assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
- assertEquals(1, shareFetchResponseData1.responses().size())
- assertEquals(topicId,
shareFetchResponseData1.responses().stream().findFirst().get().topicId())
- assertEquals(1,
shareFetchResponseData1.responses().stream().findFirst().get().partitions().size())
-
- val partitionData1 =
shareFetchResponseData1.responses().stream().findFirst().get().partitions().get(0)
-
- val shareFetchResponseData2 = shareFetchResponse2.data()
- assertEquals(Errors.NONE.code, shareFetchResponseData2.errorCode)
- assertEquals(1, shareFetchResponseData2.responses().size())
- assertEquals(topicId,
shareFetchResponseData2.responses().stream().findFirst().get().topicId())
- assertEquals(1,
shareFetchResponseData2.responses().stream().findFirst().get().partitions().size())
-
- val partitionData2 =
shareFetchResponseData2.responses().stream().findFirst().get().partitions().get(0)
-
val shareFetchResponseData3 = shareFetchResponse3.data()
- assertEquals(Errors.NONE.code, shareFetchResponseData3.errorCode)
- assertEquals(1, shareFetchResponseData3.responses().size())
- assertEquals(topicId,
shareFetchResponseData3.responses().stream().findFirst().get().topicId())
- assertEquals(1,
shareFetchResponseData3.responses().stream().findFirst().get().partitions().size())
-
val partitionData3 =
shareFetchResponseData3.responses().stream().findFirst().get().partitions().get(0)
- // There should be no common records between the 3 consumers as they are
part of the same group
- assertTrue(partitionData1.acquiredRecords().get(0).lastOffset() <
partitionData2.acquiredRecords().get(0).firstOffset())
- assertTrue(partitionData2.acquiredRecords().get(0).lastOffset() <
partitionData3.acquiredRecords().get(0).firstOffset())
+ // Each consumer should have received 1 record and any record should only
be consumed by 1 consumer
+ assertEquals(partitionData1.acquiredRecords().get(0).firstOffset(),
partitionData1.acquiredRecords().get(0).lastOffset())
+ assertEquals(partitionData1.acquiredRecords().get(0).firstOffset(), 0)
+
+ assertEquals(partitionData2.acquiredRecords().get(0).firstOffset(),
partitionData2.acquiredRecords().get(0).lastOffset())
+ assertEquals(partitionData2.acquiredRecords().get(0).firstOffset(), 1)
+
+ assertEquals(partitionData3.acquiredRecords().get(0).firstOffset(),
partitionData3.acquiredRecords().get(0).lastOffset())
+ assertEquals(partitionData3.acquiredRecords().get(0).firstOffset(), 2)
}
@ClusterTests(