This is an automated email from the ASF dual-hosted git repository.
squah-confluent pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 38227ab7682 KAFKA-20551: Remove unnecessary generics from
TxnOffsetCommitResponse, OffsetCommitResponse, and OffsetDeleteResponse (#22210)
38227ab7682 is described below
commit 38227ab768282c7ed3f3986e5e2acf46219d1642
Author: Jiayao Sun <[email protected]>
AuthorDate: Wed May 6 18:35:14 2026 +1200
KAFKA-20551: Remove unnecessary generics from TxnOffsetCommitResponse,
OffsetCommitResponse, and OffsetDeleteResponse (#22210)
Simplify the `addPartitions` builder methods by replacing the generic
type parameter `<P>` and the `Function` mapper with explicit request
partition classes (e.g., `OffsetCommitRequestPartition`). This removes
unnecessary abstraction that was only useful in test cases, and
streamlines the API usages across both production and test code.
Reviewers: Ken Huang <[email protected]>, Sean Quah
<[email protected]>
---
.../kafka/common/requests/OffsetCommitResponse.java | 7 ++++---
.../kafka/common/requests/OffsetDeleteResponse.java | 7 ++++---
.../common/requests/TxnOffsetCommitResponse.java | 7 ++++---
.../requests/TxnOffsetCommitResponseTest.java | 3 ++-
core/src/main/scala/kafka/server/KafkaApis.scala | 21 +++++++--------------
5 files changed, 21 insertions(+), 24 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 521ffa1c2fd..483640b95c4 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
@@ -165,11 +166,11 @@ public class OffsetCommitResponse extends
AbstractResponse {
return this;
}
- public <P> Builder addPartitions(
+ public Builder addPartitions(
Uuid topicId,
String topicName,
- List<P> partitions,
- Function<P, Integer> partitionIndex,
+ List<OffsetCommitRequestData.OffsetCommitRequestPartition>
partitions,
+ Function<OffsetCommitRequestData.OffsetCommitRequestPartition,
Integer> partitionIndex,
Errors error
) {
final OffsetCommitResponseTopic topicResponse =
getOrCreate(topicId, topicName);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
index 0f3655d62c6..c51b547dd58 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartition;
import
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
@@ -75,10 +76,10 @@ public class OffsetDeleteResponse extends AbstractResponse {
return this;
}
- public <P> Builder addPartitions(
+ public Builder addPartitions(
String topicName,
- List<P> partitions,
- Function<P, Integer> partitionIndex,
+ List<OffsetDeleteRequestData.OffsetDeleteRequestPartition>
partitions,
+ Function<OffsetDeleteRequestData.OffsetDeleteRequestPartition,
Integer> partitionIndex,
Errors error
) {
final OffsetDeleteResponseTopic topicResponse =
getOrCreateTopic(topicName);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 5e556ad8de2..a01b5779b4e 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
import
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
@@ -80,10 +81,10 @@ public class TxnOffsetCommitResponse extends
AbstractResponse {
return this;
}
- public <P> Builder addPartitions(
+ public Builder addPartitions(
String topicName,
- List<P> partitions,
- Function<P, Integer> partitionIndex,
+ List<TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition>
partitions,
+
Function<TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition, Integer>
partitionIndex,
Errors error
) {
final TxnOffsetCommitResponseTopic topicResponse =
getOrCreate(topicName);
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
index f3c0318de3c..6dd950a5539 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
@@ -96,7 +96,8 @@ public class TxnOffsetCommitResponseTest extends
OffsetCommitResponseTest {
@Test
public void testBuilderAddPartitions() {
TxnOffsetCommitResponse.Builder builder =
TxnOffsetCommitResponse.newBuilder();
- builder.addPartitions(topicOne, List.of(partitionOne, partitionTwo), p
-> p, errorOne);
+ builder.addPartition(topicOne, partitionOne, errorOne);
+ builder.addPartition(topicOne, partitionTwo, errorOne);
TxnOffsetCommitResponseData expected = new
TxnOffsetCommitResponseData()
.setTopics(List.of(
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index d38c8921dbd..93fe6449687 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -303,13 +303,11 @@ class KafkaApis(val requestChannel: RequestChannel,
if (useTopicIds && topic.name.isEmpty) {
// If the topic name is undefined, it means that the topic id is
unknown so we add
// the topic and all its partitions to the response with
UNKNOWN_TOPIC_ID.
-
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
- topic.topicId, topic.name, topic.partitions, _.partitionIndex,
Errors.UNKNOWN_TOPIC_ID)
+ responseBuilder.addPartitions(topic.topicId, topic.name,
topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_ID)
} else if (!authorizedTopics.contains(topic.name)) {
// If the topic is not authorized, we add the topic and all its
partitions
// to the response with TOPIC_AUTHORIZATION_FAILED.
-
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
- topic.topicId, topic.name, topic.partitions, _.partitionIndex,
Errors.TOPIC_AUTHORIZATION_FAILED)
+ responseBuilder.addPartitions(topic.topicId, topic.name,
topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
} else {
// For lower API versions, the topic id may not be included in the
request.
// In this case, we resolve the topic id from metadata cache to
ensure that the topic exists.
@@ -321,8 +319,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topic.topicId == Uuid.ZERO_UUID) {
// If the topic is unknown, we add the topic and all its partitions
// to the response with UNKNOWN_TOPIC_OR_PARTITION.
-
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
- Uuid.ZERO_UUID, topic.name, topic.partitions, _.partitionIndex,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ responseBuilder.addPartitions(Uuid.ZERO_UUID, topic.name,
topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
} else {
// Otherwise, we check all partitions to ensure that they all
exist.
val topicWithValidPartitions = new
OffsetCommitRequestData.OffsetCommitRequestTopic()
@@ -2063,13 +2060,11 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authorizedTopics.contains(topic.name)) {
// If the topic is not authorized, we add the topic and all its
partitions
// to the response with TOPIC_AUTHORIZATION_FAILED.
-
responseBuilder.addPartitions[TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition](
- topic.name, topic.partitions, _.partitionIndex,
Errors.TOPIC_AUTHORIZATION_FAILED)
+ responseBuilder.addPartitions(topic.name, topic.partitions,
_.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
} else if (!metadataCache.contains(topic.name)) {
// If the topic is unknown, we add the topic and all its partitions
// to the response with UNKNOWN_TOPIC_OR_PARTITION.
-
responseBuilder.addPartitions[TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition](
- topic.name, topic.partitions, _.partitionIndex,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ responseBuilder.addPartitions(topic.name, topic.partitions,
_.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
} else {
// Otherwise, we check all partitions to ensure that they all exist.
val topicWithValidPartitions = new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName(topic.name)
@@ -2384,13 +2379,11 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authorizedTopics.contains(topic.name)) {
// If the topic is not authorized, we add the topic and all its
partitions
// to the response with TOPIC_AUTHORIZATION_FAILED.
-
responseBuilder.addPartitions[OffsetDeleteRequestData.OffsetDeleteRequestPartition](
- topic.name, topic.partitions, _.partitionIndex,
Errors.TOPIC_AUTHORIZATION_FAILED)
+ responseBuilder.addPartitions(topic.name, topic.partitions,
_.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
} else if (!metadataCache.contains(topic.name)) {
// If the topic is unknown, we add the topic and all its partitions
// to the response with UNKNOWN_TOPIC_OR_PARTITION.
-
responseBuilder.addPartitions[OffsetDeleteRequestData.OffsetDeleteRequestPartition](
- topic.name, topic.partitions, _.partitionIndex,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ responseBuilder.addPartitions(topic.name, topic.partitions,
_.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
} else {
// Otherwise, we check all partitions to ensure that they all exist.
val topicWithValidPartitions = new
OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName(topic.name)