Updated Branches: refs/heads/0.8.1 36eae8f63 -> cef51736c
KAFKA-330 Delete topic followup - more tests and Joel's review comments Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cef51736 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cef51736 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cef51736 Branch: refs/heads/0.8.1 Commit: cef51736c70f591baf4b384444e2f0d28a906d5e Parents: 36eae8f Author: Neha Narkhede <[email protected]> Authored: Sat Feb 8 11:12:54 2014 -0800 Committer: Neha Narkhede <[email protected]> Committed: Sat Feb 8 11:12:54 2014 -0800 ---------------------------------------------------------------------- .../controller/ControllerChannelManager.scala | 35 ++++++---- .../kafka/controller/KafkaController.scala | 20 +++--- .../controller/PartitionStateMachine.scala | 41 ++++++++---- .../kafka/controller/ReplicaStateMachine.scala | 62 +++++++++--------- .../kafka/controller/TopicDeletionManager.scala | 69 ++++++++++---------- .../src/main/scala/kafka/server/KafkaApis.scala | 5 +- .../unit/kafka/admin/DeleteTopicTest.scala | 63 +++++++++++++++++- 7 files changed, 193 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cef51736/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index a1ee5a7..8ab8ab6 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -235,18 +235,29 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging callback: (RequestOrResponse) => Unit = null) { val partitionList = controllerContext.partitionLeadershipInfo.keySet.dropWhile( p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic)) - partitionList.foreach { partition => - val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition) - leaderIsrAndControllerEpochOpt match { - case Some(leaderIsrAndControllerEpoch) => - val replicas = controllerContext.partitionReplicaAssignment(partition).toSet - val partitionStateInfo = PartitionStateInfo(leaderIsrAndControllerEpoch, replicas) + if(partitionList.size > 0) { + partitionList.foreach { partition => + val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition) + leaderIsrAndControllerEpochOpt match { + case Some(leaderIsrAndControllerEpoch) => + val replicas = controllerContext.partitionReplicaAssignment(partition).toSet + val partitionStateInfo = PartitionStateInfo(leaderIsrAndControllerEpoch, replicas) + brokerIds.filter(b => b >= 0).foreach { brokerId => + updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo]) + updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo) + } + case None => + info("Leader not assigned yet for partition %s. Skip sending udpate metadata request".format(partition)) + } + } + } else { + if(controllerContext.partitionLeadershipInfo.keySet.size > 0) { + // last set of topics are being deleted + controllerContext.partitionLeadershipInfo.foreach { case(partition, leaderIsrAndControllerEpoch) => brokerIds.filter(b => b >= 0).foreach { brokerId => - updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo]) - updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo) + updateMetadataRequestMap.put(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo]) } - case None => - info("Leader not assigned yet for partition %s. Skip sending udpate metadata request".format(partition)) + } } } } @@ -272,10 +283,10 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging val broker = m._1 val partitionStateInfos = m._2.toMap val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, correlationId, clientId, - partitionStateInfos, controllerContext.liveOrShuttingDownBrokers) + partitionStateInfos, controllerContext.liveOrShuttingDownBrokers) partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " + "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, - correlationId, broker, p._1))) + correlationId, broker, p._1))) controller.sendRequest(broker, updateMetadataRequest, null) } updateMetadataRequestMap.clear() http://git-wip-us.apache.org/repos/asf/kafka/blob/cef51736/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index d812cb4..8acd076 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -433,7 +433,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg if(replicasForTopicsToBeDeleted.size > 0) { // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be // deleted when the broker is down. This will prevent the replica from being in TopicDeletionStarted state indefinitely - // since topic deletion cannot be retried if at least one replica is in TopicDeletionStarted state + // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted) } } @@ -443,6 +443,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * and partitions as input. It does the following - * 1. Registers partition change listener. This is not required until KAFKA-347 * 2. Invokes the new partition callback + * 3. Send metadata request with the new topic to all brokers so they allow requests for that topic to be served */ def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) { info("New topic creation callback for %s".format(newPartitions.mkString(","))) @@ -581,8 +582,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // first register ISR change listener watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext) controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext) - // halt topic deletion for the partitions being reassigned - deleteTopicManager.haltTopicDeletion(Set(topic)) + // mark topic ineligible for deletion for the partitions being reassigned + deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) onPartitionReassignment(topicAndPartition, reassignedPartitionContext) } else { // some replica in RAR is not alive. Fail partition reassignment @@ -605,7 +606,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(","))) try { controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions - deleteTopicManager.haltTopicDeletion(partitions.map(_.topic)) + deleteTopicManager.markTopicIneligibleForDeletion(partitions.map(_.topic)) partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector) } catch { case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e) @@ -748,17 +749,16 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg private def initializeTopicDeletion() { val topicsQueuedForDeletion = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.DeleteTopicsPath).toSet - val replicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter(r => - r._2.foldLeft(false)((res,r) => res || !controllerContext.liveBrokerIds.contains(r))) - val topicsWithReplicasOnDeadBrokers = replicasOnDeadBrokers.map(_._1.topic).toSet + val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case(partition, replicas) => + replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic) val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic) val topicsForWhichPreferredReplicaElectionIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic) - val haltedTopicsForDeletion = topicsWithReplicasOnDeadBrokers | topicsForWhichPartitionReassignmentIsInProgress | + val topicsIneligibleForDeletion = topicsWithReplicasOnDeadBrokers | topicsForWhichPartitionReassignmentIsInProgress | topicsForWhichPreferredReplicaElectionIsInProgress info("List of topics to be deleted: %s".format(topicsQueuedForDeletion.mkString(","))) - info("List of topics halted for deletion: %s".format(haltedTopicsForDeletion.mkString(","))) + info("List of topics ineligible for deletion: %s".format(topicsIneligibleForDeletion.mkString(","))) // initialize the topic deletion manager - deleteTopicManager = new TopicDeletionManager(this, topicsQueuedForDeletion, haltedTopicsForDeletion) + deleteTopicManager = new TopicDeletionManager(this, topicsQueuedForDeletion, topicsIneligibleForDeletion) } private def maybeTriggerPartitionReassignment() { http://git-wip-us.apache.org/repos/asf/kafka/blob/cef51736/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 57c96b5..c69077e 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -51,6 +51,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext) this.logIdent = "[Partition state machine on Controller " + controllerId + "]: " private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) + private var topicChangeListener: TopicChangeListener = null + private var deleteTopicsListener: DeleteTopicsListener = null + private var addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty /** * Invoked on successful controller election. First registers a topic change listener since that triggers all @@ -167,8 +170,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { assignReplicasToPartitions(topic, partition) partitionState.put(topicAndPartition, NewPartition) val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",") - stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from NotExists to New with assigned replicas %s" - .format(controllerId, controller.epoch, topicAndPartition, assignedReplicas)) + stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s" + .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, + assignedReplicas)) // post: partition has been assigned replicas case OnlinePartition => assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition) @@ -184,22 +188,22 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } partitionState.put(topicAndPartition, OnlinePartition) val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader - stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to OnlinePartition with leader %d" - .format(controllerId, controller.epoch, topicAndPartition, partitionState(topicAndPartition), leader)) + stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d" + .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader)) // post: partition has a leader case OfflinePartition => // pre: partition should be in New or Online state assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition) // should be called when the leader for a partition is no longer alive - stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from Online to Offline" - .format(controllerId, controller.epoch, topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s" + .format(controllerId, controller.epoch, topicAndPartition, currState, targetState)) partitionState.put(topicAndPartition, OfflinePartition) // post: partition has no alive leader case NonExistentPartition => // pre: partition should be in Offline state assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition) - stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from Offline to NotExists" - .format(controllerId, controller.epoch, topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s" + .format(controllerId, controller.epoch, topicAndPartition, currState, targetState)) partitionState.put(topicAndPartition, NonExistentPartition) // post: partition state is deleted from all brokers and zookeeper } @@ -358,15 +362,22 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } private def registerTopicChangeListener() = { - zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, new TopicChangeListener()) + topicChangeListener = new TopicChangeListener() + zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener) } def registerPartitionChangeListener(topic: String) = { - zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), new AddPartitionsListener(topic)) + addPartitionsListener.put(topic, new AddPartitionsListener(topic)) + zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic)) + } + + def deregisterPartitionChangeListener(topic: String) = { + zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic)) } private def registerDeleteTopicListener() = { - zkClient.subscribeChildChanges(ZkUtils.DeleteTopicsPath, new DeleteTopicsListener()) + deleteTopicsListener = new DeleteTopicsListener() + zkClient.subscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener) } private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = { @@ -438,21 +449,23 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(","))) val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t)) - if(nonExistentTopics.size > 0) + if(nonExistentTopics.size > 0) { warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(",")) + nonExistentTopics.foreach(topic => ZkUtils.deletePathRecursive(zkClient, ZkUtils.getDeleteTopicPath(topic))) + } topicsToBeDeleted --= nonExistentTopics if(topicsToBeDeleted.size > 0) { info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(",")) // add topic to deletion list controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted) - // halt if other state changes are in progress + // mark topic ineligible for deletion if other state changes are in progress topicsToBeDeleted.foreach { topic => val preferredReplicaElectionInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic) val partitionReassignmentInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic) if(preferredReplicaElectionInProgress || partitionReassignmentInProgress) - controller.deleteTopicManager.haltTopicDeletion(Set(topic)) + controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/cef51736/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 613aec6..5e016d5 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -40,7 +40,7 @@ import kafka.utils.Utils._ * 4. ReplicaDeletionStarted: If replica deletion starts, it is moved to this state. Valid previous state is OfflineReplica * 5. ReplicaDeletionSuccessful: If replica responds with no error code in response to a delete replica request, it is * moved to this state. Valid previous state is ReplicaDeletionStarted - * 6. ReplicaDeletionFailed: If replica deletion fails, it is moved to this state. Valid previous state is ReplicaDeletionStarted + * 6. ReplicaDeletionIneligible: If replica deletion fails, it is moved to this state. Valid previous state is ReplicaDeletionStarted * 7. NonExistentReplica: If a replica is deleted successfully, it is moved to this state. Valid previous state is * ReplicaDeletionSuccessful */ @@ -115,7 +115,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the * partition to every live broker * - * NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionFailed -> OfflineReplica + * NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible -> OfflineReplica * --send StopReplicaRequest to the replica (w/o deletion) * --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and * UpdateMetadata request for the partition to every live broker. @@ -126,7 +126,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { * ReplicaDeletionStarted -> ReplicaDeletionSuccessful * -- mark the state of the replica in the state machine * - * ReplicaDeletionStarted -> ReplicaDeletionFailed + * ReplicaDeletionStarted -> ReplicaDeletionIneligible * -- mark the state of the replica in the state machine * * ReplicaDeletionSuccessful -> NonExistentReplica @@ -146,8 +146,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " + "to %s failed because replica state machine has not started") .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState)) + val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica) try { - replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica) val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition) targetState match { case NewReplica => @@ -165,45 +165,47 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case None => // new leader request will be sent to this replica when one gets elected } replicaState.put(partitionAndReplica, NewReplica) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NewReplica" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" + .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, + targetState)) case ReplicaDeletionStarted => assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState) replicaState.put(partitionAndReplica, ReplicaDeletionStarted) // send stop replica command brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true, callbacks.stopReplicaResponseCallback) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to ReplicaDeletionStarted" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) - case ReplicaDeletionFailed => + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" + .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) + case ReplicaDeletionIneligible => assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) - replicaState.put(partitionAndReplica, ReplicaDeletionFailed) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to ReplicaDeletionFailed" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + replicaState.put(partitionAndReplica, ReplicaDeletionIneligible) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" + .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case ReplicaDeletionSuccessful => assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to ReplicaDeletionSuccessful" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" + .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case NonExistentReplica => assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState) // remove this replica from the assigned replicas list for its partition val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId)) replicaState.remove(partitionAndReplica) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NonExistentReplica" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" + .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case OnlineReplica => assertValidPreviousStates(partitionAndReplica, - List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionFailed), targetState) + List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) replicaState(partitionAndReplica) match { case NewReplica => // add this replica to the assigned replicas list for its partition val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) if(!currentAssignedReplicas.contains(replicaId)) controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" + .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, + targetState)) case _ => // check if the leader for this partition ever existed controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { @@ -211,8 +213,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment) replicaState.put(partitionAndReplica, OnlineReplica) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" + .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case None => // that means the partition was never in OnlinePartition state, this means the broker never // started a log for that partition and does not have a high watermark value for this partition } @@ -220,7 +222,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { replicaState.put(partitionAndReplica, OnlineReplica) case OfflineReplica => assertValidPreviousStates(partitionAndReplica, - List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionFailed), targetState) + List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) // send stop replica command to the replica so that it stops fetching from the leader brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false) // As an optimization, the controller removes dead replicas from the ISR @@ -233,8 +235,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) replicaState.put(partitionAndReplica, OfflineReplica) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" + .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) false case None => true @@ -250,8 +252,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } catch { case t: Throwable => - stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] to %s failed" - .format(controllerId, controller.epoch, replicaId, topic, partition, targetState), t) + stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed" + .format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t) } } @@ -273,7 +275,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } def replicasInDeletionStates(topic: String): Set[PartitionAndReplica] = { - val deletionStates = Set(ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionFailed) + val deletionStates = Set(ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionIneligible) replicaState.filter(r => r._1.topic.equals(topic) && deletionStates.contains(r._2)).keySet } @@ -304,8 +306,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case false => // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted. // This is required during controller failover since during controller failover a broker can go down, - // so the replicas on that broker should be moved to ReplicaDeletionFailed to be on the safer side. - replicaState.put(partitionAndReplica, ReplicaDeletionFailed) + // so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side. + replicaState.put(partitionAndReplica, ReplicaDeletionIneligible) } } } @@ -356,7 +358,7 @@ case object OnlineReplica extends ReplicaState { val state: Byte = 2 } case object OfflineReplica extends ReplicaState { val state: Byte = 3 } case object ReplicaDeletionStarted extends ReplicaState { val state: Byte = 4} case object ReplicaDeletionSuccessful extends ReplicaState { val state: Byte = 5} -case object ReplicaDeletionFailed extends ReplicaState { val state: Byte = 6} +case object ReplicaDeletionIneligible extends ReplicaState { val state: Byte = 6} case object NonExistentReplica extends ReplicaState { val state: Byte = 7 } http://git-wip-us.apache.org/repos/asf/kafka/blob/cef51736/core/src/main/scala/kafka/controller/TopicDeletionManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 91a446d..58f1c42 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -30,8 +30,8 @@ import kafka.api.{StopReplicaResponse, RequestOrResponse} * 3. The controller has a background thread that handles topic deletion. The purpose of having this background thread * is to accommodate the TTL feature, when we have it. This thread is signaled whenever deletion for a topic needs to * be started or resumed. Currently, a topic's deletion can be started only by the onPartitionDeletion callback on the - * controller. In the future, it can be triggered based on the configured TTL for the topic. A topic's deletion will - * be halted in the following scenarios - + * controller. In the future, it can be triggered based on the configured TTL for the topic. A topic will be ineligible + * for deletion in the following scenarios - * 3.1 broker hosting one of the replicas for that topic goes down * 3.2 partition reassignment for partitions of that topic is in progress * 3.3 preferred replica election for partitions of that topic is in progress @@ -62,17 +62,17 @@ import kafka.api.{StopReplicaResponse, RequestOrResponse} * it marks the topic for deletion retry. * @param controller * @param initialTopicsToBeDeleted The topics that are queued up for deletion in zookeeper at the time of controller failover - * @param initialHaltedTopicsForDeletion The topics for which deletion is halted due to any of the conditions mentioned in #3 above + * @param initialTopicsIneligibleForDeletion The topics ineligible for deletion due to any of the conditions mentioned in #3 above */ class TopicDeletionManager(controller: KafkaController, initialTopicsToBeDeleted: Set[String] = Set.empty, - initialHaltedTopicsForDeletion: Set[String] = Set.empty) extends Logging { + initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging { val controllerContext = controller.controllerContext val partitionStateMachine = controller.partitionStateMachine val replicaStateMachine = controller.replicaStateMachine var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted - var haltedTopicsForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ - (initialHaltedTopicsForDeletion & initialTopicsToBeDeleted) + var topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ + (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted) val deleteTopicsCond = controllerContext.controllerLock.newCondition() var deleteTopicStateChanged: Boolean = false var deleteTopicsThread: DeleteTopicsThread = null @@ -92,7 +92,7 @@ class TopicDeletionManager(controller: KafkaController, def shutdown() { deleteTopicsThread.shutdown() topicsToBeDeleted.clear() - haltedTopicsForDeletion.clear() + topicsIneligibleForDeletion.clear() } /** @@ -117,7 +117,7 @@ class TopicDeletionManager(controller: KafkaController, def resumeDeletionForTopics(topics: Set[String] = Set.empty) { val topicsToResumeDeletion = topics & topicsToBeDeleted if(topicsToResumeDeletion.size > 0) { - haltedTopicsForDeletion --= topicsToResumeDeletion + topicsIneligibleForDeletion --= topicsToResumeDeletion resumeTopicDeletionThread() } } @@ -125,8 +125,8 @@ class TopicDeletionManager(controller: KafkaController, /** * Invoked when a broker that hosts replicas for topics to be deleted goes down. Also invoked when the callback for * StopReplicaResponse receives an error code for the replicas of a topic to be deleted. As part of this, the replicas - * are moved from ReplicaDeletionStarted to ReplicaDeletionFailed state. Also, the topic is added to the list of topics - * for which deletion is halted until further notice. The delete topic thread is notified so it can retry topic deletion + * are moved from ReplicaDeletionStarted to ReplicaDeletionIneligible state. Also, the topic is added to the list of topics + * ineligible for deletion until further notice. The delete topic thread is notified so it can retry topic deletion * if it has received a response for all replicas of a topic to be deleted * @param replicas Replicas for which deletion has failed */ @@ -136,8 +136,8 @@ class TopicDeletionManager(controller: KafkaController, val topics = replicasThatFailedToDelete.map(_.topic) debug("Deletion failed for replicas %s. Halting deletion for topics %s" .format(replicasThatFailedToDelete.mkString(","), topics)) - controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionFailed) - haltTopicDeletion(topics) + controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible) + markTopicIneligibleForDeletion(topics) resumeTopicDeletionThread() } } @@ -147,17 +147,17 @@ class TopicDeletionManager(controller: KafkaController, * 1. replicas being down * 2. partition reassignment in progress for some partitions of the topic * 3. preferred replica election in progress for some partitions of the topic - * @param topics Topics for which deletion should be halted. No op if the topic is was not previously queued up for deletion + * @param topics Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion */ - def haltTopicDeletion(topics: Set[String]) { + def markTopicIneligibleForDeletion(topics: Set[String]) { val newTopicsToHaltDeletion = topicsToBeDeleted & topics - haltedTopicsForDeletion ++= newTopicsToHaltDeletion + topicsIneligibleForDeletion ++= newTopicsToHaltDeletion if(newTopicsToHaltDeletion.size > 0) info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(","))) } - def isTopicDeletionHalted(topic: String): Boolean = { - haltedTopicsForDeletion.contains(topic) + def isTopicIneligibleForDeletion(topic: String): Boolean = { + topicsIneligibleForDeletion.contains(topic) } def isTopicDeletionInProgress(topic: String): Boolean = { @@ -205,26 +205,29 @@ class TopicDeletionManager(controller: KafkaController, * Topic deletion can be retried if - * 1. Topic deletion is not already complete * 2. Topic deletion is currently not in progress for that topic - * 3. Topic deletion is currently halted for that topic + * 3. Topic is currently marked ineligible for deletion * @param topic Topic * @return Whether or not deletion can be retried for the topic */ private def isTopicEligibleForDeletion(topic: String): Boolean = { - topicsToBeDeleted.contains(topic) && (!isTopicDeletionInProgress(topic) && !isTopicDeletionHalted(topic)) + topicsToBeDeleted.contains(topic) && (!isTopicDeletionInProgress(topic) && !isTopicIneligibleForDeletion(topic)) } /** * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic - * To ensure a successful retry, reset states for respective replicas from ReplicaDeletionFailed to OfflineReplica state + * To ensure a successful retry, reset states for respective replicas from ReplicaDeletionIneligible to OfflineReplica state *@param topic Topic for which deletion should be retried */ private def markTopicForDeletionRetry(topic: String) { - // reset replica states from ReplicaDeletionFailed to OfflineReplica - val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionFailed) + // reset replica states from ReplicaDeletionIneligible to OfflineReplica + val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible) controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica) } private def completeDeleteTopic(topic: String) { + // deregister partition change listener on the deleted topic. This is to prevent the partition change listener + // firing before the new topic listener when a deleted topic gets auto created + partitionStateMachine.deregisterPartitionChangeListener(topic) val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful) // controller will remove this replica from the state machine as well as its partition assignment cache replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica) @@ -245,6 +248,8 @@ class TopicDeletionManager(controller: KafkaController, */ private def onTopicDeletion(topics: Set[String]) { info("Topic deletion callback for %s".format(topics.mkString(","))) + // send update metadata so that brokers stop serving data for topics to be deleted + controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic) topics.foreach { topic => onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet) @@ -257,34 +262,32 @@ class TopicDeletionManager(controller: KafkaController, * the topics are added to the in progress list. As long as a topic is in the in progress list, deletion for that topic * is never retried. A topic is removed from the in progress list when * 1. Either the topic is successfully deleted OR - * 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionFailed state + * 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionIneligible state * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic * As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends * the replicas a StopReplicaRequest (delete=true) * This callback does the following things - * 1. Send metadata request to all brokers excluding the topics to be deleted - * 2. Move all dead replicas directly to ReplicaDeletionFailed state. Also halt the deletion of respective topics if - * some replicas are dead since it won't complete successfully anyway + * 2. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible + * for deletion if some replicas are dead since it won't complete successfully anyway * 3. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully *@param replicasForTopicsToBeDeleted */ private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) { replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) => - // send update metadata so that brokers stop serving data - controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic)) val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful) val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas // move dead replicas directly to failed state - replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionFailed) + replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible) // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica) debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(","))) controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted, new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build) if(deadReplicasForTopic.size > 0) - haltTopicDeletion(Set(topic)) + markTopicIneligibleForDeletion(Set(topic)) } } @@ -314,7 +317,7 @@ class TopicDeletionManager(controller: KafkaController, stopReplicaResponse.responseMap.filter(p => p._2 != ErrorMapping.NoError).map(_._1).toSet val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)) inLock(controllerContext.controllerLock) { - // move all the failed replicas to ReplicaDeletionFailed + // move all the failed replicas to ReplicaDeletionIneligible failReplicaDeletion(replicasInError) if(replicasInError.size != stopReplicaResponse.responseMap.size) { // some replicas could have been successfully deleted @@ -350,7 +353,7 @@ class TopicDeletionManager(controller: KafkaController, // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in // TopicDeletionSuccessful. That means, there is at least one failed replica, which means topic deletion // should be retried - val replicasInTopicDeletionFailedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionFailed) + val replicasInTopicDeletionFailedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible) // mark topic for deletion retry markTopicForDeletionRetry(topic) info("Retrying delete topic for topic %s since replicas %s were not successfully deleted" @@ -362,8 +365,8 @@ class TopicDeletionManager(controller: KafkaController, info("Deletion of topic %s (re)started".format(topic)) // topic deletion will be kicked off onTopicDeletion(Set(topic)) - } else if(isTopicDeletionHalted(topic)) { - info("Not retrying deletion of topic %s at this time since it is halted".format(topic)) + } else if(isTopicIneligibleForDeletion(topic)) { + info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic)) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/cef51736/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c56ad50..ae2df20 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -145,7 +145,7 @@ class KafkaApis(val requestChannel: RequestChannel, } // remove the topics that don't exist in the UpdateMetadata request since those are the topics that are // currently being deleted by the controller - val topicsKnownToThisBroker = metadataCache.map{ + val topicsKnownToThisBroker = metadataCache.map { case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet val topicsKnownToTheController = updateMetadataRequest.partitionStateInfos.map { case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet @@ -568,6 +568,7 @@ class KafkaApis(val requestChannel: RequestChannel, partitionMetadataLock synchronized { uniqueTopics.map { topic => if(metadataCache.keySet.map(_.topic).contains(topic)) { + debug("Topic %s exists in metadata cache on broker %d".format(topic, config.brokerId)) val partitionStateInfo = metadataCache.filter(p => p._1.topic.equals(topic)) val sortedPartitions = partitionStateInfo.toList.sortWith((m1,m2) => m1._1.partition < m2._1.partition) val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState) => @@ -600,6 +601,7 @@ class KafkaApis(val requestChannel: RequestChannel, } new TopicMetadata(topic, partitionMetadata) } else { + debug("Topic %s does not exist in metadata cache on broker %d".format(topic, config.brokerId)) // topic doesn't exist, send appropriate error code new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) } @@ -621,6 +623,7 @@ class KafkaApis(val requestChannel: RequestChannel, } topicsMetadata += new TopicMetadata(topicMetadata.topic, topicMetadata.partitionsMetadata, ErrorMapping.LeaderNotAvailableCode) } else { + debug("Auto create topic skipped for %s".format(topicMetadata.topic)) topicsMetadata += topicMetadata } case _ => http://git-wip-us.apache.org/repos/asf/kafka/blob/cef51736/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 974b057..dbe078c 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -296,9 +296,8 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { def testDeleteTopicDuringAddPartition() { val topic = "test" val servers = createTestTopicAndCluster(topic) - // add partitions to topic - val topicAndPartition = TopicAndPartition(topic, 0) val newPartition = TopicAndPartition(topic, 1) + // add partitions to topic AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2") // start topic deletion AdminUtils.deleteTopic(zkClient, topic) @@ -366,6 +365,66 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { servers.foreach(_.shutdown()) } + @Test + def testAutoCreateAfterDeleteTopic() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + verifyTopicDeletion(topic, servers) + // test if first produce request after topic deletion auto creates the topic + val props = new Properties() + props.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(",")) + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("producer.type", "sync") + props.put("request.required.acks", "1") + props.put("message.send.max.retries", "1") + val producerConfig = new ProducerConfig(props) + val producer = new Producer[String, String](producerConfig) + try{ + producer.send(new KeyedMessage[String, String](topic, "test", "test1")) + } catch { + case e: FailedToSendMessageException => fail("Topic should have been auto created") + case oe: Throwable => fail("fails with exception", oe) + } + // test the topic path exists + assertTrue("Topic not auto created", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) + // wait until leader is elected + val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) + try { + producer.send(new KeyedMessage[String, String](topic, "test", "test1")) + } catch { + case e: FailedToSendMessageException => fail("Topic should have been auto created") + case oe: Throwable => fail("fails with exception", oe) + } finally { + producer.close() + } + servers.foreach(_.shutdown()) + } + + @Test + def testDeleteNonExistingTopic() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, "test2") + // verify delete topic path for test2 is removed from zookeeper + verifyTopicDeletion("test2", servers) + // verify that topic test is untouched + assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => + res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000)) + // test the topic path exists + assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) + // topic test should have a leader + val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) + servers.foreach(_.shutdown()) + + } + private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0)
