[
https://issues.apache.org/jira/browse/KAFKA-1558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14165358#comment-14165358
]
Sriharsha Chintalapani commented on KAFKA-1558:
-----------------------------------------------
[~nehanarkhede] [~guozhang] [~junrao] I ran tests for simultaneously running
preferred replica election tool and deleting multiple topics.
I kept running into KAFKA-1305 but by increasing controller.message.queue.size
to 1000 I was able to run these tests successfully.
While testing this couple of things caught my eye.
following code in KafkaController.PreferredReplicaElectionListener
{code}
val partitions = partitionsForPreferredReplicaElection --
controllerContext.partitionsUndergoingPreferredReplicaElection
val partitionsForTopicsToBeDeleted = partitions.filter(p =>
controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
if(partitionsForTopicsToBeDeleted.size > 0) {
error("Skipping preferred replica election for partitions %s since the
respective topics are being deleted"
.format(partitionsForTopicsToBeDeleted))
}
else
controller.onPreferredReplicaElection(partitions --
partitionsForTopicsToBeDeleted)
}
{code}
It doesn't need a else part there since its calling onPreferredReplicaElection
by removing partitionsForTopicsToBeDeleted
In PartitionStateMachine.DeleteTopicListener
{code}
if(topicsToBeDeleted.size > 0) {
info("Starting topic deletion for topics " +
topicsToBeDeleted.mkString(","))
// add topic to deletion list
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
// mark topic ineligible for deletion if other state changes are in
progress
topicsToBeDeleted.foreach { topic =>
val preferredReplicaElectionInProgress =
controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
if(preferredReplicaElectionInProgress ||
partitionReassignmentInProgress)
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
}
}
{code}
The above code enqueueTopicsForDeletion which calls
resumeTopicDeletionThread() to start the deletion of topics
mark topic ineligible should be before the enqueueTopicsForDeletion. This way
deletion of the topic won't happen if there is preferred replica election or
partitions reassignment going for the said topics. I am testing these changes.
Let me know what you think of these changes. Thanks.
> AdminUtils.deleteTopic does not work
> ------------------------------------
>
> Key: KAFKA-1558
> URL: https://issues.apache.org/jira/browse/KAFKA-1558
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8.1.1
> Reporter: Henning Schmiedehausen
> Assignee: Sriharsha Chintalapani
> Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: kafka-thread-dump.log
>
>
> the AdminUtils:.deleteTopic method is implemented as
> {code}
> def deleteTopic(zkClient: ZkClient, topic: String) {
> ZkUtils.createPersistentPath(zkClient,
> ZkUtils.getDeleteTopicPath(topic))
> }
> {code}
> but the DeleteTopicCommand actually does
> {code}
> zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
> zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
> {code}
> so I guess, that the 'createPersistentPath' above should actually be
> {code}
> def deleteTopic(zkClient: ZkClient, topic: String) {
> ZkUtils.deletePathRecursive(zkClient, ZkUtils.getTopicPath(topic))
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)