This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new e20c228  MINOR: Re-implement NewPartitionReassignment#of() (#7592)
e20c228 is described below

commit e20c228b98f87cfe2eaa34078f53953be587e900
Author: Stanislav Kozlovski <[email protected]>
AuthorDate: Thu Oct 24 23:23:54 2019 +0100

    MINOR: Re-implement NewPartitionReassignment#of() (#7592)
    
    Re-implement NewPartitionReassignment#of.  It now takes a list rather than 
a variable-length list of arguments.
    
    Reviewers: Colin P. McCabe <[email protected]>, Vikas Singh 
<[email protected]>
    (cherry picked from commit 28ef7f1d6d7f4cec7e81b8d0641debebefec9104)
---
 .../clients/admin/NewPartitionReassignment.java    | 12 ++++++++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 12 +++------
 .../kafka/api/AdminClientIntegrationTest.scala     | 31 +++++++++++-----------
 .../admin/ReassignPartitionsClusterTest.scala      |  2 +-
 4 files changed, 31 insertions(+), 26 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
index 0f7a61c..111514a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * A new partition reassignment, which can be applied via {@link 
AdminClient#alterPartitionReassignments(Map, 
AlterPartitionReassignmentsOptions)}.
@@ -28,7 +29,16 @@ import java.util.Map;
 public class NewPartitionReassignment {
     private final List<Integer> targetReplicas;
 
-    public NewPartitionReassignment(List<Integer> targetReplicas) {
+    /**
+     * @throws IllegalArgumentException if no replicas are supplied
+     */
+    public static Optional<NewPartitionReassignment> of(List<Integer> 
replicas) {
+        if (replicas == null || replicas.size() == 0)
+            throw new IllegalArgumentException("Cannot create a new partition 
reassignment without any replicas");
+        return Optional.of(new NewPartitionReassignment(replicas));
+    }
+
+    private NewPartitionReassignment(List<Integer> targetReplicas) {
         this.targetReplicas = Collections.unmodifiableList(new 
ArrayList<>(targetReplicas));
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index eac7063..cf50c8f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -2011,7 +2011,7 @@ public class KafkaAdminClientTest {
             TopicPartition tp2 = new TopicPartition("B", 0);
             Map<TopicPartition, Optional<NewPartitionReassignment>> 
reassignments = new HashMap<>();
             reassignments.put(tp1, Optional.empty());
-            reassignments.put(tp2, newPartitionReassignment(Arrays.asList(1, 
2, 3)));
+            reassignments.put(tp2, 
NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
 
             // 1. server returns less responses than number of partitions we 
sent
             AlterPartitionReassignmentsResponseData responseData1 = new 
AlterPartitionReassignmentsResponseData();
@@ -2102,9 +2102,9 @@ public class KafkaAdminClientTest {
             TopicPartition invalidTopicTP = new TopicPartition("", 0);
             TopicPartition invalidPartitionTP = new TopicPartition("ABC", -1);
             Map<TopicPartition, Optional<NewPartitionReassignment>> 
invalidTopicReassignments = new HashMap<>();
-            invalidTopicReassignments.put(invalidPartitionTP, 
newPartitionReassignment(Arrays.asList(1, 2, 3)));
-            invalidTopicReassignments.put(invalidTopicTP, 
newPartitionReassignment(Arrays.asList(1, 2, 3)));
-            invalidTopicReassignments.put(tp1, 
newPartitionReassignment(Arrays.asList(1, 2, 3)));
+            invalidTopicReassignments.put(invalidPartitionTP, 
NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
+            invalidTopicReassignments.put(invalidTopicTP, 
NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
+            invalidTopicReassignments.put(tp1, 
NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
 
             AlterPartitionReassignmentsResponseData singlePartResponseData =
                     new AlterPartitionReassignmentsResponseData()
@@ -2271,8 +2271,4 @@ public class KafkaAdminClientTest {
             }
         }
     }
-
-    private static Optional<NewPartitionReassignment> 
newPartitionReassignment(List<Integer> targetReplicas) {
-        return Optional.of(new NewPartitionReassignment(targetReplicas));
-    }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index eb15529..bed6900 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -2018,37 +2018,36 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
     val tp1 = new TopicPartition(topic, 0)
     val tp2 = new TopicPartition(topic, 1)
     val tp3 = new TopicPartition(topic, 2)
-    val tp4 = new TopicPartition(topic, 3)
     createTopic(topic, numPartitions = 4)
 
-    val validAssignment = new NewPartitionReassignment((0 until 
brokerCount).map(_.asInstanceOf[Integer]).asJava)
+
+    val validAssignment = NewPartitionReassignment.of(
+      (0 until brokerCount).map(_.asInstanceOf[Integer]).asJava
+    )
 
     val nonExistentTp1 = new TopicPartition("topicA", 0)
     val nonExistentTp2 = new TopicPartition(topic, 4)
     val nonExistentPartitionsResult = client.alterPartitionReassignments(Map(
-      tp1 -> java.util.Optional.of(validAssignment),
-      tp2 -> java.util.Optional.of(validAssignment),
-      tp3 -> java.util.Optional.of(validAssignment),
-      nonExistentTp1 -> java.util.Optional.of(validAssignment),
-      nonExistentTp2 -> java.util.Optional.of(validAssignment)
+      tp1 -> validAssignment,
+      tp2 -> validAssignment,
+      tp3 -> validAssignment,
+      nonExistentTp1 -> validAssignment,
+      nonExistentTp2 -> validAssignment
     ).asJava).values()
     
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp1),
 classOf[UnknownTopicOrPartitionException])
     
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp2),
 classOf[UnknownTopicOrPartitionException])
 
-    val extraNonExistentReplica = new NewPartitionReassignment((0 until 
brokerCount + 1).map(_.asInstanceOf[Integer]).asJava)
-    val negativeIdReplica = new NewPartitionReassignment(Seq(-3, -2, 
-1).map(_.asInstanceOf[Integer]).asJava)
-    val duplicateReplica = new NewPartitionReassignment(Seq(0, 1, 
1).map(_.asInstanceOf[Integer]).asJava)
-    val noReplicas = new 
NewPartitionReassignment(Seq().map(_.asInstanceOf[Integer]).asJava)
+    val extraNonExistentReplica = NewPartitionReassignment.of((0 until 
brokerCount + 1).map(_.asInstanceOf[Integer]).asJava)
+    val negativeIdReplica = NewPartitionReassignment.of(Seq(-3, -2, 
-1).map(_.asInstanceOf[Integer]).asJava)
+    val duplicateReplica = NewPartitionReassignment.of(Seq(0, 1, 
1).map(_.asInstanceOf[Integer]).asJava)
     val invalidReplicaResult = client.alterPartitionReassignments(Map(
-      tp1 -> java.util.Optional.of(extraNonExistentReplica),
-      tp2 -> java.util.Optional.of(negativeIdReplica),
-      tp3 -> java.util.Optional.of(duplicateReplica),
-      tp4 -> java.util.Optional.of(noReplicas)
+      tp1 -> extraNonExistentReplica,
+      tp2 -> negativeIdReplica,
+      tp3 -> duplicateReplica
     ).asJava).values()
     assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp1), 
classOf[InvalidReplicaAssignmentException])
     assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp2), 
classOf[InvalidReplicaAssignmentException])
     assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3), 
classOf[InvalidReplicaAssignmentException])
-    assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp4), 
classOf[InvalidReplicaAssignmentException])
   }
 
   @Test
diff --git 
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala 
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 10ed6bb..a84c611 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -1182,7 +1182,7 @@ class ReassignPartitionsClusterTest extends 
ZooKeeperTestHarness with Logging {
     }.mkString(",")
 
   def reassignmentEntry(tp: TopicPartition, replicas: Seq[Int]): 
(TopicPartition, java.util.Optional[NewPartitionReassignment]) =
-    tp -> java.util.Optional.of(new 
NewPartitionReassignment(replicas.map(_.asInstanceOf[Integer]).asJava))
+    tp -> 
NewPartitionReassignment.of(replicas.map(_.asInstanceOf[Integer]).asJava)
 
   def cancelReassignmentEntry(tp: TopicPartition): (TopicPartition, 
java.util.Optional[NewPartitionReassignment]) =
     tp -> java.util.Optional.empty()

Reply via email to