> On Feb. 4, 2014, 11:21 p.m., Neha Narkhede wrote: > > core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala, line 64 > > <https://reviews.apache.org/r/17537/diff/3/?file=456705#file456705line64> > > > > the config object should already have the per topic preference for > > unclean leader election. So we don't have to read from zookeeper again. > > Andrew Olson wrote: > It doesn't look like this is actually the case. The KafkaConfig is passed > from the KafkaServer to the KafkaController with no topic context, and the > controller does not appear to be integrated with the topic log configuration > logic in the TopicConfigManager/LogManager. > > Just to confirm my understanding of the code, I removed this Zookeeper > read and doing so caused the two TopicOverride integration tests that I added > to fail. Is there is a simpler or less awkward way to implement this as per > topic configuration? Reading the config on demand from ZK seems like the > simplest and least invasive option since this should not be a frequently > executed code path, but I could be missing something obvious. > > Neha Narkhede wrote: > >> It doesn't look like this is actually the case. The KafkaConfig is > passed from the KafkaServer to the KafkaController with no topic context, and > the controller does not appear to be integrated with the topic log > configuration logic in the TopicConfigManager/LogManager. > > To understand how the broker dynamically loads per topic configs, you can > look at TopicConfigManager. It registers a zookeeper listener on the topic > config change path and atomically switches the log config object to reflect > the new per topic config overrides. > > I haven't looked at the tests in detail, but are you introducing the per > topic config overrides the same way as TopicCommand does (by writing to the > correct zk path)? That is the only way it will be automatically reloaded by > all the brokers, including the controller
>> are you introducing the per topic config overrides the same way as >> TopicCommand does (by writing to the correct zk path)? Yes, I'm using AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(...) in the tests. >> To understand how the broker dynamically loads per topic configs, you can >> look at TopicConfigManager. After reading through the code some more, I think I understand how the TopicConfigManager works. It is currently only integrated with the LogManager [1]. The LogManager functionality is isolated from the KafkaController and ReplicaFetcherThread, which only have access to the base server KafkaConfig. The KafkaConfig config is initialized when starting the broker and immutable. The dynamic configuration updates you're referring to are done in the LogManager's map of LogConfig instances. I didn't want to introduce an abstraction leak by passing the LogManager instance to the controller and replica fetcher threads and making this new replica configuration part of the LogConfig. I also wasn't sure whether it was worth the effort and extra complexity to enhance the TopicConfigManager to also automatically reload replica-related configuration in addition to log-related configuration (i.e., adding new ReplicaConfig and ReplicaManager classes similar to LogConfig and LogManager), since there is currently only a single configuration property that is not very frequently checked. [1] https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaServer.scala#L98 - Andrew ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17537/#review33658 ----------------------------------------------------------- On Jan. 30, 2014, 7:45 p.m., Andrew Olson wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17537/ > ----------------------------------------------------------- > > (Updated Jan. 30, 2014, 7:45 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1028 > https://issues.apache.org/jira/browse/KAFKA-1028 > > > Repository: kafka > > > Description > ------- > > KAFKA-1028: per topic configuration of preference for consistency over > availability > > > Diffs > ----- > > core/src/main/scala/kafka/admin/AdminUtils.scala > a167756f0fd358574c8ccb42c5c96aaf13def4f5 > core/src/main/scala/kafka/common/NoReplicaOnlineException.scala > a1e12794978adf79020936c71259bbdabca8ee68 > core/src/main/scala/kafka/controller/KafkaController.scala > a0267ae2670e8d5f365e49ec0fa5db1f62b815bf > core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala > fd9200f3bf941aab54df798bb5899eeb552ea3a3 > core/src/main/scala/kafka/log/LogConfig.scala > 0b32aeeffcd9d4755ac90573448d197d3f729749 > core/src/main/scala/kafka/server/KafkaConfig.scala > 3c3aafc2b3f06fc8f3168a8a9c1e0b08e944c1ef > core/src/main/scala/kafka/server/ReplicaFetcherThread.scala > 73e605eb31bc71642d48b0bb8bd1632fd70b9dca > core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala > b585f0ec0b1c402d95a3b34934dab7545dcfcb1f > core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala > 89c207a3f56c7a7711f8cee6fb277626329882a6 > core/src/test/scala/unit/kafka/utils/TestUtils.scala > 426b1a7bea1d83a64081f2c6b672c88c928713b7 > > Diff: https://reviews.apache.org/r/17537/diff/ > > > Testing > ------- > > > Thanks, > > Andrew Olson > >