This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 77be6f2d744 KAFKA-19053 Remove FetchResponse#of which is not used in production … (#19327) 77be6f2d744 is described below commit 77be6f2d744166a49a599b898ecf00313e50edf4 Author: Hong-Yi Chen <apala...@gmail.com> AuthorDate: Mon Jun 2 00:48:53 2025 +0800 KAFKA-19053 Remove FetchResponse#of which is not used in production … (#19327) Removed the unused FetchResponse#of that is not used in production. The test cases that originally invoked this method have been updated to call the other [FetchResponse#of](https://github.com/apache/kafka/blob/6af849f8642d93877e64dace272cee1551620862/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java#L232), which is currently used by ```KafkaApis```, to maintain the integrity of the tests. Reviewers: Jun Rao <jun...@gmail.com>, PoAn Yang <pay...@apache.org>, Chia-Ping Tsai <chia7...@gmail.com> --- .../kafka/common/requests/FetchResponse.java | 8 --- .../kafka/clients/FetchSessionHandlerTest.java | 63 ++++++++++++++-------- .../kafka/clients/consumer/KafkaConsumerTest.java | 4 +- .../internals/FetchRequestManagerTest.java | 32 +++++------ .../clients/consumer/internals/FetcherTest.java | 34 ++++++------ .../kafka/common/requests/RequestResponseTest.java | 12 ++--- .../scala/unit/kafka/server/KafkaApisTest.scala | 2 +- .../kafka/server/ReplicaFetcherThreadTest.scala | 2 +- .../server/epoch/util/MockBlockingSender.scala | 3 +- .../kafka/jmh/common/FetchResponseBenchmark.java | 5 +- .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 2 +- .../jmh/fetchsession/FetchSessionBenchmark.java | 3 +- 12 files changed, 92 insertions(+), 78 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index 0324f591ef4..5013468095c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -243,14 +243,6 @@ public class FetchResponse extends AbstractResponse { return new FetchResponse(data); } - // TODO: remove as a part of KAFKA-12410 - public static FetchResponse of(Errors error, - int throttleTimeMs, - int sessionId, - LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> responseData) { - return new FetchResponse(toMessage(error, throttleTimeMs, sessionId, responseData.entrySet().iterator(), Collections.emptyList())); - } - // TODO: remove as a part of KAFKA-12410 public static FetchResponse of(Errors error, int throttleTimeMs, diff --git a/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java index 3166206f0a4..1c50666c6af 100644 --- a/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java @@ -217,7 +217,8 @@ public class FetchSessionHandlerTest { FetchResponse resp = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, respMap(new RespEntry("foo", 0, fooId, 0, 0), - new RespEntry("foo", 1, fooId, 0, 0))); + new RespEntry("foo", 1, fooId, 0, 0)), + List.of()); handler.handleResponse(resp, version); FetchSessionHandler.Builder builder2 = handler.newBuilder(); @@ -258,7 +259,8 @@ public class FetchSessionHandlerTest { FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, respMap(new RespEntry("foo", 0, fooId, 10, 20), - new RespEntry("foo", 1, fooId, 10, 20))); + new RespEntry("foo", 1, fooId, 10, 20)), + List.of()); handler.handleResponse(resp, version); // Test an incremental fetch request which adds one partition and modifies another. @@ -280,13 +282,15 @@ public class FetchSessionHandlerTest { data2.toSend()); FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 1, fooId, 20, 20))); + respMap(new RespEntry("foo", 1, fooId, 20, 20)), + List.of()); handler.handleResponse(resp2, version); // Skip building a new request. Test that handling an invalid fetch session epoch response results // in a request which closes the session. FetchResponse resp3 = FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, INVALID_SESSION_ID, - respMap()); + respMap(), + List.of()); handler.handleResponse(resp3, version); FetchSessionHandler.Builder builder4 = handler.newBuilder(); @@ -346,7 +350,8 @@ public class FetchSessionHandlerTest { FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, respMap(new RespEntry("foo", 0, fooId, 10, 20), new RespEntry("foo", 1, fooId, 10, 20), - new RespEntry("bar", 0, barId, 10, 20))); + new RespEntry("bar", 0, barId, 10, 20)), + List.of()); handler.handleResponse(resp, version); // Test an incremental fetch request which removes two partitions. @@ -366,8 +371,9 @@ public class FetchSessionHandlerTest { // A FETCH_SESSION_ID_NOT_FOUND response triggers us to close the session. // The next request is a session establishing FULL request. - FetchResponse resp2 = FetchResponse.of(Errors.FETCH_SESSION_ID_NOT_FOUND, 0, INVALID_SESSION_ID, - respMap()); + FetchResponse resp2 = FetchResponse.of(Errors.FETCH_SESSION_ID_NOT_FOUND, 0, INVALID_SESSION_ID, + respMap(), + List.of()); handler.handleResponse(resp2, version); FetchSessionHandler.Builder builder3 = handler.newBuilder(); @@ -399,7 +405,8 @@ public class FetchSessionHandlerTest { assertFalse(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); + respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)), + List.of()); handler.handleResponse(resp, (short) 12); // Try to add a topic ID to an already existing topic partition (0) or a new partition (1) in the session. @@ -436,7 +443,8 @@ public class FetchSessionHandlerTest { assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, fooId, 10, 20))); + respMap(new RespEntry("foo", 0, fooId, 10, 20)), + List.of()); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. @@ -475,7 +483,7 @@ public class FetchSessionHandlerTest { assertTrue(data.metadata().isFull()); assertEquals(startsWithTopicIds, data.canUseTopicIds()); - FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, respMap(new RespEntry("foo", 0, topicId1, 10, 20))); + FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, respMap(new RespEntry("foo", 0, topicId1, 10, 20)), List.of()); short version = startsWithTopicIds ? ApiKeys.FETCH.latestVersion() : 12; handler.handleResponse(resp, version); @@ -548,7 +556,7 @@ public class FetchSessionHandlerTest { assertEquals(startsWithTopicIds, data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, fooId, 10, 20))); + respMap(new RespEntry("foo", 0, fooId, 10, 20)), List.of()); handler.handleResponse(resp, responseVersion); // Re-add the first partition. Then add a partition with opposite ID usage. @@ -583,7 +591,8 @@ public class FetchSessionHandlerTest { assertEquals(useTopicIds, data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, topicId, 10, 20))); + respMap(new RespEntry("foo", 0, topicId, 10, 20)), + List.of()); handler.handleResponse(resp, responseVersion); // Remove the topic from the session @@ -610,7 +619,8 @@ public class FetchSessionHandlerTest { assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, topicId, 10, 20))); + respMap(new RespEntry("foo", 0, topicId, 10, 20)), + List.of()); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Remove the partition from the session. Return a session ID as though the session is still open. @@ -619,7 +629,8 @@ public class FetchSessionHandlerTest { assertMapsEqual(new LinkedHashMap<>(), data2.toSend(), data2.sessionPartitions()); FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 123, - new LinkedHashMap<>()); + new LinkedHashMap<>(), + List.of()); handler.handleResponse(resp2, ApiKeys.FETCH.latestVersion()); // After the topic is removed, add a recreated topic with a new ID. @@ -651,7 +662,8 @@ public class FetchSessionHandlerTest { FetchResponse resp1 = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, respMap(new RespEntry("foo", 0, fooId, 10, 20), new RespEntry("foo", 1, fooId, 10, 20), - new RespEntry("bar", 0, barId, 10, 20))); + new RespEntry("bar", 0, barId, 10, 20)), + List.of()); String issue = handler.verifyFullFetchResponsePartitions(resp1.responseData(topicNames, version).keySet(), resp1.topicIds(), version); assertTrue(issue.contains("extraPartitions=")); @@ -664,13 +676,15 @@ public class FetchSessionHandlerTest { FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, respMap(new RespEntry("foo", 0, fooId, 10, 20), new RespEntry("foo", 1, fooId, 10, 20), - new RespEntry("bar", 0, barId, 10, 20))); + new RespEntry("bar", 0, barId, 10, 20)), + List.of()); String issue2 = handler.verifyFullFetchResponsePartitions(resp2.responseData(topicNames, version).keySet(), resp2.topicIds(), version); assertNull(issue2); FetchResponse resp3 = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, respMap(new RespEntry("foo", 0, fooId, 10, 20), - new RespEntry("foo", 1, fooId, 10, 20))); + new RespEntry("foo", 1, fooId, 10, 20)), + List.of()); String issue3 = handler.verifyFullFetchResponsePartitions(resp3.responseData(topicNames, version).keySet(), resp3.topicIds(), version); assertFalse(issue3.contains("extraPartitions=")); @@ -689,7 +703,8 @@ public class FetchSessionHandlerTest { FetchResponse resp1 = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, respMap(new RespEntry("foo", 0, topicIds.get("foo"), 10, 20), new RespEntry("extra2", 1, topicIds.get("extra2"), 10, 20), - new RespEntry("bar", 0, topicIds.get("bar"), 10, 20))); + new RespEntry("bar", 0, topicIds.get("bar"), 10, 20)), + List.of()); String issue = handler.verifyFullFetchResponsePartitions(resp1.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet(), resp1.topicIds(), ApiKeys.FETCH.latestVersion()); assertTrue(issue.contains("extraPartitions=")); @@ -703,14 +718,16 @@ public class FetchSessionHandlerTest { FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, respMap(new RespEntry("foo", 0, topicIds.get("foo"), 10, 20), new RespEntry("extra2", 1, topicIds.get("extra2"), 10, 20), - new RespEntry("bar", 0, topicIds.get("bar"), 10, 20))); + new RespEntry("bar", 0, topicIds.get("bar"), 10, 20)), + List.of()); String issue2 = handler.verifyFullFetchResponsePartitions(resp2.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet(), resp2.topicIds(), ApiKeys.FETCH.latestVersion()); assertTrue(issue2.contains("extraPartitions=")); assertFalse(issue2.contains("omittedPartitions=")); FetchResponse resp3 = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, respMap(new RespEntry("foo", 0, topicIds.get("foo"), 10, 20), - new RespEntry("bar", 0, topicIds.get("bar"), 10, 20))); + new RespEntry("bar", 0, topicIds.get("bar"), 10, 20)), + List.of()); String issue3 = handler.verifyFullFetchResponsePartitions(resp3.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet(), resp3.topicIds(), ApiKeys.FETCH.latestVersion()); assertNull(issue3); @@ -734,7 +751,8 @@ public class FetchSessionHandlerTest { FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, respMap(new RespEntry("foo", 0, topicIds.get("foo"), 10, 20), - new RespEntry("foo", 1, topicIds.get("foo"), 10, 20))); + new RespEntry("foo", 1, topicIds.get("foo"), 10, 20)), + List.of()); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Test an incremental fetch request which adds an ID unknown to the broker. @@ -749,7 +767,8 @@ public class FetchSessionHandlerTest { // Return and handle a response with a top level error FetchResponse resp2 = FetchResponse.of(Errors.UNKNOWN_TOPIC_ID, 0, 123, - respMap(new RespEntry("unknown", 0, Uuid.randomUuid(), Errors.UNKNOWN_TOPIC_ID))); + respMap(new RespEntry("unknown", 0, Uuid.randomUuid(), Errors.UNKNOWN_TOPIC_ID)), + List.of()); assertFalse(handler.handleResponse(resp2, ApiKeys.FETCH.latestVersion())); // Ensure we start with a new epoch. This will close the session in the next request. diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index deff7fff656..27a8cf6697e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1893,7 +1893,7 @@ public class KafkaConsumerTest { response.put(tp0, Errors.NONE); OffsetCommitResponse commitResponse = offsetCommitResponse(response); LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())); - FetchResponse closeResponse = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>()); + FetchResponse closeResponse = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(), List.of()); consumerCloseTest(groupProtocol, 5000, Arrays.asList(commitResponse, leaveGroupResponse, closeResponse), 0, false); } @@ -2950,7 +2950,7 @@ public class KafkaConsumerTest { .setLogStartOffset(logStartOffset) .setRecords(records)); } - return FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, tpResponses); + return FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, tpResponses, List.of()); } private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, int count) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index 7c8547ddd88..7d0325e8e5d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -1733,7 +1733,7 @@ public class FetchRequestManagerTest { .setPartitionIndex(tp0.partition()) .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code()) .setHighWatermark(100)); - client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions))); + client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of())); networkClientDelegate.poll(time.timer(0)); List<ConsumerRecord<byte[], byte[]>> allFetchedRecords = new ArrayList<>(); @@ -1794,7 +1794,7 @@ public class FetchRequestManagerTest { .setLastStableOffset(4) .setLogStartOffset(0) .setRecords(partialRecords)); - client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions))); + client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of())); networkClientDelegate.poll(time.timer(0)); List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new ArrayList<>(); @@ -1865,7 +1865,7 @@ public class FetchRequestManagerTest { .setPartitionIndex(tp1.partition()) .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code()) .setHighWatermark(100)); - client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions))); + client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of())); networkClientDelegate.poll(time.timer(0)); assertEquals(1, fetchRecords().get(tp0).size()); @@ -2113,7 +2113,7 @@ public class FetchRequestManagerTest { } assertEquals(1, sendFetches()); - client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, fetchPartitionData)); + client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, fetchPartitionData, List.of())); networkClientDelegate.poll(time.timer(0)); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetchRecords(); @@ -2185,7 +2185,7 @@ public class FetchRequestManagerTest { .setLogStartOffset(0)); assertEquals(1, sendFetches()); - client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions))); + client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of())); networkClientDelegate.poll(time.timer(0)); collectFetch(); @@ -2231,7 +2231,7 @@ public class FetchRequestManagerTest { .setLogStartOffset(0) .setRecords(MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("val".getBytes())))); - client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions))); + client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of())); networkClientDelegate.poll(time.timer(0)); collectFetch(); @@ -2758,7 +2758,7 @@ public class FetchRequestManagerTest { .setHighWatermark(100) .setLogStartOffset(0) .setRecords(emptyRecords)); - FetchResponse resp1 = FetchResponse.of(Errors.NONE, 0, 123, partitions1); + FetchResponse resp1 = FetchResponse.of(Errors.NONE, 0, 123, partitions1, List.of()); client.prepareResponse(resp1); assertEquals(1, sendFetches()); assertFalse(fetcher.hasCompletedFetches()); @@ -2784,7 +2784,7 @@ public class FetchRequestManagerTest { // The second response contains no new records. LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> partitions2 = new LinkedHashMap<>(); - FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 123, partitions2); + FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 123, partitions2, List.of()); client.prepareResponse(resp2); assertEquals(1, sendFetches()); networkClientDelegate.poll(time.timer(0)); @@ -2801,7 +2801,7 @@ public class FetchRequestManagerTest { .setLastStableOffset(4) .setLogStartOffset(0) .setRecords(nextRecords)); - FetchResponse resp3 = FetchResponse.of(Errors.NONE, 0, 123, partitions3); + FetchResponse resp3 = FetchResponse.of(Errors.NONE, 0, 123, partitions3, List.of()); client.prepareResponse(resp3); assertEquals(1, sendFetches()); networkClientDelegate.poll(time.timer(0)); @@ -3246,7 +3246,7 @@ public class FetchRequestManagerTest { .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) .setLogStartOffset(0) .setRecords(nextRecords)); - client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, partitions), nodeId0); + client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, partitions, List.of()), nodeId0); networkClientDelegate.poll(time.timer(0)); partitionRecords = fetchRecords(); assertFalse(partitionRecords.containsKey(tp0)); @@ -3851,7 +3851,7 @@ public class FetchRequestManagerTest { }); client.prepareResponseFrom( - FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, partitionDataMap), + FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, partitionDataMap, List.of()), node ); } @@ -3906,7 +3906,7 @@ public class FetchRequestManagerTest { .setPartitionIndex(tp.topicPartition().partition()) .setErrorCode(error.code()) .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)); - return FetchResponse.of(error, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions)); + return FetchResponse.of(error, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()); } private FetchResponse fullFetchResponseWithAbortedTransactions(MemoryRecords records, @@ -3924,7 +3924,7 @@ public class FetchRequestManagerTest { .setLogStartOffset(0) .setAbortedTransactions(abortedTransactions) .setRecords(records)); - return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions)); + return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()); } private FetchResponse fullFetchResponse(int sessionId, TopicIdPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) { @@ -3950,7 +3950,7 @@ public class FetchRequestManagerTest { .setLastStableOffset(lastStableOffset) .setLogStartOffset(0) .setRecords(records)); - return FetchResponse.of(Errors.NONE, throttleTime, sessionId, new LinkedHashMap<>(partitions)); + return FetchResponse.of(Errors.NONE, throttleTime, sessionId, new LinkedHashMap<>(partitions), List.of()); } private FetchResponse fullFetchResponse(TopicIdPartition tp, MemoryRecords records, Errors error, long hw, @@ -3964,7 +3964,7 @@ public class FetchRequestManagerTest { .setLogStartOffset(0) .setRecords(records) .setPreferredReadReplica(preferredReplicaId.orElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))); - return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions)); + return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()); } private FetchResponse fetchResponse(TopicIdPartition tp, MemoryRecords records, Errors error, long hw, @@ -3977,7 +3977,7 @@ public class FetchRequestManagerTest { .setLastStableOffset(lastStableOffset) .setLogStartOffset(logStartOffset) .setRecords(records)); - return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions)); + return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 45a747e04da..a09024fb144 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -1720,7 +1720,7 @@ public class FetcherTest { .setPartitionIndex(tp0.partition()) .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code()) .setHighWatermark(100)); - client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions))); + client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of())); consumerClient.poll(time.timer(0)); List<ConsumerRecord<byte[], byte[]>> allFetchedRecords = new ArrayList<>(); @@ -1781,7 +1781,7 @@ public class FetcherTest { .setLastStableOffset(4) .setLogStartOffset(0) .setRecords(partialRecords)); - client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions))); + client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of())); consumerClient.poll(time.timer(0)); List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new ArrayList<>(); @@ -1852,7 +1852,7 @@ public class FetcherTest { .setPartitionIndex(tp1.partition()) .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code()) .setHighWatermark(100)); - client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions))); + client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of())); consumerClient.poll(time.timer(0)); assertEquals(1, fetchRecords().get(tp0).size()); @@ -2100,7 +2100,7 @@ public class FetcherTest { } assertEquals(1, sendFetches()); - client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, fetchPartitionData)); + client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, fetchPartitionData, List.of())); consumerClient.poll(time.timer(0)); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetchRecords(); @@ -2172,7 +2172,7 @@ public class FetcherTest { .setLogStartOffset(0)); assertEquals(1, sendFetches()); - client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions))); + client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of())); consumerClient.poll(time.timer(0)); collectFetch(); @@ -2218,7 +2218,7 @@ public class FetcherTest { .setLogStartOffset(0) .setRecords(MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("val".getBytes())))); - client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions))); + client.prepareResponse(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of())); consumerClient.poll(time.timer(0)); collectFetch(); @@ -2745,7 +2745,7 @@ public class FetcherTest { .setHighWatermark(100) .setLogStartOffset(0) .setRecords(emptyRecords)); - FetchResponse resp1 = FetchResponse.of(Errors.NONE, 0, 123, partitions1); + FetchResponse resp1 = FetchResponse.of(Errors.NONE, 0, 123, partitions1, List.of()); client.prepareResponse(resp1); assertEquals(1, sendFetches()); assertFalse(fetcher.hasCompletedFetches()); @@ -2771,7 +2771,7 @@ public class FetcherTest { // The second response contains no new records. LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> partitions2 = new LinkedHashMap<>(); - FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 123, partitions2); + FetchResponse resp2 = FetchResponse.of(Errors.NONE, 0, 123, partitions2, List.of()); client.prepareResponse(resp2); assertEquals(1, sendFetches()); consumerClient.poll(time.timer(0)); @@ -2788,7 +2788,7 @@ public class FetcherTest { .setLastStableOffset(4) .setLogStartOffset(0) .setRecords(nextRecords)); - FetchResponse resp3 = FetchResponse.of(Errors.NONE, 0, 123, partitions3); + FetchResponse resp3 = FetchResponse.of(Errors.NONE, 0, 123, partitions3, List.of()); client.prepareResponse(resp3); assertEquals(1, sendFetches()); consumerClient.poll(time.timer(0)); @@ -2922,7 +2922,7 @@ public class FetcherTest { .setLogStartOffset(0) .setRecords(buildRecords(offset, 2, offset))); } - client.respondToRequest(request, FetchResponse.of(Errors.NONE, 0, 123, responseMap)); + client.respondToRequest(request, FetchResponse.of(Errors.NONE, 0, 123, responseMap, List.of())); consumerClient.poll(time.timer(0)); } } @@ -2985,7 +2985,7 @@ public class FetcherTest { .setLogStartOffset(0) .setRecords(buildRecords(nextOffset, 2, nextOffset))); nextOffset += 2; - client.respondToRequest(request, FetchResponse.of(Errors.NONE, 0, 123, responseMap)); + client.respondToRequest(request, FetchResponse.of(Errors.NONE, 0, 123, responseMap, List.of())); consumerClient.poll(time.timer(0)); } } @@ -3523,7 +3523,7 @@ public class FetcherTest { .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) .setLogStartOffset(0) .setRecords(nextRecords)); - client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, partitions), nodeId0); + client.prepareResponseFrom(FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, partitions, List.of()), nodeId0); consumerClient.poll(time.timer(0)); partitionRecords = fetchRecords(); assertFalse(partitionRecords.containsKey(tp0)); @@ -3689,7 +3689,7 @@ public class FetcherTest { .setPartitionIndex(tp.topicPartition().partition()) .setErrorCode(error.code()) .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)); - return FetchResponse.of(error, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions)); + return FetchResponse.of(error, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()); } private FetchResponse fullFetchResponseWithAbortedTransactions(MemoryRecords records, @@ -3707,7 +3707,7 @@ public class FetcherTest { .setLogStartOffset(0) .setAbortedTransactions(abortedTransactions) .setRecords(records)); - return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions)); + return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()); } private FetchResponse fullFetchResponse(int sessionId, TopicIdPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) { @@ -3733,7 +3733,7 @@ public class FetcherTest { .setLastStableOffset(lastStableOffset) .setLogStartOffset(0) .setRecords(records)); - return FetchResponse.of(Errors.NONE, throttleTime, sessionId, new LinkedHashMap<>(partitions)); + return FetchResponse.of(Errors.NONE, throttleTime, sessionId, new LinkedHashMap<>(partitions), List.of()); } private FetchResponse fullFetchResponse(TopicIdPartition tp, MemoryRecords records, Errors error, long hw, @@ -3747,7 +3747,7 @@ public class FetcherTest { .setLogStartOffset(0) .setRecords(records) .setPreferredReadReplica(preferredReplicaId.orElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))); - return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions)); + return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()); } private FetchResponse fetchResponse(TopicIdPartition tp, MemoryRecords records, Errors error, long hw, @@ -3760,7 +3760,7 @@ public class FetcherTest { .setLastStableOffset(lastStableOffset) .setLogStartOffset(logStartOffset) .setRecords(records)); - return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions)); + return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions), List.of()); } /** diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 7a3be68ff78..09433a39239 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -545,7 +545,7 @@ public class RequestResponseTest { .setHighWatermark(1000000) .setLogStartOffset(-1) .setRecords(records)); - FetchResponse idTestResponse = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, idResponseData); + FetchResponse idTestResponse = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, idResponseData, List.of()); FetchResponse v12Deserialized = FetchResponse.parse(idTestResponse.serialize((short) 12), (short) 12); FetchResponse newestDeserialized = FetchResponse.parse(idTestResponse.serialize(FETCH.latestVersion()), FETCH.latestVersion()); assertTrue(v12Deserialized.topicIds().isEmpty()); @@ -586,7 +586,7 @@ public class RequestResponseTest { .setLastStableOffset(6) .setRecords(records)); - FetchResponse response = FetchResponse.of(Errors.NONE, 10, INVALID_SESSION_ID, responseData); + FetchResponse response = FetchResponse.of(Errors.NONE, 10, INVALID_SESSION_ID, responseData, List.of()); FetchResponse deserialized = FetchResponse.parse(response.serialize((short) 4), (short) 4); assertEquals(responseData.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().topicPartition(), Map.Entry::getValue)), deserialized.responseData(topicNames, (short) 4)); @@ -613,7 +613,7 @@ public class RequestResponseTest { TopicIdPartition topicIdPartition = new TopicIdPartition(id, new TopicPartition("test", 0)); LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> tpToData = new LinkedHashMap<>(Map.of(topicIdPartition, partitionData)); - fetchResponse = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, tpToData); + fetchResponse = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, tpToData, List.of()); validateNoNullRecords(fetchResponse); } @@ -2025,7 +2025,7 @@ public class RequestResponseTest { private FetchResponse createFetchResponse(Errors error, int sessionId) { return FetchResponse.parse( - FetchResponse.of(error, 25, sessionId, new LinkedHashMap<>()).serialize(FETCH.latestVersion()), FETCH.latestVersion()); + FetchResponse.of(error, 25, sessionId, new LinkedHashMap<>(), List.of()).serialize(FETCH.latestVersion()), FETCH.latestVersion()); } private FetchResponse createFetchResponse(int sessionId) { @@ -2047,7 +2047,7 @@ public class RequestResponseTest { .setAbortedTransactions(abortedTransactions) .setRecords(MemoryRecords.EMPTY)); return FetchResponse.parse(FetchResponse.of(Errors.NONE, 25, sessionId, - responseData).serialize(FETCH.latestVersion()), FETCH.latestVersion()); + responseData, List.of()).serialize(FETCH.latestVersion()), FETCH.latestVersion()); } private FetchResponse createFetchResponse(boolean includeAborted) { @@ -2072,7 +2072,7 @@ public class RequestResponseTest { .setAbortedTransactions(abortedTransactions) .setRecords(MemoryRecords.EMPTY)); return FetchResponse.parse(FetchResponse.of(Errors.NONE, 25, INVALID_SESSION_ID, - responseData).serialize(FETCH.latestVersion()), FETCH.latestVersion()); + responseData, List.of()).serialize(FETCH.latestVersion()), FETCH.latestVersion()); } private FetchResponse createFetchResponse(short version) { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 4c8ae299448..cd0dd12d30e 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -9658,7 +9658,7 @@ class KafkaApisTest extends Logging { topicIds.put(tp.topicPartition.topic, tp.topicId) topicNames.put(tp.topicId, tp.topicPartition.topic) } - FetchResponse.of(Errors.NONE, 100, 100, responseData) + FetchResponse.of(Errors.NONE, 100, 100, responseData, List.empty.asJava) } val throttledPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("throttledData", 0)) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 2bbea6747c9..91aa1d5c978 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -646,7 +646,7 @@ class ReplicaFetcherThreadTest { responseData.put(tid1p0, new FetchResponseData.PartitionData()) responseData.put(tid1p1, new FetchResponseData.PartitionData()) responseData.put(tid2p1, new FetchResponseData.PartitionData()) - val fetchResponse = FetchResponse.of(Errors.NONE, 0, 123, responseData) + val fetchResponse = FetchResponse.of(Errors.NONE, 0, 123, responseData, List.empty.asJava) leader.fetchSessionHandler.handleResponse(fetchResponse, ApiKeys.FETCH.latestVersion()) diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala b/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala index 47b33e0dd7b..297348ed790 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala @@ -116,7 +116,8 @@ class MockBlockingSender(offsets: java.util.Map[TopicPartition, EpochEndOffset], topicIds = Map.empty FetchResponse.of(Errors.NONE, 0, if (partitionData.isEmpty) JFetchMetadata.INVALID_SESSION_ID else 1, - partitionData) + partitionData, List.empty.asJava + ) case ApiKeys.LIST_OFFSETS => listOffsetsCount += 1 diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java index adbb8a5b0d0..f433368e8a4 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchResponseBenchmark.java @@ -48,6 +48,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -103,13 +104,13 @@ public class FetchResponseBenchmark { } this.header = new ResponseHeader(100, ApiKeys.FETCH.responseHeaderVersion(ApiKeys.FETCH.latestVersion())); - this.fetchResponse = FetchResponse.of(Errors.NONE, 0, 0, responseData); + this.fetchResponse = FetchResponse.of(Errors.NONE, 0, 0, responseData, List.of()); this.fetchResponseData = this.fetchResponse.data(); } @Benchmark public int testConstructFetchResponse() { - FetchResponse fetchResponse = FetchResponse.of(Errors.NONE, 0, 0, responseData); + FetchResponse fetchResponse = FetchResponse.of(Errors.NONE, 0, 0, responseData, List.of()); return fetchResponse.data().responses().size(); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index ae7a44a4401..c1650690851 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -220,7 +220,7 @@ public class ReplicaFetcherThreadBenchmark { // so that we do not measure this time as part of the steady state work fetcher.doWork(); // handle response to engage the incremental fetch session handler - ((RemoteLeaderEndPoint) fetcher.leader()).fetchSessionHandler().handleResponse(FetchResponse.of(Errors.NONE, 0, 999, initialFetched), ApiKeys.FETCH.latestVersion()); + ((RemoteLeaderEndPoint) fetcher.leader()).fetchSessionHandler().handleResponse(FetchResponse.of(Errors.NONE, 0, 999, initialFetched, List.of()), ApiKeys.FETCH.latestVersion()); } @TearDown(Level.Trial) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java index 11d584446f4..38da51f699c 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java @@ -44,6 +44,7 @@ import org.openjdk.jmh.annotations.Warmup; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -93,7 +94,7 @@ public class FetchSessionBenchmark { } builder.build(); // build and handle an initial response so that the next fetch will be incremental - handler.handleResponse(FetchResponse.of(Errors.NONE, 0, 1, respMap), ApiKeys.FETCH.latestVersion()); + handler.handleResponse(FetchResponse.of(Errors.NONE, 0, 1, respMap, List.of()), ApiKeys.FETCH.latestVersion()); int counter = 0; for (TopicPartition topicPartition: new ArrayList<>(fetches.keySet())) {