KAFKA-983; Expose MirrorMaker cleanShutdown method. Patched by Swapnil Ghike.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc5620cb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc5620cb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc5620cb Branch: refs/heads/trunk Commit: bc5620cbd270cd8ab443f567fee5203c66db253f Parents: 5cf6a54 Author: Joel Koshy <[email protected]> Authored: Wed Jul 24 18:03:52 2013 -0700 Committer: Joel Koshy <[email protected]> Committed: Wed Jul 24 18:05:26 2013 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/tools/MirrorMaker.scala | 29 ++++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bc5620cb/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index a85bfa2..c747bfb 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -31,6 +31,10 @@ import kafka.javaapi object MirrorMaker extends Logging { + private var connectors: Seq[ZookeeperConsumerConnector] = null + private var consumerThreads: Seq[MirrorMakerThread] = null + private var producerThreads: ListBuffer[ProducerThread] = null + def main(args: Array[String]) { info ("Starting mirror maker") @@ -112,7 +116,7 @@ object MirrorMaker extends Logging { new Producer[Array[Byte], Array[Byte]](config) }) - val connectors = options.valuesOf(consumerConfigOpt).toList + connectors = options.valuesOf(consumerConfigOpt).toList .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString))) .map(new ZookeeperConsumerConnector(_)) @@ -132,18 +136,9 @@ object MirrorMaker extends Logging { val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize); - val consumerThreads = - streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, producers, streamAndIndex._2)) - - val producerThreads = new ListBuffer[ProducerThread]() + consumerThreads = streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, producers, streamAndIndex._2)) - def cleanShutdown() { - connectors.foreach(_.shutdown) - consumerThreads.foreach(_.awaitShutdown) - producerThreads.foreach(_.shutdown) - producerThreads.foreach(_.awaitShutdown) - info("Kafka mirror maker shutdown successfully") - } + producerThreads = new ListBuffer[ProducerThread]() Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { @@ -168,6 +163,16 @@ object MirrorMaker extends Logging { cleanShutdown() } + def cleanShutdown() { + if (connectors != null) connectors.foreach(_.shutdown) + if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) + if (producerThreads != null) { + producerThreads.foreach(_.shutdown) + producerThreads.foreach(_.awaitShutdown) + } + info("Kafka mirror maker shutdown successfully") + } + class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]], producerDataChannel: ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]], producers: Seq[Producer[Array[Byte], Array[Byte]]],
