-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33056
-----------------------------------------------------------


In all, I would vote for marking the deletion as failed when some brokers are 
in-available, but with this I still feel the delete-topic logic is pretty 
complicated interleaving with other operations. Since in most operational 
scenarios, there should be just one topic operations (adding partition, 
partition reassignment, leader election, delete topic, create topic) on-the-fly 
at the same time, I think it might be better to just execute these operations 
sequentially based on the time the are triggered by admin and captured by the 
controller. Some details of the proposal:

1) Besides the callback thread from zk-fired events, we can use just one more 
thread to periodically check if any operations are needed (partition 
reassignment, delete topics, etc) and if yes put them in the working queue, and 
another thread keep processing tasks from the queue sequentially. The only 
synchronization we need then is between this background worker thread with the 
controller ZK callback thread.

2) As for the non-atomicity of some of the operations, since broker failure 
during these operations should be rare, I think it is affordable to simply 
abort-and-retry the operation and hope that the failed broker's replicas are 
already moved to offline and hence no longer block the execution. When these 
brokers restarts, the zk-fired callback thread would decide whether the offline 
replicas on these brokers need to be moved to online or non-exist based on the 
current topic-partition assignment info, and send the requests correspondingly.

3) For controller failover it is fairly the same thing: the current in-progress 
operation would fail, and the new controller would retry this operation 
eventually, although maybe not as the first task since the ordering in the 
queue can change.

4) One last thing we need to make sure is that all the controller requests are 
idempotent to the brokers, since on controller failover or operation-retry they 
can be sent multiple times. I think this is already guaranteed today but we 
just need to double check.


core/src/main/scala/kafka/controller/ControllerChannelManager.scala
<https://reviews.apache.org/r/17460/#comment62273>

    We do not need to clear the map here since if they are empty an error will 
be thrown before. This seems a revert of some previous committed patch.



core/src/main/scala/kafka/controller/ControllerChannelManager.scala
<https://reviews.apache.org/r/17460/#comment62280>

    the callback parameter is not used in this function. If it is by purpose we 
need to note that in the comments.



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment62283>

    This seems a revert of some previous committed patch also.



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment62287>

    Deleted topics should not be in partitionsBeingReassigned and 
partitionsUndergoingPreferredReplicaElection; if they are, it should be treated 
as an error.



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment62289>

    Do we need one background thread for each of these three tasks, or we can 
arrange them into one thread? The benefit of doing so is that we do not need to 
synchronize between these threads any more.



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment62300>

    Would this lead to a dead-lock, meaning the delete-topic will not complete 
until partition-reassignment finishes, while partition-reassignment will not 
resume since it is being deleted?



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment62297>

    Is this comment mistakenly put here?


- Guozhang Wang


On Jan. 28, 2014, 9:25 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Jan. 28, 2014, 9:25 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Delete topic is a pretty tricky feature and there are multiple ways to solve 
> it. I will list the various approaches with the tradeoffs here. Few things to 
> think about that make delete topic tricky -
> 
> 1. How do you handle resuming delete topics during controller failover?
> 2. How do you handle re-creating topics if brokers that host a subset of the 
> replicas are down?
> 3. If a broker fails during delete topic, how does it know which version of 
> the topic it has logs for, when it restarts? This is relevant if we allow 
> re-creating topics while a broker is down
> 
> Will address these one by one. 
> 
> #1 is pretty straightforward to handle and can be achieved in a way similar 
> to partition reassignment (through an admin path in zookeeper indicating a 
> topic deletion that has not finished)
> 
> #2 is an important policy decision that can affect the complexity of the 
> design for this feature. If you allow topics to be deleted while brokers are 
> down, the broker needs a way to know that it's version of the topic is too 
> old. This is mainly an issue since a topic can be re-created and written to, 
> while a broker is down. We need to ensure that a broker does not join the 
> quorum with an older version of the log. There are 2 ways to solve this 
> problem that I could think off -
>    1. Do not allow topic deletion to succeed if a broker hosting a replica is 
> down. Here, the controller keeps track of the state of each replica during 
> topic deletion    (TopicDeletionStarted, TopicDeletionSuccessful, 
> TopicDeletionFailed) and only marks the topic as deleted if all replicas for 
> all partitions of that topic are successfully deleted. 
>    2. Allow a topic to be deleted while a broker is down and keep track of 
> the "generation" of the topic in a fault tolerant, highly available and 
> consistent log. This log can either be zookeeper or a Kafka topic. The main 
> issue here is how many generations would we have to keep track off for a 
> topic. In other words, can this "generation" information ever be garbage 
> collected. There isn't a good bound on this since it is unclear when the 
> failed broker will come back online and when a topic will be re-created. That 
> would mean keeping this generation information for potentially a very long 
> time and incurring overhead during recovery or bootstrap of generation 
> information during controller or broker fail overs. This is especially a 
> problem for use cases or tests that keep creating and deleting a lot of short 
> lived topics. Essentially, this solution is not scalable unless we figure out 
> an intuitive way to garbage collect this topic metadata. It would require us 
> to introduce a config fo
 r controlling when a topic's generation metadata can be garbage collected. 
