Repository: kafka
Updated Branches:
  refs/heads/trunk d3aa99c54 -> 6880f66c9


kafka-1989; New purgatory design; patched by Yasuhiro Matsuda; reviewed by 
Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6880f66c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6880f66c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6880f66c

Branch: refs/heads/trunk
Commit: 6880f66c97f63b2d1f3750f1753ec8b6094cb8a5
Parents: d3aa99c
Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com>
Authored: Wed Apr 8 15:19:59 2015 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Wed Apr 8 15:19:59 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/server/DelayedOperation.scala   | 159 +++++++++---------
 .../scala/kafka/server/ReplicaManager.scala     |   1 -
 .../main/scala/kafka/utils/timer/Timer.scala    |  86 ++++++++++
 .../scala/kafka/utils/timer/TimerTask.scala     |  43 +++++
 .../scala/kafka/utils/timer/TimerTaskList.scala | 132 +++++++++++++++
 .../scala/kafka/utils/timer/TimingWheel.scala   | 160 +++++++++++++++++++
 .../kafka/server/DelayedOperationTest.scala     |  45 +++---
 .../kafka/utils/timer/TimerTaskListTest.scala   |  95 +++++++++++
 .../unit/kafka/utils/timer/TimerTest.scala      | 111 +++++++++++++
 9 files changed, 723 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala 
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index e317676..2ed9b46 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -18,11 +18,15 @@
 package kafka.server
 
 import kafka.utils._
+import kafka.utils.timer._
 import kafka.metrics.KafkaMetricsGroup
 
-import java.util
+import java.util.LinkedList
 import java.util.concurrent._
 import java.util.concurrent.atomic._
+
+import org.apache.kafka.common.utils.Utils
+
 import scala.collection._
 
 import com.yammer.metrics.core.Gauge
@@ -41,7 +45,10 @@ import com.yammer.metrics.core.Gauge
  *
  * A subclass of DelayedOperation needs to provide an implementation of both 
onComplete() and tryComplete().
  */
-abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) {
+abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging {
+
+  override val expirationMs = delayMs + System.currentTimeMillis()
+
   private val completed = new AtomicBoolean(false)
 
   /*
@@ -58,6 +65,8 @@ abstract class DelayedOperation(delayMs: Long) extends 
DelayedItem(delayMs) {
    */
   def forceComplete(): Boolean = {
     if (completed.compareAndSet(false, true)) {
+      // cancel the timeout timer
+      cancel()
       onComplete()
       true
     } else {
@@ -71,7 +80,7 @@ abstract class DelayedOperation(delayMs: Long) extends 
DelayedItem(delayMs) {
   def isCompleted(): Boolean = completed.get()
 
   /**
-   * Call-back to execute when a delayed operation expires, but before 
completion.
+   * Call-back to execute when a delayed operation gets expired and hence 
forced to complete.
    */
   def onExpiration(): Unit
 
@@ -89,6 +98,14 @@ abstract class DelayedOperation(delayMs: Long) extends 
DelayedItem(delayMs) {
    * This function needs to be defined in subclasses
    */
   def tryComplete(): Boolean
+
+  /*
+   * run() method defines a task that is executed on timeout
+   */
+  override def run(): Unit = {
+    if (forceComplete())
+      onExpiration()
+  }
 }
 
 /**
@@ -97,11 +114,21 @@ abstract class DelayedOperation(delayMs: Long) extends 
DelayedItem(delayMs) {
 class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, 
brokerId: Int = 0, purgeInterval: Int = 1000)
         extends Logging with KafkaMetricsGroup {
 
+  // timeout timer
+  private[this] val executor = Executors.newFixedThreadPool(1, new 
ThreadFactory() {
+    def newThread(runnable: Runnable): Thread =
+      Utils.newThread("executor-"+purgatoryName, runnable, false)
+  })
+  private[this] val timeoutTimer = new Timer(executor)
+
   /* a list of operation watching keys */
   private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new 
Watchers))
 
+  // the number of estimated total operations in the purgatory
+  private[this] val estimatedTotalOperations = new AtomicInteger(0)
+
   /* background thread expiring operations that have timed out */
-  private val expirationReaper = new ExpiredOperationReaper
+  private val expirationReaper = new ExpiredOperationReaper()
 
   private val metricsTags = Map("delayedOperation" -> purgatoryName)
 
@@ -153,12 +180,18 @@ class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: String, br
     if (isCompletedByMe)
       return true
 
+    var watchCreated = false
     for(key <- watchKeys) {
       // If the operation is already completed, stop adding it to the rest of 
the watcher list.
       if (operation.isCompleted())
         return false
       val watchers = watchersFor(key)
       watchers.watch(operation)
+
+      if (!watchCreated) {
+        watchCreated = true
+        estimatedTotalOperations.incrementAndGet()
+      }
     }
 
     isCompletedByMe = operation synchronized operation.tryComplete()
@@ -166,8 +199,13 @@ class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: String, br
       return true
 
     // if it cannot be completed by now and hence is watched, add to the 
expire queue also
-    if (! operation.isCompleted())
-      expirationReaper.enqueue(operation)
+    if (! operation.isCompleted()) {
+      timeoutTimer.add(operation)
+      if (operation.isCompleted()) {
+        // cancel the timer task
+        operation.cancel()
+      }
+    }
 
     false
   }
@@ -196,7 +234,7 @@ class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: String, br
   /**
    * Return the number of delayed operations in the expiry queue
    */
-  def delayed() = expirationReaper.delayed
+  def delayed() = timeoutTimer.size
 
   /*
    * Return the watch list of the given key
@@ -208,52 +246,51 @@ class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: String, br
    */
   def shutdown() {
     expirationReaper.shutdown()
+    executor.shutdown()
   }
 
   /**
    * A linked list of watched delayed operations based on some key
    */
   private class Watchers {
-    private val operations = new util.LinkedList[T]
 
-    def watched = operations.size()
+    private[this] val operations = new LinkedList[T]()
+
+    def watched(): Int = operations synchronized operations.size
 
     // add the element to watch
     def watch(t: T) {
-      synchronized {
-        operations.add(t)
-      }
+      operations synchronized operations.add(t)
     }
 
     // traverse the list and try to complete some watched elements
     def tryCompleteWatched(): Int = {
-      var completed = 0
-      synchronized {
+
+      operations synchronized {
+        var completed = 0
         val iter = operations.iterator()
-        while(iter.hasNext) {
-          val curr = iter.next
-          if (curr.isCompleted()) {
+        while (iter.hasNext) {
+          val curr = iter.next()
+          if (curr.isCompleted) {
             // another thread has completed this operation, just remove it
             iter.remove()
-          } else {
-            if(curr synchronized curr.tryComplete()) {
-              iter.remove()
-              completed += 1
-            }
+          } else if (curr synchronized curr.tryComplete()) {
+            completed += 1
+            iter.remove()
           }
         }
+        completed
       }
-      completed
     }
 
     // traverse the list and purge elements that are already completed by 
others
     def purgeCompleted(): Int = {
       var purged = 0
-      synchronized {
+      operations synchronized {
         val iter = operations.iterator()
         while (iter.hasNext) {
-          val curr = iter.next
-          if(curr.isCompleted()) {
+          val curr = iter.next()
+          if (curr.isCompleted) {
             iter.remove()
             purged += 1
           }
@@ -270,71 +307,21 @@ class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: String, br
     "ExpirationReaper-%d".format(brokerId),
     false) {
 
-    /* The queue storing all delayed operations */
-    private val delayedQueue = new DelayQueue[T]
-
-    /*
-     * Return the number of delayed operations kept by the reaper
-     */
-    def delayed() = delayedQueue.size()
-
-    /*
-     * Add an operation to be expired
-     */
-    def enqueue(t: T) {
-      delayedQueue.add(t)
-    }
-
-    /**
-     * Try to get the next expired event and force completing it
-     */
-    private def expireNext() {
-      val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS)
-      if (curr != null.asInstanceOf[T]) {
-        // if there is an expired operation, try to force complete it
-        val completedByMe: Boolean = curr synchronized {
-          curr.onExpiration()
-          curr.forceComplete()
-        }
-        if (completedByMe)
-          debug("Force complete expired delayed operation %s".format(curr))
-      }
-    }
-
-    /**
-     * Delete all satisfied events from the delay queue and the watcher lists
-     */
-    private def purgeCompleted(): Int = {
-      var purged = 0
-
-      // purge the delayed queue
-      val iter = delayedQueue.iterator()
-      while (iter.hasNext) {
-        val curr = iter.next()
-        if (curr.isCompleted()) {
-          iter.remove()
-          purged += 1
-        }
-      }
-
-      purged
-    }
-
     override def doWork() {
-      // try to get the next expired operation and force completing it
-      expireNext()
-      // see if we need to purge the watch lists
-      if (DelayedOperationPurgatory.this.watched() >= purgeInterval) {
+      timeoutTimer.advanceClock(200L)
+
+      // Trigger a purge if the number of completed but still being watched 
operations is larger than
+      // the purge threshold. That number is computed by the difference btw 
the estimated total number of
+      // operations and the number of pending delayed operations.
+      if (estimatedTotalOperations.get - delayed > purgeInterval) {
+        // now set estimatedTotalOperations to delayed (the number of pending 
operations) since we are going to
+        // clean up watchers. Note that, if more operations are completed 
during the clean up, we may end up with
+        // a little overestimated total number of operations.
+        estimatedTotalOperations.getAndSet(delayed)
         debug("Begin purging watch lists")
         val purged = watchersForKey.values.map(_.purgeCompleted()).sum
         debug("Purged %d elements from watch lists.".format(purged))
       }
-      // see if we need to purge the delayed operation queue
-      if (delayed() >= purgeInterval) {
-        debug("Begin purging delayed queue")
-        val purged = purgeCompleted()
-        debug("Purged %d operations from delayed queue.".format(purged))
-      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b06f00b..8ddd325 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -111,7 +111,6 @@ class ReplicaManager(val config: KafkaConfig,
   val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
     purgatoryName = "Fetch", config.brokerId, 
config.fetchPurgatoryPurgeIntervalRequests)
 
-
   newGauge(
     "LeaderCount",
     new Gauge[Int] {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/main/scala/kafka/utils/timer/Timer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala 
b/core/src/main/scala/kafka/utils/timer/Timer.scala
new file mode 100644
index 0000000..b8cde82
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/timer/Timer.scala
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.utils.timer
+
+import java.util.concurrent.{DelayQueue, ExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import kafka.utils.threadsafe
+
+@threadsafe
+class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 
20, startMs: Long = System.currentTimeMillis) {
+
+  private[this] val delayQueue = new DelayQueue[TimerTaskList]()
+  private[this] val taskCounter = new AtomicInteger(0)
+  private[this] val timingWheel = new TimingWheel(
+    tickMs = tickMs,
+    wheelSize = wheelSize,
+    startMs = startMs,
+    taskCounter = taskCounter,
+    delayQueue
+  )
+
+  // Locks used to protect data structures while ticking
+  private[this] val readWriteLock = new ReentrantReadWriteLock()
+  private[this] val readLock = readWriteLock.readLock()
+  private[this] val writeLock = readWriteLock.writeLock()
+
+  def add(timerTask: TimerTask): Unit = {
+    readLock.lock()
+    try {
+      addTimerTaskEntry(new TimerTaskEntry(timerTask))
+    } finally {
+      readLock.unlock()
+    }
+  }
+
+  private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
+    if (!timingWheel.add(timerTaskEntry)) {
+      // already expired
+      taskExecutor.submit(timerTaskEntry.timerTask)
+    }
+  }
+
+  private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => 
addTimerTaskEntry(timerTaskEntry)
+
+  /*
+   * Advances the clock if there is an expired bucket. If there isn't any 
expired bucket when called,
+   * waits up to timeoutMs before giving up.
+   */
+  def advanceClock(timeoutMs: Long): Boolean = {
+    var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
+    if (bucket != null) {
+      writeLock.lock()
+      try {
+        while (bucket != null) {
+          timingWheel.advanceClock(bucket.getExpiration())
+          bucket.flush(reinsert)
+          bucket = delayQueue.poll()
+        }
+      } finally {
+        writeLock.unlock()
+      }
+      true
+    } else {
+      false
+    }
+  }
+
+  def size(): Int = taskCounter.get
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/main/scala/kafka/utils/timer/TimerTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala 
b/core/src/main/scala/kafka/utils/timer/TimerTask.scala
new file mode 100644
index 0000000..3407138
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.utils.timer
+
+trait TimerTask extends Runnable {
+
+  val expirationMs: Long // timestamp in millisecond
+
+  private[this] var timerTaskEntry: TimerTaskEntry = null
+
+  def cancel(): Unit = {
+    synchronized {
+      if (timerTaskEntry != null) timerTaskEntry.remove()
+      timerTaskEntry = null
+    }
+  }
+
+  private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = {
+    synchronized {
+      // if this timerTask is already held by an existing timer task entry,
+      // we will remove such an entry first.
+      if (timerTaskEntry != null && timerTaskEntry != entry) {
+        timerTaskEntry.remove()
+      }
+      timerTaskEntry = entry
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala 
b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
new file mode 100644
index 0000000..e7a9657
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.utils.timer
+
+import java.util.concurrent.{TimeUnit, Delayed}
+import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
+
+import kafka.utils.{SystemTime, threadsafe}
+
+import scala.math._
+
+@threadsafe
+private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed 
{
+
+  // TimerTaskList forms a doubly linked cyclic list using a dummy root entry
+  // root.next points to the head
+  // root.prev points to the tail
+  private[this] val root = new TimerTaskEntry(null)
+  root.next = root
+  root.prev = root
+
+  private[this] val expiration = new AtomicLong(-1L)
+
+  // Set the bucket's expiration time
+  // Returns true if the expiration time is changed
+  def setExpiration(expirationMs: Long): Boolean = {
+    expiration.getAndSet(expirationMs) != expirationMs
+  }
+
+  // Get the bucket's expiration time
+  def getExpiration(): Long = {
+    expiration.get()
+  }
+
+  // Apply the supplied function to each of tasks in this list
+  def foreach(f: (TimerTask)=>Unit): Unit = {
+    synchronized {
+      var entry = root.next
+      while (entry ne root) {
+        val nextEntry = entry.next
+        f(entry.timerTask)
+        entry = nextEntry
+      }
+    }
+  }
+
+  // Add a timer task entry to this list
+  def add(timerTaskEntry: TimerTaskEntry): Unit = {
+    synchronized {
+      // put the timer task entry to the end of the list. (root.prev points to 
the tail entry)
+      val tail = root.prev
+      timerTaskEntry.next = root
+      timerTaskEntry.prev = tail
+      timerTaskEntry.list = this
+      tail.next = timerTaskEntry
+      root.prev = timerTaskEntry
+      taskCounter.incrementAndGet()
+    }
+  }
+
+  // Remove the specified timer task entry from this list
+  def remove(timerTaskEntry: TimerTaskEntry): Unit = {
+    synchronized {
+      if (timerTaskEntry.list != null) {
+        timerTaskEntry.next.prev = timerTaskEntry.prev
+        timerTaskEntry.prev.next = timerTaskEntry.next
+        timerTaskEntry.next = null
+        timerTaskEntry.prev = null
+        timerTaskEntry.list = null
+        taskCounter.decrementAndGet()
+      }
+    }
+  }
+
+  // Remove all task entries and apply the supplied function to each of them
+  def flush(f: (TimerTaskEntry)=>Unit): Unit = {
+    synchronized {
+      var head = root.next
+      while (head ne root) {
+        remove(head)
+        f(head)
+        head = root.next
+      }
+      expiration.set(-1L)
+    }
+  }
+
+  def getDelay(unit: TimeUnit): Long = {
+    unit.convert(max(getExpiration - SystemTime.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  }
+
+  def compareTo(d: Delayed): Int = {
+
+    val other = d.asInstanceOf[TimerTaskList]
+
+    if(getExpiration < other.getExpiration) -1
+    else if(getExpiration > other.getExpiration) 1
+    else 0
+  }
+
+}
+
+private[timer] class TimerTaskEntry(val timerTask: TimerTask) {
+
+  var list: TimerTaskList = null
+  var next: TimerTaskEntry = null
+  var prev: TimerTaskEntry = null
+
+  // if this timerTask is already held by an existing timer task entry,
+  // setTimerTaskEntry will remove it.
+  if (timerTask != null) timerTask.setTimerTaskEntry(this)
+
+  def remove(): Unit = {
+    if (list != null) list.remove(this)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala 
b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
new file mode 100644
index 0000000..9a36c20
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.utils.timer
+
+import kafka.utils.nonthreadsafe
+
+import java.util.concurrent.DelayQueue
+import java.util.concurrent.atomic.AtomicInteger
+
+/*
+ * Hierarchical Timing Wheels
+ *
+ * A simple timing wheel is a circular list of buckets of timer tasks. Let u 
be the time unit.
+ * A timing wheel with size n has n buckets and can hold timer tasks in n * u 
time interval.
+ * Each bucket holds timer tasks that fall into the corresponding time range. 
At the beginning,
+ * the first bucket holds tasks for [0, u), the second bucket holds tasks for 
[u, 2u), …,
+ * the n-th bucket for [u * (n -1), u * n). Every interval of time unit u, the 
timer ticks and
+ * moved to the next bucket then expire all timer tasks in it. So, the timer 
never insert a task
+ * into the bucket for the current time since it is already expired. The timer 
immediately runs
+ * the expired task. The emptied bucket is then available for the next round, 
so if the current
+ * bucket is for the time t, it becomes the bucket for [t + u * n, t + (n + 1) 
* u) after a tick.
+ * A timing wheel has O(1) cost for insert/delete (start-timer/stop-timer) 
whereas priority queue
+ * based timers, such as java.util.concurrent.DelayQueue and java.util.Timer, 
have O(log n)
+ * insert/delete cost.
+ *
+ * A major drawback of a simple timing wheel is that it assumes that a timer 
request is within
+ * the time interval of n * u from the current time. If a timer request is out 
of this interval,
+ * it is an overflow. A hierarchical timing wheel deals with such overflows. 
It is a hierarchically
+ * organized timing wheels. The lowest level has the finest time resolution. 
As moving up the
+ * hierarchy, time resolutions become coarser. If the resolution of a wheel at 
one level is u and
+ * the size is n, the resolution of the next level should be n * u. At each 
level overflows are
+ * delegated to the wheel in one level higher. When the wheel in the higher 
level ticks, it reinsert
+ * timer tasks to the lower level. An overflow wheel can be created on-demand. 
When a bucket in an
+ * overflow bucket expires, all tasks in it are reinserted into the timer 
recursively. The tasks
+ * are then moved to the finer grain wheels or be executed. The insert 
(start-timer) cost is O(m)
+ * where m is the number of wheels, which is usually very small compared to 
the number of requests
+ * in the system, and the delete (stop-timer) cost is still O(1).
+ *
+ * Example
+ * Let's say that u is 1 and n is 3. If the start time is c,
+ * then the buckets at different levels are:
+ *
+ * level    buckets
+ * 1        [c,c]   [c+1,c+1]  [c+2,c+2]
+ * 2        [c,c+2] [c+3,c+5]  [c+6,c+8]
+ * 3        [c,c+8] [c+9,c+17] [c+18,c+26]
+ *
+ * The bucket expiration is at the time of bucket beginning.
+ * So at time = c+1, buckets [c,c], [c,c+2] and [c,c+8] are expired.
+ * Level 1's clock moves to c+1, and [c+3,c+3] is created.
+ * Level 2 and level3's clock stay at c since their clocks move in unit of 3 
and 9, respectively.
+ * So, no new buckets are created in level 2 and 3.
+ *
+ * Note that bucket [c,c+2] in level 2 won't receive any task since that range 
is already covered in level 1.
+ * The same is true for the bucket [c,c+8] in level 3 since its range is 
covered in level 2.
+ * This is a bit wasteful, but simplifies the implementation.
+ *
+ * 1        [c+1,c+1]  [c+2,c+2]  [c+3,c+3]
+ * 2        [c,c+2]    [c+3,c+5]  [c+6,c+8]
+ * 3        [c,c+8]    [c+9,c+17] [c+18,c+26]
+ *
+ * At time = c+2, [c+1,c+1] is newly expired.
+ * Level 1 moves to c+2, and [c+4,c+4] is created,
+ *
+ * 1        [c+2,c+2]  [c+3,c+3]  [c+4,c+4]
+ * 2        [c,c+2]    [c+3,c+5]  [c+6,c+8]
+ * 3        [c,c+8]    [c+9,c+17] [c+18,c+18]
+ *
+ * At time = c+3, [c+2,c+2] is newly expired.
+ * Level 2 moves to c+3, and [c+5,c+5] and [c+9,c+11] are created.
+ * Level 3 stay at c.
+ *
+ * 1        [c+3,c+3]  [c+4,c+4]  [c+5,c+5]
+ * 2        [c+3,c+5]  [c+6,c+8]  [c+9,c+11]
+ * 3        [c,c+8]    [c+9,c+17] [c+8,c+11]
+ *
+ * The hierarchical timing wheels works especially well when operations are 
completed before they time out.
+ * Even when everything times out, it still has advantageous when there are 
many items in the timer.
+ * Its insert cost (including reinsert) and delete cost are O(m) and O(1), 
respectively while priority
+ * queue based timers takes O(log N) for both insert and delete where N is the 
number of items in the queue.
+ *
+ * This class is not thread-safe. There should not be any add calls while 
advanceClock is executing.
+ * It is caller's responsibility to enforce it. Simultaneous add calls are 
thread-safe.
+ */
+@nonthreadsafe
+private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, 
taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
+
+  private[this] val interval = tickMs * wheelSize
+  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => 
new TimerTaskList(taskCounter) }
+
+  private[this] var currentTime = startMs - (startMs % tickMs) // rounding 
down to multiple of tickMs
+  private[this] var overflowWheel: TimingWheel = null
+
+  private[this] def addOverflowWheel(): Unit = {
+    synchronized {
+      if (overflowWheel == null) {
+        overflowWheel = new TimingWheel(
+          tickMs = interval,
+          wheelSize = wheelSize,
+          startMs = currentTime,
+          taskCounter = taskCounter,
+          queue
+        )
+      }
+    }
+  }
+
+  def add(timerTaskEntry: TimerTaskEntry): Boolean = {
+    val expiration = timerTaskEntry.timerTask.expirationMs
+
+    if (expiration < currentTime + tickMs) {
+      // Already expired
+      false
+    } else if (expiration < currentTime + interval) {
+      // Put in its own bucket
+      val virtualId = expiration / tickMs
+      val bucket = buckets((virtualId % wheelSize.toLong).toInt)
+      bucket.add(timerTaskEntry)
+
+      // Set the bucket expiration time
+      if (bucket.setExpiration(virtualId * tickMs)) {
+        // The bucket needs to be enqueued because it was an expired bucket
+        // We only need to enqueue the bucket when its expiration time has 
changed, i.e. the wheel has advanced
+        // and the previous buckets gets reused; further calls to set the 
expiration within the same wheel cycle
+        // will pass in the same value and hence return false, thus the bucket 
with the same expiration will not
+        // be enqueued multiple times.
+        queue.offer(bucket)
+      }
+      true
+    } else {
+      // Out of the interval. Put it into the parent timer
+      if (overflowWheel == null) addOverflowWheel()
+      overflowWheel.add(timerTaskEntry)
+    }
+  }
+
+  // Try to advance the clock
+  def advanceClock(timeMs: Long): Unit = {
+    if (timeMs >= currentTime + tickMs) {
+      currentTime = timeMs - (timeMs % tickMs)
+
+      // Try to advance the clock of the overflow wheel if present
+      if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index 7a37617..9186c90 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -20,17 +20,16 @@ package kafka.server
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import junit.framework.Assert._
-import kafka.utils.TestUtils
 
 class DelayedOperationTest extends JUnit3Suite {
 
   var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null
-  
+
   override def setUp() {
     super.setUp()
-    purgatory = new 
DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock", 0, 5)
+    purgatory = new 
DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock")
   }
-  
+
   override def tearDown() {
     purgatory.shutdown()
     super.tearDown()
@@ -72,32 +71,34 @@ class DelayedOperationTest extends JUnit3Suite {
   def testRequestPurge() {
     val r1 = new MockDelayedOperation(100000L)
     val r2 = new MockDelayedOperation(100000L)
+    val r3 = new MockDelayedOperation(100000L)
     purgatory.tryCompleteElseWatch(r1, Array("test1"))
     purgatory.tryCompleteElseWatch(r2, Array("test1", "test2"))
-    purgatory.tryCompleteElseWatch(r1, Array("test2", "test3"))
+    purgatory.tryCompleteElseWatch(r3, Array("test1", "test2", "test3"))
 
-    assertEquals("Purgatory should have 5 watched elements", 5, 
purgatory.watched())
     assertEquals("Purgatory should have 3 total delayed operations", 3, 
purgatory.delayed())
+    assertEquals("Purgatory should have 6 watched elements", 6, 
purgatory.watched())
 
-    // complete one of the operations, it should
-    // eventually be purged from the watch list with purge interval 5
+    // complete the operations, it should immediately be purged from the 
delayed operation
     r2.completable = true
     r2.tryComplete()
-    TestUtils.waitUntilTrue(() => purgatory.watched() == 3,
-      "Purgatory should have 3 watched elements instead of " + 
purgatory.watched(), 1000L)
-    TestUtils.waitUntilTrue(() => purgatory.delayed() == 3,
-      "Purgatory should still have 3 total delayed operations instead of " + 
purgatory.delayed(), 1000L)
+    assertEquals("Purgatory should have 2 total delayed operations instead of 
" + purgatory.delayed(), 2, purgatory.delayed())
 
-    // add two more requests, then the satisfied request should be purged from 
the delayed queue with purge interval 5
-    purgatory.tryCompleteElseWatch(r1, Array("test1"))
-    purgatory.tryCompleteElseWatch(r1, Array("test1"))
+    r3.completable = true
+    r3.tryComplete()
+    assertEquals("Purgatory should have 1 total delayed operations instead of 
" + purgatory.delayed(), 1, purgatory.delayed())
 
-    TestUtils.waitUntilTrue(() => purgatory.watched() == 5,
-      "Purgatory should have 5 watched elements instead of " + 
purgatory.watched(), 1000L)
-    TestUtils.waitUntilTrue(() => purgatory.delayed() == 4,
-      "Purgatory should have 4 total delayed operations instead of " + 
purgatory.delayed(), 1000L)
+    // checking a watch should purge the watch list
+    purgatory.checkAndComplete("test1")
+    assertEquals("Purgatory should have 4 watched elements instead of " + 
purgatory.watched(), 4, purgatory.watched())
+
+    purgatory.checkAndComplete("test2")
+    assertEquals("Purgatory should have 2 watched elements instead of " + 
purgatory.watched(), 2, purgatory.watched())
+
+    purgatory.checkAndComplete("test3")
+    assertEquals("Purgatory should have 1 watched elements instead of " + 
purgatory.watched(), 1, purgatory.watched())
   }
-  
+
   class MockDelayedOperation(delayMs: Long) extends DelayedOperation(delayMs) {
     var completable = false
 
@@ -124,5 +125,5 @@ class DelayedOperationTest extends JUnit3Suite {
       }
     }
   }
-  
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala 
b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
new file mode 100644
index 0000000..052aecd
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.utils.timer
+
+import junit.framework.Assert._
+import java.util.concurrent.atomic._
+import org.junit.{Test, After, Before}
+
+class TimerTaskListTest {
+
+  private class TestTask(val expirationMs: Long) extends TimerTask {
+    def run(): Unit = { }
+  }
+
+  private def size(list: TimerTaskList): Int = {
+    var count = 0
+    list.foreach(_ => count += 1)
+    count
+  }
+
+  @Test
+  def testAll() {
+    val sharedCounter = new AtomicInteger(0)
+    val runCounter = new AtomicInteger(0)
+    val execCounter = new AtomicInteger(0)
+    val list1 = new TimerTaskList(sharedCounter)
+    val list2 = new TimerTaskList(sharedCounter)
+    val list3 = new TimerTaskList(sharedCounter)
+
+    val tasks = (1 to 10).map { i =>
+      val task = new TestTask(10L)
+      list1.add(new TimerTaskEntry(task))
+      assertEquals(i, sharedCounter.get)
+      task
+    }.toSeq
+
+    assertEquals(tasks.size, sharedCounter.get)
+
+    // reinserting the existing tasks shouldn't change the task count
+    tasks.take(4).foreach { task =>
+      val prevCount = sharedCounter.get
+      // new TimerTaskEntry(task) will remove the existing entry from the list
+      list2.add(new TimerTaskEntry(task))
+      assertEquals(prevCount, sharedCounter.get)
+    }
+    assertEquals(10 - 4, size(list1))
+    assertEquals(4, size(list2))
+
+    assertEquals(tasks.size, sharedCounter.get)
+
+    // reinserting the existing tasks shouldn't change the task count
+    tasks.drop(4).foreach { task =>
+      val prevCount = sharedCounter.get
+      // new TimerTaskEntry(task) will remove the existing entry from the list
+      list3.add(new TimerTaskEntry(task))
+      assertEquals(prevCount, sharedCounter.get)
+    }
+    assertEquals(0, size(list1))
+    assertEquals(4, size(list2))
+    assertEquals(6, size(list3))
+
+    assertEquals(tasks.size, sharedCounter.get)
+
+    // cancel tasks in lists
+    list1.foreach { _.cancel() }
+    assertEquals(0, size(list1))
+    assertEquals(4, size(list2))
+    assertEquals(6, size(list3))
+
+    list2.foreach { _.cancel() }
+    assertEquals(0, size(list1))
+    assertEquals(0, size(list2))
+    assertEquals(6, size(list3))
+
+    list3.foreach { _.cancel() }
+    assertEquals(0, size(list1))
+    assertEquals(0, size(list2))
+    assertEquals(0, size(list3))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala 
b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
new file mode 100644
index 0000000..8507592
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.utils.timer
+
+import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, 
TimeUnit}
+
+import junit.framework.Assert._
+import java.util.concurrent.atomic._
+import org.junit.{Test, After, Before}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+class TimerTest {
+
+  private class TestTask(override val expirationMs: Long, id: Int, latch: 
CountDownLatch, output: ArrayBuffer[Int]) extends TimerTask {
+    private[this] val completed = new AtomicBoolean(false)
+    def run(): Unit = {
+      if (completed.compareAndSet(false, true)) {
+        output.synchronized { output += id }
+        latch.countDown()
+      }
+    }
+  }
+
+  private[this] var executor: ExecutorService = null
+
+  @Before
+  def setup() {
+    executor = Executors.newSingleThreadExecutor()
+  }
+
+  @After
+  def teardown(): Unit = {
+    executor.shutdown()
+    executor = null
+  }
+
+  @Test
+  def testAlreadyExpiredTask(): Unit = {
+    val startTime = System.currentTimeMillis()
+    val timer = new Timer(taskExecutor = executor, tickMs = 1, wheelSize = 3, 
startMs = startTime)
+    val output = new ArrayBuffer[Int]()
+
+
+    val latches = (-5 until 0).map { i =>
+      val latch = new CountDownLatch(1)
+      timer.add(new TestTask(startTime + i, i, latch, output))
+      latch
+    }
+
+    latches.take(5).foreach { latch =>
+      assertEquals("already expired tasks should run immediately", true, 
latch.await(3, TimeUnit.SECONDS))
+    }
+
+    assertEquals("output of already expired tasks", Set(-5, -4, -3, -2, -1), 
output.toSet)
+  }
+
+  @Test
+  def testTaskExpiration(): Unit = {
+    val startTime = System.currentTimeMillis()
+    val timer = new Timer(taskExecutor = executor, tickMs = 1, wheelSize = 3, 
startMs = startTime)
+    val output = new ArrayBuffer[Int]()
+
+    val tasks = new ArrayBuffer[TestTask]()
+    val ids = new ArrayBuffer[Int]()
+
+    val latches =
+      (0 until 5).map { i =>
+        val latch = new CountDownLatch(1)
+        tasks += new TestTask(startTime + i, i, latch, output)
+        ids += i
+        latch
+      } ++ (10 until 100).map { i =>
+        val latch = new CountDownLatch(2)
+        tasks += new TestTask(startTime + i, i, latch, output)
+        tasks += new TestTask(startTime + i, i, latch, output)
+        ids += i
+        ids += i
+        latch
+      } ++ (100 until 500).map { i =>
+        val latch = new CountDownLatch(1)
+        tasks += new TestTask(startTime + i, i, latch, output)
+        ids += i
+        latch
+      }
+
+    // randomly submit requests
+    Random.shuffle(tasks.toSeq).foreach { task => timer.add(task) }
+
+    while (timer.advanceClock(1000)) {}
+
+    latches.foreach { latch => latch.await() }
+
+    assertEquals("output should match", ids.sorted, output.toSeq)
+  }
+}

Reply via email to