Repository: kafka Updated Branches: refs/heads/trunk 86eb74d92 -> 300565381
KAFKA-2454; Deadlock between log segment deletion and server shutdown. Author: Jiangjie Qin <becket....@gmail.com> Reviewers: Joel Koshy <jjkosh...@gmail.com> Closes #153 from becketqin/KAFKA-2454 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/30056538 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/30056538 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/30056538 Branch: refs/heads/trunk Commit: 30056538130ae1e2be35398b0ddd2ea04105bafd Parents: 86eb74d Author: Jiangjie Qin <becket....@gmail.com> Authored: Wed Oct 21 13:24:10 2015 -0700 Committer: Joel Koshy <jjko...@gmail.com> Committed: Wed Oct 21 13:24:10 2015 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/utils/KafkaScheduler.scala | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/30056538/core/src/main/scala/kafka/utils/KafkaScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index 5bab08d..641218e 100755 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -70,7 +70,7 @@ class KafkaScheduler(val threads: Int, daemon: Boolean = true) extends Scheduler with Logging { private var executor: ScheduledThreadPoolExecutor = null private val schedulerThreadId = new AtomicInteger(0) - + override def startup() { debug("Initializing task scheduler.") this synchronized { @@ -88,12 +88,14 @@ class KafkaScheduler(val threads: Int, override def shutdown() { debug("Shutting down task scheduler.") - this synchronized { - if(isStarted) { - executor.shutdown() - executor.awaitTermination(1, TimeUnit.DAYS) + // We use the local variable to avoid NullPointerException if another thread shuts down scheduler at same time. + val cachedExecutor = this.executor + if (cachedExecutor != null) { + this synchronized { + cachedExecutor.shutdown() this.executor = null } + cachedExecutor.awaitTermination(1, TimeUnit.DAYS) } } @@ -101,10 +103,10 @@ class KafkaScheduler(val threads: Int, debug("Scheduling task %s with initial delay %d ms and period %d ms." .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit))) this synchronized { - ensureStarted + ensureRunning val runnable = CoreUtils.runnable { try { - trace("Begining execution of scheduled task '%s'.".format(name)) + trace("Beginning execution of scheduled task '%s'.".format(name)) fun() } catch { case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t) @@ -125,8 +127,8 @@ class KafkaScheduler(val threads: Int, } } - private def ensureStarted = { + private def ensureRunning = { if(!isStarted) - throw new IllegalStateException("Kafka scheduler has not been started") + throw new IllegalStateException("Kafka scheduler is not running.") } }