Repository: kafka Updated Branches: refs/heads/trunk d9c0ad685 -> 5c2ca30f2
kafka-2234; Partition reassignment of a nonexistent topic prevents future reassignments; patched by Manikumar Reddy; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c2ca30f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c2ca30f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c2ca30f Branch: refs/heads/trunk Commit: 5c2ca30f229c7f39fca65aed6bd45c382aacda77 Parents: d9c0ad6 Author: Manikumar Reddy <manikumar.re...@gmail.com> Authored: Thu Jun 18 16:37:25 2015 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Thu Jun 18 16:37:25 2015 -0700 ---------------------------------------------------------------------- .../scala/kafka/admin/ReassignPartitionsCommand.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5c2ca30f/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 912b718..ea34589 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -208,9 +208,14 @@ class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[T def reassignPartitions(): Boolean = { try { val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1.topic, p._1.partition)) - val jsonReassignmentData = ZkUtils.getPartitionReassignmentZkData(validPartitions) - ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData) - true + if(validPartitions.isEmpty) { + false + } + else { + val jsonReassignmentData = ZkUtils.getPartitionReassignmentZkData(validPartitions) + ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData) + true + } } catch { case ze: ZkNodeExistsException => val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)