This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new bee6c41ff87 KAFKA-15353: make sure AlterPartitionRequest.build() is
idempotent (#14236)
new b6da42319b1 Merge branch '3.6' of https://github.com/apache/kafka into
3.6
bee6c41ff87 is described below
commit bee6c41ff875c0a59028bab169cc0a2a9190fa7b
Author: Calvin Liu <[email protected]>
AuthorDate: Mon Aug 28 02:59:48 2023 -0700
KAFKA-15353: make sure AlterPartitionRequest.build() is idempotent (#14236)
As described in https://issues.apache.org/jira/browse/KAFKA-15353
When the AlterPartitionRequest version is < 3 and its builder.build is
called multiple times, both newIsrWithEpochs and newIsr will all be empty. This
can happen if the sender retires on errors.
Reviewers: Luke Chen <[email protected]>
---
.../kafka/common/requests/AlterPartitionRequest.java | 16 ++++++++++------
.../kafka/common/requests/AlterPartitionRequestTest.java | 13 +++++++++++++
2 files changed, 23 insertions(+), 6 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
index 2b150508292..45f85a6d367 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
@@ -85,12 +85,16 @@ public class AlterPartitionRequest extends AbstractRequest {
if (version < 3) {
data.topics().forEach(topicData -> {
topicData.partitions().forEach(partitionData -> {
- List<Integer> newIsr = new
ArrayList<>(partitionData.newIsrWithEpochs().size());
- partitionData.newIsrWithEpochs().forEach(brokerState
-> {
- newIsr.add(brokerState.brokerId());
- });
- partitionData.setNewIsr(newIsr);
-
partitionData.setNewIsrWithEpochs(Collections.emptyList());
+ // The newIsrWithEpochs will be empty after build.
Then we can skip the conversion if the build
+ // is called again.
+ if (partitionData.newIsrWithEpochs().size() > 0) {
+ List<Integer> newIsr = new
ArrayList<>(partitionData.newIsrWithEpochs().size());
+
partitionData.newIsrWithEpochs().forEach(brokerState -> {
+ newIsr.add(brokerState.brokerId());
+ });
+ partitionData.setNewIsr(newIsr);
+
partitionData.setNewIsrWithEpochs(Collections.emptyList());
+ }
});
});
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
index 5b0231ca882..8b346eea735 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
@@ -72,5 +72,18 @@ class AlterPartitionRequestTest {
assertEquals(newIsrWithBrokerEpoch,
partitionData.newIsrWithEpochs());
assertTrue(partitionData.newIsr().isEmpty());
}
+
+ // Build the request again to make sure build() is idempotent.
+ alterPartitionRequest = builder.build(version);
+ assertEquals(1, alterPartitionRequest.data().topics().size());
+ assertEquals(1,
alterPartitionRequest.data().topics().get(0).partitions().size());
+ alterPartitionRequest.data().topics().get(0).partitions().get(0);
+ if (version < 3) {
+ assertEquals(Arrays.asList(1, 2, 3), partitionData.newIsr());
+ assertTrue(partitionData.newIsrWithEpochs().isEmpty());
+ } else {
+ assertEquals(newIsrWithBrokerEpoch,
partitionData.newIsrWithEpochs());
+ assertTrue(partitionData.newIsr().isEmpty());
+ }
}
}