-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33046
-----------------------------------------------------------



core/src/main/scala/kafka/controller/ControllerChannelManager.scala
<https://reviews.apache.org/r/17460/#comment62258>

    thanks for this suggestion, Jun!


- Neha Narkhede


On Jan. 28, 2014, 7:15 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Jan. 28, 2014, 7:15 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Delete topic is a pretty tricky feature and there are multiple ways to solve 
> it. I will list the various approaches with the tradeoffs here. Few things to 
> think about that make delete topic tricky -
> 
> 1. How do you handle resuming delete topics during controller failover?
> 2. How do you handle re-creating topics if brokers that host a subset of the 
> replicas are down?
> 3. If a broker fails during delete topic, how does it know which version of 
> the topic it has logs for, when it restarts? This is relevant if we allow 
> re-creating topics while a broker is down
> 
> Will address these one by one. 
> 
> #1 is pretty straightforward to handle and can be achieved in a way similar 
> to partition reassignment (through an admin path in zookeeper indicating a 
> topic deletion that has not finished)
> 
> #2 is an important policy decision that can affect the complexity of the 
> design for this feature. If you allow topics to be deleted while brokers are 
> down, the broker needs a way to know that it's version of the topic is too 
> old. This is mainly an issue since a topic can be re-created and written to, 
> while a broker is down. We need to ensure that a broker does not join the 
> quorum with an older version of the log. There are 2 ways to solve this 
> problem that I could think off -
>    1. Do not allow topic deletion to succeed if a broker hosting a replica is 
> down. Here, the controller keeps track of the state of each replica during 
> topic deletion    (TopicDeletionStarted, TopicDeletionSuccessful, 
> TopicDeletionFailed) and only marks the topic as deleted if all replicas for 
> all partitions of that topic are successfully deleted. 
>    2. Allow a topic to be deleted while a broker is down and keep track of 
> the "generation" of the topic in a fault tolerant, highly available and 
> consistent log. This log can either be zookeeper or a Kafka topic. The main 
> issue here is how many generations would we have to keep track off for a 
> topic. In other words, can this "generation" information ever be garbage 
> collected. There isn't a good bound on this since it is unclear when the 
> failed broker will come back online and when a topic will be re-created. That 
> would mean keeping this generation information for potentially a very long 
> time and incurring overhead during recovery or bootstrap of generation 
> information during controller or broker fail overs. This is especially a 
> problem for use cases or tests that keep creating and deleting a lot of short 
> lived topics. Essentially, this solution is not scalable unless we figure out 
> an intuitive way to garbage collect this topic metadata. It would require us 
> to introduce a config fo
 r controlling when a topic's generation metadata can be garbage collected. 
Note that this config is different from the topic TTL feature which controls 
when a topic, that is currently not in use, can be deleted. Overall, this 
alternative is unnecessarily complex for the benefit of deleting topics while a 
broker is down.
> 
> #3 is related to the policy decision made about #2. If a topic is not marked 
> deleted successfully while a broker is down, the controller will 
> automatically resume topic deletion when a broker restarts. 
> 
> This patch follows the previous approach of not calling a topic deletion 
> successful until all replicas have confirmed the deletion of local state for 
> that topic. This requires the following changes -
> 1. TopicCommand issues topic deletion by creating a new admin path 
> /admin/delete_topics/<topic>
> 
> 2. The controller listens for child changes on /admin/delete_topic and starts 
> topic deletion for the respective topics
> 
> 3. The controller has a background thread that handles topic deletion. The 
> purpose of having this background thread is to accommodate the TTL feature, 
> when we have it. This thread is signaled whenever deletion for a topic needs 
> to be started or resumed. Currently, a topic's deletion can be started only 
> by the onPartitionDeletion callback on the controller. In the future, it can 
> be triggered based on the configured TTL for the topic. A topic's deletion 
> will be halted in the following scenarios -
> * broker hosting one of the replicas for that topic goes down
> * partition reassignment for partitions of that topic is in progress
> * preferred replica election for partitions of that topic is in progress 
> (though this is not strictly required since it holds the controller lock for 
> the entire duration from start to end)
> 
> 4. Topic deletion is resumed when -
> * broker hosting one of the replicas for that topic is started
> * preferred replica election for partitions of that topic completes
> * partition reassignment for partitions of that topic completes 
> 
> 5. Every replica for a topic being deleted is in either of the 3 states - 
> * TopicDeletionStarted (Replica enters TopicDeletionStarted phase when the 
> onPartitionDeletion callback is invoked. This happens when the child change 
> watch for /admin/delete_topics fires on the controller. As part of this state 
> change, the controller sends StopReplicaRequests to all replicas. It 
> registers a callback for the StopReplicaResponse when deletePartition=true 
> thereby invoking a callback when a response for delete replica is received 
> from every replica)
> * TopicDeletionSuccessful (deleteTopicStopReplicaCallback() moves replicas 
> from TopicDeletionStarted->TopicDeletionSuccessful depending on the error 
> codes in StopReplicaResponse)
> * TopicDeletionFailed. (deleteTopicStopReplicaCallback() moves replicas from 
> TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in 
> StopReplicaResponse. In general, if a broker dies and if it hosted replicas 
> for topics being deleted, the controller marks the respective replicas in 
> TopicDeletionFailed state in the onBrokerFailure callback. The reason is that 
> if a broker fails before the request is sent and after the replica is in 
> TopicDeletionStarted state, it is possible that the replica will mistakenly 
> remain in TopicDeletionStarted state and topic deletion will not be retried 
> when the broker comes back up.)
> 
> 6. The delete topic thread marks a topic successfully deleted only if all 
> replicas are in TopicDeletionSuccessful state and it starts the topic 
> deletion teardown mode where it deletes all topic state from the 
> controllerContext as well as from zookeeper. This is the only time the 
> /brokers/topics/<topic> path gets deleted. 
> On the other hand, if no replica is in TopicDeletionStarted state and at 
> least one replica is in TopicDeletionFailed state, then it marks the topic 
> for deletion retry. 
> 
> 7. I've introduced callbacks for controller-broker communication. Ideally, 
> every callback should be of the following format (RequestOrResponse) => Unit. 
> BUT since StopReplicaResponse doesn't carry the replica id, this is handled 
> in a somewhat hacky manner in the patch. The purpose is to fix the approach 
> of upgrading controller-broker protocols in a reasonable way before having 
> delete topic upgrade StopReplica request in a one-off way. Will file a JIRA 
> for that.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala 
> a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 
> 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala 
> d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala 
> a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 
> fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
> ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
> 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
> 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala 
> b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 
> 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala 
> b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 
> 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
> 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> d88b6c3e8fd8098d540998b6a82a65cec8a1dcb0 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>

Reply via email to