This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 77be6f2d744 KAFKA-19053 Remove FetchResponse#of which is not used in 
production … (#19327)
77be6f2d744 is described below

commit 77be6f2d744166a49a599b898ecf00313e50edf4
Author: Hong-Yi Chen <apala...@gmail.com>
AuthorDate: Mon Jun 2 00:48:53 2025 +0800

    KAFKA-19053 Remove FetchResponse#of which is not used in production … 
(#19327)
    
    Removed the unused FetchResponse#of that is not used in production. The
    test cases that originally invoked this method have been updated to call
    the other
    
    
[FetchResponse#of](https://github.com/apache/kafka/blob/6af849f8642d93877e64dace272cee1551620862/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java#L232),
    which is currently used by ```KafkaApis```, to maintain the integrity of
    the tests.
    
    Reviewers: Jun Rao <jun...@gmail.com>, PoAn Yang <pay...@apache.org>,
     Chia-Ping Tsai <chia7...@gmail.com>
---
 .../kafka/common/requests/FetchResponse.java       |  8 ---
 .../kafka/clients/FetchSessionHandlerTest.java     | 63 ++++++++++++++--------
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  4 +-
 .../internals/FetchRequestManagerTest.java         | 32 +++++------
 .../clients/consumer/internals/FetcherTest.java    | 34 ++++++------
 .../kafka/common/requests/RequestResponseTest.java | 12 ++---
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  2 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    |  2 +-
 .../server/epoch/util/MockBlockingSender.scala     |  3 +-
 .../kafka/jmh/common/FetchResponseBenchmark.java   |  5 +-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |  2 +-
 .../jmh/fetchsession/FetchSessionBenchmark.java    |  3 +-
 12 files changed, 92 insertions(+), 78 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 0324f591ef4..5013468095c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -243,14 +243,6 @@ public class FetchResponse extends AbstractResponse {
         return new FetchResponse(data);
     }
 
-    // TODO: remove as a part of KAFKA-12410
-    public static FetchResponse of(Errors error,
-                                   int throttleTimeMs,
-                                   int sessionId,
-                                   LinkedHashMap<TopicIdPartition, 
FetchResponseData.PartitionData> responseData) {
-        return new FetchResponse(toMessage(error, throttleTimeMs, sessionId, 
responseData.entrySet().iterator(), Collections.emptyList()));
-    }
-
     // TODO: remove as a part of KAFKA-12410
     public static FetchResponse of(Errors error,
                                    int throttleTimeMs,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
index 3166206f0a4..1c50666c6af 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
@@ -217,7 +217,8 @@ public class FetchSessionHandlerTest {
 
             FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID,
                 respMap(new RespEntry("foo", 0, fooId, 0, 0),
-                        new RespEntry("foo", 1, fooId, 0, 0)));
+                        new RespEntry("foo", 1, fooId, 0, 0)),
+                    List.of());
             handler.handleResponse(resp, version);
 
             FetchSessionHandler.Builder builder2 = handler.newBuilder();
@@ -258,7 +259,8 @@ public class FetchSessionHandlerTest {
 
             FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
                 respMap(new RespEntry("foo", 0, fooId, 10, 20),
-                        new RespEntry("foo", 1, fooId, 10, 20)));
+                        new RespEntry("foo", 1, fooId, 10, 20)),
+                    List.of());
             handler.handleResponse(resp, version);
 
             // Test an incremental fetch request which adds one partition and 
modifies another.
@@ -280,13 +282,15 @@ public class FetchSessionHandlerTest {
                     data2.toSend());
 
             FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 123,
-                respMap(new RespEntry("foo", 1, fooId, 20, 20)));
+                respMap(new RespEntry("foo", 1, fooId, 20, 20)),
+                    List.of());
             handler.handleResponse(resp2, version);
 
             // Skip building a new request.  Test that handling an invalid 
fetch session epoch response results
             // in a request which closes the session.
             FetchResponse resp3 = 
FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, INVALID_SESSION_ID,
-                respMap());
+                    respMap(),
+                    List.of());
             handler.handleResponse(resp3, version);
 
             FetchSessionHandler.Builder builder4 = handler.newBuilder();
