This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new e20c228 MINOR: Re-implement NewPartitionReassignment#of() (#7592)
e20c228 is described below
commit e20c228b98f87cfe2eaa34078f53953be587e900
Author: Stanislav Kozlovski <[email protected]>
AuthorDate: Thu Oct 24 23:23:54 2019 +0100
MINOR: Re-implement NewPartitionReassignment#of() (#7592)
Re-implement NewPartitionReassignment#of. It now takes a list rather than
a variable-length list of arguments.
Reviewers: Colin P. McCabe <[email protected]>, Vikas Singh
<[email protected]>
(cherry picked from commit 28ef7f1d6d7f4cec7e81b8d0641debebefec9104)
---
.../clients/admin/NewPartitionReassignment.java | 12 ++++++++-
.../kafka/clients/admin/KafkaAdminClientTest.java | 12 +++------
.../kafka/api/AdminClientIntegrationTest.scala | 31 +++++++++++-----------
.../admin/ReassignPartitionsClusterTest.scala | 2 +-
4 files changed, 31 insertions(+), 26 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
index 0f7a61c..111514a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* A new partition reassignment, which can be applied via {@link
AdminClient#alterPartitionReassignments(Map,
AlterPartitionReassignmentsOptions)}.
@@ -28,7 +29,16 @@ import java.util.Map;
public class NewPartitionReassignment {
private final List<Integer> targetReplicas;
- public NewPartitionReassignment(List<Integer> targetReplicas) {
+ /**
+ * @throws IllegalArgumentException if no replicas are supplied
+ */
+ public static Optional<NewPartitionReassignment> of(List<Integer>
replicas) {
+ if (replicas == null || replicas.size() == 0)
+ throw new IllegalArgumentException("Cannot create a new partition
reassignment without any replicas");
+ return Optional.of(new NewPartitionReassignment(replicas));
+ }
+
+ private NewPartitionReassignment(List<Integer> targetReplicas) {
this.targetReplicas = Collections.unmodifiableList(new
ArrayList<>(targetReplicas));
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index eac7063..cf50c8f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -2011,7 +2011,7 @@ public class KafkaAdminClientTest {
TopicPartition tp2 = new TopicPartition("B", 0);
Map<TopicPartition, Optional<NewPartitionReassignment>>
reassignments = new HashMap<>();
reassignments.put(tp1, Optional.empty());
- reassignments.put(tp2, newPartitionReassignment(Arrays.asList(1,
2, 3)));
+ reassignments.put(tp2,
NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
// 1. server returns less responses than number of partitions we
sent
AlterPartitionReassignmentsResponseData responseData1 = new
AlterPartitionReassignmentsResponseData();
@@ -2102,9 +2102,9 @@ public class KafkaAdminClientTest {
TopicPartition invalidTopicTP = new TopicPartition("", 0);
TopicPartition invalidPartitionTP = new TopicPartition("ABC", -1);
Map<TopicPartition, Optional<NewPartitionReassignment>>
invalidTopicReassignments = new HashMap<>();
- invalidTopicReassignments.put(invalidPartitionTP,
newPartitionReassignment(Arrays.asList(1, 2, 3)));
- invalidTopicReassignments.put(invalidTopicTP,
newPartitionReassignment(Arrays.asList(1, 2, 3)));
- invalidTopicReassignments.put(tp1,
newPartitionReassignment(Arrays.asList(1, 2, 3)));
+ invalidTopicReassignments.put(invalidPartitionTP,
NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
+ invalidTopicReassignments.put(invalidTopicTP,
NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
+ invalidTopicReassignments.put(tp1,
NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
AlterPartitionReassignmentsResponseData singlePartResponseData =
new AlterPartitionReassignmentsResponseData()
@@ -2271,8 +2271,4 @@ public class KafkaAdminClientTest {
}
}
}
-
- private static Optional<NewPartitionReassignment>
newPartitionReassignment(List<Integer> targetReplicas) {
- return Optional.of(new NewPartitionReassignment(targetReplicas));
- }
}
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index eb15529..bed6900 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -2018,37 +2018,36 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
val tp1 = new TopicPartition(topic, 0)
val tp2 = new TopicPartition(topic, 1)
val tp3 = new TopicPartition(topic, 2)
- val tp4 = new TopicPartition(topic, 3)
createTopic(topic, numPartitions = 4)
- val validAssignment = new NewPartitionReassignment((0 until
brokerCount).map(_.asInstanceOf[Integer]).asJava)
+
+ val validAssignment = NewPartitionReassignment.of(
+ (0 until brokerCount).map(_.asInstanceOf[Integer]).asJava
+ )
val nonExistentTp1 = new TopicPartition("topicA", 0)
val nonExistentTp2 = new TopicPartition(topic, 4)
val nonExistentPartitionsResult = client.alterPartitionReassignments(Map(
- tp1 -> java.util.Optional.of(validAssignment),
- tp2 -> java.util.Optional.of(validAssignment),
- tp3 -> java.util.Optional.of(validAssignment),
- nonExistentTp1 -> java.util.Optional.of(validAssignment),
- nonExistentTp2 -> java.util.Optional.of(validAssignment)
+ tp1 -> validAssignment,
+ tp2 -> validAssignment,
+ tp3 -> validAssignment,
+ nonExistentTp1 -> validAssignment,
+ nonExistentTp2 -> validAssignment
).asJava).values()
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp1),
classOf[UnknownTopicOrPartitionException])
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp2),
classOf[UnknownTopicOrPartitionException])
- val extraNonExistentReplica = new NewPartitionReassignment((0 until
brokerCount + 1).map(_.asInstanceOf[Integer]).asJava)
- val negativeIdReplica = new NewPartitionReassignment(Seq(-3, -2,
-1).map(_.asInstanceOf[Integer]).asJava)
- val duplicateReplica = new NewPartitionReassignment(Seq(0, 1,
1).map(_.asInstanceOf[Integer]).asJava)
- val noReplicas = new
NewPartitionReassignment(Seq().map(_.asInstanceOf[Integer]).asJava)
+ val extraNonExistentReplica = NewPartitionReassignment.of((0 until
brokerCount + 1).map(_.asInstanceOf[Integer]).asJava)
+ val negativeIdReplica = NewPartitionReassignment.of(Seq(-3, -2,
-1).map(_.asInstanceOf[Integer]).asJava)
+ val duplicateReplica = NewPartitionReassignment.of(Seq(0, 1,
1).map(_.asInstanceOf[Integer]).asJava)
val invalidReplicaResult = client.alterPartitionReassignments(Map(
- tp1 -> java.util.Optional.of(extraNonExistentReplica),
- tp2 -> java.util.Optional.of(negativeIdReplica),
- tp3 -> java.util.Optional.of(duplicateReplica),
- tp4 -> java.util.Optional.of(noReplicas)
+ tp1 -> extraNonExistentReplica,
+ tp2 -> negativeIdReplica,
+ tp3 -> duplicateReplica
).asJava).values()
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp1),
classOf[InvalidReplicaAssignmentException])
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp2),
classOf[InvalidReplicaAssignmentException])
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3),
classOf[InvalidReplicaAssignmentException])
- assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp4),
classOf[InvalidReplicaAssignmentException])
}
@Test
diff --git
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 10ed6bb..a84c611 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -1182,7 +1182,7 @@ class ReassignPartitionsClusterTest extends
ZooKeeperTestHarness with Logging {
}.mkString(",")
def reassignmentEntry(tp: TopicPartition, replicas: Seq[Int]):
(TopicPartition, java.util.Optional[NewPartitionReassignment]) =
- tp -> java.util.Optional.of(new
NewPartitionReassignment(replicas.map(_.asInstanceOf[Integer]).asJava))
+ tp ->
NewPartitionReassignment.of(replicas.map(_.asInstanceOf[Integer]).asJava)
def cancelReassignmentEntry(tp: TopicPartition): (TopicPartition,
java.util.Optional[NewPartitionReassignment]) =
tp -> java.util.Optional.empty()