----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17460/#review33046 -----------------------------------------------------------
core/src/main/scala/kafka/controller/ControllerChannelManager.scala <https://reviews.apache.org/r/17460/#comment62258> thanks for this suggestion, Jun! - Neha Narkhede On Jan. 28, 2014, 7:15 p.m., Neha Narkhede wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17460/ > ----------------------------------------------------------- > > (Updated Jan. 28, 2014, 7:15 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-330 > https://issues.apache.org/jira/browse/KAFKA-330 > > > Repository: kafka > > > Description > ------- > > Delete topic is a pretty tricky feature and there are multiple ways to solve > it. I will list the various approaches with the tradeoffs here. Few things to > think about that make delete topic tricky - > > 1. How do you handle resuming delete topics during controller failover? > 2. How do you handle re-creating topics if brokers that host a subset of the > replicas are down? > 3. If a broker fails during delete topic, how does it know which version of > the topic it has logs for, when it restarts? This is relevant if we allow > re-creating topics while a broker is down > > Will address these one by one. > > #1 is pretty straightforward to handle and can be achieved in a way similar > to partition reassignment (through an admin path in zookeeper indicating a > topic deletion that has not finished) > > #2 is an important policy decision that can affect the complexity of the > design for this feature. If you allow topics to be deleted while brokers are > down, the broker needs a way to know that it's version of the topic is too > old. This is mainly an issue since a topic can be re-created and written to, > while a broker is down. We need to ensure that a broker does not join the > quorum with an older version of the log. There are 2 ways to solve this > problem that I could think off - > 1. Do not allow topic deletion to succeed if a broker hosting a replica is > down. Here, the controller keeps track of the state of each replica during > topic deletion (TopicDeletionStarted, TopicDeletionSuccessful, > TopicDeletionFailed) and only marks the topic as deleted if all replicas for > all partitions of that topic are successfully deleted. > 2. Allow a topic to be deleted while a broker is down and keep track of > the "generation" of the topic in a fault tolerant, highly available and > consistent log. This log can either be zookeeper or a Kafka topic. The main > issue here is how many generations would we have to keep track off for a > topic. In other words, can this "generation" information ever be garbage > collected. There isn't a good bound on this since it is unclear when the > failed broker will come back online and when a topic will be re-created. That > would mean keeping this generation information for potentially a very long > time and incurring overhead during recovery or bootstrap of generation > information during controller or broker fail overs. This is especially a > problem for use cases or tests that keep creating and deleting a lot of short > lived topics. Essentially, this solution is not scalable unless we figure out > an intuitive way to garbage collect this topic metadata. It would require us > to introduce a config fo r controlling when a topic's generation metadata can be garbage collected. Note that this config is different from the topic TTL feature which controls when a topic, that is currently not in use, can be deleted. Overall, this alternative is unnecessarily complex for the benefit of deleting topics while a broker is down. > > #3 is related to the policy decision made about #2. If a topic is not marked > deleted successfully while a broker is down, the controller will > automatically resume topic deletion when a broker restarts. > > This patch follows the previous approach of not calling a topic deletion > successful until all replicas have confirmed the deletion of local state for > that topic. This requires the following changes - > 1. TopicCommand issues topic deletion by creating a new admin path > /admin/delete_topics/<topic> > > 2. The controller listens for child changes on /admin/delete_topic and starts > topic deletion for the respective topics > > 3. The controller has a background thread that handles topic deletion. The > purpose of having this background thread is to accommodate the TTL feature, > when we have it. This thread is signaled whenever deletion for a topic needs > to be started or resumed. Currently, a topic's deletion can be started only > by the onPartitionDeletion callback on the controller. In the future, it can > be triggered based on the configured TTL for the topic. A topic's deletion > will be halted in the following scenarios - > * broker hosting one of the replicas for that topic goes down > * partition reassignment for partitions of that topic is in progress > * preferred replica election for partitions of that topic is in progress > (though this is not strictly required since it holds the controller lock for > the entire duration from start to end) > > 4. Topic deletion is resumed when - > * broker hosting one of the replicas for that topic is started > * preferred replica election for partitions of that topic completes > * partition reassignment for partitions of that topic completes > > 5. Every replica for a topic being deleted is in either of the 3 states - > * TopicDeletionStarted (Replica enters TopicDeletionStarted phase when the > onPartitionDeletion callback is invoked. This happens when the child change > watch for /admin/delete_topics fires on the controller. As part of this state > change, the controller sends StopReplicaRequests to all replicas. It > registers a callback for the StopReplicaResponse when deletePartition=true > thereby invoking a callback when a response for delete replica is received > from every replica) > * TopicDeletionSuccessful (deleteTopicStopReplicaCallback() moves replicas > from TopicDeletionStarted->TopicDeletionSuccessful depending on the error > codes in StopReplicaResponse) > * TopicDeletionFailed. (deleteTopicStopReplicaCallback() moves replicas from > TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in > StopReplicaResponse. In general, if a broker dies and if it hosted replicas > for topics being deleted, the controller marks the respective replicas in > TopicDeletionFailed state in the onBrokerFailure callback. The reason is that > if a broker fails before the request is sent and after the replica is in > TopicDeletionStarted state, it is possible that the replica will mistakenly > remain in TopicDeletionStarted state and topic deletion will not be retried > when the broker comes back up.) > > 6. The delete topic thread marks a topic successfully deleted only if all > replicas are in TopicDeletionSuccessful state and it starts the topic > deletion teardown mode where it deletes all topic state from the > controllerContext as well as from zookeeper. This is the only time the > /brokers/topics/<topic> path gets deleted. > On the other hand, if no replica is in TopicDeletionStarted state and at > least one replica is in TopicDeletionFailed state, then it marks the topic > for deletion retry. > > 7. I've introduced callbacks for controller-broker communication. Ideally, > every callback should be of the following format (RequestOrResponse) => Unit. > BUT since StopReplicaResponse doesn't carry the replica id, this is handled > in a somewhat hacky manner in the patch. The purpose is to fix the approach > of upgrading controller-broker protocols in a reasonable way before having > delete topic upgrade StopReplica request in a one-off way. Will file a JIRA > for that. > > > Diffs > ----- > > core/src/main/scala/kafka/admin/AdminUtils.scala > a167756f0fd358574c8ccb42c5c96aaf13def4f5 > core/src/main/scala/kafka/admin/TopicCommand.scala > 842c11047cca0531fbc572fdb25523244ba2b626 > core/src/main/scala/kafka/api/StopReplicaRequest.scala > 820f0f57b00849a588a840358d07f3a4a31772d4 > core/src/main/scala/kafka/api/StopReplicaResponse.scala > d7e36308263aec2298e8adff8f22e18212e33fca > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > ea8485b479155b479c575ebc89a4f73086c872cb > core/src/main/scala/kafka/controller/KafkaController.scala > a0267ae2670e8d5f365e49ec0fa5db1f62b815bf > core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala > fd9200f3bf941aab54df798bb5899eeb552ea3a3 > core/src/main/scala/kafka/controller/PartitionStateMachine.scala > ac4262a403fc73edaecbddf55858703c640b11c0 > core/src/main/scala/kafka/controller/ReplicaStateMachine.scala > 483559aa64726c51320d18b64a1b48f8fb2905a0 > core/src/main/scala/kafka/server/KafkaApis.scala > bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e > core/src/main/scala/kafka/server/KafkaHealthcheck.scala > 9dca55c9254948f1196ba17e1d3ebacdcd66be0c > core/src/main/scala/kafka/server/OffsetCheckpoint.scala > b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b > core/src/main/scala/kafka/server/ReplicaManager.scala > f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 > core/src/main/scala/kafka/server/TopicConfigManager.scala > 42e98dd66f3269e6e3a8210934dabfd65df2dba9 > core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala > b189619bdc1b0d2bba8e8f88467fce014be96ccd > core/src/main/scala/kafka/utils/ZkUtils.scala > b42e52b8e5668383b287b2a86385df65e51b5108 > core/src/test/scala/unit/kafka/admin/AdminTest.scala > 59de1b469fece0b28e1d04dcd7b7015c12576abb > core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > 8df0982a1e71e3f50a073c4ae181096d32914f3e > core/src/test/scala/unit/kafka/server/LogOffsetTest.scala > 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 > core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > c0475d07a778ff957ad266c08a7a81ea500debd2 > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala > 03e6266ffdad5891ec81df786bd094066b78b4c0 > core/src/test/scala/unit/kafka/utils/TestUtils.scala > d88b6c3e8fd8098d540998b6a82a65cec8a1dcb0 > > Diff: https://reviews.apache.org/r/17460/diff/ > > > Testing > ------- > > Several integration tests added to test - > > 1. Topic deletion when all replica brokers are alive > 2. Halt and resume topic deletion after a follower replica is restarted > 3. Halt and resume topic deletion after a controller failover > 4. Request handling during topic deletion > 5. Topic deletion and partition reassignment in parallel > 6. Topic deletion and preferred replica election in parallel > 7. Topic deletion and per topic config changes in parallel > > > Thanks, > > Neha Narkhede > >