@@ -346,7 +350,8 @@ public class FetchSessionHandlerTest {
             FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
                 respMap(new RespEntry("foo", 0, fooId, 10, 20),
                         new RespEntry("foo", 1, fooId, 10, 20),
-                        new RespEntry("bar", 0, barId, 10, 20)));
+                        new RespEntry("bar", 0, barId, 10, 20)),
+                    List.of());
             handler.handleResponse(resp, version);
 
             // Test an incremental fetch request which removes two partitions.
@@ -366,8 +371,9 @@ public class FetchSessionHandlerTest {
 
             // A FETCH_SESSION_ID_NOT_FOUND response triggers us to close the 
session.
             // The next request is a session establishing FULL request.
-            FetchResponse resp2 = 
FetchResponse.of(Errors.FETCH_SESSION_ID_NOT_FOUND, 0, INVALID_SESSION_ID,
-                respMap());
+            FetchResponse resp2 = 
FetchResponse.of(Errors.FETCH_SESSION_ID_NOT_FOUND, 0, INVALID_SESSION_ID, 
+                    respMap(), 
+                    List.of());
             handler.handleResponse(resp2, version);
 
             FetchSessionHandler.Builder builder3 = handler.newBuilder();
@@ -399,7 +405,8 @@ public class FetchSessionHandlerTest {
             assertFalse(data.canUseTopicIds());
 
             FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-                    respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+                    respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)),
+                    List.of());
             handler.handleResponse(resp, (short) 12);
 
             // Try to add a topic ID to an already existing topic partition 
(0) or a new partition (1) in the session.
@@ -436,7 +443,8 @@ public class FetchSessionHandlerTest {
             assertTrue(data.canUseTopicIds());
 
             FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-                    respMap(new RespEntry("foo", 0, fooId, 10, 20)));
+                    respMap(new RespEntry("foo", 0, fooId, 10, 20)),
+                    List.of());
             handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
             // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
@@ -475,7 +483,7 @@ public class FetchSessionHandlerTest {
         assertTrue(data.metadata().isFull());
         assertEquals(startsWithTopicIds, data.canUseTopicIds());
 
-        FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, respMap(new 
RespEntry("foo", 0, topicId1, 10, 20)));
+        FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, respMap(new 
RespEntry("foo", 0, topicId1, 10, 20)), List.of());
         short version = startsWithTopicIds ? ApiKeys.FETCH.latestVersion() : 
12;
         handler.handleResponse(resp, version);
 
@@ -548,7 +556,7 @@ public class FetchSessionHandlerTest {
         assertEquals(startsWithTopicIds, data.canUseTopicIds());
 
         FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-                respMap(new RespEntry("foo", 0, fooId, 10, 20)));
+                respMap(new RespEntry("foo", 0, fooId, 10, 20)), List.of());
         handler.handleResponse(resp, responseVersion);
 
         // Re-add the first partition. Then add a partition with opposite ID 
usage.
@@ -583,7 +591,8 @@ public class FetchSessionHandlerTest {
         assertEquals(useTopicIds, data.canUseTopicIds());
 
         FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-                respMap(new RespEntry("foo", 0, topicId, 10, 20)));
+                respMap(new RespEntry("foo", 0, topicId, 10, 20)),
+                List.of());
         handler.handleResponse(resp, responseVersion);
 
         // Remove the topic from the session
@@ -610,7 +619,8 @@ public class FetchSessionHandlerTest {
         assertTrue(data.canUseTopicIds());
 
         FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-                respMap(new RespEntry("foo", 0, topicId, 10, 20)));
+                respMap(new RespEntry("foo", 0, topicId, 10, 20)),
+                List.of());
         handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
         // Remove the partition from the session. Return a session ID as 
though the session is still open.
@@ -619,7 +629,8 @@ public class FetchSessionHandlerTest {
         assertMapsEqual(new LinkedHashMap<>(),
                 data2.toSend(), data2.sessionPartitions());
         FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 123,
-                new LinkedHashMap<>());
+                new LinkedHashMap<>(),
+                List.of());
         handler.handleResponse(resp2, ApiKeys.FETCH.latestVersion());
 
         // After the topic is removed, add a recreated topic with a new ID.
@@ -651,7 +662,8 @@ public class FetchSessionHandlerTest {
             FetchResponse resp1 = FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID,
                 respMap(new RespEntry("foo", 0, fooId, 10, 20),
                         new RespEntry("foo", 1, fooId, 10, 20),
-                        new RespEntry("bar", 0, barId, 10, 20)));
+                        new RespEntry("bar", 0, barId, 10, 20)),
+                    List.of());
             String issue = 
