-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/
-----------------------------------------------------------
(Updated Jan. 28, 2014, 7:06 p.m.)
Review request for kafka.
Bugs: KAFKA-330
https://issues.apache.org/jira/browse/KAFKA-330
Repository: kafka
Description (updated)
-------
Delete topic is a pretty tricky feature and there are multiple ways to solve
it. I will list the various approaches with the tradeoffs here. Few things to
think about that make delete topic tricky -
1. How do you handle resuming delete topics during controller failover?
2. How do you handle re-creating topics if brokers that host a subset of the
replicas are down?
3. If a broker fails during delete topic, how does it know which version of the
topic it has logs for, when it restarts? This is relevant if we allow
re-creating topics while a broker is down
Will address these one by one. #1 is pretty straightforward to handle and can
be achieved in a way similar to partition reassignment (through an admin path
in zookeeper indicating a topic deletion that has not finished)
#2 is an important policy decision that can affect the complexity of the design
for this feature. If you allow topics to be deleted while brokers are down, the
broker needs a way to know that it's version of the topic is too old. This is
mainly an issue since a topic can be re-created and written to, while a broker
is down. We need to ensure that a broker does not join the quorum with an older
version of the log. There are 2 ways to solve this problem that I could think
off -
1. Do not allow topic deletion to succeed if a broker hosting a replica is
down. Here, the controller keeps track of the state of each replica during
topic deletion (TopicDeletionStarted, TopicDeletionSuccessful,
TopicDeletionFailed) and only marks the topic as deleted if all replicas for
all partitions of that topic are successfully deleted.
2. Allow a topic to be deleted while a broker is down and keep track of the
"generation" of the topic in a fault tolerant, highly available and consistent
log. This log can either be zookeeper or a Kafka topic. The main issue here is
how many generations would we have to keep track off for a topic. In other
words, can this "generation" information ever be garbage collected. There isn't
a good bound on this since it is unclear when the failed broker will come back
online and when a topic will be re-created. That would mean keeping this
generation information for potentially a very long time and incurring overhead
during recovery or bootstrap of generation information during controller or
broker fail overs. This is especially a problem for use cases or tests that
keep creating and deleting a lot of short lived topics. Essentially, this
solution is not scalable unless we figure out an intuitive way to garbage
collect this topic metadata. It would require us to introduce a config for con
trolling when a topic's generation metadata can be garbage collected. Note
that this config is different from the topic TTL feature which controls when a
topic, that is currently not in use, can be deleted. Overall, this alternative
is unnecessarily complex for the benefit of deleting topics while a broker is
down.
#3 is related to the policy decision made about #2. If a topic is not marked
deleted successfully while a broker is down, the controller will automatically
resume topic deletion when a broker restarts.
This patch follows the previous approach of not calling a topic deletion
successful until all replicas have confirmed the deletion of local state for
that topic. This requires the following changes -
1. TopicCommand issues topic deletion by creating a new admin path
/admin/delete_topics/<topic>
2. The controller listens for child changes on /admin/delete_topic and starts
topic deletion for the respective topics
3. The controller has a background thread that handles topic deletion. The
purpose of having this background thread is to accommodate the TTL feature,
when we have it. This thread is signaled whenever deletion for a topic needs to
be started or resumed. Currently, a topic's deletion can be started only by the
onPartitionDeletion callback on the controller. In the future, it can be
triggered based on the configured TTL for the topic. A topic's deletion will be
halted in the following scenarios -
* broker hosting one of the replicas for that topic goes down
* partition reassignment for partitions of that topic is in progress
* preferred replica election for partitions of that topic is in progress
(though this is not strictly required since it holds the controller lock for
the entire duration from start to end)
4. Every replica for a topic being deleted is in either of the 3 states -
* TopicDeletionStarted (Replica enters TopicDeletionStarted phase when the
onPartitionDeletion callback is invoked. This happens when the child change
watch for /admin/delete_topics fires on the controller. As part of this state
change, the controller sends StopReplicaRequests to all replicas. It registers
a callback for the StopReplicaResponse when deletePartition=true thereby
invoking a callback when a response for delete replica is received from every
replica)
* TopicDeletionSuccessful (deleteTopicStopReplicaCallback() moves replicas from
TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in
StopReplicaResponse)
* TopicDeletionFailed. (deleteTopicStopReplicaCallback() moves replicas from
TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in
StopReplicaResponse)
5. The delete topic thread marks a topic successfully deleted only if all
replicas are in TopicDeletionSuccessful state and it starts the topic deletion
teardown mode where it deletes all topic state from the controllerContext as
well as from zookeeper. This is the only time the /brokers/topics/<topic> path
gets deleted.
On the other hand, if no replica is in TopicDeletionStarted state and at least
one replica is in TopicDeletionFailed state, then it marks the topic for
deletion retry.
6. I've introduced callbacks for controller-broker communication. Ideally,
every callback should be of the following format (RequestOrResponse) => Unit.
BUT since StopReplicaResponse doesn't carry the replica id, this is handled in
a bit hacky manner in the patch. The purpose is to fix the approach of
upgrading controller-broker protocols in a reasonable way before having delete
topic upgrade StopReplica request in a one-off way.
Diffs
-----
core/src/main/scala/kafka/admin/AdminUtils.scala
a167756f0fd358574c8ccb42c5c96aaf13def4f5
core/src/main/scala/kafka/admin/TopicCommand.scala
842c11047cca0531fbc572fdb25523244ba2b626
core/src/main/scala/kafka/api/StopReplicaRequest.scala
820f0f57b00849a588a840358d07f3a4a31772d4
core/src/main/scala/kafka/api/StopReplicaResponse.scala
d7e36308263aec2298e8adff8f22e18212e33fca
core/src/main/scala/kafka/controller/ControllerChannelManager.scala
ea8485b479155b479c575ebc89a4f73086c872cb
core/src/main/scala/kafka/controller/KafkaController.scala
a0267ae2670e8d5f365e49ec0fa5db1f62b815bf
core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
fd9200f3bf941aab54df798bb5899eeb552ea3a3
core/src/main/scala/kafka/controller/PartitionStateMachine.scala
ac4262a403fc73edaecbddf55858703c640b11c0
core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
483559aa64726c51320d18b64a1b48f8fb2905a0
core/src/main/scala/kafka/server/KafkaApis.scala
bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e
core/src/main/scala/kafka/server/KafkaHealthcheck.scala
9dca55c9254948f1196ba17e1d3ebacdcd66be0c
core/src/main/scala/kafka/server/OffsetCheckpoint.scala
b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b
core/src/main/scala/kafka/server/ReplicaManager.scala
f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6
core/src/main/scala/kafka/server/TopicConfigManager.scala
42e98dd66f3269e6e3a8210934dabfd65df2dba9
core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
b189619bdc1b0d2bba8e8f88467fce014be96ccd
core/src/main/scala/kafka/utils/ZkUtils.scala
b42e52b8e5668383b287b2a86385df65e51b5108
core/src/test/scala/unit/kafka/admin/AdminTest.scala
59de1b469fece0b28e1d04dcd7b7015c12576abb
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION
core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
8df0982a1e71e3f50a073c4ae181096d32914f3e
core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
9aea67b140e50c6f9f1868ce2e2aac9e7530fa77
core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
c0475d07a778ff957ad266c08a7a81ea500debd2
core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
03e6266ffdad5891ec81df786bd094066b78b4c0
core/src/test/scala/unit/kafka/utils/TestUtils.scala
d88b6c3e8fd8098d540998b6a82a65cec8a1dcb0
Diff: https://reviews.apache.org/r/17460/diff/
Testing (updated)
-------
Several integration tests added to test -
1. Topic deletion when all replica brokers are alive
2. Halt and resume topic deletion after a follower replica is restarted
3. Halt and resume topic deletion after a controller failover
4. Request handling during topic deletion
5. Topic deletion and partition reassignment in parallel
6. Topic deletion and preferred replica election in parallel
7. Topic deletion and per topic config changes in parallel
Thanks,
Neha Narkhede