Repository: kafka Updated Branches: refs/heads/trunk d5fbba633 -> f5684366e
kafka-1952; High CPU Usage in 0.8.2 release; patched by Jun Rao; reviewed by Guozhang Wang, Ewen Cheslack-Postava and Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f5684366 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f5684366 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f5684366 Branch: refs/heads/trunk Commit: f5684366ef60125c4d799121a6c0adca4744e8ab Parents: d5fbba6 Author: Jun Rao <jun...@gmail.com> Authored: Wed Feb 18 13:39:05 2015 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Wed Feb 18 13:39:05 2015 -0800 ---------------------------------------------------------------------- .../scala/kafka/server/DelayedOperation.scala | 34 +++++++++++++------- 1 file changed, 23 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f5684366/core/src/main/scala/kafka/server/DelayedOperation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index fc06b01..1d11099 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -128,25 +128,37 @@ class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeI * @return true iff the delayed operations can be completed by the caller */ def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = { + assert(watchKeys.size > 0, "The watch key list can't be empty") + + // The cost of tryComplete() is typically proportional to the number of keys. Calling + // tryComplete() for each key is going to be expensive if there are many keys. Instead, + // we do the check in the following way. Call tryComplete(). If the operation is not completed, + // we just add the operation to all keys. Then we call tryComplete() again. At this time, if + // the operation is still not completed, we are guaranteed that it won't miss any future triggering + // event since the operation is already on the watcher list for all keys. This does mean that + // if the operation is completed (by another thread) between the two tryComplete() calls, the + // operation is unnecessarily added for watch. However, this is a less severe issue since the + // expire reaper will clean it up periodically. + + var isCompletedByMe = operation synchronized operation.tryComplete() + if (isCompletedByMe) + return true + for(key <- watchKeys) { - // if the operation is already completed, stopping adding it to - // any further lists and return false + // If the operation is already completed, stop adding it to the rest of the watcher list. if (operation.isCompleted()) return false val watchers = watchersFor(key) - // if the operation can by completed by myself, stop adding it to - // any further lists and return true immediately - if(operation synchronized operation.tryComplete()) { - return true - } else { - watchers.watch(operation) - } + watchers.watch(operation) } + isCompletedByMe = operation synchronized operation.tryComplete() + if (isCompletedByMe) + return true + // if it cannot be completed by now and hence is watched, add to the expire queue also - if (! operation.isCompleted()) { + if (! operation.isCompleted()) expirationReaper.enqueue(operation) - } false }