handler.verifyFullFetchResponsePartitions(resp1.responseData(topicNames, 
version).keySet(),
                     resp1.topicIds(), version);
             assertTrue(issue.contains("extraPartitions="));
@@ -664,13 +676,15 @@ public class FetchSessionHandlerTest {
             FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID,
                 respMap(new RespEntry("foo", 0, fooId, 10, 20),
                         new RespEntry("foo", 1, fooId, 10, 20),
-                        new RespEntry("bar", 0, barId, 10, 20)));
+                        new RespEntry("bar", 0, barId, 10, 20)),
+                    List.of());
             String issue2 = 
handler.verifyFullFetchResponsePartitions(resp2.responseData(topicNames, 
version).keySet(),
                     resp2.topicIds(), version);
             assertNull(issue2);
             FetchResponse resp3 = FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID,
                 respMap(new RespEntry("foo", 0, fooId, 10, 20),
-                        new RespEntry("foo", 1, fooId, 10, 20)));
+                        new RespEntry("foo", 1, fooId, 10, 20)),
+                    List.of());
             String issue3 = 
handler.verifyFullFetchResponsePartitions(resp3.responseData(topicNames, 
version).keySet(),
                     resp3.topicIds(), version);
             assertFalse(issue3.contains("extraPartitions="));
@@ -689,7 +703,8 @@ public class FetchSessionHandlerTest {
         FetchResponse resp1 = FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID,
             respMap(new RespEntry("foo", 0, topicIds.get("foo"), 10, 20),
                     new RespEntry("extra2", 1, topicIds.get("extra2"), 10, 20),
-                    new RespEntry("bar", 0, topicIds.get("bar"), 10, 20)));
+                    new RespEntry("bar", 0, topicIds.get("bar"), 10, 20)),
+                List.of());
         String issue = 
handler.verifyFullFetchResponsePartitions(resp1.responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).keySet(),
                 resp1.topicIds(), ApiKeys.FETCH.latestVersion());
         assertTrue(issue.contains("extraPartitions="));
@@ -703,14 +718,16 @@ public class FetchSessionHandlerTest {
         FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID,
             respMap(new RespEntry("foo", 0, topicIds.get("foo"), 10, 20),
                     new RespEntry("extra2", 1, topicIds.get("extra2"), 10, 20),
-                    new RespEntry("bar", 0, topicIds.get("bar"), 10, 20)));
+                    new RespEntry("bar", 0, topicIds.get("bar"), 10, 20)),
+                List.of());
         String issue2 = 
handler.verifyFullFetchResponsePartitions(resp2.responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).keySet(),
                 resp2.topicIds(), ApiKeys.FETCH.latestVersion());
         assertTrue(issue2.contains("extraPartitions="));
         assertFalse(issue2.contains("omittedPartitions="));
         FetchResponse resp3 = FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID,
             respMap(new RespEntry("foo", 0, topicIds.get("foo"), 10, 20),
-                    new RespEntry("bar", 0, topicIds.get("bar"), 10, 20)));
+                    new RespEntry("bar", 0, topicIds.get("bar"), 10, 20)),
+                List.of());
         String issue3 = 
handler.verifyFullFetchResponsePartitions(resp3.responseData(topicNames, 
ApiKeys.FETCH.latestVersion()).keySet(),
                 resp3.topicIds(), ApiKeys.FETCH.latestVersion());
         assertNull(issue3);
@@ -734,7 +751,8 @@ public class FetchSessionHandlerTest {
 
         FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
             respMap(new RespEntry("foo", 0, topicIds.get("foo"), 10, 20),
-                    new RespEntry("foo", 1, topicIds.get("foo"), 10, 20)));
+                    new RespEntry("foo", 1, topicIds.get("foo"), 10, 20)),
+                List.of());
         handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
         // Test an incremental fetch request which adds an ID unknown to the 
broker.
@@ -749,7 +767,8 @@ public class FetchSessionHandlerTest {
 
         // Return and handle a response with a top level error
         FetchResponse resp2 = FetchResponse.of(Errors.UNKNOWN_TOPIC_ID, 0, 123,
-            respMap(new RespEntry("unknown", 0, Uuid.randomUuid(), 
Errors.UNKNOWN_TOPIC_ID)));
+            respMap(new RespEntry("unknown", 0, Uuid.randomUuid(), 
Errors.UNKNOWN_TOPIC_ID)),
+                List.of());
         assertFalse(handler.handleResponse(resp2, 
ApiKeys.FETCH.latestVersion()));
 
         // Ensure we start with a new epoch. This will close the session in 
