This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b41b2dfcf2f KAFKA-15353: make sure AlterPartitionRequest.build() is 
idempotent (#14236)
b41b2dfcf2f is described below

commit b41b2dfcf2f0f9e458374fb9b0842bcc8739f130
Author: Calvin Liu <[email protected]>
AuthorDate: Mon Aug 28 02:59:48 2023 -0700

    KAFKA-15353: make sure AlterPartitionRequest.build() is idempotent (#14236)
    
    As described in https://issues.apache.org/jira/browse/KAFKA-15353
    When the AlterPartitionRequest version is < 3 and its builder.build is 
called multiple times, both newIsrWithEpochs and newIsr will all be empty. This 
can happen if the sender retires on errors.
    
    Reviewers: Luke Chen <[email protected]>
---
 .../kafka/common/requests/AlterPartitionRequest.java     | 16 ++++++++++------
 .../kafka/common/requests/AlterPartitionRequestTest.java | 13 +++++++++++++
 2 files changed, 23 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
index 2b150508292..45f85a6d367 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
@@ -85,12 +85,16 @@ public class AlterPartitionRequest extends AbstractRequest {
             if (version < 3) {
                 data.topics().forEach(topicData -> {
                     topicData.partitions().forEach(partitionData -> {
-                        List<Integer> newIsr = new 
ArrayList<>(partitionData.newIsrWithEpochs().size());
-                        partitionData.newIsrWithEpochs().forEach(brokerState 
-> {
-                            newIsr.add(brokerState.brokerId());
-                        });
-                        partitionData.setNewIsr(newIsr);
-                        
partitionData.setNewIsrWithEpochs(Collections.emptyList());
+                        // The newIsrWithEpochs will be empty after build. 
Then we can skip the conversion if the build
+                        // is called again.
+                        if (partitionData.newIsrWithEpochs().size() > 0) {
+                            List<Integer> newIsr = new 
ArrayList<>(partitionData.newIsrWithEpochs().size());
+                            
partitionData.newIsrWithEpochs().forEach(brokerState -> {
+                                newIsr.add(brokerState.brokerId());
+                            });
+                            partitionData.setNewIsr(newIsr);
+                            
partitionData.setNewIsrWithEpochs(Collections.emptyList());
+                        }
                     });
                 });
             }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
index 5b0231ca882..8b346eea735 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
@@ -72,5 +72,18 @@ class AlterPartitionRequestTest {
             assertEquals(newIsrWithBrokerEpoch, 
partitionData.newIsrWithEpochs());
             assertTrue(partitionData.newIsr().isEmpty());
         }
+
+        // Build the request again to make sure build() is idempotent.
+        alterPartitionRequest = builder.build(version);
+        assertEquals(1, alterPartitionRequest.data().topics().size());
+        assertEquals(1, 
alterPartitionRequest.data().topics().get(0).partitions().size());
+        alterPartitionRequest.data().topics().get(0).partitions().get(0);
+        if (version < 3) {
+            assertEquals(Arrays.asList(1, 2, 3), partitionData.newIsr());
+            assertTrue(partitionData.newIsrWithEpochs().isEmpty());
+        } else {
+            assertEquals(newIsrWithBrokerEpoch, 
partitionData.newIsrWithEpochs());
+            assertTrue(partitionData.newIsr().isEmpty());
+        }
     }
 }

Reply via email to