Repository: kafka
Updated Branches:
  refs/heads/trunk d5fbba633 -> f5684366e


kafka-1952; High CPU Usage in 0.8.2 release; patched by Jun Rao; reviewed by 
Guozhang Wang, Ewen Cheslack-Postava and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f5684366
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f5684366
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f5684366

Branch: refs/heads/trunk
Commit: f5684366ef60125c4d799121a6c0adca4744e8ab
Parents: d5fbba6
Author: Jun Rao <jun...@gmail.com>
Authored: Wed Feb 18 13:39:05 2015 -0800
Committer: Jun Rao <jun...@gmail.com>
Committed: Wed Feb 18 13:39:05 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/server/DelayedOperation.scala   | 34 +++++++++++++-------
 1 file changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f5684366/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala 
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index fc06b01..1d11099 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -128,25 +128,37 @@ class DelayedOperationPurgatory[T <: 
DelayedOperation](brokerId: Int = 0, purgeI
    * @return true iff the delayed operations can be completed by the caller
    */
   def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
+    assert(watchKeys.size > 0, "The watch key list can't be empty")
+
+    // The cost of tryComplete() is typically proportional to the number of 
keys. Calling
+    // tryComplete() for each key is going to be expensive if there are many 
keys. Instead,
+    // we do the check in the following way. Call tryComplete(). If the 
operation is not completed,
+    // we just add the operation to all keys. Then we call tryComplete() 
again. At this time, if
+    // the operation is still not completed, we are guaranteed that it won't 
miss any future triggering
+    // event since the operation is already on the watcher list for all keys. 
This does mean that
+    // if the operation is completed (by another thread) between the two 
tryComplete() calls, the
+    // operation is unnecessarily added for watch. However, this is a less 
severe issue since the
+    // expire reaper will clean it up periodically.
+
+    var isCompletedByMe = operation synchronized operation.tryComplete()
+    if (isCompletedByMe)
+      return true
+
     for(key <- watchKeys) {
-      // if the operation is already completed, stopping adding it to
-      // any further lists and return false
+      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
       if (operation.isCompleted())
         return false
       val watchers = watchersFor(key)
-      // if the operation can by completed by myself, stop adding it to
-      // any further lists and return true immediately
-      if(operation synchronized operation.tryComplete()) {
-        return true
-      } else {
-        watchers.watch(operation)
-      }
+      watchers.watch(operation)
     }
 
+    isCompletedByMe = operation synchronized operation.tryComplete()
+    if (isCompletedByMe)
+      return true
+
     // if it cannot be completed by now and hence is watched, add to the 
expire queue also
-    if (! operation.isCompleted()) {
+    if (! operation.isCompleted())
       expirationReaper.enqueue(operation)
-    }
 
     false
   }

Reply via email to