jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1124937664


##########
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java:
##########
@@ -80,45 +105,44 @@ topicName, new 
AddPartitionsToTxnPartitionResultCollection()
         AddPartitionsToTxnTopicResultCollection topicCollection = new 
AddPartitionsToTxnTopicResultCollection();
         for (Map.Entry<String, AddPartitionsToTxnPartitionResultCollection> 
entry : resultMap.entrySet()) {
             topicCollection.add(new AddPartitionsToTxnTopicResult()
-                                    .setName(entry.getKey())
-                                    .setResults(entry.getValue()));
+                .setName(entry.getKey())
+                .setResultsByPartition(entry.getValue()));
         }
-
-        this.data = new AddPartitionsToTxnResponseData()
-                        .setThrottleTimeMs(throttleTimeMs)
-                        .setResults(topicCollection);
+        return topicCollection;
     }
 
-    @Override
-    public int throttleTimeMs() {
-        return data.throttleTimeMs();
+    public static AddPartitionsToTxnResult resultForTransaction(String 
transactionalId, Map<TopicPartition, Errors> errors) {
+        return new 
AddPartitionsToTxnResult().setTransactionalId(transactionalId).setTopicResults(topicCollectionForErrors(errors));
     }
 
-    @Override
-    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
-        data.setThrottleTimeMs(throttleTimeMs);
+    public AddPartitionsToTxnTopicResultCollection 
getTransactionTopicResults(String transactionalId) {
+        return 
data.resultsByTransaction().find(transactionalId).topicResults();
     }
 
-    public Map<TopicPartition, Errors> errors() {
-        if (cachedErrorsMap != null) {
-            return cachedErrorsMap;
-        }
-
-        cachedErrorsMap = new HashMap<>();
-
-        for (AddPartitionsToTxnTopicResult topicResult : this.data.results()) {
-            for (AddPartitionsToTxnPartitionResult partitionResult : 
topicResult.results()) {
-                cachedErrorsMap.put(new TopicPartition(
-                        topicResult.name(), partitionResult.partitionIndex()),
-                    Errors.forCode(partitionResult.errorCode()));
+    public static Map<TopicPartition, Errors> 
errorsForTransaction(AddPartitionsToTxnTopicResultCollection topicCollection) {
+        Map<TopicPartition, Errors> topicResults = new HashMap<>();
+        for (AddPartitionsToTxnTopicResult topicResult : topicCollection) {
+            for (AddPartitionsToTxnPartitionResult partitionResult : 
topicResult.resultsByPartition()) {
+                topicResults.put(
+                    new TopicPartition(topicResult.name(), 
partitionResult.partitionIndex()), 
Errors.forCode(partitionResult.partitionErrorCode()));
             }
         }
-        return cachedErrorsMap;
+        return topicResults;
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(errors().values());
+        List<Errors> allErrors = new ArrayList<>();
+
+        // If we are not using this field, we have request 4 or later
+        if (this.data.resultsByTopicV3AndBelow().size() == 0) {
+            allErrors.add(Errors.forCode(data.errorCode()));

Review Comment:
   I create allErrors because I use addAll for the individual transactions. I 
can place this code after I create error counts, but it doesn't really seem to 
accomplish much. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to