KAFKA-829 Mirror maker needs to share the migration tool request channel; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/66b10389 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/66b10389 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/66b10389 Branch: refs/heads/trunk Commit: 66b103895720700c5ea47e8f75a1105aca6e9342 Parents: be3ce14 Author: Neha Narkhede <neha.narkh...@gmail.com> Authored: Thu Mar 28 09:54:02 2013 -0700 Committer: Neha Narkhede <neha.narkh...@gmail.com> Committed: Thu Mar 28 09:54:02 2013 -0700 ---------------------------------------------------------------------- config/producer.properties | 2 +- .../scala/kafka/javaapi/producer/Producer.scala | 4 +- .../scala/kafka/tools/KafkaMigrationTool.java | 25 +++---- .../main/scala/kafka/tools/MirrorMaker.scala | 79 ++++++++++++-------- 4 files changed, 64 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/66b10389/config/producer.properties ---------------------------------------------------------------------- diff --git a/config/producer.properties b/config/producer.properties index a1c8cb2..cc8f5f6 100644 --- a/config/producer.properties +++ b/config/producer.properties @@ -31,7 +31,7 @@ producer.type=sync compression.codec=none # message encoder -serializer.class=kafka.serializer.StringEncoder +serializer.class=kafka.serializer.DefaultEncoder # allow topic level compression #compressed.topics= http://git-wip-us.apache.org/repos/asf/kafka/blob/66b10389/core/src/main/scala/kafka/javaapi/producer/Producer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala index 424ef39..7265328 100644 --- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala +++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala @@ -26,7 +26,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for /** * Sends the data to a single topic, partitioned by key, using either the * synchronous or the asynchronous producer - * @param producerData the producer data object that encapsulates the topic, key and message data + * @param message the producer data object that encapsulates the topic, key and message data */ def send(message: KeyedMessage[K,V]) { underlying.send(message) @@ -34,7 +34,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for /** * Use this API to send data to multiple topics - * @param producerData list of producer data objects that encapsulate the topic, key and message data + * @param messages list of producer data objects that encapsulate the topic, key and message data */ def send(messages: java.util.List[KeyedMessage[K,V]]) { import collection.JavaConversions._ http://git-wip-us.apache.org/repos/asf/kafka/blob/66b10389/core/src/main/scala/kafka/tools/KafkaMigrationTool.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index eb63d75..a15b350 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -220,7 +220,7 @@ public class KafkaMigrationTool { kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); // create a producer channel instead int queueSize = options.valueOf(queueSizeOpt); - ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<String, byte[]>>(queueSize); + ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<byte[], byte[]>>(queueSize); int threadId = 0; Runtime.getRuntime().addShutdownHook(new Thread() { @@ -279,8 +279,7 @@ public class KafkaMigrationTool { } } - - private static class ProducerDataChannel<T> { + static class ProducerDataChannel<T> { private final int producerQueueSize; private final BlockingQueue<T> producerRequestQueue; @@ -300,14 +299,14 @@ public class KafkaMigrationTool { private static class MigrationThread extends Thread { private final Object stream; - private final ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel; + private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel; private final int threadId; private final String threadName; private final org.apache.log4j.Logger logger; private CountDownLatch shutdownComplete = new CountDownLatch(1); private final AtomicBoolean isRunning = new AtomicBoolean(true); - MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<String, byte[]>> _producerDataChannel, int _threadId) { + MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel, int _threadId) { stream = _stream; producerDataChannel = _producerDataChannel; threadId = _threadId; @@ -336,7 +335,7 @@ public class KafkaMigrationTool { ((ByteBuffer)payload_07).get(bytes); if(logger.isDebugEnabled()) logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic "+ topic); - KeyedMessage<String, byte[]> producerData = new KeyedMessage((String)topic, null, bytes); + KeyedMessage<byte[], byte[]> producerData = new KeyedMessage((String)topic, null, bytes); producerDataChannel.sendRequest(producerData); } logger.info("Migration thread " + threadName + " finished running"); @@ -362,17 +361,17 @@ public class KafkaMigrationTool { } } - private static class ProducerThread extends Thread { - private final ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel; - private final Producer<String, byte[]> producer; + static class ProducerThread extends Thread { + private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel; + private final Producer<byte[], byte[]> producer; private final int threadId; private String threadName; private org.apache.log4j.Logger logger; private CountDownLatch shutdownComplete = new CountDownLatch(1); - private KeyedMessage<String, byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null); + private KeyedMessage<byte[], byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null); - public ProducerThread(ProducerDataChannel<KeyedMessage<String, byte[]>> _producerDataChannel, - Producer<String, byte[]> _producer, + public ProducerThread(ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel, + Producer<byte[], byte[]> _producer, int _threadId) { producerDataChannel = _producerDataChannel; producer = _producer; @@ -385,7 +384,7 @@ public class KafkaMigrationTool { public void run() { try{ while(true) { - KeyedMessage<String, byte[]> data = producerDataChannel.receiveRequest(); + KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest(); if(!data.equals(shutdownMessage)) producer.send(data); else http://git-wip-us.apache.org/repos/asf/kafka/blob/66b10389/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 5c4b3d2..3d22dc7 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -24,6 +24,9 @@ import scala.collection.JavaConversions._ import java.util.concurrent.CountDownLatch import kafka.consumer._ import kafka.serializer._ +import collection.mutable.ListBuffer +import kafka.tools.KafkaMigrationTool.{ProducerThread, ProducerDataChannel} +import kafka.javaapi object MirrorMaker extends Logging { @@ -59,7 +62,13 @@ object MirrorMaker extends Logging { .describedAs("Number of threads") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - + + val bufferSizeOpt = parser.accepts("queue.size", "Number of messages that are buffered between the consumer and producer") + .withRequiredArg() + .describedAs("Queue size in terms of number of messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(10000); + val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to mirror.") .withRequiredArg() @@ -88,6 +97,7 @@ object MirrorMaker extends Logging { } val numStreams = options.valueOf(numStreamsOpt) + val bufferSize = options.valueOf(bufferSizeOpt).intValue() val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => { val config = new ProducerConfig( @@ -95,52 +105,63 @@ object MirrorMaker extends Logging { new Producer[Array[Byte], Array[Byte]](config) }) - val threads = { - val connectors = options.valuesOf(consumerConfigOpt).toList - .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString))) - .map(new ZookeeperConsumerConnector(_)) + val connectors = options.valuesOf(consumerConfigOpt).toList + .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString))) + .map(new ZookeeperConsumerConnector(_)) - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - connectors.foreach(_.shutdown()) - producers.foreach(_.close()) - } - }) + val filterSpec = if (options.has(whitelistOpt)) + new Whitelist(options.valueOf(whitelistOpt)) + else + new Blacklist(options.valueOf(blacklistOpt)) - val filterSpec = if (options.has(whitelistOpt)) - new Whitelist(options.valueOf(whitelistOpt)) - else - new Blacklist(options.valueOf(blacklistOpt)) + val streams = + connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())) - val streams = - connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())) + val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize); - streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2)) - } + val consumerThreads = + streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2)) + + val producerThreads = new ListBuffer[ProducerThread]() + + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + connectors.foreach(_.shutdown) + consumerThreads.foreach(_.awaitShutdown) + producerThreads.foreach(_.shutdown) + producerThreads.foreach(_.awaitShutdown) + logger.info("Kafka migration tool shutdown successfully"); + } + }) - threads.foreach(_.start()) + // create producer threads + var i: Int = 1 + for(producer <- producers) { + val producerThread: KafkaMigrationTool.ProducerThread = new KafkaMigrationTool.ProducerThread(producerDataChannel, + new javaapi.producer.Producer[Array[Byte], Array[Byte]](producer), i) + producerThreads += producerThread + i += 1 + } - threads.foreach(_.awaitShutdown()) + consumerThreads.foreach(_.start) + producerThreads.foreach(_.start) } class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]], - producers: Seq[Producer[Array[Byte], Array[Byte]]], + producerDataChannel: ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]], threadId: Int) extends Thread with Logging { private val shutdownLatch = new CountDownLatch(1) private val threadName = "mirrormaker-" + threadId - private val producerSelector = Utils.circularIterator(producers) this.setName(threadName) override def run() { try { for (msgAndMetadata <- stream) { - val producer = producerSelector.next() - val pd = new KeyedMessage[Array[Byte], Array[Byte]]( - msgAndMetadata.topic, msgAndMetadata.message) - producer.send(pd) + val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message) + producerDataChannel.sendRequest(pd) } } catch { case e => @@ -155,9 +176,7 @@ object MirrorMaker extends Logging { try { shutdownLatch.await() } catch { - case e: InterruptedException => fatal( - "Shutdown of thread %s interrupted. This might leak data!" - .format(threadName)) + case e: InterruptedException => fatal("Shutdown of thread %s interrupted. This might leak data!".format(threadName)) } } }