This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b41b2dfcf2f KAFKA-15353: make sure AlterPartitionRequest.build() is
idempotent (#14236)
b41b2dfcf2f is described below
commit b41b2dfcf2f0f9e458374fb9b0842bcc8739f130
Author: Calvin Liu <[email protected]>
AuthorDate: Mon Aug 28 02:59:48 2023 -0700
KAFKA-15353: make sure AlterPartitionRequest.build() is idempotent (#14236)
As described in https://issues.apache.org/jira/browse/KAFKA-15353
When the AlterPartitionRequest version is < 3 and its builder.build is
called multiple times, both newIsrWithEpochs and newIsr will all be empty. This
can happen if the sender retires on errors.
Reviewers: Luke Chen <[email protected]>
---
.../kafka/common/requests/AlterPartitionRequest.java | 16 ++++++++++------
.../kafka/common/requests/AlterPartitionRequestTest.java | 13 +++++++++++++
2 files changed, 23 insertions(+), 6 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
index 2b150508292..45f85a6d367 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
@@ -85,12 +85,16 @@ public class AlterPartitionRequest extends AbstractRequest {
if (version < 3) {
data.topics().forEach(topicData -> {
topicData.partitions().forEach(partitionData -> {
- List<Integer> newIsr = new
ArrayList<>(partitionData.newIsrWithEpochs().size());
- partitionData.newIsrWithEpochs().forEach(brokerState
-> {
- newIsr.add(brokerState.brokerId());
- });
- partitionData.setNewIsr(newIsr);
-
partitionData.setNewIsrWithEpochs(Collections.emptyList());
+ // The newIsrWithEpochs will be empty after build.
Then we can skip the conversion if the build
+ // is called again.
+ if (partitionData.newIsrWithEpochs().size() > 0) {
+ List<Integer> newIsr = new
ArrayList<>(partitionData.newIsrWithEpochs().size());
+
partitionData.newIsrWithEpochs().forEach(brokerState -> {
+ newIsr.add(brokerState.brokerId());
+ });
+ partitionData.setNewIsr(newIsr);
+
partitionData.setNewIsrWithEpochs(Collections.emptyList());
+ }
});
});
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
index 5b0231ca882..8b346eea735 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
@@ -72,5 +72,18 @@ class AlterPartitionRequestTest {
assertEquals(newIsrWithBrokerEpoch,
partitionData.newIsrWithEpochs());
assertTrue(partitionData.newIsr().isEmpty());
}
+
+ // Build the request again to make sure build() is idempotent.
+ alterPartitionRequest = builder.build(version);
+ assertEquals(1, alterPartitionRequest.data().topics().size());
+ assertEquals(1,
alterPartitionRequest.data().topics().get(0).partitions().size());
+ alterPartitionRequest.data().topics().get(0).partitions().get(0);
+ if (version < 3) {
+ assertEquals(Arrays.asList(1, 2, 3), partitionData.newIsr());
+ assertTrue(partitionData.newIsrWithEpochs().isEmpty());
+ } else {
+ assertEquals(newIsrWithBrokerEpoch,
partitionData.newIsrWithEpochs());
+ assertTrue(partitionData.newIsr().isEmpty());
+ }
}
}