This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c717e6e174f MINOR: Clean up OffsetCommit request/response validation
tests in KafkaApisTest (#22250)
c717e6e174f is described below
commit c717e6e174ff4f552548e7cf6ffd39daae232ee3
Author: David Jacot <[email protected]>
AuthorDate: Mon May 11 09:23:47 2026 +0200
MINOR: Clean up OffsetCommit request/response validation tests in
KafkaApisTest (#22250)
- Consolidate
`testHandleOffsetCommitRequestTopicsAndPartitionsValidation`
and
`testHandleOffsetCommitRequestTopicsAndPartitionsValidationWithTopicIds`
into a single `@ParameterizedTest @ApiKeyVersionsSource(apiKey =
ApiKeys.OFFSET_COMMIT)`
method that covers every API version, mirroring the shape of
`testHandleTxnOffsetCommitRequestTopicsAndPartitionsValidation`.
- Drop the unnecessary version gating on `topicId`/`name` in the
source request data of `testHandleOffsetCommitRequest` and
`testHandleOffsetCommitRequestFutureFailed`. Both fields can be set
unconditionally; the wire schema filters at serialization time and
the builder accepts requests that have both fields populated.
Reviewers: Sean Quah <[email protected]>
---
.../scala/unit/kafka/server/KafkaApisTest.scala | 194 +++------------------
1 file changed, 28 insertions(+), 166 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index bcb6edb92a5..3dda1e6e921 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1052,8 +1052,8 @@ class KafkaApisTest extends Logging {
.setMemberId("member")
.setTopics(util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
- .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
- .setName(if (version < 10) topicName else "")
+ .setTopicId(topicId)
+ .setName(topicName)
.setPartitions(util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
@@ -1113,8 +1113,8 @@ class KafkaApisTest extends Logging {
.setMemberId("member")
.setTopics(util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
- .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
- .setName(if (version < 10) topicName else "")
+ .setTopicId(topicId)
+ .setName(topicName)
.setPartitions(util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
@@ -1162,13 +1162,15 @@ class KafkaApisTest extends Logging {
assertEquals(expectedOffsetCommitResponse, response.data)
}
- @Test
- def
testHandleOffsetCommitRequestTopicsAndPartitionsValidationWithTopicIds(): Unit
= {
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(version:
Short): Unit = {
val fooId = Uuid.randomUuid()
val barId = Uuid.randomUuid()
val zarId = Uuid.randomUuid()
val fooName = "foo"
val barName = "bar"
+ val zarName = "zar"
addTopicToMetadataCache(fooName, topicId = fooId, numPartitions = 2)
addTopicToMetadataCache(barName, topicId = barId, numPartitions = 2)
@@ -1179,6 +1181,7 @@ class KafkaApisTest extends Logging {
// foo exists but only has 2 partitions.
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setTopicId(fooId)
+ .setName(fooName)
.setPartitions(util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
@@ -1192,6 +1195,7 @@ class KafkaApisTest extends Logging {
// bar exists.
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setTopicId(barId)
+ .setName(barName)
.setPartitions(util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
@@ -1202,6 +1206,7 @@ class KafkaApisTest extends Logging {
// zar does not exist.
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setTopicId(zarId)
+ .setName(zarName)
.setPartitions(util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
@@ -1210,7 +1215,9 @@ class KafkaApisTest extends Logging {
.setPartitionIndex(1)
.setCommittedOffset(70)))))
- val requestChannelRequest =
buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest).build())
+ val requestChannelRequest = buildRequest(
+
OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest).build(version)
+ )
// This is the request expected by the group coordinator.
val expectedOffsetCommitRequest = new OffsetCommitRequestData()
@@ -1255,159 +1262,8 @@ class KafkaApisTest extends Logging {
val offsetCommitResponse = new OffsetCommitResponseData()
.setTopics(util.List.of(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
- .setTopicId(fooId)
- .setName(fooName)
- .setPartitions(util.List.of(
- new OffsetCommitResponseData.OffsetCommitResponsePartition()
- .setPartitionIndex(0)
- .setErrorCode(Errors.NONE.code),
- new OffsetCommitResponseData.OffsetCommitResponsePartition()
- .setPartitionIndex(1)
- .setErrorCode(Errors.NONE.code))),
- new OffsetCommitResponseData.OffsetCommitResponseTopic()
- .setTopicId(barId)
- .setName(barName)
- .setPartitions(util.List.of(
- new OffsetCommitResponseData.OffsetCommitResponsePartition()
- .setPartitionIndex(0)
- .setErrorCode(Errors.NONE.code),
- new OffsetCommitResponseData.OffsetCommitResponsePartition()
- .setPartitionIndex(1)
- .setErrorCode(Errors.NONE.code)))))
-
- val expectedOffsetCommitResponse = new OffsetCommitResponseData()
- .setTopics(util.List.of(
- new OffsetCommitResponseData.OffsetCommitResponseTopic()
- .setTopicId(fooId)
- .setPartitions(util.List.of(
- // foo-2 is first because partitions failing the validation
- // are put in the response first.
- new OffsetCommitResponseData.OffsetCommitResponsePartition()
- .setPartitionIndex(2)
- .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
- new OffsetCommitResponseData.OffsetCommitResponsePartition()
- .setPartitionIndex(0)
- .setErrorCode(Errors.NONE.code),
- new OffsetCommitResponseData.OffsetCommitResponsePartition()
- .setPartitionIndex(1)
- .setErrorCode(Errors.NONE.code))),
- // zar is before bar because topics failing the validation are
- // put in the response first.
- new OffsetCommitResponseData.OffsetCommitResponseTopic()
- .setTopicId(zarId)
- .setPartitions(util.List.of(
- new OffsetCommitResponseData.OffsetCommitResponsePartition()
- .setPartitionIndex(0)
- .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code),
- new OffsetCommitResponseData.OffsetCommitResponsePartition()
- .setPartitionIndex(1)
- .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))),
- new OffsetCommitResponseData.OffsetCommitResponseTopic()
- .setTopicId(barId)
- .setPartitions(util.List.of(
- new OffsetCommitResponseData.OffsetCommitResponsePartition()
- .setPartitionIndex(0)
- .setErrorCode(Errors.NONE.code),
- new OffsetCommitResponseData.OffsetCommitResponsePartition()
- .setPartitionIndex(1)
- .setErrorCode(Errors.NONE.code)))))
-
- future.complete(offsetCommitResponse)
- val response =
verifyNoThrottling[OffsetCommitResponse](requestChannelRequest)
- assertEquals(expectedOffsetCommitResponse, response.data)
- }
-
- @Test
- def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = {
- val fooId = Uuid.randomUuid()
- val barId = Uuid.randomUuid()
- addTopicToMetadataCache("foo", numPartitions = 2, topicId = fooId)
- addTopicToMetadataCache("bar", numPartitions = 2, topicId = barId)
-
- val offsetCommitRequest = new OffsetCommitRequestData()
- .setGroupId("group")
- .setMemberId("member")
- .setTopics(util.List.of(
- // foo exists but only has 2 partitions.
- new OffsetCommitRequestData.OffsetCommitRequestTopic()
- .setName("foo")
- .setPartitions(util.List.of(
- new OffsetCommitRequestData.OffsetCommitRequestPartition()
- .setPartitionIndex(0)
- .setCommittedOffset(10),
- new OffsetCommitRequestData.OffsetCommitRequestPartition()
- .setPartitionIndex(1)
- .setCommittedOffset(20),
- new OffsetCommitRequestData.OffsetCommitRequestPartition()
- .setPartitionIndex(2)
- .setCommittedOffset(30))),
- // bar exists.
- new OffsetCommitRequestData.OffsetCommitRequestTopic()
- .setName("bar")
- .setPartitions(util.List.of(
- new OffsetCommitRequestData.OffsetCommitRequestPartition()
- .setPartitionIndex(0)
- .setCommittedOffset(40),
- new OffsetCommitRequestData.OffsetCommitRequestPartition()
- .setPartitionIndex(1)
- .setCommittedOffset(50))),
- // zar does not exist.
- new OffsetCommitRequestData.OffsetCommitRequestTopic()
- .setName("zar")
- .setPartitions(util.List.of(
- new OffsetCommitRequestData.OffsetCommitRequestPartition()
- .setPartitionIndex(0)
- .setCommittedOffset(60),
- new OffsetCommitRequestData.OffsetCommitRequestPartition()
- .setPartitionIndex(1)
- .setCommittedOffset(70)))))
-
- val requestChannelRequest =
buildRequest(OffsetCommitRequest.Builder.forTopicNames(offsetCommitRequest).build())
-
- // This is the request expected by the group coordinator.
- val expectedOffsetCommitRequest = new OffsetCommitRequestData()
- .setGroupId("group")
- .setMemberId("member")
- .setTopics(util.List.of(
- // foo exists but only has 2 partitions.
- new OffsetCommitRequestData.OffsetCommitRequestTopic()
- .setName("foo")
- .setTopicId(fooId)
- .setPartitions(util.List.of(
- new OffsetCommitRequestData.OffsetCommitRequestPartition()
- .setPartitionIndex(0)
- .setCommittedOffset(10),
- new OffsetCommitRequestData.OffsetCommitRequestPartition()
- .setPartitionIndex(1)
- .setCommittedOffset(20))),
- new OffsetCommitRequestData.OffsetCommitRequestTopic()
- .setName("bar")
- .setTopicId(barId)
- .setPartitions(util.List.of(
- new OffsetCommitRequestData.OffsetCommitRequestPartition()
- .setPartitionIndex(0)
- .setCommittedOffset(40),
- new OffsetCommitRequestData.OffsetCommitRequestPartition()
- .setPartitionIndex(1)
- .setCommittedOffset(50)))))
-
- val future = new CompletableFuture[OffsetCommitResponseData]()
- when(groupCoordinator.commitOffsets(
- requestChannelRequest.context,
- expectedOffsetCommitRequest,
- RequestLocal.noCaching.bufferSupplier
- )).thenReturn(future)
- kafkaApis = createKafkaApis()
- kafkaApis.handle(
- requestChannelRequest,
- RequestLocal.noCaching
- )
-
- // This is the response returned by the group coordinator.
- val offsetCommitResponse = new OffsetCommitResponseData()
- .setTopics(util.List.of(
- new OffsetCommitResponseData.OffsetCommitResponseTopic()
- .setName("foo")
+ .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID)
+ .setName(if (version < 10) fooName else "")
.setPartitions(util.List.of(
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)
@@ -1416,7 +1272,8 @@ class KafkaApisTest extends Logging {
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code))),
new OffsetCommitResponseData.OffsetCommitResponseTopic()
- .setName("bar")
+ .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
+ .setName(if (version < 10) barName else "")
.setPartitions(util.List.of(
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)
@@ -1425,10 +1282,13 @@ class KafkaApisTest extends Logging {
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code)))))
+ // For v10+, the unknown topic returns UNKNOWN_TOPIC_ID; for v0-9 it
returns
+ // UNKNOWN_TOPIC_OR_PARTITION.
val expectedOffsetCommitResponse = new OffsetCommitResponseData()
.setTopics(util.List.of(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
- .setName("foo")
+ .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID)
+ .setName(if (version < 10) fooName else "")
.setPartitions(util.List.of(
// foo-2 is first because partitions failing the validation
// are put in the response first.
@@ -1444,16 +1304,18 @@ class KafkaApisTest extends Logging {
// zar is before bar because topics failing the validation are
// put in the response first.
new OffsetCommitResponseData.OffsetCommitResponseTopic()
- .setName("zar")
+ .setTopicId(if (version >= 10) zarId else Uuid.ZERO_UUID)
+ .setName(if (version < 10) zarName else "")
.setPartitions(util.List.of(
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)
- .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
+ .setErrorCode(if (version >= 10) Errors.UNKNOWN_TOPIC_ID.code
else Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(1)
- .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))),
+ .setErrorCode(if (version >= 10) Errors.UNKNOWN_TOPIC_ID.code
else Errors.UNKNOWN_TOPIC_OR_PARTITION.code))),
new OffsetCommitResponseData.OffsetCommitResponseTopic()
- .setName("bar")
+ .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
+ .setName(if (version < 10) barName else "")
.setPartitions(util.List.of(
new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(0)