This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new bdf2446 KAFKA-9089; Reassignment should be resilient to unexpected
errors (#7562)
bdf2446 is described below
commit bdf2446ccce592f3c000290f11de88520327aa19
Author: Jason Gustafson <[email protected]>
AuthorDate: Thu Oct 24 19:33:16 2019 -0700
KAFKA-9089; Reassignment should be resilient to unexpected errors (#7562)
The purpose of this patch is to make the reassignment algorithm simpler and
more resilient to unexpected errors. Specifically, it has the following
improvements:
1. Remove `ReassignedPartitionContext`. We no longer need to track the
previous reassignment through the context and we now use the assignment state
as the single source of truth for the target replicas in a reassignment.
2. Remove the intermediate assignment state when overriding a previous
reassignment. Instead, an overriding reassignment directly updates the
assignment state and shuts down any unneeded replicas. Reassignments are
_always_ persisted in Zookeeper before being updated in the controller context.
3. To fix race conditions with concurrent submissions, reassignment
completion for a partition always checks for a zk partition reassignment to be
removed. This means the controller no longer needs to track the source of the
reassignment.
4. Api reassignments explicitly remove reassignment state from zk prior to
beginning the new reassignment. This fixes an inconsistency in precedence. Upon
controller failover, zookeeper reassignments always take precedence over any
active reassignment. So if we do not have the logic to remove the zk
reassignment when an api reassignment is triggered, then we can revert to the
older zk reassignment.
Reviewers: Viktor Somogyi <[email protected]>, Stanislav Kozlovski
<[email protected]>, Jun Rao <[email protected]>
---
.../kafka/admin/ReassignPartitionsCommand.scala | 2 +-
.../controller/ControllerChannelManager.scala | 2 +-
.../scala/kafka/controller/ControllerContext.scala | 66 +-
.../src/main/scala/kafka/controller/Election.scala | 8 +-
.../scala/kafka/controller/KafkaController.scala | 714 +++++++++------------
core/src/main/scala/kafka/server/KafkaApis.scala | 4 +-
core/src/main/scala/kafka/zk/AdminZkClient.scala | 11 +-
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 22 +-
core/src/main/scala/kafka/zk/ZkData.scala | 8 +-
.../scala/unit/kafka/admin/AddPartitionsTest.scala | 12 +-
.../scala/unit/kafka/admin/DeleteTopicTest.scala | 4 +-
.../admin/ReassignPartitionsClusterTest.scala | 37 +-
.../admin/ReassignPartitionsCommandTest.scala | 4 +-
.../controller/ControllerChannelManagerTest.scala | 2 +-
.../kafka/controller/ControllerContextTest.scala | 34 +-
.../controller/ControllerIntegrationTest.scala | 14 +-
.../controller/PartitionStateMachineTest.scala | 2 +-
.../kafka/controller/ReplicaStateMachineTest.scala | 2 +-
.../kafka/security/auth/ZkAuthorizationTest.scala | 4 +-
.../scala/unit/kafka/zk/AdminZkClientTest.scala | 4 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 8 +-
21 files changed, 426 insertions(+), 538 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 83d2e00..934191a 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -649,7 +649,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
}
} catch {
case _: NodeExistsException =>
- val partitionsBeingReassigned = zkClient.getPartitionReassignment
+ val partitionsBeingReassigned = zkClient.getPartitionReassignment()
throw new AdminCommandFailedException("Partition reassignment
currently in " +
"progress for %s. Aborting
operation".format(partitionsBeingReassigned))
}
diff --git
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index c60130b..375f0d3 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -377,7 +377,7 @@ abstract class AbstractControllerBrokerRequestBatch(config:
KafkaConfig,
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int],
topicPartition: TopicPartition,
leaderIsrAndControllerEpoch:
LeaderIsrAndControllerEpoch,
- replicaAssignment:
PartitionReplicaAssignment,
+ replicaAssignment: ReplicaAssignment,
isNew: Boolean): Unit = {
brokerIds.filter(_ >= 0).foreach { brokerId =>
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala
b/core/src/main/scala/kafka/controller/ControllerContext.scala
index be7f54a..93b0b4d 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -22,43 +22,40 @@ import org.apache.kafka.common.TopicPartition
import scala.collection.{Map, Seq, Set, mutable}
-object PartitionReplicaAssignment {
- def fromOldAndNewReplicas(oldReplicas: Seq[Int], newReplicas: Seq[Int]):
PartitionReplicaAssignment = {
+object ReplicaAssignment {
+ def fromOldAndNewReplicas(oldReplicas: Seq[Int], newReplicas: Seq[Int]):
ReplicaAssignment = {
val fullReplicaSet = (newReplicas ++ oldReplicas).distinct
- PartitionReplicaAssignment(
+ ReplicaAssignment(
fullReplicaSet,
- fullReplicaSet.filterNot(oldReplicas.contains(_)),
- fullReplicaSet.filterNot(newReplicas.contains(_))
+ fullReplicaSet.diff(oldReplicas),
+ fullReplicaSet.diff(newReplicas)
)
}
+
+ def apply(replicas: Seq[Int]): ReplicaAssignment = {
+ apply(replicas, Seq.empty, Seq.empty)
+ }
}
-case class PartitionReplicaAssignment(replicas: Seq[Int], addingReplicas:
Seq[Int], removingReplicas: Seq[Int]) {
+case class ReplicaAssignment(replicas: Seq[Int],
+ addingReplicas: Seq[Int],
+ removingReplicas: Seq[Int]) {
+
+ lazy val originReplicas: Seq[Int] = replicas.diff(addingReplicas)
+ lazy val targetReplicas: Seq[Int] = replicas.diff(removingReplicas)
+
def isBeingReassigned: Boolean = {
addingReplicas.nonEmpty || removingReplicas.nonEmpty
}
- /**
- * Returns the partition replica assignment previous to this one.
- * It is different than this one only when the partition is undergoing
reassignment
- * Note that this will not preserve the original ordering
- */
- def previousAssignment: PartitionReplicaAssignment = {
- PartitionReplicaAssignment(
- replicas.filterNot(addingReplicas.contains(_)),
- Seq(),
- Seq()
- )
+ def reassignTo(newReplicas: Seq[Int]): ReplicaAssignment = {
+ ReplicaAssignment.fromOldAndNewReplicas(originReplicas, newReplicas)
}
- /**
- * Returns the target replica assignment for this partition.
- * This is different than the `replicas` variable only when there is a
reassignment going on
- */
- def targetReplicas: Seq[Int] =
replicas.filterNot(removingReplicas.contains(_))
-
- override def toString: String = s"PartitionReplicaAssignment(replicas:
${replicas.mkString(",")}, " +
- s"addingReplicas: ${addingReplicas.mkString(",")}, removingReplicas:
${removingReplicas.mkString(",")})"
+ override def toString: String = s"ReplicaAssignment(" +
+ s"replicas=${replicas.mkString(",")}, " +
+ s"addingReplicas=${addingReplicas.mkString(",")}, " +
+ s"removingReplicas=${removingReplicas.mkString(",")})"
}
class ControllerContext {
@@ -71,9 +68,9 @@ class ControllerContext {
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
var allTopics: Set[String] = Set.empty
- val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int,
PartitionReplicaAssignment]]
+ val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int,
ReplicaAssignment]]
val partitionLeadershipInfo = mutable.Map.empty[TopicPartition,
LeaderIsrAndControllerEpoch]
- val partitionsBeingReassigned = mutable.Map.empty[TopicPartition,
ReassignedPartitionsContext]
+ val partitionsBeingReassigned = mutable.Set.empty[TopicPartition]
val partitionStates = mutable.Map.empty[TopicPartition, PartitionState]
val replicaStates = mutable.Map.empty[PartitionAndReplica, ReplicaState]
val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] =
mutable.Map.empty
@@ -121,11 +118,10 @@ class ControllerContext {
}
}
- def partitionFullReplicaAssignment(topicPartition: TopicPartition):
PartitionReplicaAssignment = {
- partitionAssignments.getOrElse(topicPartition.topic, mutable.Map.empty)
- .get(topicPartition.partition) match {
+ def partitionFullReplicaAssignment(topicPartition: TopicPartition):
ReplicaAssignment = {
+ partitionAssignments.getOrElse(topicPartition.topic,
mutable.Map.empty).get(topicPartition.partition) match {
case Some(partitionAssignment) => partitionAssignment
- case None => PartitionReplicaAssignment(Seq(), Seq(), Seq())
+ case None => ReplicaAssignment(Seq(), Seq(), Seq())
}
}
@@ -133,13 +129,13 @@ class ControllerContext {
val assignments =
partitionAssignments.getOrElseUpdate(topicPartition.topic, mutable.Map.empty)
val newAssignment = assignments.get(topicPartition.partition) match {
case Some(partitionAssignment) =>
- PartitionReplicaAssignment(
+ ReplicaAssignment(
newReplicas,
partitionAssignment.addingReplicas,
partitionAssignment.removingReplicas
)
case None =>
- PartitionReplicaAssignment(
+ ReplicaAssignment(
newReplicas,
Seq.empty,
Seq.empty
@@ -148,7 +144,7 @@ class ControllerContext {
updatePartitionFullReplicaAssignment(topicPartition, newAssignment)
}
- def updatePartitionFullReplicaAssignment(topicPartition: TopicPartition,
newAssignment: PartitionReplicaAssignment): Unit = {
+ def updatePartitionFullReplicaAssignment(topicPartition: TopicPartition,
newAssignment: ReplicaAssignment): Unit = {
val assignments =
partitionAssignments.getOrElseUpdate(topicPartition.topic, mutable.Map.empty)
assignments.put(topicPartition.partition, newAssignment)
}
@@ -159,7 +155,7 @@ class ControllerContext {
}.toMap
}
- def partitionFullReplicaAssignmentForTopic(topic : String):
Map[TopicPartition, PartitionReplicaAssignment] = {
+ def partitionFullReplicaAssignmentForTopic(topic : String):
Map[TopicPartition, ReplicaAssignment] = {
partitionAssignments.getOrElse(topic, Map.empty).map {
case (partition, assignment) => (new TopicPartition(topic, partition),
assignment)
}.toMap
diff --git a/core/src/main/scala/kafka/controller/Election.scala
b/core/src/main/scala/kafka/controller/Election.scala
index 163f916..dffa888 100644
--- a/core/src/main/scala/kafka/controller/Election.scala
+++ b/core/src/main/scala/kafka/controller/Election.scala
@@ -72,12 +72,12 @@ object Election {
private def leaderForReassign(partition: TopicPartition,
leaderAndIsr: LeaderAndIsr,
controllerContext: ControllerContext):
ElectionResult = {
- val reassignment =
controllerContext.partitionsBeingReassigned(partition).newReplicas
- val liveReplicas = reassignment.filter(replica =>
controllerContext.isReplicaOnline(replica, partition))
+ val targetReplicas =
controllerContext.partitionFullReplicaAssignment(partition).targetReplicas
+ val liveReplicas = targetReplicas.filter(replica =>
controllerContext.isReplicaOnline(replica, partition))
val isr = leaderAndIsr.isr
- val leaderOpt =
PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment,
isr, liveReplicas.toSet)
+ val leaderOpt =
PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(targetReplicas,
isr, liveReplicas.toSet)
val newLeaderAndIsrOpt = leaderOpt.map(leader =>
leaderAndIsr.newLeader(leader))
- ElectionResult(partition, newLeaderAndIsrOpt, reassignment)
+ ElectionResult(partition, newLeaderAndIsrOpt, targetReplicas)
}
/**
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index d45fea8..5456c62 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -55,7 +55,7 @@ object KafkaController extends Logging {
val InitialControllerEpochZkVersion = 0
type ElectLeadersCallback = Map[TopicPartition, Either[ApiError, Int]] =>
Unit
- type ListReassignmentsCallback = Either[Map[TopicPartition,
PartitionReplicaAssignment], ApiError] => Unit
+ type ListReassignmentsCallback = Either[Map[TopicPartition,
ReplicaAssignment], ApiError] => Unit
type AlterReassignmentsCallback = Either[Map[TopicPartition, ApiError],
ApiError] => Unit
}
@@ -306,7 +306,8 @@ class KafkaController(val config: KafkaConfig,
partitionStateMachine.startup()
info(s"Ready to serve as the new controller with epoch $epoch")
-
maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet)
+
+ initializePartitionReassignments()
topicDeletionManager.tryTopicDeletion()
val pendingPreferredReplicaElections =
fetchPendingPreferredReplicaElections()
onReplicaElection(pendingPreferredReplicaElections,
ElectionType.PREFERRED, ZkTriggered)
@@ -423,10 +424,9 @@ class KafkaController(val config: KafkaConfig,
// to see if these brokers can become leaders for some/all of those
partitionStateMachine.triggerOnlinePartitionStateChange()
// check if reassignment of some partitions need to be restarted
- val partitionsWithReplicasOnNewBrokers =
controllerContext.partitionsBeingReassigned.filter {
- case (_, reassignmentContext) =>
reassignmentContext.newReplicas.exists(newBrokersSet.contains)
+ maybeResumeReassignments { (_, assignment) =>
+ assignment.targetReplicas.exists(newBrokersSet.contains)
}
- partitionsWithReplicasOnNewBrokers.foreach { case (tp, context) =>
onPartitionReassignment(tp, context) }
// check if topic deletion needs to be resumed. If at least one replica
that belongs to the topic being deleted exists
// on the newly restarted brokers, there is a chance that topic deletion
can resume
val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p =>
topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
@@ -439,6 +439,14 @@ class KafkaController(val config: KafkaConfig,
registerBrokerModificationsHandler(newBrokers)
}
+ private def maybeResumeReassignments(shouldResume: (TopicPartition,
ReplicaAssignment) => Boolean): Unit = {
+ controllerContext.partitionsBeingReassigned.foreach { tp =>
+ val currentAssignment =
controllerContext.partitionFullReplicaAssignment(tp)
+ if (shouldResume(tp, currentAssignment))
+ onPartitionReassignment(tp, currentAssignment)
+ }
+ }
+
private def registerBrokerModificationsHandler(brokerIds: Iterable[Int]):
Unit = {
debug(s"Register BrokerModifications handler for $brokerIds")
brokerIds.foreach { brokerId =>
@@ -544,7 +552,6 @@ class KafkaController(val config: KafkaConfig,
* 4. Whenever a new broker comes up which is part of an ongoing reassignment
* 5. On controller startup/failover
*
- *
* Reassigning replicas for a partition goes through a few steps listed in
the code.
* RS = current assigned replica set
* ORS = Original replica set for partition
@@ -553,33 +560,27 @@ class KafkaController(val config: KafkaConfig,
* RR = The replicas we are removing as part of this reassignment
*
* A reassignment may have up to three phases, each with its own steps:
+
+ * Phase U (Assignment update): Regardless of the trigger, the first step is
in the reassignment process
+ * is to update the existing assignment state. We always update the state in
Zookeeper before
+ * we update memory so that it can be resumed upon controller fail-over.
+ *
+ * U1. Update ZK with RS = ORS + TRS, AR = TRS - ORS, RR = ORS - TRS.
+ * U2. Update memory with RS = ORS + TRS, AR = TRS - ORS and RR = ORS - TRS
+ * U3. If we are cancelling or replacing an existing reassignment, send
StopReplica to all members
+ * of AR in the original reassignment if they are not in TRS from the
new assignment
*
+ * To complete the reassignment, we need to bring the new replicas into
sync, so depending on the state
+ * of the ISR, we will execute one of the following steps.
*
- * Cleanup Phase: In the cases where this reassignment has to override an
ongoing reassignment.
- * . The ongoing reassignment is in phase A
- * . ORS denotes the original replica set, prior to the ongoing
reassignment
- * . URS denotes the unnecessary replicas, ones which are currently part
of the AR of the ongoing reassignment but will not be part of the new one
- * . OVRS denotes the overlapping replica set - replicas which are part of
the AR of the ongoing reassignment and will be part of the overriding
reassignment
- * (it is essentially (RS - ORS) - URS)
+ * Phase A (when TRS != ISR): The reassignment is not yet complete
*
- * 1 Set RS = ORS + OVRS, AR = OVRS, RR = [] in memory
- * 2 Send LeaderAndIsr request with RS = ORS + OVRS, AR = OVRS, RR = [] to
all brokers in ORS + OVRS
- * (because the ongoing reassignment is in phase A, we know we wouldn't
have a leader in URS
- * unless a preferred leader election was triggered while the
reassignment was happening)
- * 3 Replicas in URS -> Offline (force those replicas out of ISR)
- * 4 Replicas in URS -> NonExistentReplica (force those replicas to be
deleted)
+ * A1. Bump the leader epoch for the partition and send LeaderAndIsr
updates to RS.
+ * A2. Start new replicas AR by moving replicas in AR to NewReplica state.
*
- * Phase A: Initial trigger (when TRS != ISR)
- * A1. Update ZK with RS = ORS + TRS,
- * AR = TRS - ORS and
- * RR = ORS - TRS.
- * A2. Update memory with RS = ORS + TRS, AR = TRS - ORS and RR = ORS - TRS
- * A3. Send LeaderAndIsr request to every replica in ORS + TRS (with the
new RS, AR and RR).
- * We do this by forcing an update of the leader epoch in zookeeper.
- * A4. Start new replicas AR by moving replicas in AR to NewReplica state.
+ * Phase B (when TRS = ISR): The reassignment is complete
*
- * Phase B: All of TRS have caught up with the leaders and are in ISR
- * B1. Move all replicas in TRS to OnlineReplica state.
+ * B1. Move all replicas in AR to OnlineReplica state.
* B2. Set RS = TRS, AR = [], RR = [] in memory.
* B3. Send a LeaderAndIsr request with RS = TRS. This will prevent the
leader from adding any replica in TRS - ORS back in the isr.
* If the current leader is not in TRS or isn't alive, we move the
leader to a new replica in TRS.
@@ -611,51 +612,36 @@ class KafkaController(val config: KafkaConfig,
* Note that we have to update RS in ZK with TRS last since it's the only
place where we store ORS persistently.
* This way, if the controller crashes before that step, we can still
recover.
*/
- private def onPartitionReassignment(topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext): Unit = {
- val originalAssignmentOpt = maybeRevertOngoingReassignment(topicPartition,
reassignedPartitionContext)
- val oldReplicas = originalAssignmentOpt match {
- case Some(originalReplicas) => originalReplicas
- case None =>
controllerContext.partitionFullReplicaAssignment(topicPartition).previousAssignment.replicas
- }
- // RS = ORS + TRS, AR = TRS - ORS and RR = ORS - TRS
- val partitionAssignment = PartitionReplicaAssignment.fromOldAndNewReplicas(
- oldReplicas = oldReplicas,
- newReplicas = reassignedPartitionContext.newReplicas)
- assert(reassignedPartitionContext.newReplicas ==
partitionAssignment.targetReplicas,
- s"newReplicas ${reassignedPartitionContext.newReplicas} were not equal
to the expected targetReplicas ${partitionAssignment.targetReplicas}")
- val targetReplicas = partitionAssignment.targetReplicas
-
- if (!areReplicasInIsr(topicPartition, targetReplicas)) {
- info(s"New replicas ${targetReplicas.mkString(",")} for partition
$topicPartition being reassigned not yet " +
- "caught up with the leader")
-
- // A1. Update ZK with RS = ORS + TRS, AR = TRS - ORS and RR = ORS - TRS.
- updateReplicaAssignmentForPartition(topicPartition, partitionAssignment)
- // A2. Update memory with RS = ORS + TRS, AR = TRS - ORS and RR = ORS -
TRS
- controllerContext.updatePartitionFullReplicaAssignment(topicPartition,
partitionAssignment)
- // A3. Send LeaderAndIsr request to every replica in ORS + TRS (with the
new RS, AR and RR).
- val updatedAssignment =
controllerContext.partitionFullReplicaAssignment(topicPartition)
- updateLeaderEpochAndSendRequest(topicPartition,
updatedAssignment.replicas, updatedAssignment)
- // A4. replicas in AR -> NewReplica
- startNewReplicasForReassignedPartition(topicPartition,
updatedAssignment.addingReplicas)
- info(s"Waiting for new replicas
${updatedAssignment.addingReplicas.mkString(",")} for partition $topicPartition
being " +
- s"reassigned to catch up with the leader (target replicas
${updatedAssignment.targetReplicas})")
+ private def onPartitionReassignment(topicPartition: TopicPartition,
reassignment: ReplicaAssignment): Unit = {
+ // While a reassignment is in progress, deletion is not allowed
+
topicDeletionManager.markTopicIneligibleForDeletion(Set(topicPartition.topic),
reason = "topic reassignment in progress")
+
+ updateCurrentReassignment(topicPartition, reassignment)
+
+ val addingReplicas = reassignment.addingReplicas
+ val removingReplicas = reassignment.removingReplicas
+
+ if (!isReassignmentComplete(topicPartition, reassignment)) {
+ // A1. Send LeaderAndIsr request to every replica in ORS + TRS (with the
new RS, AR and RR).
+ updateLeaderEpochAndSendRequest(topicPartition, reassignment)
+ // A2. replicas in AR -> NewReplica
+ startNewReplicasForReassignedPartition(topicPartition, addingReplicas)
} else {
- // B1. replicas in TRS -> OnlineReplica
- targetReplicas.foreach { replica =>
-
replicaStateMachine.handleStateChanges(Seq(PartitionAndReplica(topicPartition,
replica)), OnlineReplica)
- }
+ // B1. replicas in AR -> OnlineReplica
+
replicaStateMachine.handleStateChanges(addingReplicas.map(PartitionAndReplica(topicPartition,
_)), OnlineReplica)
// B2. Set RS = TRS, AR = [], RR = [] in memory.
+ val completedReassignment =
ReplicaAssignment(reassignment.targetReplicas)
+ controllerContext.updatePartitionFullReplicaAssignment(topicPartition,
completedReassignment)
// B3. Send LeaderAndIsr request with a potential new leader (if current
leader not in TRS) and
// a new RS (using TRS) and same isr to every broker in ORS + TRS or
TRS
- moveReassignedPartitionLeaderIfRequired(topicPartition,
reassignedPartitionContext, partitionAssignment)
+ moveReassignedPartitionLeaderIfRequired(topicPartition,
completedReassignment)
// B4. replicas in RR -> Offline (force those replicas out of isr)
// B5. replicas in RR -> NonExistentReplica (force those replicas to be
deleted)
- stopRemovedReplicasOfReassignedPartition(topicPartition,
partitionAssignment.removingReplicas)
+ stopRemovedReplicasOfReassignedPartition(topicPartition,
removingReplicas)
// B6. Update ZK with RS = TRS, AR = [], RR = [].
- updateReplicaAssignmentForPartition(topicPartition,
controllerContext.partitionFullReplicaAssignment(topicPartition))
+ updateReplicaAssignmentForPartition(topicPartition,
completedReassignment)
// B7. Remove the ISR reassign listener and maybe update the
/admin/reassign_partitions path in ZK to remove this partition from it.
- removePartitionFromReassignedPartitions(topicPartition)
+ removePartitionFromReassigningPartitions(topicPartition,
completedReassignment)
// B8. After electing a leader in B3, the replicas and isr information
changes, so resend the update metadata request to every broker
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
Set(topicPartition))
// signal delete topic thread if reassignment for some partitions
belonging to topics being deleted just completed
@@ -664,117 +650,88 @@ class KafkaController(val config: KafkaConfig,
}
/**
- * This is called in case we need to override/revert an ongoing
reassignment.
- * Note that due to the way we compute the original replica set, we have no
guarantee that a revert would put it in the same order.
- * @return An option of the original replicas prior to the ongoing
reassignment. None if there is no ongoing reassignment
- */
- private def maybeRevertOngoingReassignment(topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext): Option[Seq[Int]] = {
- reassignedPartitionContext.ongoingReassignmentOpt match {
- case Some(ongoingAssignment) =>
- val originalAssignment = ongoingAssignment.previousAssignment
- assert(ongoingAssignment.isBeingReassigned)
- assert(!originalAssignment.isBeingReassigned)
+ * Update the current assignment state in Zookeeper and in memory. If a
reassignment is already in
+ * progress, then the new reassignment will supplant it and some replicas
will be shutdown.
+ *
+ * Note that due to the way we compute the original replica set, we cannot
guarantee that a
+ * cancellation will restore the original replica order. Target replicas are
always listed
+ * first in the replica set in the desired order, which means we have no way
to get to the
+ * original order if the reassignment overlaps with the current assignment.
For example,
+ * with an initial assignment of [1, 2, 3] and a reassignment of [3, 4, 2],
then the replicas
+ * will be encoded as [3, 4, 2, 1] while the reassignment is in progress. If
the reassignment
+ * is cancelled, there is no way to restore the original order.
+ *
+ * @param topicPartition The reassigning partition
+ * @param reassignment The new reassignment
+ *
+ * @return The updated assignment state
+ */
+ private def updateCurrentReassignment(topicPartition: TopicPartition,
reassignment: ReplicaAssignment): Unit = {
+ val currentAssignment =
controllerContext.partitionFullReplicaAssignment(topicPartition)
- val unnecessaryReplicas =
ongoingAssignment.replicas.filterNot(originalAssignment.replicas.contains(_))
- val (overlappingReplicas, replicasToRemove) =
unnecessaryReplicas.partition(reassignedPartitionContext.newReplicas.contains(_))
- // RS = ORS + OVRS, AR = OVRS, RR = []
- val intermediateReplicaAssignment =
PartitionReplicaAssignment(originalAssignment.replicas ++ overlappingReplicas,
overlappingReplicas, Seq())
+ if (currentAssignment != reassignment) {
+ debug(s"Updating assignment of partition $topicPartition from
$currentAssignment to $reassignment")
- if (isDebugEnabled)
- debug(s"Reverting previous reassignment $originalAssignment (we were
in the " +
- s"process of an ongoing reassignment with target replicas
${ongoingAssignment.targetReplicas.mkString(",")} (" +
- s"Overlapping replicas: ${overlappingReplicas.mkString(",")},
Replicas to remove: ${replicasToRemove.mkString(",")})")
+ // U1. Update assignment state in zookeeper
+ updateReplicaAssignmentForPartition(topicPartition, reassignment)
+ // U2. Update assignment state in memory
+ controllerContext.updatePartitionFullReplicaAssignment(topicPartition,
reassignment)
- // Set RS = ORS + OVRS, AR = OVRS, RR = [] in memory.
- controllerContext.updatePartitionFullReplicaAssignment(topicPartition,
intermediateReplicaAssignment)
+ // If there is a reassignment already in progress, then some of the
currently adding replicas
+ // may be eligible for immediate removal, in which case we need to stop
the replicas.
+ val unneededReplicas =
currentAssignment.replicas.diff(reassignment.replicas)
+ if (unneededReplicas.nonEmpty)
+ stopRemovedReplicasOfReassignedPartition(topicPartition,
unneededReplicas)
+ }
- // Increment leader epoch and send LeaderAndIsr with new RS (using old
replicas and overlapping replicas) and same ISR to every broker in ORS + OVRS
- // This will prevent the leader from adding any replica outside RS
back in the ISR
- updateLeaderEpochAndSendRequest(topicPartition,
intermediateReplicaAssignment.replicas, intermediateReplicaAssignment)
+ val reassignIsrChangeHandler = new
PartitionReassignmentIsrChangeHandler(eventManager, topicPartition)
+ zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
- // replicas in URS -> Offline (force those replicas out of isr)
- // replicas in URS -> NonExistentReplica (force those replicas to be
deleted)
- stopRemovedReplicasOfReassignedPartition(topicPartition,
replicasToRemove)
- reassignedPartitionContext.ongoingReassignmentOpt = None
- Some(originalAssignment.replicas)
- case None => None // nothing to revert
- }
+ controllerContext.partitionsBeingReassigned.add(topicPartition)
}
/**
- * Trigger partition reassignment for the provided partitions if the
assigned replicas are not the same as the
- * reassigned replicas (as defined in
`ControllerContext.partitionsBeingReassigned`) and if the topic has not been
- * deleted.
+ * Trigger a partition reassignment provided that the topic exists and is
not being deleted.
*
- * Called when:
- * 1. zNode is first created
- * 2. Controller fail over
- * 3. AlterPartitionReassignments API is called
+ * This is called when a reassignment is initially received either through
Zookeeper or through the
+ * AlterPartitionReassignments API
*
- * `partitionsBeingReassigned` must be populated with all partitions being
reassigned before this method is invoked
- * as explained in the method documentation of
`removePartitionFromReassignedPartitions` (which is invoked by this
- * method).
+ * The `partitionsBeingReassigned` field in the controller context will be
updated by this
+ * call after the reassignment completes validation and is successfully
stored in the topic
+ * assignment zNode.
*
- * @throws IllegalStateException if a partition is not in
`partitionsBeingReassigned`
+ * @param reassignments The reassignments to begin processing
+ * @return A map of any errors in the reassignment. If the error is NONE for
a given partition,
+ * then the reassignment was submitted successfully.
*/
- private def maybeTriggerPartitionReassignment(topicPartitions:
Set[TopicPartition]): Map[TopicPartition, ApiError] = {
- val reassignmentResults: mutable.Map[TopicPartition, ApiError] =
mutable.Map.empty
- val partitionsToBeRemovedFromReassignment =
scala.collection.mutable.Set.empty[TopicPartition]
+ private def maybeTriggerPartitionReassignment(reassignments:
Map[TopicPartition, ReplicaAssignment]): Map[TopicPartition, ApiError] = {
+ reassignments.map { case (tp, reassignment) =>
+ val topic = tp.topic
- topicPartitions.foreach { tp =>
- if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
+ val apiError = if
(topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
info(s"Skipping reassignment of $tp since the topic is currently being
deleted")
- partitionsToBeRemovedFromReassignment.add(tp)
- reassignmentResults.put(tp, new
ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist."))
+ new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does
not exist.")
} else {
- val reassignedPartitionContext =
controllerContext.partitionsBeingReassigned.get(tp).getOrElse {
- throw new IllegalStateException(s"Initiating reassign replicas for
partition $tp not present in " +
- s"partitionsBeingReassigned:
${controllerContext.partitionsBeingReassigned.mkString(", ")}")
- }
- val newReplicas = reassignedPartitionContext.newReplicas
- val topic = tp.topic
val assignedReplicas = controllerContext.partitionReplicaAssignment(tp)
if (assignedReplicas.nonEmpty) {
- if (assignedReplicas == newReplicas) {
- info(s"Partition $tp to be reassigned is already assigned to
replicas " +
- s"${newReplicas.mkString(",")}. Ignoring request for partition
reassignment.")
- partitionsToBeRemovedFromReassignment.add(tp)
- reassignmentResults.put(tp, ApiError.NONE)
- } else {
- try {
- info(s"Handling reassignment of partition $tp from current
replicas ${assignedReplicas.mkString(",")}" +
- s"to new replicas ${newReplicas.mkString(",")}")
- // first register ISR change listener
-
reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)
- // mark topic ineligible for deletion for the partitions being
reassigned
- topicDeletionManager.markTopicIneligibleForDeletion(Set(topic),
- reason = "topic reassignment in progress")
- onPartitionReassignment(tp, reassignedPartitionContext)
- reassignmentResults.put(tp, ApiError.NONE)
- } catch {
- case e: ControllerMovedException =>
- error(s"Error completing reassignment of partition $tp because
controller has moved to another broker", e)
- throw e
- case e: Throwable =>
- error(s"Error completing reassignment of partition $tp", e)
- partitionsToBeRemovedFromReassignment.add(tp)
-
zkClient.getFullReplicaAssignmentForTopics(immutable.Set(tp.topic())).find(_._1
== tp) match {
- case Some(persistedAssignment) =>
- controllerContext.updatePartitionFullReplicaAssignment(tp,
persistedAssignment._2)
- case None =>
- }
- reassignmentResults.put(tp, new
ApiError(Errors.UNKNOWN_SERVER_ERROR))
- }
+ try {
+ onPartitionReassignment(tp, reassignment)
+ ApiError.NONE
+ } catch {
+ case e: ControllerMovedException =>
+ info(s"Failed completing reassignment of partition $tp because
controller has moved to another broker")
+ throw e
+ case e: Throwable =>
+ error(s"Error completing reassignment of partition $tp", e)
+ new ApiError(Errors.UNKNOWN_SERVER_ERROR)
}
} else {
- error(s"Ignoring request to reassign partition $tp that doesn't
exist.")
- partitionsToBeRemovedFromReassignment.add(tp)
- reassignmentResults.put(tp, new
ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist."))
+ new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition
does not exist.")
}
}
+
+ tp -> apiError
}
-
removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment)
- reassignmentResults
}
/**
@@ -832,16 +789,13 @@ class KafkaController(val config: KafkaConfig,
val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
controllerContext.setLiveBrokerAndEpochs(curBrokerAndEpochs)
info(s"Initialized broker epochs cache:
${controllerContext.liveBrokerIdAndEpochs}")
- controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
+ controllerContext.allTopics = zkClient.getAllTopicsInCluster
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
zkClient.getFullReplicaAssignmentForTopics(controllerContext.allTopics.toSet).foreach
{
case (topicPartition, replicaAssignment) =>
controllerContext.updatePartitionFullReplicaAssignment(topicPartition,
replicaAssignment)
- if (replicaAssignment.isBeingReassigned) {
- val reassignIsrChangeHandler = new
PartitionReassignmentIsrChangeHandler(eventManager, topicPartition)
- controllerContext.partitionsBeingReassigned.put(topicPartition,
ReassignedPartitionsContext(replicaAssignment.targetReplicas,
- reassignIsrChangeHandler, persistedInZk = false,
ongoingReassignmentOpt = None))
- }
+ if (replicaAssignment.isBeingReassigned)
+ controllerContext.partitionsBeingReassigned.add(topicPartition)
}
controllerContext.partitionLeadershipInfo.clear()
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
@@ -851,7 +805,6 @@ class KafkaController(val config: KafkaConfig,
updateLeaderAndIsrCache()
// start the channel manager
controllerChannelManager.startup()
- initializePartitionReassignment()
info(s"Currently active brokers in the cluster:
${controllerContext.liveBrokerIds}")
info(s"Currently shutting brokers in the cluster:
${controllerContext.shuttingDownBrokerIds}")
info(s"Current list of topics in the cluster:
${controllerContext.allTopics}")
@@ -878,28 +831,15 @@ class KafkaController(val config: KafkaConfig,
}
/**
- * Initializes the partitions being reassigned by reading them from the
/admin/reassign_partitions znode
- * This will overwrite any reassignments that were set by the
AlterPartitionReassignments API
- */
- private def initializePartitionReassignment(): Unit = {
- val partitionsBeingReassigned = zkClient.getPartitionReassignment
- if (partitionsBeingReassigned.nonEmpty) {
- info(s"DEPRECATED: Partitions being reassigned through ZooKeeper:
$partitionsBeingReassigned")
-
- partitionsBeingReassigned.foreach {
- case (tp, newReplicas) =>
- val reassignIsrChangeHandler = new
PartitionReassignmentIsrChangeHandler(eventManager, tp)
- val assignment = controllerContext.partitionFullReplicaAssignment(tp)
- val ongoingReassignmentOption = if (assignment.isBeingReassigned)
- Some(assignment)
- else
- None
-
- controllerContext.partitionsBeingReassigned += (
- tp -> ReassignedPartitionsContext(newReplicas,
reassignIsrChangeHandler,
- persistedInZk = true,
- ongoingReassignmentOpt = ongoingReassignmentOption))
- }
+ * Initialize pending reassignments. This includes reassignments sent
through /admin/reassign_partitions,
+ * which will supplant any API reassignments already in progress.
+ */
+ private def initializePartitionReassignments(): Unit = {
+ // New reassignments may have been submitted through Zookeeper while the
controller was failing over
+ val zkPartitionsResumed = processZkPartitionReassignment()
+ // We may also have some API-based reassignments that need to be restarted
+ maybeResumeReassignments { (tp, _) =>
+ !zkPartitionsResumed.contains(tp)
}
}
@@ -909,7 +849,7 @@ class KafkaController(val config: KafkaConfig,
val replicasForTopic = controllerContext.replicasForTopic(topic)
replicasForTopic.exists(r =>
!controllerContext.isReplicaOnline(r.replica, r.topicPartition))
}}
- val topicsForWhichPartitionReassignmentIsInProgress =
controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
+ val topicsForWhichPartitionReassignmentIsInProgress =
controllerContext.partitionsBeingReassigned.map(_.topic)
val topicsIneligibleForDeletion = topicsWithOfflineReplicas |
topicsForWhichPartitionReassignmentIsInProgress
info(s"List of topics to be deleted: ${topicsToBeDeleted.mkString(",")}")
info(s"List of topics ineligible for deletion:
${topicsIneligibleForDeletion.mkString(",")}")
@@ -923,43 +863,37 @@ class KafkaController(val config: KafkaConfig,
}
}
- private def areReplicasInIsr(partition: TopicPartition, replicas: Seq[Int]):
Boolean = {
- zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists {
leaderIsrAndControllerEpoch =>
- replicas.forall(leaderIsrAndControllerEpoch.leaderAndIsr.isr.contains)
+ private def isReassignmentComplete(partition: TopicPartition, assignment:
ReplicaAssignment): Boolean = {
+ if (!assignment.isBeingReassigned) {
+ true
+ } else {
+ zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists {
leaderIsrAndControllerEpoch =>
+ val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr.toSet
+ val targetReplicas = assignment.targetReplicas.toSet
+ targetReplicas.subsetOf(isr)
+ }
}
}
private def moveReassignedPartitionLeaderIfRequired(topicPartition:
TopicPartition,
-
reassignedPartitionContext: ReassignedPartitionsContext,
- currentAssignment:
PartitionReplicaAssignment): Unit = {
- val reassignedReplicas = reassignedPartitionContext.newReplicas
+ newAssignment:
ReplicaAssignment): Unit = {
+ val reassignedReplicas = newAssignment.replicas
val currentLeader =
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
- // change the assigned replica list to just the reassigned replicas in the
cache so it gets sent out on the LeaderAndIsr
- // request to the current or new leader. This will prevent it from adding
the old replicas to the ISR
- val newAssignment = PartitionReplicaAssignment(replicas =
reassignedReplicas, addingReplicas = Seq(), removingReplicas = Seq())
- controllerContext.updatePartitionFullReplicaAssignment(
- topicPartition,
- newAssignment
- )
-
- if (!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
+ if (!reassignedReplicas.contains(currentLeader)) {
info(s"Leader $currentLeader for partition $topicPartition being
reassigned, " +
s"is not in the new list of replicas
${reassignedReplicas.mkString(",")}. Re-electing leader")
// move the leader to one of the alive and caught up new replicas
partitionStateMachine.handleStateChanges(Seq(topicPartition),
OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))
+ } else if (controllerContext.isReplicaOnline(currentLeader,
topicPartition)) {
+ info(s"Leader $currentLeader for partition $topicPartition being
reassigned, " +
+ s"is already in the new list of replicas
${reassignedReplicas.mkString(",")} and is alive")
+ // shrink replication factor and update the leader epoch in zookeeper to
use on the next LeaderAndIsrRequest
+ updateLeaderEpochAndSendRequest(topicPartition, newAssignment)
} else {
- // check if the leader is alive or not
- if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) {
- info(s"Leader $currentLeader for partition $topicPartition being
reassigned, " +
- s"is already in the new list of replicas
${reassignedReplicas.mkString(",")} and is alive")
- // shrink replication factor and update the leader epoch in zookeeper
to use on the next LeaderAndIsrRequest
- updateLeaderEpochAndSendRequest(topicPartition,
newAssignment.replicas, newAssignment)
- } else {
- info(s"Leader $currentLeader for partition $topicPartition being
reassigned, " +
- s"is already in the new list of replicas
${reassignedReplicas.mkString(",")} but is dead")
- partitionStateMachine.handleStateChanges(Seq(topicPartition),
OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))
- }
+ info(s"Leader $currentLeader for partition $topicPartition being
reassigned, " +
+ s"is already in the new list of replicas
${reassignedReplicas.mkString(",")} but is dead")
+ partitionStateMachine.handleStateChanges(Seq(topicPartition),
OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))
}
}
@@ -975,17 +909,17 @@ class KafkaController(val config: KafkaConfig,
replicaStateMachine.handleStateChanges(replicasToBeDeleted,
NonExistentReplica)
}
- private def updateReplicaAssignmentForPartition(partition: TopicPartition,
- assignment:
PartitionReplicaAssignment): Unit = {
- var topicAssignment =
controllerContext.partitionFullReplicaAssignmentForTopic(partition.topic)
- topicAssignment += partition -> assignment
+ private def updateReplicaAssignmentForPartition(topicPartition:
TopicPartition, assignment: ReplicaAssignment): Unit = {
+ var topicAssignment =
controllerContext.partitionFullReplicaAssignmentForTopic(topicPartition.topic)
+ topicAssignment += topicPartition -> assignment
- val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic,
topicAssignment, controllerContext.epochZkVersion)
+ val setDataResponse = zkClient.setTopicAssignmentRaw(topicPartition.topic,
topicAssignment, controllerContext.epochZkVersion)
setDataResponse.resultCode match {
case Code.OK =>
- info(s"Updated assigned replicas for partition $partition being
reassigned to ${assignment.targetReplicas.mkString(",")}" +
- s" (addingReplicas: ${assignment.addingReplicas.mkString(",")},
removingReplicas: ${assignment.removingReplicas.mkString(",")})")
- case Code.NONODE => throw new IllegalStateException(s"Topic
${partition.topic} doesn't exist")
+ info(s"Successfully updated assignment of partition $topicPartition to
$assignment")
+ case Code.NONODE =>
+ throw new IllegalStateException(s"Failed to update assignment for
$topicPartition since the topic " +
+ "has no current assignment")
case _ => throw new KafkaException(setDataResponse.resultException.get)
}
}
@@ -998,28 +932,27 @@ class KafkaController(val config: KafkaConfig,
}
}
- private def updateLeaderEpochAndSendRequest(partition: TopicPartition,
replicasToReceiveRequest: Seq[Int],
- newAssignedReplicas:
PartitionReplicaAssignment): Unit = {
+ private def updateLeaderEpochAndSendRequest(topicPartition: TopicPartition,
+ assignment: ReplicaAssignment):
Unit = {
val stateChangeLog =
stateChangeLogger.withControllerEpoch(controllerContext.epoch)
- val replicaSetStr = s"replica set
${newAssignedReplicas.replicas.mkString(",")} " +
- s"(addingReplicas: ${newAssignedReplicas.addingReplicas.mkString(",")},
removingReplicas: ${newAssignedReplicas.removingReplicas.mkString(",")})"
- updateLeaderEpoch(partition) match {
+ updateLeaderEpoch(topicPartition) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
try {
brokerRequestBatch.newBatch()
-
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest,
partition,
- updatedLeaderIsrAndControllerEpoch, newAssignedReplicas, isNew =
false)
+
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(assignment.replicas,
topicPartition,
+ updatedLeaderIsrAndControllerEpoch, assignment, isNew = false)
brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
case e: IllegalStateException =>
handleIllegalState(e)
}
- stateChangeLog.trace(s"Sent LeaderAndIsr request
$updatedLeaderIsrAndControllerEpoch with new assigned $replicaSetStr" +
- s"to leader
${updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader} " +
- s"for partition being reassigned $partition")
+ stateChangeLog.trace(s"Sent LeaderAndIsr request
$updatedLeaderIsrAndControllerEpoch with " +
+ s"new replica assignment $assignment to leader
${updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader} " +
+ s"for partition being reassigned $topicPartition")
+
case None => // fail the reassignment
- stateChangeLog.error(s"Failed to send LeaderAndIsr request with new
assigned $replicaSetStr " +
- s"to leader for partition being reassigned $partition")
+ stateChangeLog.error(s"Failed to send LeaderAndIsr request with new
replica assignment " +
+ s"$assignment to leader for partition being reassigned
$topicPartition")
}
}
@@ -1038,62 +971,49 @@ class KafkaController(val config: KafkaConfig,
}
private def unregisterPartitionReassignmentIsrChangeHandlers(): Unit = {
-
controllerContext.partitionsBeingReassigned.values.foreach(_.unregisterReassignIsrChangeHandler(zkClient))
- }
-
- /**
- * Remove partition from partitions being reassigned in ZooKeeper and
ControllerContext. If the partition reassignment
- * is complete (i.e. there is no other partition with a reassignment in
progress), the reassign_partitions znode
- * is deleted.
- *
- * `ControllerContext.partitionsBeingReassigned` must be populated with all
the zk-persisted partition reassignments before this
- * method is invoked to avoid premature deletion of the
`reassign_partitions` znode.
- */
- private def removePartitionsFromReassignedPartitions(partitionsToBeRemoved:
Set[TopicPartition]): Unit = {
-
partitionsToBeRemoved.map(controllerContext.partitionsBeingReassigned).foreach
{ reassignContext =>
- reassignContext.unregisterReassignIsrChangeHandler(zkClient)
+ controllerContext.partitionsBeingReassigned.foreach { tp =>
+ val path = TopicPartitionStateZNode.path(tp)
+ zkClient.unregisterZNodeChangeHandler(path)
}
-
- removePartitionsFromZkReassignment(partitionsToBeRemoved)
-
- controllerContext.partitionsBeingReassigned --= partitionsToBeRemoved
}
- private def removePartitionFromReassignedPartitions(partitionToBeRemoved:
TopicPartition) {
- controllerContext.partitionsBeingReassigned.get(partitionToBeRemoved)
match {
- case Some(reassignContext) =>
- reassignContext.unregisterReassignIsrChangeHandler(zkClient)
-
- if (reassignContext.persistedInZk) {
- removePartitionsFromZkReassignment(Set(partitionToBeRemoved))
- }
-
-
controllerContext.partitionsBeingReassigned.remove(partitionToBeRemoved)
- case None =>
- throw new IllegalStateException("Cannot remove a reassigning partition
because it is not present in memory")
+ private def removePartitionFromReassigningPartitions(topicPartition:
TopicPartition,
+ assignment:
ReplicaAssignment): Unit = {
+ if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
+ val path = TopicPartitionStateZNode.path(topicPartition)
+ zkClient.unregisterZNodeChangeHandler(path)
+ maybeRemoveFromZkReassignment((tp, replicas) => tp == topicPartition &&
replicas == assignment.replicas)
+ controllerContext.partitionsBeingReassigned.remove(topicPartition)
+ } else {
+ throw new IllegalStateException("Cannot remove a reassigning partition
because it is not present in memory")
}
}
- private def removePartitionsFromZkReassignment(partitionsToBeRemoved:
Set[TopicPartition]): Unit = {
- if (!zkClient.reassignPartitionsInProgress()) {
- debug(s"Cannot remove partitions $partitionsToBeRemoved from ZooKeeper
because there is no ZooKeeper reassignment present")
+ /**
+ * Remove partitions from an active zk-based reassignment (if one exists).
+ *
+ * @param shouldRemoveReassignment Predicate indicating which partition
reassignments should be removed
+ */
+ private def maybeRemoveFromZkReassignment(shouldRemoveReassignment:
(TopicPartition, Seq[Int]) => Boolean): Unit = {
+ if (!zkClient.reassignPartitionsInProgress())
return
- }
-
- val updatedPartitionsBeingReassigned =
controllerContext.partitionsBeingReassigned.filter(_._2.persistedInZk).toMap --
partitionsToBeRemoved
- info(s"Removing partitions $partitionsToBeRemoved from the list of
reassigned partitions in zookeeper")
+ val reassigningPartitions = zkClient.getPartitionReassignment()
+ val (removingPartitions, updatedPartitionsBeingReassigned) =
reassigningPartitions.partition { case (tp, replicas) =>
+ shouldRemoveReassignment(tp, replicas)
+ }
+ info(s"Removing partitions $removingPartitions from the list of reassigned
partitions in zookeeper")
// write the new list to zookeeper
if (updatedPartitionsBeingReassigned.isEmpty) {
info(s"No more partitions need to be reassigned. Deleting zk path
${ReassignPartitionsZNode.path}")
zkClient.deletePartitionReassignment(controllerContext.epochZkVersion)
// Ensure we detect future reassignments
- eventManager.put(PartitionReassignment(None, None))
+ eventManager.put(ZkPartitionReassignment)
} else {
- val reassignment = updatedPartitionsBeingReassigned.map { case (k, v) =>
k -> v.newReplicas }
- try zkClient.setOrCreatePartitionReassignment(reassignment,
controllerContext.epochZkVersion)
- catch {
+ try {
+
zkClient.setOrCreatePartitionReassignment(updatedPartitionsBeingReassigned,
controllerContext.epochZkVersion)
+ } catch {
case e: KeeperException => throw new AdminOperationException(e)
}
}
@@ -1377,7 +1297,7 @@ class KafkaController(val config: KafkaConfig,
0
} else {
controllerContext.allPartitions.count { topicPartition =>
- val replicaAssignment: PartitionReplicaAssignment =
controllerContext.partitionFullReplicaAssignment(topicPartition)
+ val replicaAssignment: ReplicaAssignment =
controllerContext.partitionFullReplicaAssignment(topicPartition)
val replicas = replicaAssignment.replicas
val preferredReplica = replicas.head
@@ -1553,7 +1473,7 @@ class KafkaController(val config: KafkaConfig,
private def processTopicChange(): Unit = {
if (!isActive) return
- val topics = zkClient.getAllTopicsInCluster.toSet
+ val topics = zkClient.getAllTopicsInCluster
val newTopics = topics -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- topics
controllerContext.allTopics = topics
@@ -1640,7 +1560,7 @@ class KafkaController(val config: KafkaConfig,
// mark topic ineligible for deletion if other state changes are in
progress
topicsToBeDeleted.foreach { topic =>
val partitionReassignmentInProgress =
-
controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
+
controllerContext.partitionsBeingReassigned.map(_.topic).contains(topic)
if (partitionReassignmentInProgress)
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic),
reason = "topic reassignment in progress")
@@ -1655,156 +1575,112 @@ class KafkaController(val config: KafkaConfig,
}
}
+ private def processZkPartitionReassignment(): Set[TopicPartition] = {
+ // We need to register the watcher if the path doesn't exist in order to
detect future
+ // reassignments and we get the `path exists` check for free
+ if (isActive &&
zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler))
{
+ val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
+ val partitionsToReassign = mutable.Map.empty[TopicPartition,
ReplicaAssignment]
+
+ zkClient.getPartitionReassignment().foreach { case (tp, targetReplicas)
=>
+ maybeBuildReassignment(tp, Some(targetReplicas)) match {
+ case Some(context) => partitionsToReassign.put(tp, context)
+ case None => reassignmentResults.put(tp, new
ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
+ }
+ }
+
+ reassignmentResults ++=
maybeTriggerPartitionReassignment(partitionsToReassign)
+ val (partitionsReassigned, partitionsFailed) =
reassignmentResults.partition(_._2.error == Errors.NONE)
+ if (partitionsFailed.nonEmpty) {
+ warn(s"Failed reassignment through zk with the following errors:
$partitionsFailed")
+ maybeRemoveFromZkReassignment((tp, _) => partitionsFailed.contains(tp))
+ }
+ partitionsReassigned.keySet
+ } else {
+ Set.empty
+ }
+ }
+
/**
- * Process a partition reassignment.
- * A partition reassignment can be triggered through the
AlterPartitionReassignment API (in which case reassignmentsOpt is present)
- * or through the /admin/reassign_partitions znode (in which case
reassignmentsOpt is None).
- * In both cases, we need to populate `partitionsBeingReassigned` with all
partitions being reassigned
- * before invoking `maybeTriggerPartitionReassignment` (see method
documentation for the reason)
+ * Process a partition reassignment from the AlterPartitionReassignment API.
If there is an
+ * existing reassignment through zookeeper for any of the requested
partitions, they will be
+ * cancelled prior to beginning the new reassignment. Any zk-based
reassignment for partitions
+ * which are NOT included in this call will not be affected.
*
- * @param reassignmentsOpt - optional map of reassignments, expected when an
API reassignment is issued
- * The map consists of topic partitions to an
optional sequence of target replicas.
- * An empty target replicas option denotes a
revert of an on-going reassignment.
- * @param callback - optional callback, expected when an API reassignment is
issued
+ * @param reassignments Map of reassignments passed through the
AlterReassignments API. A null value
+ * means that we should cancel an in-progress
reassignment.
+ * @param callback Callback to send AlterReassignments response
*/
- private def processPartitionReassignment(reassignmentsOpt:
Option[Map[TopicPartition, Option[Seq[Int]]]],
- callback:
Option[AlterReassignmentsCallback]): Unit = {
+ private def processApiPartitionReassignment(reassignments:
Map[TopicPartition, Option[Seq[Int]]],
+ callback:
AlterReassignmentsCallback): Unit = {
if (!isActive) {
- callback match {
- case Some(cb) => cb(Right(new ApiError(Errors.NOT_CONTROLLER)))
- case None =>
- }
- return
- }
-
- val reassignmentResults: mutable.Map[TopicPartition, ApiError] =
mutable.Map.empty
- val partitionsToBeReassigned = reassignmentsOpt match {
- case Some(reassignments) => // API triggered
- val (savedReassignments, _) = reassignments.partition { case (tp,
targetReplicas) =>
- if (replicasAreValid(targetReplicas)) {
- savePartitionReassignment(reassignmentResults, tp, targetReplicas,
zkTriggered = false)
- } else {
- reassignmentResults.put(tp, new
ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, "The partition assignment is not
valid."))
- false
- }
- }
- savedReassignments.keySet
-
- case None => // ZK triggered
- // We need to register the watcher if the path doesn't exist in order
to detect future reassignments and we get
- // the `path exists` check for free
- if
(zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler))
{
- val partitionReassignment = zkClient.getPartitionReassignment
- val (savedReassignments, _) = partitionReassignment.partition { case
(tp, targetReplicas) =>
- savePartitionReassignment(reassignmentResults, tp,
Some(targetReplicas), zkTriggered = true)
+ callback(Right(new ApiError(Errors.NOT_CONTROLLER)))
+ } else {
+ val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
+ val partitionsToReassign = mutable.Map.empty[TopicPartition,
ReplicaAssignment]
+
+ reassignments.foreach { case (tp, targetReplicas) =>
+ if (replicasAreValid(tp, targetReplicas)) {
+ maybeBuildReassignment(tp, targetReplicas) match {
+ case Some(context) => partitionsToReassign.put(tp, context)
+ case None => reassignmentResults.put(tp, new
ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
}
- savedReassignments.keySet
} else {
- Set.empty[TopicPartition]
+ reassignmentResults.put(tp, new
ApiError(Errors.INVALID_REPLICA_ASSIGNMENT))
}
- }
+ }
- reassignmentResults ++=
maybeTriggerPartitionReassignment(partitionsToBeReassigned)
- callback match {
- case Some(cb) => cb(Left(reassignmentResults))
- case None =>
+ // The latest reassignment (whether by API or through zk) always takes
precedence,
+ // so remove from active zk reassignment (if one exists)
+ maybeRemoveFromZkReassignment((tp, _) =>
partitionsToReassign.contains(tp))
+
+ reassignmentResults ++=
maybeTriggerPartitionReassignment(partitionsToReassign)
+ callback(Left(reassignmentResults))
}
}
- private def replicasAreValid(replicasOpt: Option[Seq[Int]]): Boolean = {
+ private def replicasAreValid(topicPartition: TopicPartition, replicasOpt:
Option[Seq[Int]]): Boolean = {
replicasOpt match {
case Some(replicas) =>
val replicaSet = replicas.toSet
-
if (replicas.isEmpty || replicas.size != replicaSet.size)
false
else if (replicas.exists(_ < 0))
false
- else
- replicaSet.subsetOf(controllerContext.liveBrokerIds)
+ else {
+ // Ensure that any new replicas are among the live brokers
+ val currentAssignment =
controllerContext.partitionFullReplicaAssignment(topicPartition)
+ val newAssignment = currentAssignment.reassignTo(replicas)
+
newAssignment.addingReplicas.toSet.subsetOf(controllerContext.liveBrokerIds)
+ }
+
case None => true
}
}
- private def savePartitionReassignment(reassignmentResults:
mutable.Map[TopicPartition, ApiError], partition: TopicPartition,
- targetReplicasOpt: Option[Seq[Int]],
zkTriggered: Boolean): Boolean = {
- val reassignIsrChangeHandler = new
PartitionReassignmentIsrChangeHandler(eventManager, partition)
- val replicaAssignment =
controllerContext.partitionFullReplicaAssignment(partition)
- val reassignmentIsInProgress =
controllerContext.partitionsBeingReassigned.contains(partition)
-
- val newContextOpt = targetReplicasOpt match {
- case Some(targetReplicas) =>
- if (reassignmentIsInProgress) {
- info(s"Overriding old reassignment for partition $partition " +
- s"(with target replicas
${replicaAssignment.targetReplicas.mkString(",")}) " +
- s"to new target replicas (${targetReplicas.mkString(",")})")
- assert(replicaAssignment.isBeingReassigned)
-
- val oldContext =
controllerContext.partitionsBeingReassigned(partition)
- oldContext.unregisterReassignIsrChangeHandler(zkClient)
-
- Some(ReassignedPartitionsContext(targetReplicas,
reassignIsrChangeHandler,
- persistedInZk = zkTriggered || oldContext.persistedInZk,
- ongoingReassignmentOpt = Some(replicaAssignment))
- )
- } else {
- Some(ReassignedPartitionsContext(targetReplicas,
reassignIsrChangeHandler,
- persistedInZk = zkTriggered,
- ongoingReassignmentOpt = None)
- )
- }
- case None => // revert
- if (reassignmentIsInProgress) {
- val originalAssignment = replicaAssignment.previousAssignment
- info(s"Reverting old reassignment for partition $partition " +
- s"(with target replicas
${replicaAssignment.targetReplicas.mkString(",")}) " +
- s"to original replicas
(${originalAssignment.replicas.mkString(",")})")
- assert(replicaAssignment.isBeingReassigned)
-
- val oldContext =
controllerContext.partitionsBeingReassigned(partition)
- oldContext.unregisterReassignIsrChangeHandler(zkClient)
-
- Some(ReassignedPartitionsContext(originalAssignment.replicas,
reassignIsrChangeHandler,
- persistedInZk = oldContext.persistedInZk,
- ongoingReassignmentOpt = Some(replicaAssignment)
- ))
- } else {
- reassignmentResults.put(partition, new
ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
- None
- }
- }
-
- newContextOpt match {
- case Some(newContext) =>
- controllerContext.partitionsBeingReassigned.put(partition, newContext)
- true
- case None => false
+ private def maybeBuildReassignment(topicPartition: TopicPartition,
+ targetReplicasOpt: Option[Seq[Int]]):
Option[ReplicaAssignment] = {
+ val replicaAssignment =
controllerContext.partitionFullReplicaAssignment(topicPartition)
+ if (replicaAssignment.isBeingReassigned) {
+ val targetReplicas =
targetReplicasOpt.getOrElse(replicaAssignment.originReplicas)
+ Some(replicaAssignment.reassignTo(targetReplicas))
+ } else {
+ targetReplicasOpt.map { targetReplicas =>
+ replicaAssignment.reassignTo(targetReplicas)
+ }
}
}
-
- private def processPartitionReassignmentIsrChange(partition:
TopicPartition): Unit = {
+ private def processPartitionReassignmentIsrChange(topicPartition:
TopicPartition): Unit = {
if (!isActive) return
- // check if this partition is still being reassigned or not
- controllerContext.partitionsBeingReassigned.get(partition).foreach {
reassignedPartitionContext =>
- val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet
- zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match {
- case Some(leaderIsrAndControllerEpoch) => // check if new replicas
have joined ISR
- val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
- val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
- if (caughtUpReplicas == reassignedReplicas) {
- // resume the partition reassignment process
- info(s"${caughtUpReplicas.size}/${reassignedReplicas.size}
replicas have caught up with the leader for " +
- s"partition $partition being reassigned. Resuming partition
reassignment")
- onPartitionReassignment(partition, reassignedPartitionContext)
- }
- else {
- info(s"${caughtUpReplicas.size}/${reassignedReplicas.size}
replicas have caught up with the leader for " +
- s"partition $partition being reassigned. Replica(s) " +
- s"${(reassignedReplicas --
leaderAndIsr.isr.toSet).mkString(",")} still need to catch up")
- }
- case None => error(s"Error handling reassignment of partition
$partition to replicas " +
- s"${reassignedReplicas.mkString(",")} as it was never created")
+
+ if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
+ val reassignment =
controllerContext.partitionFullReplicaAssignment(topicPartition)
+ if (isReassignmentComplete(topicPartition, reassignment)) {
+ // resume the partition reassignment process
+ info(s"Target replicas ${reassignment.targetReplicas} have all caught
up with the leader for " +
+ s"reassigning partition $topicPartition")
+ onPartitionReassignment(topicPartition, reassignment)
}
}
}
@@ -1813,10 +1689,10 @@ class KafkaController(val config: KafkaConfig,
if (!isActive) {
callback(Right(new ApiError(Errors.NOT_CONTROLLER)))
} else {
- val results: mutable.Map[TopicPartition, PartitionReplicaAssignment] =
mutable.Map.empty
+ val results: mutable.Map[TopicPartition, ReplicaAssignment] =
mutable.Map.empty
val partitionsToList = partitionsOpt match {
case Some(partitions) => partitions
- case None => controllerContext.partitionsBeingReassigned.keys
+ case None => controllerContext.partitionsBeingReassigned
}
partitionsToList.foreach { tp =>
@@ -1866,7 +1742,7 @@ class KafkaController(val config: KafkaConfig,
def alterPartitionReassignments(partitions: Map[TopicPartition,
Option[Seq[Int]]],
callback: AlterReassignmentsCallback): Unit
= {
- eventManager.put(PartitionReassignment(Some(partitions), Some(callback)))
+ eventManager.put(ApiPartitionReassignment(partitions, callback))
}
private def preemptReplicaLeaderElection(
@@ -2016,8 +1892,10 @@ class KafkaController(val config: KafkaConfig,
processPartitionModifications(topic)
case TopicDeletion =>
processTopicDeletion()
- case PartitionReassignment(reassignments, callback) =>
- processPartitionReassignment(reassignments, callback)
+ case ApiPartitionReassignment(reassignments, callback) =>
+ processApiPartitionReassignment(reassignments, callback)
+ case ZkPartitionReassignment =>
+ processZkPartitionReassignment()
case ListPartitionReassignments(partitions, callback) =>
processListPartitionReassignments(partitions, callback)
case PartitionReassignmentIsrChange(partition) =>
@@ -2099,7 +1977,7 @@ class PartitionReassignmentHandler(eventManager:
ControllerEventManager) extends
// Note that the event is also enqueued when the znode is deleted, but we do
it explicitly instead of relying on
// handleDeletion(). This approach is more robust as it doesn't depend on
the watcher being re-registered after
// it's consumed during data changes (we ensure re-registration when the
znode is deleted).
- override def handleCreation(): Unit =
eventManager.put(PartitionReassignment(None, None))
+ override def handleCreation(): Unit =
eventManager.put(ZkPartitionReassignment)
}
class PartitionReassignmentIsrChangeHandler(eventManager:
ControllerEventManager, partition: TopicPartition) extends ZNodeChangeHandler {
@@ -2132,26 +2010,6 @@ class ControllerChangeHandler(eventManager:
ControllerEventManager) extends ZNod
override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}
-/**
- * @param newReplicas - the target replicas for this partition
- * @param reassignIsrChangeHandler - a handler for tracking ISR changes in
this partition
- * @param persistedInZk - a boolean indicating whether this partition is
stored in the /admin/reassign_partitions znode
- * it is needed to ensure that an API reassignment
that overrides a znode reassignment still cleans up after itself
- * @param ongoingReassignmentOpt - the ongoing reassignment for this
partition, if one is present -- it will be reverted.
- */
-case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
- reassignIsrChangeHandler:
PartitionReassignmentIsrChangeHandler,
- persistedInZk: Boolean,
- var ongoingReassignmentOpt:
Option[PartitionReplicaAssignment]) {
-
- def registerReassignIsrChangeHandler(zkClient: KafkaZkClient): Unit =
- zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
-
- def unregisterReassignIsrChangeHandler(zkClient: KafkaZkClient): Unit =
- zkClient.unregisterZNodeChangeHandler(reassignIsrChangeHandler.path)
-
-}
-
case class PartitionAndReplica(topicPartition: TopicPartition, replica: Int) {
def topic: String = topicPartition.topic
def partition: Int = topicPartition.partition
@@ -2261,8 +2119,12 @@ case object TopicDeletion extends ControllerEvent {
override def state: ControllerState = ControllerState.TopicDeletion
}
-case class PartitionReassignment(reassignments: Option[Map[TopicPartition,
Option[Seq[Int]]]],
- callback: Option[AlterReassignmentsCallback])
extends ControllerEvent {
+case object ZkPartitionReassignment extends ControllerEvent {
+ override def state: ControllerState =
ControllerState.AlterPartitionReassignment
+}
+
+case class ApiPartitionReassignment(reassignments: Map[TopicPartition,
Option[Seq[Int]]],
+ callback: AlterReassignmentsCallback)
extends ControllerEvent {
override def state: ControllerState =
ControllerState.AlterPartitionReassignment
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9a0ebd7..f194485 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -30,7 +30,7 @@ import kafka.api.ElectLeadersRequestOps
import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0}
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
-import kafka.controller.{KafkaController, PartitionReplicaAssignment}
+import kafka.controller.{KafkaController, ReplicaAssignment}
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult,
LeaveGroupResult, SyncGroupResult}
import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinator}
import kafka.message.ZStdCompressionCodec
@@ -2350,7 +2350,7 @@ class KafkaApis(val requestChannel: RequestChannel,
authorizeClusterOperation(request, DESCRIBE)
val listPartitionReassignmentsRequest =
request.body[ListPartitionReassignmentsRequest]
- def sendResponseCallback(result: Either[Map[TopicPartition,
PartitionReplicaAssignment], ApiError]): Unit = {
+ def sendResponseCallback(result: Either[Map[TopicPartition,
ReplicaAssignment], ApiError]): Unit = {
val responseData = result match {
case Right(error) => new
ListPartitionReassignmentsResponseData().setErrorMessage(error.message()).setErrorCode(error.error().code())
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index de6337b..63e2614 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -20,7 +20,7 @@ import java.util.Properties
import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata,
RackAwareMode}
import kafka.common.TopicAlreadyMarkedForDeletionException
-import kafka.controller.PartitionReplicaAssignment
+import kafka.controller.ReplicaAssignment
import kafka.log.LogConfig
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
import kafka.utils._
@@ -93,8 +93,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
// create the partition assignment
- writeTopicPartitionAssignment(topic,
partitionReplicaAssignment.mapValues(PartitionReplicaAssignment(_, List(),
List())).toMap,
- isUpdate = false)
+ writeTopicPartitionAssignment(topic,
partitionReplicaAssignment.mapValues(ReplicaAssignment(_)).toMap, isUpdate =
false)
}
/**
@@ -136,7 +135,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends
Logging {
LogConfig.validate(config)
}
- private def writeTopicPartitionAssignment(topic: String, replicaAssignment:
Map[Int, PartitionReplicaAssignment], isUpdate: Boolean): Unit = {
+ private def writeTopicPartitionAssignment(topic: String, replicaAssignment:
Map[Int, ReplicaAssignment], isUpdate: Boolean): Unit = {
try {
val assignment = replicaAssignment.map { case (partitionId, replicas) =>
(new TopicPartition(topic,partitionId), replicas) }.toMap
@@ -182,7 +181,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends
Logging {
* @return the updated replica assignment
*/
def addPartitions(topic: String,
- existingAssignment: Map[Int, PartitionReplicaAssignment],
+ existingAssignment: Map[Int, ReplicaAssignment],
allBrokers: Seq[BrokerMetadata],
numPartitions: Int = 1,
replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
@@ -211,7 +210,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends
Logging {
}
val proposedAssignment = existingAssignment ++
proposedAssignmentForNewPartitions.map { case (tp, replicas) =>
- tp -> PartitionReplicaAssignment(replicas, List(), List())
+ tp -> ReplicaAssignment(replicas, List(), List())
}
if (!validateOnly) {
info(s"Creating $partitionsToAdd partitions for '$topic' with the
following replica assignment: " +
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 864dcab..4b16860 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -21,7 +21,7 @@ import java.util.Properties
import com.yammer.metrics.core.MetricName
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
-import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch,
PartitionReplicaAssignment}
+import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch,
ReplicaAssignment}
import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
import kafka.security.authorizer.AclAuthorizer.{NoAcls, VersionedAcls}
@@ -484,7 +484,9 @@ class KafkaZkClient private[zk] (zooKeeperClient:
ZooKeeperClient, isSecure: Boo
* @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
* @return SetDataResponse
*/
- def setTopicAssignmentRaw(topic: String, assignment:
collection.Map[TopicPartition, PartitionReplicaAssignment],
expectedControllerEpochZkVersion: Int): SetDataResponse = {
+ def setTopicAssignmentRaw(topic: String,
+ assignment: collection.Map[TopicPartition,
ReplicaAssignment],
+ expectedControllerEpochZkVersion: Int):
SetDataResponse = {
val setDataRequest = SetDataRequest(TopicZNode.path(topic),
TopicZNode.encode(assignment), ZkVersion.MatchAnyVersion)
retryRequestUntilConnected(setDataRequest,
expectedControllerEpochZkVersion)
}
@@ -496,7 +498,9 @@ class KafkaZkClient private[zk] (zooKeeperClient:
ZooKeeperClient, isSecure: Boo
* @param expectedControllerEpochZkVersion expected controller epoch
zkVersion.
* @throws KeeperException if there is an error while setting assignment
*/
- def setTopicAssignment(topic: String, assignment: Map[TopicPartition,
PartitionReplicaAssignment], expectedControllerEpochZkVersion: Int =
ZkVersion.MatchAnyVersion) = {
+ def setTopicAssignment(topic: String,
+ assignment: Map[TopicPartition, ReplicaAssignment],
+ expectedControllerEpochZkVersion: Int =
ZkVersion.MatchAnyVersion) = {
val setDataResponse = setTopicAssignmentRaw(topic, assignment,
expectedControllerEpochZkVersion)
setDataResponse.maybeThrow
}
@@ -508,7 +512,7 @@ class KafkaZkClient private[zk] (zooKeeperClient:
ZooKeeperClient, isSecure: Boo
* @throws KeeperException if there is an error while creating assignment
*/
def createTopicAssignment(topic: String, assignment: Map[TopicPartition,
Seq[Int]]) = {
- val persistedAssignments =
assignment.mapValues(PartitionReplicaAssignment(_, List(), List())).toMap
+ val persistedAssignments = assignment.mapValues(ReplicaAssignment(_)).toMap
createRecursive(TopicZNode.path(topic),
TopicZNode.encode(persistedAssignments))
}
@@ -584,14 +588,14 @@ class KafkaZkClient private[zk] (zooKeeperClient:
ZooKeeperClient, isSecure: Boo
* @param topics the topics whose partitions we wish to get the assignments
for.
* @return the full replica assignment for each partition from the given
topics.
*/
- def getFullReplicaAssignmentForTopics(topics: Set[String]):
Map[TopicPartition, PartitionReplicaAssignment] = {
+ def getFullReplicaAssignmentForTopics(topics: Set[String]):
Map[TopicPartition, ReplicaAssignment] = {
val getDataRequests = topics.map(topic =>
GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
getDataResponses.flatMap { getDataResponse =>
val topic = getDataResponse.ctx.get.asInstanceOf[String]
getDataResponse.resultCode match {
case Code.OK => TopicZNode.decode(topic, getDataResponse.data)
- case Code.NONODE => Map.empty[TopicPartition,
PartitionReplicaAssignment]
+ case Code.NONODE => Map.empty[TopicPartition, ReplicaAssignment]
case _ => throw getDataResponse.resultException.get
}
}.toMap
@@ -602,7 +606,7 @@ class KafkaZkClient private[zk] (zooKeeperClient:
ZooKeeperClient, isSecure: Boo
* @param topics the topics whose partitions we wish to get the assignments
for.
* @return the partition assignment for each partition from the given topics.
*/
- def getPartitionAssignmentForTopics(topics: Set[String]): Map[String,
Map[Int, PartitionReplicaAssignment]] = {
+ def getPartitionAssignmentForTopics(topics: Set[String]): Map[String,
Map[Int, ReplicaAssignment]] = {
val getDataRequests = topics.map(topic =>
GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
getDataResponses.flatMap { getDataResponse =>
@@ -611,7 +615,7 @@ class KafkaZkClient private[zk] (zooKeeperClient:
ZooKeeperClient, isSecure: Boo
val partitionMap = TopicZNode.decode(topic, getDataResponse.data).map
{ case (k, v) => (k.partition, v) }
Map(topic -> partitionMap)
} else if (getDataResponse.resultCode == Code.NONODE) {
- Map.empty[String, Map[Int, PartitionReplicaAssignment]]
+ Map.empty[String, Map[Int, ReplicaAssignment]]
} else {
throw getDataResponse.resultException.get
}
@@ -805,7 +809,7 @@ class KafkaZkClient private[zk] (zooKeeperClient:
ZooKeeperClient, isSecure: Boo
* @deprecated Use the PartitionReassignment Kafka API instead
*/
@Deprecated
- def getPartitionReassignment: collection.Map[TopicPartition, Seq[Int]] = {
+ def getPartitionReassignment(): collection.Map[TopicPartition, Seq[Int]] = {
val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path)
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode match {
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala
b/core/src/main/scala/kafka/zk/ZkData.scala
index 4a81e83..0d1d525 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -25,7 +25,7 @@ import com.fasterxml.jackson.core.JsonProcessingException
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
-import kafka.controller.{IsrChangeNotificationHandler,
LeaderIsrAndControllerEpoch, PartitionReplicaAssignment}
+import kafka.controller.{IsrChangeNotificationHandler,
LeaderIsrAndControllerEpoch, ReplicaAssignment}
import kafka.security.auth.Resource.Separator
import kafka.security.authorizer.AclAuthorizer.VersionedAcls
import kafka.security.auth.{Acl, Resource, ResourceType}
@@ -240,7 +240,7 @@ object TopicsZNode {
object TopicZNode {
def path(topic: String) = s"${TopicsZNode.path}/$topic"
- def encode(assignment: collection.Map[TopicPartition,
PartitionReplicaAssignment]): Array[Byte] = {
+ def encode(assignment: collection.Map[TopicPartition, ReplicaAssignment]):
Array[Byte] = {
val replicaAssignmentJson = mutable.Map[String, util.List[Int]]()
val addingReplicasAssignmentJson = mutable.Map[String, util.List[Int]]()
val removingReplicasAssignmentJson = mutable.Map[String, util.List[Int]]()
@@ -260,7 +260,7 @@ object TopicZNode {
"removing_replicas" -> removingReplicasAssignmentJson.asJava
).asJava)
}
- def decode(topic: String, bytes: Array[Byte]): Map[TopicPartition,
PartitionReplicaAssignment] = {
+ def decode(topic: String, bytes: Array[Byte]): Map[TopicPartition,
ReplicaAssignment] = {
def getReplicas(replicasJsonOpt: Option[JsonObject], partition: String):
Seq[Int] = {
replicasJsonOpt match {
case Some(replicasJson) => replicasJson.get(partition) match {
@@ -278,7 +278,7 @@ object TopicZNode {
val removingReplicasJsonOpt =
assignmentJson.get("removing_replicas").map(_.asJsonObject)
partitionsJsonOpt.map { partitionsJson =>
partitionsJson.iterator.map { case (partition, replicas) =>
- new TopicPartition(topic, partition.toInt) ->
PartitionReplicaAssignment(
+ new TopicPartition(topic, partition.toInt) -> ReplicaAssignment(
replicas.to[Seq[Int]],
getReplicas(addingReplicasJsonOpt, partition),
getReplicas(removingReplicasJsonOpt, partition)
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 13216e8..115f091 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,7 +17,7 @@
package kafka.admin
-import kafka.controller.PartitionReplicaAssignment
+import kafka.controller.ReplicaAssignment
import kafka.network.SocketServer
import org.junit.Assert._
import kafka.utils.TestUtils._
@@ -39,15 +39,15 @@ class AddPartitionsTest extends BaseRequestTest {
val partitionId = 0
val topic1 = "new-topic1"
- val topic1Assignment = Map(0 -> PartitionReplicaAssignment(Seq(0,1), List(),
List()))
+ val topic1Assignment = Map(0 -> ReplicaAssignment(Seq(0,1), List(), List()))
val topic2 = "new-topic2"
- val topic2Assignment = Map(0 -> PartitionReplicaAssignment(Seq(1,2), List(),
List()))
+ val topic2Assignment = Map(0 -> ReplicaAssignment(Seq(1,2), List(), List()))
val topic3 = "new-topic3"
- val topic3Assignment = Map(0 -> PartitionReplicaAssignment(Seq(2,3,0,1),
List(), List()))
+ val topic3Assignment = Map(0 -> ReplicaAssignment(Seq(2,3,0,1), List(),
List()))
val topic4 = "new-topic4"
- val topic4Assignment = Map(0 -> PartitionReplicaAssignment(Seq(0,3), List(),
List()))
+ val topic4Assignment = Map(0 -> ReplicaAssignment(Seq(0,3), List(), List()))
val topic5 = "new-topic5"
- val topic5Assignment = Map(1 -> PartitionReplicaAssignment(Seq(0,1), List(),
List()))
+ val topic5Assignment = Map(1 -> ReplicaAssignment(Seq(0,1), List(), List()))
@Before
override def setUp(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 34c8c85..eb61dd2 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -29,7 +29,7 @@ import org.junit.{After, Test}
import kafka.admin.TopicCommand.ZookeeperTopicService
import kafka.common.TopicAlreadyMarkedForDeletionException
-import kafka.controller.{OfflineReplica, PartitionAndReplica,
PartitionReplicaAssignment, ReplicaDeletionSuccessful}
+import kafka.controller.{OfflineReplica, PartitionAndReplica,
ReplicaAssignment, ReplicaDeletionSuccessful}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.scalatest.Assertions.fail
@@ -39,7 +39,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
var servers: Seq[KafkaServer] = Seq()
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
- val expectedReplicaFullAssignment =
expectedReplicaAssignment.mapValues(PartitionReplicaAssignment(_, List(),
List())).toMap
+ val expectedReplicaFullAssignment =
expectedReplicaAssignment.mapValues(ReplicaAssignment(_, List(), List())).toMap
@After
override def tearDown(): Unit = {
diff --git
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index a84c611..6ee2f53 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -31,7 +31,7 @@ import scala.collection.{Map, Seq}
import scala.util.Random
import java.io.File
-import kafka.controller.PartitionReplicaAssignment
+import kafka.controller.ReplicaAssignment
import kafka.log.LogConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.config.ConfigResource
@@ -141,6 +141,25 @@ class ReassignPartitionsClusterTest extends
ZooKeeperTestHarness with Logging {
}
@Test
+ def testReassignmentMatchesCurrentAssignment(): Unit = {
+ // Given a single replica on server 100
+ startBrokers(Seq(100))
+ adminClient = createAdminClient(servers)
+ createTopic(zkClient, topicName, Map(tp0.partition() -> Seq(100)), servers
= servers)
+
+ // Execute no-op reassignment
+ val topicJson = executeAssignmentJson(Seq(
+ PartitionAssignmentJson(tp0, replicas = Seq(100), logDirectories = None)
+ ))
+ ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient),
topicJson, NoThrottle)
+ waitForZkReassignmentToComplete()
+
+ // The replica should remain on 100
+ val partitionAssignment =
zkClient.getPartitionAssignmentForTopics(Set(topicName))(topicName)(tp0.partition)
+ assertMoveForPartitionOccurred(Seq(100), partitionAssignment)
+ }
+
+ @Test
def shouldMoveSinglePartitionWithinBroker(): Unit = {
// Given a single replica on server 100
startBrokers(Seq(100, 101))
@@ -657,11 +676,11 @@ class ReassignPartitionsClusterTest extends
ZooKeeperTestHarness with Logging {
adminClient.close()
zkClient.setTopicAssignment("orders", Map(
- new TopicPartition("orders", 0) -> PartitionReplicaAssignment(List(0,
1), List(2), List(0)), // should be overwritten
- new TopicPartition("orders", 1) -> PartitionReplicaAssignment(List(1,
2), List(3), List(1)), // should be overwritten
+ new TopicPartition("orders", 0) -> ReplicaAssignment(List(0, 1),
List(2), List(0)), // should be overwritten
+ new TopicPartition("orders", 1) -> ReplicaAssignment(List(1, 2),
List(3), List(1)), // should be overwritten
// should be overwritten (so we know to remove it from ZK) even though
we do the exact same move
- sameMoveTp -> PartitionReplicaAssignment(List(0, 1, 2), List(2),
List(0)),
- new TopicPartition("orders", 3) -> PartitionReplicaAssignment(List(0, 1,
2), List(2), List(0)) // moves
+ sameMoveTp -> ReplicaAssignment(List(0, 1, 2), List(2), List(0)),
+ new TopicPartition("orders", 3) -> ReplicaAssignment(List(0, 1, 2),
List(2), List(0)) // moves
))
val move = Map(
new TopicPartition("orders", 0) -> Seq(2, 1), // moves
@@ -916,9 +935,7 @@ class ReassignPartitionsClusterTest extends
ZooKeeperTestHarness with Logging {
waitForZkReassignmentToComplete()
// 4. Ensure the API reassignment not part of the znode is still in
progress
val leftoverReassignments =
adminClient.listPartitionReassignments(Set(tpA0, tpA1,
tpB0).asJava).reassignments().get()
- assertEquals(1, leftoverReassignments.size())
- val tpB0LeftoverReassignment = leftoverReassignments.get(tpB0)
- assertIsReassigning(from = Seq(100), to = Seq(102),
tpB0LeftoverReassignment)
+ assertTrue(leftoverReassignments.keySet().asScala.subsetOf(Set(tpA1,
tpB0)))
resetBrokersThrottle()
waitForAllReassignmentsToComplete()
@@ -1116,7 +1133,7 @@ class ReassignPartitionsClusterTest extends
ZooKeeperTestHarness with Logging {
* Asserts that a topic's reassignments completed and span across the
expected replicas
*/
def assertMoveForTopicOccurred(expectedReplicas: Seq[Int],
- partitionAssignments: Map[Int,
PartitionReplicaAssignment]): Unit = {
+ partitionAssignments: Map[Int,
ReplicaAssignment]): Unit = {
assertEquals(expectedReplicas,
partitionAssignments.values.flatMap(_.replicas).toSeq.distinct.sorted)
assertTrue(partitionAssignments.values.flatMap(_.addingReplicas).isEmpty)
assertTrue(partitionAssignments.values.flatMap(_.removingReplicas).isEmpty)
@@ -1126,7 +1143,7 @@ class ReassignPartitionsClusterTest extends
ZooKeeperTestHarness with Logging {
* Asserts that a partition moved to the exact expected replicas in the
specific order
*/
def assertMoveForPartitionOccurred(expectedReplicas: Seq[Int],
- partitionAssignment:
PartitionReplicaAssignment): Unit = {
+ partitionAssignment: ReplicaAssignment):
Unit = {
assertEquals(expectedReplicas, partitionAssignment.replicas)
assertTrue(partitionAssignment.addingReplicas.isEmpty)
assertTrue(partitionAssignment.removingReplicas.isEmpty)
diff --git
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index ac1bc8d..790e3d8 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -527,10 +527,10 @@ class ReassignPartitionsCommandTest extends
ZooKeeperTestHarness with Logging {
@Test
def testResumePartitionReassignmentThatWasCompleted(): Unit = {
- val expectedReplicaAssignment = Map(0 -> List(0, 1))
+ val initialAssignment = Map(0 -> List(0, 2))
val topic = "test"
// create the topic
- adminZkClient.createTopicWithAssignment(topic, config = new Properties,
expectedReplicaAssignment)
+ adminZkClient.createTopicWithAssignment(topic, config = new Properties,
initialAssignment)
// put the partition in the reassigned path as well
// reassign partition 0
val newReplicas = Seq(0, 1)
diff --git
a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
index 88607f2..d286967 100644
---
a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
@@ -673,7 +673,7 @@ class ControllerChannelManagerTest {
KafkaConfig.fromProps(props)
}
- private def replicaAssignment(replicas: Seq[Int]):
PartitionReplicaAssignment = PartitionReplicaAssignment(replicas, Seq(), Seq())
+ private def replicaAssignment(replicas: Seq[Int]): ReplicaAssignment =
ReplicaAssignment(replicas, Seq(), Seq())
private def initContext(brokers: Seq[Int],
topics: Set[String],
diff --git
a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
index eeddad0..a0121e2 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
@@ -18,7 +18,7 @@
package unit.kafka.controller
import kafka.cluster.{Broker, EndPoint}
-import kafka.controller.{ControllerContext, PartitionReplicaAssignment}
+import kafka.controller.{ControllerContext, ReplicaAssignment}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -77,7 +77,7 @@ class ControllerContextTest {
@Test
def
testUpdatePartitionReplicaAssignmentUpdatesReplicaAssignmentOnlyAndDoesNotOverwrite():
Unit = {
val expectedReplicas = Seq(4)
- val expectedFullAssignment = PartitionReplicaAssignment(Seq(3), Seq(1),
Seq(2))
+ val expectedFullAssignment = ReplicaAssignment(Seq(3), Seq(1), Seq(2))
context.updatePartitionFullReplicaAssignment(tp1, expectedFullAssignment)
context.updatePartitionReplicaAssignment(tp1, expectedReplicas) // update
only the replicas
@@ -100,7 +100,7 @@ class ControllerContextTest {
assertEquals(Seq(), fullAssignment.addingReplicas)
assertEquals(Seq(), fullAssignment.removingReplicas)
- val expectedFullAssignment = PartitionReplicaAssignment(Seq(3), Seq(1),
Seq(2))
+ val expectedFullAssignment = ReplicaAssignment(Seq(3), Seq(1), Seq(2))
context.updatePartitionFullReplicaAssignment(tp1, expectedFullAssignment)
val updatedFullAssignment = context.partitionFullReplicaAssignment(tp1)
assertEquals(expectedFullAssignment.replicas,
updatedFullAssignment.replicas)
@@ -118,7 +118,7 @@ class ControllerContextTest {
@Test
def
testPartitionFullReplicaAssignmentReturnsEmptyAssignmentIfTopicOrPartitionDoesNotExist():
Unit = {
- val expectedEmptyAssignment = PartitionReplicaAssignment(Seq.empty,
Seq.empty, Seq.empty)
+ val expectedEmptyAssignment = ReplicaAssignment(Seq.empty, Seq.empty,
Seq.empty)
val noTopicAssignment = context.partitionFullReplicaAssignment(new
TopicPartition("NONEXISTENT", 0))
assertEquals(expectedEmptyAssignment, noTopicAssignment)
@@ -143,23 +143,23 @@ class ControllerContextTest {
@Test
def testPartitionReplicaAssignment(): Unit = {
- val reassigningPartition = PartitionReplicaAssignment(List(1, 2, 3, 4, 5,
6), List(2, 3, 4), List(1, 5, 6))
+ val reassigningPartition = ReplicaAssignment(List(1, 2, 3, 4, 5, 6),
List(2, 3, 4), List(1, 5, 6))
assertTrue(reassigningPartition.isBeingReassigned)
assertEquals(List(2, 3, 4), reassigningPartition.targetReplicas)
- val reassigningPartition2 = PartitionReplicaAssignment(List(1, 2, 3, 4),
List(), List(1, 4))
+ val reassigningPartition2 = ReplicaAssignment(List(1, 2, 3, 4), List(),
List(1, 4))
assertTrue(reassigningPartition2.isBeingReassigned)
assertEquals(List(2, 3), reassigningPartition2.targetReplicas)
- val reassigningPartition3 = PartitionReplicaAssignment(List(1, 2, 3, 4),
List(4), List(2))
+ val reassigningPartition3 = ReplicaAssignment(List(1, 2, 3, 4), List(4),
List(2))
assertTrue(reassigningPartition3.isBeingReassigned)
assertEquals(List(1, 3, 4), reassigningPartition3.targetReplicas)
- val partition = PartitionReplicaAssignment(List(1, 2, 3, 4, 5, 6), List(),
List())
+ val partition = ReplicaAssignment(List(1, 2, 3, 4, 5, 6), List(), List())
assertFalse(partition.isBeingReassigned)
assertEquals(List(1, 2, 3, 4, 5, 6), partition.targetReplicas)
- val reassigningPartition4 =
PartitionReplicaAssignment.fromOldAndNewReplicas(
+ val reassigningPartition4 = ReplicaAssignment.fromOldAndNewReplicas(
List(1, 2, 3, 4), List(4, 2, 5, 3)
)
assertEquals(List(4, 2, 5, 3, 1), reassigningPartition4.replicas)
@@ -168,7 +168,7 @@ class ControllerContextTest {
assertEquals(List(1), reassigningPartition4.removingReplicas)
assertTrue(reassigningPartition4.isBeingReassigned)
- val reassigningPartition5 =
PartitionReplicaAssignment.fromOldAndNewReplicas(
+ val reassigningPartition5 = ReplicaAssignment.fromOldAndNewReplicas(
List(1, 2, 3), List(4, 5, 6)
)
assertEquals(List(4, 5, 6, 1, 2, 3), reassigningPartition5.replicas)
@@ -177,7 +177,7 @@ class ControllerContextTest {
assertEquals(List(1, 2, 3), reassigningPartition5.removingReplicas)
assertTrue(reassigningPartition5.isBeingReassigned)
- val nonReassigningPartition =
PartitionReplicaAssignment.fromOldAndNewReplicas(
+ val nonReassigningPartition = ReplicaAssignment.fromOldAndNewReplicas(
List(1, 2, 3), List(3, 1, 2)
)
assertEquals(List(3, 1, 2), nonReassigningPartition.replicas)
@@ -186,4 +186,16 @@ class ControllerContextTest {
assertEquals(List(), nonReassigningPartition.removingReplicas)
assertFalse(nonReassigningPartition.isBeingReassigned)
}
+
+ @Test
+ def testReassignTo(): Unit = {
+ val assignment = ReplicaAssignment(Seq(1, 2, 3))
+ val firstReassign = assignment.reassignTo(Seq(4, 5, 6))
+
+ assertEquals(ReplicaAssignment(Seq(4, 5, 6, 1, 2, 3), Seq(4, 5, 6), Seq(1,
2, 3)), firstReassign)
+ assertEquals(ReplicaAssignment(Seq(7, 8, 9, 1, 2, 3), Seq(7, 8, 9), Seq(1,
2, 3)), firstReassign.reassignTo(Seq(7, 8, 9)))
+ assertEquals(ReplicaAssignment(Seq(7, 8, 9, 1, 2, 3), Seq(7, 8, 9), Seq(1,
2, 3)), assignment.reassignTo(Seq(7, 8, 9)))
+ assertEquals(assignment, firstReassign.reassignTo(Seq(1,2,3)))
+ }
+
}
diff --git
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index dc09d73..018a0bb 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -250,8 +250,8 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
val tp1 = new TopicPartition("t", 1)
val assignment = Map(tp0.partition -> Seq(0))
val expandedAssignment = Map(
- tp0 -> PartitionReplicaAssignment(Seq(0), Seq(), Seq()),
- tp1 -> PartitionReplicaAssignment(Seq(0), Seq(), Seq()))
+ tp0 -> ReplicaAssignment(Seq(0), Seq(), Seq()),
+ tp1 -> ReplicaAssignment(Seq(0), Seq(), Seq()))
TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment =
assignment, servers = servers)
zkClient.setTopicAssignment(tp0.topic, expandedAssignment,
firstControllerEpochZkVersion)
waitForPartitionState(tp1, firstControllerEpoch, 0,
LeaderAndIsr.initialLeaderEpoch,
@@ -268,8 +268,8 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
val tp1 = new TopicPartition("t", 1)
val assignment = Map(tp0.partition -> Seq(otherBrokerId, controllerId))
val expandedAssignment = Map(
- tp0 -> PartitionReplicaAssignment(Seq(otherBrokerId, controllerId),
Seq(), Seq()),
- tp1 -> PartitionReplicaAssignment(Seq(otherBrokerId, controllerId),
Seq(), Seq()))
+ tp0 -> ReplicaAssignment(Seq(otherBrokerId, controllerId), Seq(), Seq()),
+ tp1 -> ReplicaAssignment(Seq(otherBrokerId, controllerId), Seq(), Seq()))
TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment =
assignment, servers = servers)
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
@@ -290,7 +290,7 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
val otherBrokerId = servers.map(_.config.brokerId).filter(_ !=
controllerId).head
val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(controllerId))
- val reassignment = Map(tp ->
PartitionReplicaAssignment(Seq(otherBrokerId), List(), List()))
+ val reassignment = Map(tp -> ReplicaAssignment(Seq(otherBrokerId), List(),
List()))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
zkClient.createPartitionReassignment(reassignment.mapValues(_.replicas).toMap)
waitForPartitionState(tp, firstControllerEpoch, otherBrokerId,
LeaderAndIsr.initialLeaderEpoch + 3,
@@ -330,7 +330,7 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
val otherBrokerId = servers.map(_.config.brokerId).filter(_ !=
controllerId).head
val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(controllerId))
- val reassignment = Map(tp ->
PartitionReplicaAssignment(Seq(otherBrokerId), List(), List()))
+ val reassignment = Map(tp -> ReplicaAssignment(Seq(otherBrokerId), List(),
List()))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
@@ -628,8 +628,6 @@ class ControllerIntegrationTest extends
ZooKeeperTestHarness {
TestUtils.waitUntilTrue(() => !controller.isActive, "Controller fails to
resign")
// Expect to capture the ControllerMovedException in the log of
ControllerEventThread
- println(appender.getMessages.find(e => e.getLevel == Level.INFO
- && e.getThrowableInformation != null))
val event = appender.getMessages.find(e => e.getLevel == Level.INFO
&& e.getThrowableInformation != null
&&
e.getThrowableInformation.getThrowable.getClass.getName.equals(classOf[ControllerMovedException].getName))
diff --git
a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 99dc6ed..d00e1bb 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -517,6 +517,6 @@ class PartitionStateMachineTest {
assertEquals(s"There should be no offline partition(s)", 0,
controllerContext.offlinePartitionCount)
}
- private def replicaAssignment(replicas: Seq[Int]):
PartitionReplicaAssignment = PartitionReplicaAssignment(replicas, Seq(), Seq())
+ private def replicaAssignment(replicas: Seq[Int]): ReplicaAssignment =
ReplicaAssignment(replicas, Seq(), Seq())
}
diff --git
a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index c43fd09..7a76496 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -409,6 +409,6 @@ class ReplicaStateMachineTest {
assertEquals(fromState, replicaState(replica))
}
- private def replicaAssignment(replicas: Seq[Int]):
PartitionReplicaAssignment = PartitionReplicaAssignment(replicas, Seq(), Seq())
+ private def replicaAssignment(replicas: Seq[Int]): ReplicaAssignment =
ReplicaAssignment(replicas, Seq(), Seq())
}
diff --git
a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index f1647ce..6780d98 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -32,7 +32,7 @@ import scala.util.{Failure, Success, Try}
import javax.security.auth.login.Configuration
import kafka.api.ApiVersion
import kafka.cluster.{Broker, EndPoint}
-import kafka.controller.PartitionReplicaAssignment
+import kafka.controller.ReplicaAssignment
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Time
@@ -131,7 +131,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with
Logging {
// Test that can update persistent nodes
val updatedAssignment = assignment - new TopicPartition(topic1, 2)
- zkClient.setTopicAssignment(topic1, updatedAssignment.mapValues { case (v)
=> PartitionReplicaAssignment(v, List(), List()) }.toMap)
+ zkClient.setTopicAssignment(topic1, updatedAssignment.mapValues { case (v)
=> ReplicaAssignment(v, List(), List()) }.toMap)
assertEquals(updatedAssignment.size,
zkClient.getTopicPartitionCount(topic1).get)
}
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index 07405f7..e81e032 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -19,7 +19,7 @@ package kafka.admin
import java.util
import java.util.Properties
-import kafka.controller.PartitionReplicaAssignment
+import kafka.controller.ReplicaAssignment
import kafka.log._
import kafka.server.DynamicConfig.Broker._
import kafka.server.KafkaConfig._
@@ -88,7 +88,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with
Logging with RackAware
1 -> List(1, 2, 3))
adminZkClient.createTopicWithAssignment("test", topicConfig, assignment)
val found = zkClient.getPartitionAssignmentForTopics(Set("test"))
- assertEquals(assignment.mapValues(PartitionReplicaAssignment(_, List(),
List())).toMap, found("test"))
+ assertEquals(assignment.mapValues(ReplicaAssignment(_, List(),
List())).toMap, found("test"))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 135092d..f11b9eb 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -40,7 +40,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Seq, mutable}
import scala.util.Random
-import kafka.controller.{LeaderIsrAndControllerEpoch,
PartitionReplicaAssignment}
+import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zookeeper._
import org.apache.kafka.common.errors.ControllerMovedException
@@ -169,7 +169,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val expectedAssignment = assignment map { topicAssignment =>
val partition = topicAssignment._1.partition
val assignment = topicAssignment._2
- partition -> PartitionReplicaAssignment(assignment, List(), List())
+ partition -> ReplicaAssignment(assignment, List(), List())
}
assertEquals(assignment.size, zkClient.getTopicPartitionCount(topic1).get)
@@ -179,7 +179,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val updatedAssignment = assignment - new TopicPartition(topic1, 2)
- zkClient.setTopicAssignment(topic1, updatedAssignment.mapValues { case v
=> PartitionReplicaAssignment(v, List(), List()) }.toMap)
+ zkClient.setTopicAssignment(topic1, updatedAssignment.mapValues { case v
=> ReplicaAssignment(v, List(), List()) }.toMap)
assertEquals(updatedAssignment.size,
zkClient.getTopicPartitionCount(topic1).get)
// add second topic
@@ -817,7 +817,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.createTopicAssignment(topicPartition.topic(),
Map(topicPartition -> Seq()))
- val expectedAssignment = PartitionReplicaAssignment(Seq(1,2,3), Seq(1),
Seq(3))
+ val expectedAssignment = ReplicaAssignment(Seq(1,2,3), Seq(1), Seq(3))
val response = zkClient.setTopicAssignmentRaw(topicPartition.topic(),
Map(topicPartition -> expectedAssignment), controllerEpochZkVersion)
assertEquals(Code.OK, response.resultCode)