This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 78e7c90 MINOR: Check against empty replicas in
AlterPartitionReassignments (#7574)
78e7c90 is described below
commit 78e7c90e90efa18b2a5b298e49154834d8d5bf67
Author: Stanislav Kozlovski <[email protected]>
AuthorDate: Tue Oct 22 23:26:50 2019 +0100
MINOR: Check against empty replicas in AlterPartitionReassignments (#7574)
Do not allow an empty replica set to be passed into the reassignment API.
Reviewers: Colin P. McCabe <[email protected]>, José Armando García Sancio
<[email protected]>
(cherry picked from commit 2efee34b74d4895b504ab541b716edab72b320d1)
---
core/src/main/scala/kafka/controller/KafkaController.scala | 2 +-
.../integration/kafka/api/AdminClientIntegrationTest.scala | 10 +++++++---
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0dd2ec7..d45fea8 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1716,7 +1716,7 @@ class KafkaController(val config: KafkaConfig,
case Some(replicas) =>
val replicaSet = replicas.toSet
- if (replicas.size != replicaSet.size)
+ if (replicas.isEmpty || replicas.size != replicaSet.size)
false
else if (replicas.exists(_ < 0))
false
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 6ff4f0e..eb15529 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -2018,12 +2018,13 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
val tp1 = new TopicPartition(topic, 0)
val tp2 = new TopicPartition(topic, 1)
val tp3 = new TopicPartition(topic, 2)
- createTopic(topic, numPartitions = 3)
+ val tp4 = new TopicPartition(topic, 3)
+ createTopic(topic, numPartitions = 4)
val validAssignment = new NewPartitionReassignment((0 until
brokerCount).map(_.asInstanceOf[Integer]).asJava)
val nonExistentTp1 = new TopicPartition("topicA", 0)
- val nonExistentTp2 = new TopicPartition(topic, 3)
+ val nonExistentTp2 = new TopicPartition(topic, 4)
val nonExistentPartitionsResult = client.alterPartitionReassignments(Map(
tp1 -> java.util.Optional.of(validAssignment),
tp2 -> java.util.Optional.of(validAssignment),
@@ -2037,14 +2038,17 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
val extraNonExistentReplica = new NewPartitionReassignment((0 until
brokerCount + 1).map(_.asInstanceOf[Integer]).asJava)
val negativeIdReplica = new NewPartitionReassignment(Seq(-3, -2,
-1).map(_.asInstanceOf[Integer]).asJava)
val duplicateReplica = new NewPartitionReassignment(Seq(0, 1,
1).map(_.asInstanceOf[Integer]).asJava)
+ val noReplicas = new
NewPartitionReassignment(Seq().map(_.asInstanceOf[Integer]).asJava)
val invalidReplicaResult = client.alterPartitionReassignments(Map(
tp1 -> java.util.Optional.of(extraNonExistentReplica),
tp2 -> java.util.Optional.of(negativeIdReplica),
- tp3 -> java.util.Optional.of(duplicateReplica)
+ tp3 -> java.util.Optional.of(duplicateReplica),
+ tp4 -> java.util.Optional.of(noReplicas)
).asJava).values()
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp1),
classOf[InvalidReplicaAssignmentException])
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp2),
classOf[InvalidReplicaAssignmentException])
assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3),
classOf[InvalidReplicaAssignmentException])
+ assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp4),
classOf[InvalidReplicaAssignmentException])
}
@Test