the next request.
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index deff7fff656..27a8cf6697e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1893,7 +1893,7 @@ public class KafkaConsumerTest {
         response.put(tp0, Errors.NONE);
         OffsetCommitResponse commitResponse = offsetCommitResponse(response);
         LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(new 
LeaveGroupResponseData().setErrorCode(Errors.NONE.code()));
-        FetchResponse closeResponse = FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>());
+        FetchResponse closeResponse = FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(), List.of());
         consumerCloseTest(groupProtocol, 5000, Arrays.asList(commitResponse, 
leaveGroupResponse, closeResponse), 0, false);
     }
 
@@ -2950,7 +2950,7 @@ public class KafkaConsumerTest {
                     .setLogStartOffset(logStartOffset)
                     .setRecords(records));
         }
-        return FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
tpResponses);
+        return FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
tpResponses, List.of());
     }
 
     private FetchResponse fetchResponse(TopicPartition partition, long 
fetchOffset, int count) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index 7c8547ddd88..7d0325e8e5d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -1733,7 +1733,7 @@ public class FetchRequestManagerTest {
                 .setPartitionIndex(tp0.partition())
                 .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code())
                 .setHighWatermark(100));
-        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()));
         networkClientDelegate.poll(time.timer(0));
 
         List<ConsumerRecord<byte[], byte[]>> allFetchedRecords = new 
ArrayList<>();
@@ -1794,7 +1794,7 @@ public class FetchRequestManagerTest {
                 .setLastStableOffset(4)
                 .setLogStartOffset(0)
                 .setRecords(partialRecords));
-        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()));
         networkClientDelegate.poll(time.timer(0));
 
         List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new 
ArrayList<>();
@@ -1865,7 +1865,7 @@ public class FetchRequestManagerTest {
                 .setPartitionIndex(tp1.partition())
                 .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code())
                 .setHighWatermark(100));
-        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()));
         networkClientDelegate.poll(time.timer(0));
         assertEquals(1, fetchRecords().get(tp0).size());
 
@@ -2113,7 +2113,7 @@ public class FetchRequestManagerTest {
         }
 
         assertEquals(1, sendFetches());
-        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, fetchPartitionData));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, fetchPartitionData, List.of()));
         networkClientDelegate.poll(time.timer(0));
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetchRecords();
@@ -2185,7 +2185,7 @@ public class FetchRequestManagerTest {
                 .setLogStartOffset(0));
 
         assertEquals(1, sendFetches());
-        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()));
         networkClientDelegate.poll(time.timer(0));
         collectFetch();
 
@@ -2231,7 +2231,7 @@ public class FetchRequestManagerTest {
                 .setLogStartOffset(0)
                 .setRecords(MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord("val".getBytes()))));
 
-        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()));
         networkClientDelegate.poll(time.timer(0));
         collectFetch();
 
@@ -2758,7 +2758,7 @@ public class FetchRequestManagerTest {
                 .setHighWatermark(100)
                 .setLogStartOffset(0)
                 .setRecords(emptyRecords));
-        FetchResponse resp1 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions1);
+        FetchResponse resp1 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions1, List.of());
         client.prepareResponse(resp1);
         assertEquals(1, sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
@@ -2784,7 +2784,7 @@ public class FetchRequestManagerTest {
 
         // The second response contains no new records.
         LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
partitions2 = new LinkedHashMap<>();
-        FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions2);
+        FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions2, List.of());
         client.prepareResponse(resp2);
         assertEquals(1, sendFetches());
         networkClientDelegate.poll(time.timer(0));
@@ -2801,7 +2801,7 @@ public class FetchRequestManagerTest {
                 .setLastStableOffset(4)
                 .setLogStartOffset(0)
                 .setRecords(nextRecords));
-        FetchResponse resp3 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions3);
+        FetchResponse resp3 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions3, List.of());
         client.prepareResponse(resp3);
         assertEquals(1, sendFetches());
         networkClientDelegate.poll(time.timer(0));
@@ -3246,7 +3246,7 @@ public class FetchRequestManagerTest {
                 .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
                 .setLogStartOffset(0)
                 .setRecords(nextRecords));