Note that this config is different from the topic TTL feature which controls 
when a topic, that is currently not in use, can be deleted. Overall, this 
alternative is unnecessarily complex for the benefit of deleting topics while a 
broker is down.
> 
> #3 is related to the policy decision made about #2. If a topic is not marked 
> deleted successfully while a broker is down, the controller will 
> automatically resume topic deletion when a broker restarts. 
> 
> This patch follows the previous approach of not calling a topic deletion 
> successful until all replicas have confirmed the deletion of local state for 
> that topic. This requires the following changes -
> 1. TopicCommand issues topic deletion by creating a new admin path 
> /admin/delete_topics/<topic>
> 
> 2. The controller listens for child changes on /admin/delete_topic and starts 
> topic deletion for the respective topics
> 
> 3. The controller has a background thread that handles topic deletion. The 
> purpose of having this background thread is to accommodate the TTL feature, 
> when we have it. This thread is signaled whenever deletion for a topic needs 
> to be started or resumed. Currently, a topic's deletion can be started only 
> by the onPartitionDeletion callback on the controller. In the future, it can 
> be triggered based on the configured TTL for the topic. A topic's deletion 
> will be halted in the following scenarios -
> * broker hosting one of the replicas for that topic goes down
> * partition reassignment for partitions of that topic is in progress
> * preferred replica election for partitions of that topic is in progress 
> (though this is not strictly required since it holds the controller lock for 
> the entire duration from start to end)
> 
> 4. Topic deletion is resumed when -
> * broker hosting one of the replicas for that topic is started
> * preferred replica election for partitions of that topic completes
> * partition reassignment for partitions of that topic completes 
> 
> 5. Every replica for a topic being deleted is in either of the 3 states - 
> * TopicDeletionStarted (Replica enters TopicDeletionStarted phase when the 
> onPartitionDeletion callback is invoked. This happens when the child change 
> watch for /admin/delete_topics fires on the controller. As part of this state 
> change, the controller sends StopReplicaRequests to all replicas. It 
> registers a callback for the StopReplicaResponse when deletePartition=true 
> thereby invoking a callback when a response for delete replica is received 
> from every replica)
> * TopicDeletionSuccessful (deleteTopicStopReplicaCallback() moves replicas 
> from TopicDeletionStarted->TopicDeletionSuccessful depending on the error 
> codes in StopReplicaResponse)
> * TopicDeletionFailed. (deleteTopicStopReplicaCallback() moves replicas from 
> TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in 
> StopReplicaResponse. In general, if a broker dies and if it hosted replicas 
> for topics being deleted, the controller marks the respective replicas in 
> TopicDeletionFailed state in the onBrokerFailure callback. The reason is that 
> if a broker fails before the request is sent and after the replica is in 
> TopicDeletionStarted state, it is possible that the replica will mistakenly 
> remain in TopicDeletionStarted state and topic deletion will not be retried 
> when the broker comes back up.)
> 
> 6. The delete topic thread marks a topic successfully deleted only if all 
> replicas are in TopicDeletionSuccessful state and it starts the topic 
> deletion teardown mode where it deletes all topic state from the 
> controllerContext as well as from zookeeper. This is the only time the 
> /brokers/topics/<topic> path gets deleted. 
> On the other hand, if no replica is in TopicDeletionStarted state and at 
> least one replica is in TopicDeletionFailed state, then it marks the topic 
> for deletion retry. 
> 
> 7. I've introduced callbacks for controller-broker communication. Ideally, 
> every callback should be of the following format (RequestOrResponse) => Unit. 
> BUT since StopReplicaResponse doesn't carry the replica id, this is handled 
> in a somewhat hacky manner in the patch. The purpose is to fix the approach 
> of upgrading controller-broker protocols in a reasonable way before having 
> delete topic upgrade StopReplica request in a one-off way. Will file a JIRA 
> for that.
> 
> 8. UpdateMetadataRequest now includes all topics that currently exist in the 
> cluster and is used as a mechanism to notify brokers of the list of topics 
> that are deleted. As such, the first step of topic deletion is to send an 
> UpdateMetadataRequest to all brokers notifying them of the list of all topics 
> minus the ones being deleted. Once the brokers receive the 
> UpdateMetadataRequest, they start rejecting client requests 
> (produce/fetch/offset) for topics that are being deleted.
> 
> 9. One thing to note about topic deletion is that once it is initiated, it 
> always completes. In other words, topic deletion can be initiated when other 
> state changes for the respective partitions/replicas is in progress. Deletion 
> is halted until the other state changes that were initiated before the 
> deletion complete. For this, we need to add a status option for delete topic 
> to the admin command. Will do this in a follow up patch.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala 
> a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 
> 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala 
> d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala 
> a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 
> fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
> ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
> 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
> 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala 
> b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 
> 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala 
> b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 
> 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
> 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> d88b6c3e8fd8098d540998b6a82a65cec8a1dcb0 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>

Reply via email to