This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c9afd2d  MINOR: refactor FetchResponse#toMessage to avoid creating 
unnecessary collections (#9818)
c9afd2d is described below

commit c9afd2db01415b49d572f2f257bb83a243d249e3
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Tue Jan 5 10:32:04 2021 +0800

    MINOR: refactor FetchResponse#toMessage to avoid creating unnecessary 
collections (#9818)
    
    Reviewers: Ismael Juma <[email protected]>
---
 .../apache/kafka/common/requests/FetchRequest.java | 25 --------------
 .../kafka/common/requests/FetchResponse.java       | 39 ++++++++++++----------
 2 files changed, 21 insertions(+), 43 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 950b03e..591c6e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.Utils;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -144,30 +143,6 @@ public class FetchRequest extends AbstractRequest {
         return result;
     }
 
-    static final class TopicAndPartitionData<T> {
-        public final String topic;
-        public final LinkedHashMap<Integer, T> partitions;
-
-        public TopicAndPartitionData(String topic) {
-            this.topic = topic;
-            this.partitions = new LinkedHashMap<>();
-        }
-
-        public static <T> List<TopicAndPartitionData<T>> 
batchByTopic(Iterator<Map.Entry<TopicPartition, T>> iter) {
-            List<TopicAndPartitionData<T>> topics = new ArrayList<>();
-            while (iter.hasNext()) {
-                Map.Entry<TopicPartition, T> topicEntry = iter.next();
-                String topic = topicEntry.getKey().topic();
-                int partition = topicEntry.getKey().partition();
-                T partitionData = topicEntry.getValue();
-                if (topics.isEmpty() || !topics.get(topics.size() - 
1).topic.equals(topic))
-                    topics.add(new TopicAndPartitionData<T>(topic));
-                topics.get(topics.size() - 1).partitions.put(partition, 
partitionData);
-            }
-            return topics;
-        }
-    }
-
     public static class Builder extends AbstractRequest.Builder<FetchRequest> {
         private final int maxWait;
         private final int minBytes;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 30b462b..4e8dce5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -331,28 +331,31 @@ public class FetchResponse<T extends BaseRecords> extends 
AbstractResponse {
     private static <T extends BaseRecords> FetchResponseData toMessage(int 
throttleTimeMs, Errors error,
                                                                        
Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator,
                                                                        int 
sessionId) {
-        FetchResponseData message = new FetchResponseData();
-        message.setThrottleTimeMs(throttleTimeMs);
-        message.setErrorCode(error.code());
-        message.setSessionId(sessionId);
-
         List<FetchResponseData.FetchableTopicResponse> topicResponseList = new 
ArrayList<>();
-        List<FetchRequest.TopicAndPartitionData<PartitionData<T>>> topicsData =
-                FetchRequest.TopicAndPartitionData.batchByTopic(partIterator);
-        topicsData.forEach(partitionDataTopicAndPartitionData -> {
-            List<FetchResponseData.FetchablePartitionResponse> 
partitionResponses = new ArrayList<>();
-            
partitionDataTopicAndPartitionData.partitions.forEach((partitionId, 
partitionData) -> {
-                // Since PartitionData alone doesn't know the partition ID, we 
set it here
-                partitionData.partitionResponse.setPartition(partitionId);
+        partIterator.forEachRemaining(entry -> {
+            PartitionData<T> partitionData = entry.getValue();
+            // Since PartitionData alone doesn't know the partition ID, we set 
it here
+            
partitionData.partitionResponse.setPartition(entry.getKey().partition());
+            // We have to keep the order of input topic-partition. Hence, we 
batch the partitions only if the last
+            // batch is in the same topic group.
+            FetchResponseData.FetchableTopicResponse previousTopic = 
topicResponseList.isEmpty() ? null
+                    : topicResponseList.get(topicResponseList.size() - 1);
+            if (previousTopic != null && 
previousTopic.topic().equals(entry.getKey().topic()))
+                
previousTopic.partitionResponses().add(partitionData.partitionResponse);
+            else {
+                List<FetchResponseData.FetchablePartitionResponse> 
partitionResponses = new ArrayList<>();
                 partitionResponses.add(partitionData.partitionResponse);
-            });
-            topicResponseList.add(new 
FetchResponseData.FetchableTopicResponse()
-                .setTopic(partitionDataTopicAndPartitionData.topic)
-                .setPartitionResponses(partitionResponses));
+                topicResponseList.add(new 
FetchResponseData.FetchableTopicResponse()
+                        .setTopic(entry.getKey().topic())
+                        .setPartitionResponses(partitionResponses));
+            }
         });
 
-        message.setResponses(topicResponseList);
-        return message;
+        return new FetchResponseData()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setErrorCode(error.code())
+                .setSessionId(sessionId)
+                .setResponses(topicResponseList);
     }
 
     /**

Reply via email to