-        client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, partitions), nodeId0);
+        client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, partitions, List.of()), nodeId0);
         networkClientDelegate.poll(time.timer(0));
         partitionRecords = fetchRecords();
         assertFalse(partitionRecords.containsKey(tp0));
@@ -3851,7 +3851,7 @@ public class FetchRequestManagerTest {
         });
 
         client.prepareResponseFrom(
-            FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
partitionDataMap),
+            FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
partitionDataMap, List.of()),
             node
         );
     }
@@ -3906,7 +3906,7 @@ public class FetchRequestManagerTest {
                         .setPartitionIndex(tp.topicPartition().partition())
                         .setErrorCode(error.code())
                         
.setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK));
-        return FetchResponse.of(error, throttleTime, INVALID_SESSION_ID, new 
LinkedHashMap<>(partitions));
+        return FetchResponse.of(error, throttleTime, INVALID_SESSION_ID, new 
LinkedHashMap<>(partitions), List.of());
     }
 
     private FetchResponse 
fullFetchResponseWithAbortedTransactions(MemoryRecords records,
@@ -3924,7 +3924,7 @@ public class FetchRequestManagerTest {
                         .setLogStartOffset(0)
                         .setAbortedTransactions(abortedTransactions)
                         .setRecords(records));
-        return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, 
new LinkedHashMap<>(partitions));
+        return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, 
new LinkedHashMap<>(partitions), List.of());
     }
 
     private FetchResponse fullFetchResponse(int sessionId, TopicIdPartition 
tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
@@ -3950,7 +3950,7 @@ public class FetchRequestManagerTest {
                         .setLastStableOffset(lastStableOffset)
                         .setLogStartOffset(0)
                         .setRecords(records));
-        return FetchResponse.of(Errors.NONE, throttleTime, sessionId, new 
LinkedHashMap<>(partitions));
+        return FetchResponse.of(Errors.NONE, throttleTime, sessionId, new 
LinkedHashMap<>(partitions), List.of());
     }
 
     private FetchResponse fullFetchResponse(TopicIdPartition tp, MemoryRecords 
records, Errors error, long hw,
@@ -3964,7 +3964,7 @@ public class FetchRequestManagerTest {
                         .setLogStartOffset(0)
                         .setRecords(records)
                         
.setPreferredReadReplica(preferredReplicaId.orElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID)));
-        return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, 
new LinkedHashMap<>(partitions));
+        return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, 
new LinkedHashMap<>(partitions), List.of());
     }
 
     private FetchResponse fetchResponse(TopicIdPartition tp, MemoryRecords 
records, Errors error, long hw,
@@ -3977,7 +3977,7 @@ public class FetchRequestManagerTest {
                         .setLastStableOffset(lastStableOffset)
                         .setLogStartOffset(logStartOffset)
                         .setRecords(records));
-        return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, 
new LinkedHashMap<>(partitions));
+        return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, 
new LinkedHashMap<>(partitions), List.of());
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 45a747e04da..a09024fb144 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1720,7 +1720,7 @@ public class FetcherTest {
                 .setPartitionIndex(tp0.partition())
                 .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code())
                 .setHighWatermark(100));
-        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()));
         consumerClient.poll(time.timer(0));
 
         List<ConsumerRecord<byte[], byte[]>> allFetchedRecords = new 
ArrayList<>();
@@ -1781,7 +1781,7 @@ public class FetcherTest {
                 .setLastStableOffset(4)
                 .setLogStartOffset(0)
                 .setRecords(partialRecords));
-        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()));
         consumerClient.poll(time.timer(0));
 
         List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new 
ArrayList<>();
@@ -1852,7 +1852,7 @@ public class FetcherTest {
                         .setPartitionIndex(tp1.partition())
                         .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code())
                         .setHighWatermark(100));
-        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()));
         consumerClient.poll(time.timer(0));
         assertEquals(1, fetchRecords().get(tp0).size());
 
@@ -2100,7 +2100,7 @@ public class FetcherTest {
         }
 
         assertEquals(1, sendFetches());
-        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, fetchPartitionData));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, fetchPartitionData, List.of()));
         consumerClient.poll(time.timer(0));
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
fetchedRecords = fetchRecords();
@@ -2172,7 +2172,7 @@ public class FetcherTest {
                 .setLogStartOffset(0));
 
         assertEquals(1, sendFetches());
-        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()));
         consumerClient.poll(time.timer(0));
         collectFetch();
 
