This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c9afd2d MINOR: refactor FetchResponse#toMessage to avoid creating
unnecessary collections (#9818)
c9afd2d is described below
commit c9afd2db01415b49d572f2f257bb83a243d249e3
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Tue Jan 5 10:32:04 2021 +0800
MINOR: refactor FetchResponse#toMessage to avoid creating unnecessary
collections (#9818)
Reviewers: Ismael Juma <[email protected]>
---
.../apache/kafka/common/requests/FetchRequest.java | 25 --------------
.../kafka/common/requests/FetchResponse.java | 39 ++++++++++++----------
2 files changed, 21 insertions(+), 43 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 950b03e..591c6e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -144,30 +143,6 @@ public class FetchRequest extends AbstractRequest {
return result;
}
- static final class TopicAndPartitionData<T> {
- public final String topic;
- public final LinkedHashMap<Integer, T> partitions;
-
- public TopicAndPartitionData(String topic) {
- this.topic = topic;
- this.partitions = new LinkedHashMap<>();
- }
-
- public static <T> List<TopicAndPartitionData<T>>
batchByTopic(Iterator<Map.Entry<TopicPartition, T>> iter) {
- List<TopicAndPartitionData<T>> topics = new ArrayList<>();
- while (iter.hasNext()) {
- Map.Entry<TopicPartition, T> topicEntry = iter.next();
- String topic = topicEntry.getKey().topic();
- int partition = topicEntry.getKey().partition();
- T partitionData = topicEntry.getValue();
- if (topics.isEmpty() || !topics.get(topics.size() -
1).topic.equals(topic))
- topics.add(new TopicAndPartitionData<T>(topic));
- topics.get(topics.size() - 1).partitions.put(partition,
partitionData);
- }
- return topics;
- }
- }
-
public static class Builder extends AbstractRequest.Builder<FetchRequest> {
private final int maxWait;
private final int minBytes;
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 30b462b..4e8dce5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -331,28 +331,31 @@ public class FetchResponse<T extends BaseRecords> extends
AbstractResponse {
private static <T extends BaseRecords> FetchResponseData toMessage(int
throttleTimeMs, Errors error,
Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator,
int
sessionId) {
- FetchResponseData message = new FetchResponseData();
- message.setThrottleTimeMs(throttleTimeMs);
- message.setErrorCode(error.code());
- message.setSessionId(sessionId);
-
List<FetchResponseData.FetchableTopicResponse> topicResponseList = new
ArrayList<>();
- List<FetchRequest.TopicAndPartitionData<PartitionData<T>>> topicsData =
- FetchRequest.TopicAndPartitionData.batchByTopic(partIterator);
- topicsData.forEach(partitionDataTopicAndPartitionData -> {
- List<FetchResponseData.FetchablePartitionResponse>
partitionResponses = new ArrayList<>();
-
partitionDataTopicAndPartitionData.partitions.forEach((partitionId,
partitionData) -> {
- // Since PartitionData alone doesn't know the partition ID, we
set it here
- partitionData.partitionResponse.setPartition(partitionId);
+ partIterator.forEachRemaining(entry -> {
+ PartitionData<T> partitionData = entry.getValue();
+ // Since PartitionData alone doesn't know the partition ID, we set
it here
+
partitionData.partitionResponse.setPartition(entry.getKey().partition());
+ // We have to keep the order of input topic-partition. Hence, we
batch the partitions only if the last
+ // batch is in the same topic group.
+ FetchResponseData.FetchableTopicResponse previousTopic =
topicResponseList.isEmpty() ? null
+ : topicResponseList.get(topicResponseList.size() - 1);
+ if (previousTopic != null &&
previousTopic.topic().equals(entry.getKey().topic()))
+
previousTopic.partitionResponses().add(partitionData.partitionResponse);
+ else {
+ List<FetchResponseData.FetchablePartitionResponse>
partitionResponses = new ArrayList<>();
partitionResponses.add(partitionData.partitionResponse);
- });
- topicResponseList.add(new
FetchResponseData.FetchableTopicResponse()
- .setTopic(partitionDataTopicAndPartitionData.topic)
- .setPartitionResponses(partitionResponses));
+ topicResponseList.add(new
FetchResponseData.FetchableTopicResponse()
+ .setTopic(entry.getKey().topic())
+ .setPartitionResponses(partitionResponses));
+ }
});
- message.setResponses(topicResponseList);
- return message;
+ return new FetchResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(error.code())
+ .setSessionId(sessionId)
+ .setResponses(topicResponseList);
}
/**