Repository: kafka Updated Branches: refs/heads/trunk d3aa99c54 -> 6880f66c9
kafka-1989; New purgatory design; patched by Yasuhiro Matsuda; reviewed by Guozhang Wang and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6880f66c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6880f66c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6880f66c Branch: refs/heads/trunk Commit: 6880f66c97f63b2d1f3750f1753ec8b6094cb8a5 Parents: d3aa99c Author: Yasuhiro Matsuda <yasuhiro.mats...@gmail.com> Authored: Wed Apr 8 15:19:59 2015 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Wed Apr 8 15:19:59 2015 -0700 ---------------------------------------------------------------------- .../scala/kafka/server/DelayedOperation.scala | 159 +++++++++--------- .../scala/kafka/server/ReplicaManager.scala | 1 - .../main/scala/kafka/utils/timer/Timer.scala | 86 ++++++++++ .../scala/kafka/utils/timer/TimerTask.scala | 43 +++++ .../scala/kafka/utils/timer/TimerTaskList.scala | 132 +++++++++++++++ .../scala/kafka/utils/timer/TimingWheel.scala | 160 +++++++++++++++++++ .../kafka/server/DelayedOperationTest.scala | 45 +++--- .../kafka/utils/timer/TimerTaskListTest.scala | 95 +++++++++++ .../unit/kafka/utils/timer/TimerTest.scala | 111 +++++++++++++ 9 files changed, 723 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/main/scala/kafka/server/DelayedOperation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index e317676..2ed9b46 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -18,11 +18,15 @@ package kafka.server import kafka.utils._ +import kafka.utils.timer._ import kafka.metrics.KafkaMetricsGroup -import java.util +import java.util.LinkedList import java.util.concurrent._ import java.util.concurrent.atomic._ + +import org.apache.kafka.common.utils.Utils + import scala.collection._ import com.yammer.metrics.core.Gauge @@ -41,7 +45,10 @@ import com.yammer.metrics.core.Gauge * * A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete(). */ -abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) { +abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging { + + override val expirationMs = delayMs + System.currentTimeMillis() + private val completed = new AtomicBoolean(false) /* @@ -58,6 +65,8 @@ abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) { */ def forceComplete(): Boolean = { if (completed.compareAndSet(false, true)) { + // cancel the timeout timer + cancel() onComplete() true } else { @@ -71,7 +80,7 @@ abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) { def isCompleted(): Boolean = completed.get() /** - * Call-back to execute when a delayed operation expires, but before completion. + * Call-back to execute when a delayed operation gets expired and hence forced to complete. */ def onExpiration(): Unit @@ -89,6 +98,14 @@ abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) { * This function needs to be defined in subclasses */ def tryComplete(): Boolean + + /* + * run() method defines a task that is executed on timeout + */ + override def run(): Unit = { + if (forceComplete()) + onExpiration() + } } /** @@ -97,11 +114,21 @@ abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) { class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000) extends Logging with KafkaMetricsGroup { + // timeout timer + private[this] val executor = Executors.newFixedThreadPool(1, new ThreadFactory() { + def newThread(runnable: Runnable): Thread = + Utils.newThread("executor-"+purgatoryName, runnable, false) + }) + private[this] val timeoutTimer = new Timer(executor) + /* a list of operation watching keys */ private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) + // the number of estimated total operations in the purgatory + private[this] val estimatedTotalOperations = new AtomicInteger(0) + /* background thread expiring operations that have timed out */ - private val expirationReaper = new ExpiredOperationReaper + private val expirationReaper = new ExpiredOperationReaper() private val metricsTags = Map("delayedOperation" -> purgatoryName) @@ -153,12 +180,18 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br if (isCompletedByMe) return true + var watchCreated = false for(key <- watchKeys) { // If the operation is already completed, stop adding it to the rest of the watcher list. if (operation.isCompleted()) return false val watchers = watchersFor(key) watchers.watch(operation) + + if (!watchCreated) { + watchCreated = true + estimatedTotalOperations.incrementAndGet() + } } isCompletedByMe = operation synchronized operation.tryComplete() @@ -166,8 +199,13 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br return true // if it cannot be completed by now and hence is watched, add to the expire queue also - if (! operation.isCompleted()) - expirationReaper.enqueue(operation) + if (! operation.isCompleted()) { + timeoutTimer.add(operation) + if (operation.isCompleted()) { + // cancel the timer task + operation.cancel() + } + } false } @@ -196,7 +234,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br /** * Return the number of delayed operations in the expiry queue */ - def delayed() = expirationReaper.delayed + def delayed() = timeoutTimer.size /* * Return the watch list of the given key @@ -208,52 +246,51 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br */ def shutdown() { expirationReaper.shutdown() + executor.shutdown() } /** * A linked list of watched delayed operations based on some key */ private class Watchers { - private val operations = new util.LinkedList[T] - def watched = operations.size() + private[this] val operations = new LinkedList[T]() + + def watched(): Int = operations synchronized operations.size // add the element to watch def watch(t: T) { - synchronized { - operations.add(t) - } + operations synchronized operations.add(t) } // traverse the list and try to complete some watched elements def tryCompleteWatched(): Int = { - var completed = 0 - synchronized { + + operations synchronized { + var completed = 0 val iter = operations.iterator() - while(iter.hasNext) { - val curr = iter.next - if (curr.isCompleted()) { + while (iter.hasNext) { + val curr = iter.next() + if (curr.isCompleted) { // another thread has completed this operation, just remove it iter.remove() - } else { - if(curr synchronized curr.tryComplete()) { - iter.remove() - completed += 1 - } + } else if (curr synchronized curr.tryComplete()) { + completed += 1 + iter.remove() } } + completed } - completed } // traverse the list and purge elements that are already completed by others def purgeCompleted(): Int = { var purged = 0 - synchronized { + operations synchronized { val iter = operations.iterator() while (iter.hasNext) { - val curr = iter.next - if(curr.isCompleted()) { + val curr = iter.next() + if (curr.isCompleted) { iter.remove() purged += 1 } @@ -270,71 +307,21 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br "ExpirationReaper-%d".format(brokerId), false) { - /* The queue storing all delayed operations */ - private val delayedQueue = new DelayQueue[T] - - /* - * Return the number of delayed operations kept by the reaper - */ - def delayed() = delayedQueue.size() - - /* - * Add an operation to be expired - */ - def enqueue(t: T) { - delayedQueue.add(t) - } - - /** - * Try to get the next expired event and force completing it - */ - private def expireNext() { - val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS) - if (curr != null.asInstanceOf[T]) { - // if there is an expired operation, try to force complete it - val completedByMe: Boolean = curr synchronized { - curr.onExpiration() - curr.forceComplete() - } - if (completedByMe) - debug("Force complete expired delayed operation %s".format(curr)) - } - } - - /** - * Delete all satisfied events from the delay queue and the watcher lists - */ - private def purgeCompleted(): Int = { - var purged = 0 - - // purge the delayed queue - val iter = delayedQueue.iterator() - while (iter.hasNext) { - val curr = iter.next() - if (curr.isCompleted()) { - iter.remove() - purged += 1 - } - } - - purged - } - override def doWork() { - // try to get the next expired operation and force completing it - expireNext() - // see if we need to purge the watch lists - if (DelayedOperationPurgatory.this.watched() >= purgeInterval) { + timeoutTimer.advanceClock(200L) + + // Trigger a purge if the number of completed but still being watched operations is larger than + // the purge threshold. That number is computed by the difference btw the estimated total number of + // operations and the number of pending delayed operations. + if (estimatedTotalOperations.get - delayed > purgeInterval) { + // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to + // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with + // a little overestimated total number of operations. + estimatedTotalOperations.getAndSet(delayed) debug("Begin purging watch lists") val purged = watchersForKey.values.map(_.purgeCompleted()).sum debug("Purged %d elements from watch lists.".format(purged)) } - // see if we need to purge the delayed operation queue - if (delayed() >= purgeInterval) { - debug("Begin purging delayed queue") - val purged = purgeCompleted() - debug("Purged %d operations from delayed queue.".format(purged)) - } } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b06f00b..8ddd325 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -111,7 +111,6 @@ class ReplicaManager(val config: KafkaConfig, val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) - newGauge( "LeaderCount", new Gauge[Int] { http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/main/scala/kafka/utils/timer/Timer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala new file mode 100644 index 0000000..b8cde82 --- /dev/null +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils.timer + +import java.util.concurrent.{DelayQueue, ExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.ReentrantReadWriteLock + +import kafka.utils.threadsafe + +@threadsafe +class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20, startMs: Long = System.currentTimeMillis) { + + private[this] val delayQueue = new DelayQueue[TimerTaskList]() + private[this] val taskCounter = new AtomicInteger(0) + private[this] val timingWheel = new TimingWheel( + tickMs = tickMs, + wheelSize = wheelSize, + startMs = startMs, + taskCounter = taskCounter, + delayQueue + ) + + // Locks used to protect data structures while ticking + private[this] val readWriteLock = new ReentrantReadWriteLock() + private[this] val readLock = readWriteLock.readLock() + private[this] val writeLock = readWriteLock.writeLock() + + def add(timerTask: TimerTask): Unit = { + readLock.lock() + try { + addTimerTaskEntry(new TimerTaskEntry(timerTask)) + } finally { + readLock.unlock() + } + } + + private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { + if (!timingWheel.add(timerTaskEntry)) { + // already expired + taskExecutor.submit(timerTaskEntry.timerTask) + } + } + + private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry) + + /* + * Advances the clock if there is an expired bucket. If there isn't any expired bucket when called, + * waits up to timeoutMs before giving up. + */ + def advanceClock(timeoutMs: Long): Boolean = { + var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) + if (bucket != null) { + writeLock.lock() + try { + while (bucket != null) { + timingWheel.advanceClock(bucket.getExpiration()) + bucket.flush(reinsert) + bucket = delayQueue.poll() + } + } finally { + writeLock.unlock() + } + true + } else { + false + } + } + + def size(): Int = taskCounter.get +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/main/scala/kafka/utils/timer/TimerTask.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala b/core/src/main/scala/kafka/utils/timer/TimerTask.scala new file mode 100644 index 0000000..3407138 --- /dev/null +++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils.timer + +trait TimerTask extends Runnable { + + val expirationMs: Long // timestamp in millisecond + + private[this] var timerTaskEntry: TimerTaskEntry = null + + def cancel(): Unit = { + synchronized { + if (timerTaskEntry != null) timerTaskEntry.remove() + timerTaskEntry = null + } + } + + private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = { + synchronized { + // if this timerTask is already held by an existing timer task entry, + // we will remove such an entry first. + if (timerTaskEntry != null && timerTaskEntry != entry) { + timerTaskEntry.remove() + } + timerTaskEntry = entry + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala new file mode 100644 index 0000000..e7a9657 --- /dev/null +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils.timer + +import java.util.concurrent.{TimeUnit, Delayed} +import java.util.concurrent.atomic.{AtomicLong, AtomicInteger} + +import kafka.utils.{SystemTime, threadsafe} + +import scala.math._ + +@threadsafe +private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { + + // TimerTaskList forms a doubly linked cyclic list using a dummy root entry + // root.next points to the head + // root.prev points to the tail + private[this] val root = new TimerTaskEntry(null) + root.next = root + root.prev = root + + private[this] val expiration = new AtomicLong(-1L) + + // Set the bucket's expiration time + // Returns true if the expiration time is changed + def setExpiration(expirationMs: Long): Boolean = { + expiration.getAndSet(expirationMs) != expirationMs + } + + // Get the bucket's expiration time + def getExpiration(): Long = { + expiration.get() + } + + // Apply the supplied function to each of tasks in this list + def foreach(f: (TimerTask)=>Unit): Unit = { + synchronized { + var entry = root.next + while (entry ne root) { + val nextEntry = entry.next + f(entry.timerTask) + entry = nextEntry + } + } + } + + // Add a timer task entry to this list + def add(timerTaskEntry: TimerTaskEntry): Unit = { + synchronized { + // put the timer task entry to the end of the list. (root.prev points to the tail entry) + val tail = root.prev + timerTaskEntry.next = root + timerTaskEntry.prev = tail + timerTaskEntry.list = this + tail.next = timerTaskEntry + root.prev = timerTaskEntry + taskCounter.incrementAndGet() + } + } + + // Remove the specified timer task entry from this list + def remove(timerTaskEntry: TimerTaskEntry): Unit = { + synchronized { + if (timerTaskEntry.list != null) { + timerTaskEntry.next.prev = timerTaskEntry.prev + timerTaskEntry.prev.next = timerTaskEntry.next + timerTaskEntry.next = null + timerTaskEntry.prev = null + timerTaskEntry.list = null + taskCounter.decrementAndGet() + } + } + } + + // Remove all task entries and apply the supplied function to each of them + def flush(f: (TimerTaskEntry)=>Unit): Unit = { + synchronized { + var head = root.next + while (head ne root) { + remove(head) + f(head) + head = root.next + } + expiration.set(-1L) + } + } + + def getDelay(unit: TimeUnit): Long = { + unit.convert(max(getExpiration - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS) + } + + def compareTo(d: Delayed): Int = { + + val other = d.asInstanceOf[TimerTaskList] + + if(getExpiration < other.getExpiration) -1 + else if(getExpiration > other.getExpiration) 1 + else 0 + } + +} + +private[timer] class TimerTaskEntry(val timerTask: TimerTask) { + + var list: TimerTaskList = null + var next: TimerTaskEntry = null + var prev: TimerTaskEntry = null + + // if this timerTask is already held by an existing timer task entry, + // setTimerTaskEntry will remove it. + if (timerTask != null) timerTask.setTimerTaskEntry(this) + + def remove(): Unit = { + if (list != null) list.remove(this) + } + +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/main/scala/kafka/utils/timer/TimingWheel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala new file mode 100644 index 0000000..9a36c20 --- /dev/null +++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils.timer + +import kafka.utils.nonthreadsafe + +import java.util.concurrent.DelayQueue +import java.util.concurrent.atomic.AtomicInteger + +/* + * Hierarchical Timing Wheels + * + * A simple timing wheel is a circular list of buckets of timer tasks. Let u be the time unit. + * A timing wheel with size n has n buckets and can hold timer tasks in n * u time interval. + * Each bucket holds timer tasks that fall into the corresponding time range. At the beginning, + * the first bucket holds tasks for [0, u), the second bucket holds tasks for [u, 2u), â¦, + * the n-th bucket for [u * (n -1), u * n). Every interval of time unit u, the timer ticks and + * moved to the next bucket then expire all timer tasks in it. So, the timer never insert a task + * into the bucket for the current time since it is already expired. The timer immediately runs + * the expired task. The emptied bucket is then available for the next round, so if the current + * bucket is for the time t, it becomes the bucket for [t + u * n, t + (n + 1) * u) after a tick. + * A timing wheel has O(1) cost for insert/delete (start-timer/stop-timer) whereas priority queue + * based timers, such as java.util.concurrent.DelayQueue and java.util.Timer, have O(log n) + * insert/delete cost. + * + * A major drawback of a simple timing wheel is that it assumes that a timer request is within + * the time interval of n * u from the current time. If a timer request is out of this interval, + * it is an overflow. A hierarchical timing wheel deals with such overflows. It is a hierarchically + * organized timing wheels. The lowest level has the finest time resolution. As moving up the + * hierarchy, time resolutions become coarser. If the resolution of a wheel at one level is u and + * the size is n, the resolution of the next level should be n * u. At each level overflows are + * delegated to the wheel in one level higher. When the wheel in the higher level ticks, it reinsert + * timer tasks to the lower level. An overflow wheel can be created on-demand. When a bucket in an + * overflow bucket expires, all tasks in it are reinserted into the timer recursively. The tasks + * are then moved to the finer grain wheels or be executed. The insert (start-timer) cost is O(m) + * where m is the number of wheels, which is usually very small compared to the number of requests + * in the system, and the delete (stop-timer) cost is still O(1). + * + * Example + * Let's say that u is 1 and n is 3. If the start time is c, + * then the buckets at different levels are: + * + * level buckets + * 1 [c,c] [c+1,c+1] [c+2,c+2] + * 2 [c,c+2] [c+3,c+5] [c+6,c+8] + * 3 [c,c+8] [c+9,c+17] [c+18,c+26] + * + * The bucket expiration is at the time of bucket beginning. + * So at time = c+1, buckets [c,c], [c,c+2] and [c,c+8] are expired. + * Level 1's clock moves to c+1, and [c+3,c+3] is created. + * Level 2 and level3's clock stay at c since their clocks move in unit of 3 and 9, respectively. + * So, no new buckets are created in level 2 and 3. + * + * Note that bucket [c,c+2] in level 2 won't receive any task since that range is already covered in level 1. + * The same is true for the bucket [c,c+8] in level 3 since its range is covered in level 2. + * This is a bit wasteful, but simplifies the implementation. + * + * 1 [c+1,c+1] [c+2,c+2] [c+3,c+3] + * 2 [c,c+2] [c+3,c+5] [c+6,c+8] + * 3 [c,c+8] [c+9,c+17] [c+18,c+26] + * + * At time = c+2, [c+1,c+1] is newly expired. + * Level 1 moves to c+2, and [c+4,c+4] is created, + * + * 1 [c+2,c+2] [c+3,c+3] [c+4,c+4] + * 2 [c,c+2] [c+3,c+5] [c+6,c+8] + * 3 [c,c+8] [c+9,c+17] [c+18,c+18] + * + * At time = c+3, [c+2,c+2] is newly expired. + * Level 2 moves to c+3, and [c+5,c+5] and [c+9,c+11] are created. + * Level 3 stay at c. + * + * 1 [c+3,c+3] [c+4,c+4] [c+5,c+5] + * 2 [c+3,c+5] [c+6,c+8] [c+9,c+11] + * 3 [c,c+8] [c+9,c+17] [c+8,c+11] + * + * The hierarchical timing wheels works especially well when operations are completed before they time out. + * Even when everything times out, it still has advantageous when there are many items in the timer. + * Its insert cost (including reinsert) and delete cost are O(m) and O(1), respectively while priority + * queue based timers takes O(log N) for both insert and delete where N is the number of items in the queue. + * + * This class is not thread-safe. There should not be any add calls while advanceClock is executing. + * It is caller's responsibility to enforce it. Simultaneous add calls are thread-safe. + */ +@nonthreadsafe +private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) { + + private[this] val interval = tickMs * wheelSize + private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) } + + private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs + private[this] var overflowWheel: TimingWheel = null + + private[this] def addOverflowWheel(): Unit = { + synchronized { + if (overflowWheel == null) { + overflowWheel = new TimingWheel( + tickMs = interval, + wheelSize = wheelSize, + startMs = currentTime, + taskCounter = taskCounter, + queue + ) + } + } + } + + def add(timerTaskEntry: TimerTaskEntry): Boolean = { + val expiration = timerTaskEntry.timerTask.expirationMs + + if (expiration < currentTime + tickMs) { + // Already expired + false + } else if (expiration < currentTime + interval) { + // Put in its own bucket + val virtualId = expiration / tickMs + val bucket = buckets((virtualId % wheelSize.toLong).toInt) + bucket.add(timerTaskEntry) + + // Set the bucket expiration time + if (bucket.setExpiration(virtualId * tickMs)) { + // The bucket needs to be enqueued because it was an expired bucket + // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced + // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle + // will pass in the same value and hence return false, thus the bucket with the same expiration will not + // be enqueued multiple times. + queue.offer(bucket) + } + true + } else { + // Out of the interval. Put it into the parent timer + if (overflowWheel == null) addOverflowWheel() + overflowWheel.add(timerTaskEntry) + } + } + + // Try to advance the clock + def advanceClock(timeMs: Long): Unit = { + if (timeMs >= currentTime + tickMs) { + currentTime = timeMs - (timeMs % tickMs) + + // Try to advance the clock of the overflow wheel if present + if (overflowWheel != null) overflowWheel.advanceClock(currentTime) + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index 7a37617..9186c90 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -20,17 +20,16 @@ package kafka.server import org.junit.Test import org.scalatest.junit.JUnit3Suite import junit.framework.Assert._ -import kafka.utils.TestUtils class DelayedOperationTest extends JUnit3Suite { var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null - + override def setUp() { super.setUp() - purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock", 0, 5) + purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock") } - + override def tearDown() { purgatory.shutdown() super.tearDown() @@ -72,32 +71,34 @@ class DelayedOperationTest extends JUnit3Suite { def testRequestPurge() { val r1 = new MockDelayedOperation(100000L) val r2 = new MockDelayedOperation(100000L) + val r3 = new MockDelayedOperation(100000L) purgatory.tryCompleteElseWatch(r1, Array("test1")) purgatory.tryCompleteElseWatch(r2, Array("test1", "test2")) - purgatory.tryCompleteElseWatch(r1, Array("test2", "test3")) + purgatory.tryCompleteElseWatch(r3, Array("test1", "test2", "test3")) - assertEquals("Purgatory should have 5 watched elements", 5, purgatory.watched()) assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.delayed()) + assertEquals("Purgatory should have 6 watched elements", 6, purgatory.watched()) - // complete one of the operations, it should - // eventually be purged from the watch list with purge interval 5 + // complete the operations, it should immediately be purged from the delayed operation r2.completable = true r2.tryComplete() - TestUtils.waitUntilTrue(() => purgatory.watched() == 3, - "Purgatory should have 3 watched elements instead of " + purgatory.watched(), 1000L) - TestUtils.waitUntilTrue(() => purgatory.delayed() == 3, - "Purgatory should still have 3 total delayed operations instead of " + purgatory.delayed(), 1000L) + assertEquals("Purgatory should have 2 total delayed operations instead of " + purgatory.delayed(), 2, purgatory.delayed()) - // add two more requests, then the satisfied request should be purged from the delayed queue with purge interval 5 - purgatory.tryCompleteElseWatch(r1, Array("test1")) - purgatory.tryCompleteElseWatch(r1, Array("test1")) + r3.completable = true + r3.tryComplete() + assertEquals("Purgatory should have 1 total delayed operations instead of " + purgatory.delayed(), 1, purgatory.delayed()) - TestUtils.waitUntilTrue(() => purgatory.watched() == 5, - "Purgatory should have 5 watched elements instead of " + purgatory.watched(), 1000L) - TestUtils.waitUntilTrue(() => purgatory.delayed() == 4, - "Purgatory should have 4 total delayed operations instead of " + purgatory.delayed(), 1000L) + // checking a watch should purge the watch list + purgatory.checkAndComplete("test1") + assertEquals("Purgatory should have 4 watched elements instead of " + purgatory.watched(), 4, purgatory.watched()) + + purgatory.checkAndComplete("test2") + assertEquals("Purgatory should have 2 watched elements instead of " + purgatory.watched(), 2, purgatory.watched()) + + purgatory.checkAndComplete("test3") + assertEquals("Purgatory should have 1 watched elements instead of " + purgatory.watched(), 1, purgatory.watched()) } - + class MockDelayedOperation(delayMs: Long) extends DelayedOperation(delayMs) { var completable = false @@ -124,5 +125,5 @@ class DelayedOperationTest extends JUnit3Suite { } } } - -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala new file mode 100644 index 0000000..052aecd --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils.timer + +import junit.framework.Assert._ +import java.util.concurrent.atomic._ +import org.junit.{Test, After, Before} + +class TimerTaskListTest { + + private class TestTask(val expirationMs: Long) extends TimerTask { + def run(): Unit = { } + } + + private def size(list: TimerTaskList): Int = { + var count = 0 + list.foreach(_ => count += 1) + count + } + + @Test + def testAll() { + val sharedCounter = new AtomicInteger(0) + val runCounter = new AtomicInteger(0) + val execCounter = new AtomicInteger(0) + val list1 = new TimerTaskList(sharedCounter) + val list2 = new TimerTaskList(sharedCounter) + val list3 = new TimerTaskList(sharedCounter) + + val tasks = (1 to 10).map { i => + val task = new TestTask(10L) + list1.add(new TimerTaskEntry(task)) + assertEquals(i, sharedCounter.get) + task + }.toSeq + + assertEquals(tasks.size, sharedCounter.get) + + // reinserting the existing tasks shouldn't change the task count + tasks.take(4).foreach { task => + val prevCount = sharedCounter.get + // new TimerTaskEntry(task) will remove the existing entry from the list + list2.add(new TimerTaskEntry(task)) + assertEquals(prevCount, sharedCounter.get) + } + assertEquals(10 - 4, size(list1)) + assertEquals(4, size(list2)) + + assertEquals(tasks.size, sharedCounter.get) + + // reinserting the existing tasks shouldn't change the task count + tasks.drop(4).foreach { task => + val prevCount = sharedCounter.get + // new TimerTaskEntry(task) will remove the existing entry from the list + list3.add(new TimerTaskEntry(task)) + assertEquals(prevCount, sharedCounter.get) + } + assertEquals(0, size(list1)) + assertEquals(4, size(list2)) + assertEquals(6, size(list3)) + + assertEquals(tasks.size, sharedCounter.get) + + // cancel tasks in lists + list1.foreach { _.cancel() } + assertEquals(0, size(list1)) + assertEquals(4, size(list2)) + assertEquals(6, size(list3)) + + list2.foreach { _.cancel() } + assertEquals(0, size(list1)) + assertEquals(0, size(list2)) + assertEquals(6, size(list3)) + + list3.foreach { _.cancel() } + assertEquals(0, size(list1)) + assertEquals(0, size(list2)) + assertEquals(0, size(list3)) + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6880f66c/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala new file mode 100644 index 0000000..8507592 --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils.timer + +import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit} + +import junit.framework.Assert._ +import java.util.concurrent.atomic._ +import org.junit.{Test, After, Before} + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +class TimerTest { + + private class TestTask(override val expirationMs: Long, id: Int, latch: CountDownLatch, output: ArrayBuffer[Int]) extends TimerTask { + private[this] val completed = new AtomicBoolean(false) + def run(): Unit = { + if (completed.compareAndSet(false, true)) { + output.synchronized { output += id } + latch.countDown() + } + } + } + + private[this] var executor: ExecutorService = null + + @Before + def setup() { + executor = Executors.newSingleThreadExecutor() + } + + @After + def teardown(): Unit = { + executor.shutdown() + executor = null + } + + @Test + def testAlreadyExpiredTask(): Unit = { + val startTime = System.currentTimeMillis() + val timer = new Timer(taskExecutor = executor, tickMs = 1, wheelSize = 3, startMs = startTime) + val output = new ArrayBuffer[Int]() + + + val latches = (-5 until 0).map { i => + val latch = new CountDownLatch(1) + timer.add(new TestTask(startTime + i, i, latch, output)) + latch + } + + latches.take(5).foreach { latch => + assertEquals("already expired tasks should run immediately", true, latch.await(3, TimeUnit.SECONDS)) + } + + assertEquals("output of already expired tasks", Set(-5, -4, -3, -2, -1), output.toSet) + } + + @Test + def testTaskExpiration(): Unit = { + val startTime = System.currentTimeMillis() + val timer = new Timer(taskExecutor = executor, tickMs = 1, wheelSize = 3, startMs = startTime) + val output = new ArrayBuffer[Int]() + + val tasks = new ArrayBuffer[TestTask]() + val ids = new ArrayBuffer[Int]() + + val latches = + (0 until 5).map { i => + val latch = new CountDownLatch(1) + tasks += new TestTask(startTime + i, i, latch, output) + ids += i + latch + } ++ (10 until 100).map { i => + val latch = new CountDownLatch(2) + tasks += new TestTask(startTime + i, i, latch, output) + tasks += new TestTask(startTime + i, i, latch, output) + ids += i + ids += i + latch + } ++ (100 until 500).map { i => + val latch = new CountDownLatch(1) + tasks += new TestTask(startTime + i, i, latch, output) + ids += i + latch + } + + // randomly submit requests + Random.shuffle(tasks.toSeq).foreach { task => timer.add(task) } + + while (timer.advanceClock(1000)) {} + + latches.foreach { latch => latch.await() } + + assertEquals("output should match", ids.sorted, output.toSeq) + } +}