@@ -2218,7 +2218,7 @@ public class FetcherTest {
                 .setLogStartOffset(0)
                 .setRecords(MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord("val".getBytes()))));
 
-        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions)));
+        client.prepareResponse(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()));
         consumerClient.poll(time.timer(0));
         collectFetch();
 
@@ -2745,7 +2745,7 @@ public class FetcherTest {
                 .setHighWatermark(100)
                 .setLogStartOffset(0)
                 .setRecords(emptyRecords));
-        FetchResponse resp1 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions1);
+        FetchResponse resp1 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions1, List.of());
         client.prepareResponse(resp1);
         assertEquals(1, sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
@@ -2771,7 +2771,7 @@ public class FetcherTest {
 
         // The second response contains no new records.
         LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
partitions2 = new LinkedHashMap<>();
-        FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions2);
+        FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions2, List.of());
         client.prepareResponse(resp2);
         assertEquals(1, sendFetches());
         consumerClient.poll(time.timer(0));
@@ -2788,7 +2788,7 @@ public class FetcherTest {
                 .setLastStableOffset(4)
                 .setLogStartOffset(0)
                 .setRecords(nextRecords));
-        FetchResponse resp3 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions3);
+        FetchResponse resp3 = FetchResponse.of(Errors.NONE, 0, 123, 
partitions3, List.of());
         client.prepareResponse(resp3);
         assertEquals(1, sendFetches());
         consumerClient.poll(time.timer(0));
@@ -2922,7 +2922,7 @@ public class FetcherTest {
                                     .setLogStartOffset(0)
                                     .setRecords(buildRecords(offset, 2, 
offset)));
                         }
-                        client.respondToRequest(request, 
FetchResponse.of(Errors.NONE, 0, 123, responseMap));
+                        client.respondToRequest(request, 
FetchResponse.of(Errors.NONE, 0, 123, responseMap, List.of()));
                         consumerClient.poll(time.timer(0));
                     }
                 }
@@ -2985,7 +2985,7 @@ public class FetcherTest {
                                 .setLogStartOffset(0)
                                 .setRecords(buildRecords(nextOffset, 2, 
nextOffset)));
                         nextOffset += 2;
-                        client.respondToRequest(request, 
FetchResponse.of(Errors.NONE, 0, 123, responseMap));
+                        client.respondToRequest(request, 
FetchResponse.of(Errors.NONE, 0, 123, responseMap, List.of()));
                         consumerClient.poll(time.timer(0));
                     }
                 }
@@ -3523,7 +3523,7 @@ public class FetcherTest {
                 .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
                 .setLogStartOffset(0)
                 .setRecords(nextRecords));
-        client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, partitions), nodeId0);
+        client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, partitions, List.of()), nodeId0);
         consumerClient.poll(time.timer(0));
         partitionRecords = fetchRecords();
         assertFalse(partitionRecords.containsKey(tp0));
@@ -3689,7 +3689,7 @@ public class FetcherTest {
                         .setPartitionIndex(tp.topicPartition().partition())
                         .setErrorCode(error.code())
                         
.setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK));
-        return FetchResponse.of(error, throttleTime, INVALID_SESSION_ID, new 
LinkedHashMap<>(partitions));
+        return FetchResponse.of(error, throttleTime, INVALID_SESSION_ID, new 
LinkedHashMap<>(partitions), List.of());
     }
 
     private FetchResponse 
