Repository: kafka Updated Branches: refs/heads/0.10.1 f46032f64 -> abfee8549
KAFKA-3994: Fix deadlock in Watchers by calling tryComplete without any locks Author: Jason Gustafson <[email protected]> Reviewers: Ismael Juma, Jun Rao, Jiangjie Qin, Guozhang Wang Closes #2195 from hachikuji/KAFKA-3994-linked-queue Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/02e75a27 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/02e75a27 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/02e75a27 Branch: refs/heads/0.10.1 Commit: 02e75a27ce1b2a34a54142cfd3c6bf0931c94c10 Parents: f46032f Author: Jason Gustafson <[email protected]> Authored: Mon Dec 5 11:12:03 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Tue Dec 6 11:56:19 2016 -0800 ---------------------------------------------------------------------- .../kafka/coordinator/DelayedHeartbeat.scala | 5 ++ .../scala/kafka/coordinator/DelayedJoin.scala | 8 +- .../scala/kafka/server/DelayedOperation.scala | 93 +++++++++++--------- .../other/kafka/TestPurgatoryPerformance.scala | 1 - .../kafka/server/DelayedOperationTest.scala | 4 +- 5 files changed, 62 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala index 8e250c3..b05186c 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala @@ -29,6 +29,11 @@ private[coordinator] class DelayedHeartbeat(coordinator: GroupCoordinator, heartbeatDeadline: Long, sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { + + // overridden since tryComplete already synchronizes on the group. This makes it safe to + // call purgatory operations while holding the group lock. + override def safeTryComplete(): Boolean = tryComplete() + override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete) override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) override def onComplete() = coordinator.onCompleteHeartbeat() http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/main/scala/kafka/coordinator/DelayedJoin.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala index a62884a..8744f16 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala @@ -31,8 +31,12 @@ import kafka.server.DelayedOperation */ private[coordinator] class DelayedJoin(coordinator: GroupCoordinator, group: GroupMetadata, - sessionTimeout: Long) - extends DelayedOperation(sessionTimeout) { + rebalanceTimeout: Long) + extends DelayedOperation(rebalanceTimeout) { + + // overridden since tryComplete already synchronizes on the group. This makes it safe to + // call purgatory operations while holding the group lock. + override def safeTryComplete(): Boolean = tryComplete() override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete) override def onExpiration() = coordinator.onExpireJoin() http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/main/scala/kafka/server/DelayedOperation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 5248edf..553c27a 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -17,22 +17,18 @@ package kafka.server -import kafka.utils._ -import kafka.utils.timer._ -import kafka.utils.CoreUtils.{inReadLock, inWriteLock} -import kafka.metrics.KafkaMetricsGroup - -import java.util.LinkedList import java.util.concurrent._ import java.util.concurrent.atomic._ import java.util.concurrent.locks.ReentrantReadWriteLock -import org.apache.kafka.common.utils.Utils - -import scala.collection._ - import com.yammer.metrics.core.Gauge +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.CoreUtils.{inReadLock, inWriteLock} +import kafka.utils._ +import kafka.utils.timer._ + +import scala.collection._ /** * An operation whose processing needs to be delayed for at most the given delayMs. For example @@ -77,7 +73,7 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi /** * Check if the delayed operation is already completed */ - def isCompleted(): Boolean = completed.get() + def isCompleted: Boolean = completed.get() /** * Call-back to execute when a delayed operation gets expired and hence forced to complete. @@ -90,7 +86,7 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi */ def onComplete(): Unit - /* + /** * Try to complete the delayed operation by first checking if the operation * can be completed by now. If yes execute the completion logic by calling * forceComplete() and return true iff forceComplete returns true; otherwise return false @@ -99,6 +95,16 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi */ def tryComplete(): Boolean + /** + * Thread-safe variant of tryComplete(). This can be overridden if the operation provides its + * own synchronization. + */ + def safeTryComplete(): Boolean = { + synchronized { + tryComplete() + } + } + /* * run() method defines a task that is executed on timeout */ @@ -187,14 +193,14 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, // operation is unnecessarily added for watch. However, this is a less severe issue since the // expire reaper will clean it up periodically. - var isCompletedByMe = operation synchronized operation.tryComplete() + var isCompletedByMe = operation.safeTryComplete() if (isCompletedByMe) return true var watchCreated = false for(key <- watchKeys) { // If the operation is already completed, stop adding it to the rest of the watcher list. - if (operation.isCompleted()) + if (operation.isCompleted) return false watchForOperation(key, operation) @@ -204,14 +210,14 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, } } - isCompletedByMe = operation synchronized operation.tryComplete() + isCompletedByMe = operation.safeTryComplete() if (isCompletedByMe) return true // if it cannot be completed by now and hence is watched, add to the expire queue also - if (! operation.isCompleted()) { + if (!operation.isCompleted) { timeoutTimer.add(operation) - if (operation.isCompleted()) { + if (operation.isCompleted) { // cancel the timer task operation.cancel() } @@ -239,7 +245,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, * on multiple lists, and some of its watched entries may still be in the watch lists * even when it has been completed, this number may be larger than the number of real operations watched */ - def watched() = allWatchers.map(_.watched).sum + def watched() = allWatchers.map(_.countWatched).sum /** * Return the number of delayed operations in the expiry queue @@ -272,7 +278,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, if (watchersForKey.get(key) != watchers) return - if (watchers != null && watchers.watched == 0) { + if (watchers != null && watchers.isEmpty) { watchersForKey.remove(key) } } @@ -291,35 +297,35 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, * A linked list of watched delayed operations based on some key */ private class Watchers(val key: Any) { + private[this] val operations = new ConcurrentLinkedQueue[T]() - private[this] val operations = new LinkedList[T]() + // count the current number of watched operations. This is O(n), so use isEmpty() if possible + def countWatched: Int = operations.size - def watched: Int = operations synchronized operations.size + def isEmpty: Boolean = operations.isEmpty // add the element to watch def watch(t: T) { - operations synchronized operations.add(t) + operations.add(t) } // traverse the list and try to complete some watched elements def tryCompleteWatched(): Int = { - var completed = 0 - operations synchronized { - val iter = operations.iterator() - while (iter.hasNext) { - val curr = iter.next() - if (curr.isCompleted) { - // another thread has completed this operation, just remove it - iter.remove() - } else if (curr synchronized curr.tryComplete()) { - completed += 1 - iter.remove() - } + + val iter = operations.iterator() + while (iter.hasNext) { + val curr = iter.next() + if (curr.isCompleted) { + // another thread has completed this operation, just remove it + iter.remove() + } else if (curr.safeTryComplete()) { + iter.remove() + completed += 1 } } - if (operations.size == 0) + if (operations.isEmpty) removeKeyIfEmpty(key, this) completed @@ -328,18 +334,17 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, // traverse the list and purge elements that are already completed by others def purgeCompleted(): Int = { var purged = 0 - operations synchronized { - val iter = operations.iterator() - while (iter.hasNext) { - val curr = iter.next() - if (curr.isCompleted) { - iter.remove() - purged += 1 - } + + val iter = operations.iterator() + while (iter.hasNext) { + val curr = iter.next() + if (curr.isCompleted) { + iter.remove() + purged += 1 } } - if (operations.size == 0) + if (operations.isEmpty) removeKeyIfEmpty(key, this) purged http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala index ba89fc8..addd232 100644 --- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala +++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala @@ -237,7 +237,6 @@ object TestPurgatoryPerformance { } private class FakeOperation(delayMs: Long, size: Int, val latencyMs: Long, latch: CountDownLatch) extends DelayedOperation(delayMs) { - private[this] val data = new Array[Byte](size) val completesAt = System.currentTimeMillis + latencyMs def onExpiration(): Unit = {} http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index ae0d12f..dccde2f 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -62,8 +62,8 @@ class DelayedOperationTest { assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2"))) r1.awaitExpiration() val elapsed = SystemTime.hiResClockMs - start - assertTrue("r1 completed due to expiration", r1.isCompleted()) - assertFalse("r2 hasn't completed", r2.isCompleted()) + assertTrue("r1 completed due to expiration", r1.isCompleted) + assertFalse("r2 hasn't completed", r2.isCompleted) assertTrue(s"Time for expiration $elapsed should at least $expiration", elapsed >= expiration) }
