----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review51353 -----------------------------------------------------------
Thanks for the patch. Overall, this is a really nice cleanup patch. The logic is much clear now. Some comments below. A minor issue is that currently we never remove any watched keys in Purgatory, even when the watch list is already empty. We probably don't want to keep removing and then adding the same watch key. However, if a key hasn't been used for some time. Perhaps it's useful to remove it. We can probably file a followup jira to think this through. core/src/main/scala/kafka/cluster/Partition.scala <https://reviews.apache.org/r/24676/#comment89613> This comment is not very accurate. For example, a follower's LEO change can also cause HW to increase. core/src/main/scala/kafka/log/Log.scala <https://reviews.apache.org/r/24676/#comment89614> Should these two classes be in ReplicaManager since they are only used there? core/src/main/scala/kafka/server/DelayedFetch.scala <https://reviews.apache.org/r/24676/#comment89600> fetchInfo.offset seems to be redunant wtih startOffsetMetadata.messageOffset. Instead of keeping fetchInfo, could we just keep fetchSize? core/src/main/scala/kafka/server/DelayedFetch.scala <https://reviews.apache.org/r/24676/#comment89599> Should we use mapValues()? core/src/main/scala/kafka/server/DelayedProduce.scala <https://reviews.apache.org/r/24676/#comment89601> "if every partition it produces to is satisfied by one of the following." core/src/main/scala/kafka/server/DelayedProduce.scala <https://reviews.apache.org/r/24676/#comment89602> Case B.2 seems to be in line 97-98. core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/24676/#comment89608> Do we need the warning log here? core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/24676/#comment89609> Do we need the warning log here? core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/24676/#comment89612> Could we call this responseCallback? core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/24676/#comment89610> Could this just be Map(offsetTopicPartition -> new ByteBufferMessageSet(...))? core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/24676/#comment89611> Could this be just responseStatus(offsetTopicPartition)? core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/24676/#comment89603> To be consistent, should this be named tryCompleteDelayedFetch? core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/24676/#comment89604> I think it's better to explicitly use the return clause unless this is the last statement in the method. For example, if later someone adds a return value at the end of the method, this value will no longer be returned. core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/24676/#comment89605> Could this be merged with the first if test? core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/24676/#comment89606> Would it be better to rename this to fetchOnlyFromLeader throughout? core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/24676/#comment89607> This seems to be an existing problem. However, we need to make sure these stats are recorded only once when sending out the fetch response, not everytime when reading from a local log. core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/24676/#comment89598> It would be good to explain which methods typically need to be redefined in the subclasses and which ones are not, and group the methods accordingly. core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/24676/#comment89596> Should we just let expirationReaper call complete() and get rid of expire()? core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/24676/#comment89582> request => operation core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/24676/#comment89583> request => operation core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/24676/#comment89584> Edited the comment a bit as below. Note that a delayed operation can be watched on multiple keys. It is possible that an operation is completed after it has been added to the watch list for some, but not all of the keys. In this case, the operation is considered completed and won't be added to the watch list of the remaining keys. The expiration reaper thread will remove this operation from any watcher list in which the operation exists. core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/24676/#comment89585> This seems redunant given the check in line 120. core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/24676/#comment89594> Would it be better to call this tryCompleteWatched()? core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/24676/#comment89595> Do we need to synchronize on curr? core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/24676/#comment89597> Would it be better to encapsulate this as a function forceComplete() in DelayedRequest? forceComplete() will set the internal completed flag to false and if successful, will call complete(). Then, we don't need to return curr back and we can just get rid of DelayedRequest.expire(). We can also make the completed flag private. - Jun Rao On Aug. 21, 2014, 6:33 p.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/24676/ > ----------------------------------------------------------- > > (Updated Aug. 21, 2014, 6:33 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1583 > https://issues.apache.org/jira/browse/KAFKA-1583 > > > Repository: kafka > > > Description > ------- > > Changes include: > > 1. Remove Fetch/ProduceRequestPurgatory classes and move their checking > satisfaction logic into the delayedFetch/Produce. > 2. The base RequestPurgatory's API change tryCompleteElseWatch and > checkAndComplete > 3. The base DelayedRequest API change complete, expire, tryComplete (which > will call complete upon success) and isCompleted. > 4. Move the updatingReplicaLEO function from ReplicaManager into Partition. > 5. Move the appendMessageToLocalLog and readMessages from KafkaApis to > ReplicaManager. > 6. OffsetManager still used a nested callback for putting offsets into cache > while interacting with ReplicaManager. > > Would like reviews on: > > 2. 1. RequestPurgatory and DelayedRequest API and implementation. > 2. DelayedFetch/Produce API and implementation. > 3. ReplicaManager's readMessages/appendMessages API. > 4. OffsetManager's storeOffsets API. > 5. KafkaApis. > > > Diffs > ----- > > core/src/main/scala/kafka/api/FetchRequest.scala > 51cdccf7f90eb530cc62b094ed822b8469d50b12 > core/src/main/scala/kafka/api/FetchResponse.scala > af9308737bf7832eca018c2b3ede703f7d1209f1 > core/src/main/scala/kafka/api/OffsetCommitRequest.scala > 861a6cf11dc6b6431fcbbe9de00c74a122f204bd > core/src/main/scala/kafka/api/ProducerRequest.scala > b2366e7eedcac17f657271d5293ff0bef6f3cbe6 > core/src/main/scala/kafka/api/ProducerResponse.scala > a286272c834b6f40164999ff8b7f8998875f2cfe > core/src/main/scala/kafka/cluster/Partition.scala > ff106b47e6ee194cea1cf589474fef975b9dd7e2 > core/src/main/scala/kafka/common/ErrorMapping.scala > 3fae7910e4ce17bc8325887a046f383e0c151d44 > core/src/main/scala/kafka/log/Log.scala > 0ddf97bd30311b6039e19abade41d2fbbad2f59b > core/src/main/scala/kafka/network/BoundedByteBufferSend.scala > a624359fb2059340bb8dc1619c5b5f226e26eb9b > core/src/main/scala/kafka/server/DelayedFetch.scala > e0f14e25af03e6d4344386dcabc1457ee784d345 > core/src/main/scala/kafka/server/DelayedProduce.scala > 9481508fc2d6140b36829840c337e557f3d090da > core/src/main/scala/kafka/server/FetchRequestPurgatory.scala > ed1318891253556cdf4d908033b704495acd5724 > core/src/main/scala/kafka/server/KafkaApis.scala > c584b559416b3ee4bcbec5966be4891e0a03eefb > core/src/main/scala/kafka/server/OffsetManager.scala > 43eb2a35bb54d32c66cdb94772df657b3a104d1a > core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala > d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f > core/src/main/scala/kafka/server/ReplicaManager.scala > 68758e35d496a4659819960ae8e809d6e215568e > core/src/main/scala/kafka/server/RequestPurgatory.scala > ce06d2c381348deef8559374869fcaed923da1d1 > core/src/main/scala/kafka/utils/DelayedItem.scala > d7276494072f14f1cdf7d23f755ac32678c5675c > core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala > 168712de241125982d556c188c76514fceb93779 > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala > 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 > > Diff: https://reviews.apache.org/r/24676/diff/ > > > Testing > ------- > > > Thanks, > > Guozhang Wang > >