fullFetchResponseWithAbortedTransactions(MemoryRecords records,
@@ -3707,7 +3707,7 @@ public class FetcherTest {
                         .setLogStartOffset(0)
                         .setAbortedTransactions(abortedTransactions)
                         .setRecords(records));
-        return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, 
new LinkedHashMap<>(partitions));
+        return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, 
new LinkedHashMap<>(partitions), List.of());
     }
 
     private FetchResponse fullFetchResponse(int sessionId, TopicIdPartition 
tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
@@ -3733,7 +3733,7 @@ public class FetcherTest {
                         .setLastStableOffset(lastStableOffset)
                         .setLogStartOffset(0)
                         .setRecords(records));
-        return FetchResponse.of(Errors.NONE, throttleTime, sessionId, new 
LinkedHashMap<>(partitions));
+        return FetchResponse.of(Errors.NONE, throttleTime, sessionId, new 
LinkedHashMap<>(partitions), List.of());
     }
 
     private FetchResponse fullFetchResponse(TopicIdPartition tp, MemoryRecords 
records, Errors error, long hw,
@@ -3747,7 +3747,7 @@ public class FetcherTest {
                         .setLogStartOffset(0)
                         .setRecords(records)
                         
.setPreferredReadReplica(preferredReplicaId.orElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID)));
-        return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, 
new LinkedHashMap<>(partitions));
+        return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, 
new LinkedHashMap<>(partitions), List.of());
     }
 
     private FetchResponse fetchResponse(TopicIdPartition tp, MemoryRecords 
records, Errors error, long hw,
@@ -3760,7 +3760,7 @@ public class FetcherTest {
                         .setLastStableOffset(lastStableOffset)
                         .setLogStartOffset(logStartOffset)
                         .setRecords(records));
-        return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, 
new LinkedHashMap<>(partitions));
+        return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, 
new LinkedHashMap<>(partitions), List.of());
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 7a3be68ff78..09433a39239 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -545,7 +545,7 @@ public class RequestResponseTest {
                         .setHighWatermark(1000000)
                         .setLogStartOffset(-1)
                         .setRecords(records));
-        FetchResponse idTestResponse = FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, idResponseData);
+        FetchResponse idTestResponse = FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, idResponseData, List.of());
         FetchResponse v12Deserialized = 
FetchResponse.parse(idTestResponse.serialize((short) 12), (short) 12);
         FetchResponse newestDeserialized = 
FetchResponse.parse(idTestResponse.serialize(FETCH.latestVersion()), 
FETCH.latestVersion());
         assertTrue(v12Deserialized.topicIds().isEmpty());
@@ -586,7 +586,7 @@ public class RequestResponseTest {
                         .setLastStableOffset(6)
                         .setRecords(records));
 
-        FetchResponse response = FetchResponse.of(Errors.NONE, 10, 
INVALID_SESSION_ID, responseData);
+        FetchResponse response = FetchResponse.of(Errors.NONE, 10, 
INVALID_SESSION_ID, responseData, List.of());
         FetchResponse deserialized = 
FetchResponse.parse(response.serialize((short) 4), (short) 4);
         
assertEquals(responseData.entrySet().stream().collect(Collectors.toMap(e -> 
e.getKey().topicPartition(), Map.Entry::getValue)),
                 deserialized.responseData(topicNames, (short) 4));
@@ -613,7 +613,7 @@ public class RequestResponseTest {
 
         TopicIdPartition topicIdPartition = new TopicIdPartition(id, new 
TopicPartition("test", 0));
         LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> 
tpToData = new LinkedHashMap<>(Map.of(topicIdPartition, partitionData));
-        fetchResponse = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
tpToData);
+        fetchResponse = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
tpToData, List.of());
         validateNoNullRecords(fetchResponse);
     }
 
@@ -2025,7 +2025,7 @@ public class RequestResponseTest {
 
     private FetchResponse createFetchResponse(Errors error, int sessionId) {
         return FetchResponse.parse(
-            FetchResponse.of(error, 25, sessionId, new 
LinkedHashMap<>()).serialize(FETCH.latestVersion()), FETCH.latestVersion());
+            FetchResponse.of(error, 25, sessionId, new LinkedHashMap<>(), 
List.of()).serialize(FETCH.latestVersion()), FETCH.latestVersion());
     }
 
     private FetchResponse createFetchResponse(int sessionId) {
@@ -2047,7 +2047,7 @@ public class RequestResponseTest {
                         .setAbortedTransactions(abortedTransactions)
                         .setRecords(MemoryRecords.EMPTY));
         return FetchResponse.parse(FetchResponse.of(Errors.NONE, 25, sessionId,
-            responseData).serialize(FETCH.latestVersion()), 
FETCH.latestVersion());
+            responseData, List.of()).serialize(FETCH.latestVersion()), 
FETCH.latestVersion());
     }
 
     private FetchResponse createFetchResponse(boolean includeAborted) {
@@ -2072,7 +2072,7 @@ public class RequestResponseTest {
                         .setAbortedTransactions(abortedTransactions)
                         .setRecords(MemoryRecords.EMPTY));
         return FetchResponse.parse(FetchResponse.of(Errors.NONE, 25, 
INVALID_SESSION_ID,
-            responseData).serialize(FETCH.latestVersion()), 
FETCH.latestVersion());
+            responseData, List.of()).serialize(FETCH.latestVersion()), 
FETCH.latestVersion());
     }
 
     private FetchResponse createFetchResponse(short version) {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 4c8ae299448..cd0dd12d30e 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -9658,7 +9658,7 @@ class KafkaApisTest extends Logging {
         topicIds.put(tp.topicPartition.topic, tp.topicId)
         topicNames.put(tp.topicId, tp.topicPartition.topic)
       }
-      FetchResponse.of(Errors.NONE, 100, 100, responseData)
+      FetchResponse.of(Errors.NONE, 100, 100, responseData, List.empty.asJava)
     }
 
     val throttledPartition = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("throttledData", 0))
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 2bbea6747c9..91aa1d5c978 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -646,7 +646,7 @@ class ReplicaFetcherThreadTest {
     responseData.put(tid1p0, new FetchResponseData.PartitionData())
     responseData.put(tid1p1, new FetchResponseData.PartitionData())
     responseData.put(tid2p1, new FetchResponseData.PartitionData())
-    val fetchResponse = FetchResponse.of(Errors.NONE, 0, 123, responseData)
+    val fetchResponse = FetchResponse.of(Errors.NONE, 0, 123, responseData,  
List.empty.asJava)
 
     leader.fetchSessionHandler.handleResponse(fetchResponse, 
ApiKeys.FETCH.latestVersion())
 
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala 
b/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala
index 47b33e0dd7b..297348ed790 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala
@@ -116,7 +116,8 @@ class MockBlockingSender(offsets: 
java.util.Map[TopicPartition, EpochEndOffset],
         topicIds = Map.empty
         FetchResponse.of(Errors.NONE, 0,
           if (partitionData.isEmpty) JFetchMetadata.INVALID_SESSION_ID else 1,
-          partitionData)
+          partitionData,  List.empty.asJava
+        )
 
       case ApiKeys.LIST_OFFSETS =>
         listOffsetsCount += 1
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
index adbb8a5b0d0..f433368e8a4 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java
@@ -48,6 +48,7 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -103,13 +104,13 @@ public class FetchResponseBenchmark {
         }
 
         this.header = new ResponseHeader(100, 
ApiKeys.FETCH.responseHeaderVersion(ApiKeys.FETCH.latestVersion()));
-        this.fetchResponse = FetchResponse.of(Errors.NONE, 0, 0, responseData);
+        this.fetchResponse = FetchResponse.of(Errors.NONE, 0, 0, responseData, 
List.of());
         this.fetchResponseData = this.fetchResponse.data();
     }
 
     @Benchmark
     public int testConstructFetchResponse() {
-        FetchResponse fetchResponse = FetchResponse.of(Errors.NONE, 0, 0, 
responseData);
+        FetchResponse fetchResponse = FetchResponse.of(Errors.NONE, 0, 0, 
responseData, List.of());
         return fetchResponse.data().responses().size();
     }
 
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index ae7a44a4401..c1650690851 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -220,7 +220,7 @@ public class ReplicaFetcherThreadBenchmark {
         // so that we do not measure this time as part of the steady state work
         fetcher.doWork();
         // handle response to engage the incremental fetch session handler
-        ((RemoteLeaderEndPoint) 
fetcher.leader()).fetchSessionHandler().handleResponse(FetchResponse.of(Errors.NONE,
 0, 999, initialFetched), ApiKeys.FETCH.latestVersion());
+        ((RemoteLeaderEndPoint) 
fetcher.leader()).fetchSessionHandler().handleResponse(FetchResponse.of(Errors.NONE,
 0, 999, initialFetched, List.of()), ApiKeys.FETCH.latestVersion());
     }
 
     @TearDown(Level.Trial)
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java
index 11d584446f4..38da51f699c 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java
@@ -44,6 +44,7 @@ import org.openjdk.jmh.annotations.Warmup;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -93,7 +94,7 @@ public class FetchSessionBenchmark {
         }
         builder.build();
         // build and handle an initial response so that the next fetch will be 
incremental
-        handler.handleResponse(FetchResponse.of(Errors.NONE, 0, 1, respMap), 
ApiKeys.FETCH.latestVersion());
+        handler.handleResponse(FetchResponse.of(Errors.NONE, 0, 1, respMap, 
List.of()), ApiKeys.FETCH.latestVersion());
 
         int counter = 0;
         for (TopicPartition topicPartition: new ArrayList<>(fetches.keySet())) 
{


Reply via email to