KAFKA-734 Migration tool needs a revamp, it was poorly written and has many performance bugs; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d925b157 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d925b157 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d925b157 Branch: refs/heads/trunk Commit: d925b157f42b13cb410a86e8850c23a11de3d2f1 Parents: ccfdabc Author: Neha Narkhede <neha.narkh...@gmail.com> Authored: Sun Feb 24 14:27:21 2013 -0800 Committer: Neha Narkhede <neha.narkh...@gmail.com> Committed: Sun Feb 24 14:27:21 2013 -0800 ---------------------------------------------------------------------- .../consumer/ZookeeperConsumerConnector.scala | 1 + core/src/main/scala/kafka/producer/Producer.scala | 3 +- .../main/scala/kafka/tools/KafkaMigrationTool.java | 364 ++++++++++----- core/src/main/scala/kafka/utils/Utils.scala | 2 +- .../test/scala/unit/kafka/utils/UtilsTest.scala | 5 +- .../scala/kafka/perf/ProducerPerformance.scala | 2 +- 6 files changed, 244 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 9db9a8b..c9e4127 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -646,6 +646,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val topicThreadId = e._1 val q = e._2._1 topicThreadIdAndQueues.put(topicThreadId, q) + debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString)) newGauge( config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize", new Gauge[Int] { http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/main/scala/kafka/producer/Producer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index f7d85b9..c837091 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -40,8 +40,7 @@ class Producer[K,V](val config: ProducerConfig, case "sync" => case "async" => sync = false - val asyncProducerID = random.nextInt(Int.MaxValue) - producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, + producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId, queue, eventHandler, config.queueBufferingMaxMs, http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/main/scala/kafka/tools/KafkaMigrationTool.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index dbbddae..f3a5095 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -22,13 +22,12 @@ import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.utils.Utils; -import scala.collection.Iterator; -import scala.collection.JavaConversions; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; @@ -36,27 +35,28 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; /** - * The kafka 07 to 08 online migration tool, it's used for migrating data from 07 to 08 cluster. Internally, - * it's composed of a kafka 07 consumer and kafka 08 producer. The kafka 07 consumer consumes data from the - * 07 cluster, and the kafka 08 producer produces data to the 08 cluster. + * This is a kafka 0.7 to 0.8 online migration tool used for migrating data from 0.7 to 0.8 cluster. Internally, + * it's composed of a kafka 0.7 consumer and kafka 0.8 producer. The kafka 0.7 consumer consumes data from the + * 0.7 cluster, and the kafka 0.8 producer produces data to the 0.8 cluster. * - * The 07 consumer is loaded from kafka 07 jar using a "parent last, child first" java class loader. - * Ordinary class loader is "parent first, child last", and kafka 08 and 07 both have classes for a lot of - * class names like "kafka.consumer.Consumer", etc., so ordinary java URLClassLoader with kafka 07 jar will - * will still load the 08 version class. + * The 0.7 consumer is loaded from kafka 0.7 jar using a "parent last, child first" java class loader. + * Ordinary class loader is "parent first, child last", and kafka 0.8 and 0.7 both have classes for a lot of + * class names like "kafka.consumer.Consumer", etc., so ordinary java URLClassLoader with kafka 0.7 jar will + * will still load the 0.8 version class. * - * As kafka 07 and kafka 08 used different version of zkClient, the zkClient jar used by kafka 07 should + * As kafka 0.7 and kafka 0.8 used different version of zkClient, the zkClient jar used by kafka 0.7 should * also be used by the class loader. * - * The user need to provide the configuration file for 07 consumer and 08 producer. For 08 producer, - * the "serializer.class" filed is set to "kafka.serializer.DefaultEncode" by the code. + * The user need to provide the configuration file for 0.7 consumer and 0.8 producer. For 0.8 producer, + * the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code. */ -public class KafkaMigrationTool -{ +public class KafkaMigrationTool { private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName()); private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer"; private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig"; @@ -80,90 +80,91 @@ public class KafkaMigrationTool private static Class<?> KafkaMessageAndMetatDataClass_07 = null; private static Class<?> KafkaMessageClass_07 = null; - public static void main(String[] args){ + public static void main(String[] args) throws InterruptedException, IOException { OptionParser parser = new OptionParser(); ArgumentAcceptingOptionSpec<String> consumerConfigOpt - = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source cluster. " + "You man specify multiple of these.") - .withRequiredArg() - .describedAs("config file") - .ofType(String.class); + = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. " + "You man specify multiple of these.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); ArgumentAcceptingOptionSpec<String> producerConfigOpt - = parser.accepts("producer.config", "Embedded producer config.") - .withRequiredArg() - .describedAs("config file") - .ofType(String.class); + = parser.accepts("producer.config", "Producer config.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); ArgumentAcceptingOptionSpec<Integer> numProducersOpt - = parser.accepts("num.producers", "Number of producer instances") - .withRequiredArg() - .describedAs("Number of producers") - .ofType(Integer.class) - .defaultsTo(1); + = parser.accepts("num.producers", "Number of producer instances") + .withRequiredArg() + .describedAs("Number of producers") + .ofType(Integer.class) + .defaultsTo(1); ArgumentAcceptingOptionSpec<String> zkClient01JarOpt - = parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file") - .withRequiredArg() - .describedAs("zkClient 0.1 jar file required by Kafka 0.7") - .ofType(String.class); + = parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file") + .withRequiredArg() + .describedAs("zkClient 0.1 jar file required by Kafka 0.7") + .ofType(String.class); ArgumentAcceptingOptionSpec<String> kafka07JarOpt - = parser.accepts("kafka.07.jar", "Kafka 0.7 jar file") - .withRequiredArg() - .describedAs("kafka 0.7 jar") - .ofType(String.class); + = parser.accepts("kafka.07.jar", "Kafka 0.7 jar file") + .withRequiredArg() + .describedAs("kafka 0.7 jar") + .ofType(String.class); ArgumentAcceptingOptionSpec<Integer> numStreamsOpt - = parser.accepts("num.streams", "Number of consumption streams.") - .withRequiredArg() - .describedAs("Number of threads") - .ofType(Integer.class) - .defaultsTo(1); + = parser.accepts("num.streams", "Number of consumer streams") + .withRequiredArg() + .describedAs("Number of consumer threads") + .ofType(Integer.class) + .defaultsTo(1); ArgumentAcceptingOptionSpec<String> whitelistOpt - = parser.accepts("whitelist", "Whitelist of topics to mirror.") - .withRequiredArg() - .describedAs("Java regex (String)") - .ofType(String.class); + = parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(String.class); ArgumentAcceptingOptionSpec<String> blacklistOpt - = parser.accepts("blacklist", "Blacklist of topics to mirror.") - .withRequiredArg() - .describedAs("Java regex (String)") - .ofType(String.class); + = parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(String.class); OptionSpecBuilder helpOpt - = parser.accepts("help", "Print this message."); + = parser.accepts("help", "Print this message."); OptionSet options = parser.parse(args); - try{ - if (options.has(helpOpt)){ - parser.printHelpOn(System.out); - System.exit(0); - } - - checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt}); - int whiteListCount = options.has(whitelistOpt) ? 1 : 0; - int blackListCount = options.has(blacklistOpt) ? 1 : 0; - if(whiteListCount + blackListCount != 1){ - System.err.println("Exactly one of whitelist or blacklist is required."); - System.exit(1); - } + if (options.has(helpOpt)) { + parser.printHelpOn(System.out); + System.exit(0); + } - String kafkaJarFile_07 = options.valueOf(kafka07JarOpt); - String zkClientJarFile = options.valueOf(zkClient01JarOpt); - String consumerConfigFile_07 = options.valueOf(consumerConfigOpt); - int numStreams = options.valueOf(numStreamsOpt); - String producerConfigFile_08 = options.valueOf(producerConfigOpt); - int numProducers = options.valueOf(numProducersOpt); + checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt}); + int whiteListCount = options.has(whitelistOpt) ? 1 : 0; + int blackListCount = options.has(blacklistOpt) ? 1 : 0; + if(whiteListCount + blackListCount != 1) { + System.err.println("Exactly one of whitelist or blacklist is required."); + System.exit(1); + } + String kafkaJarFile_07 = options.valueOf(kafka07JarOpt); + String zkClientJarFile = options.valueOf(zkClient01JarOpt); + String consumerConfigFile_07 = options.valueOf(consumerConfigOpt); + int numConsumers = options.valueOf(numStreamsOpt); + String producerConfigFile_08 = options.valueOf(producerConfigOpt); + int numProducers = options.valueOf(numProducersOpt); + final List<MigrationThread> migrationThreads = new ArrayList<MigrationThread>(numConsumers); + final List<ProducerThread> producerThreads = new ArrayList<ProducerThread>(numProducers); + try { File kafkaJar_07 = new File(kafkaJarFile_07); File zkClientJar = new File(zkClientJarFile); - ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[]{ - kafkaJar_07.toURI().toURL(), - zkClientJar.toURI().toURL() + ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[] { + kafkaJar_07.toURI().toURL(), + zkClientJar.toURI().toURL() }); /** Construct the 07 consumer config **/ @@ -182,7 +183,7 @@ public class KafkaMigrationTool Properties kafkaConsumerProperties_07 = new Properties(); kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07)); /** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/ - if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")){ + if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")) { logger.warn("Shallow iterator should not be used in the migration tool"); kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false"); } @@ -190,51 +191,73 @@ public class KafkaMigrationTool /** Construct the 07 consumer connector **/ Method ConsumerConnectorCreationMethod_07 = KafkaStaticConsumer_07.getMethod("createJavaConsumerConnector", ConsumerConfig_07); - - Object consumerConnector_07 = ConsumerConnectorCreationMethod_07.invoke(null, consumerConfig_07); - + final Object consumerConnector_07 = ConsumerConnectorCreationMethod_07.invoke(null, consumerConfig_07); Method ConsumerConnectorCreateMessageStreamsMethod_07 = ConsumerConnector_07.getMethod( - "createMessageStreamsByFilter", - TopicFilter_07, int.class); - - + "createMessageStreamsByFilter", + TopicFilter_07, int.class); + final Method ConsumerConnectorShutdownMethod_07 = ConsumerConnector_07.getMethod("shutdown"); Constructor WhiteListConstructor_07 = WhiteList_07.getConstructor(String.class); Constructor BlackListConstructor_07 = BlackList_07.getConstructor(String.class); Object filterSpec = null; - if(options.has(whitelistOpt)) filterSpec = WhiteListConstructor_07.newInstance(options.valueOf(whitelistOpt)); else filterSpec = BlackListConstructor_07.newInstance(options.valueOf(blacklistOpt)); - Object retKafkaStreams = ConsumerConnectorCreateMessageStreamsMethod_07.invoke(consumerConnector_07, filterSpec, numStreams); + Object retKafkaStreams = ConsumerConnectorCreateMessageStreamsMethod_07.invoke(consumerConnector_07, filterSpec, numConsumers); Properties kafkaProducerProperties_08 = new Properties(); kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08)); kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); - ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08); + // create a producer channel instead + ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<String, byte[]>>(numProducers); + int threadId = 0; - List<Producer> producers = new ArrayList<Producer>(); - for (int i = 0; i < numProducers; i++){ - producers.add(new Producer(producerConfig_08)); - } + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + ConsumerConnectorShutdownMethod_07.invoke(consumerConnector_07); + } catch(Exception e) { + logger.error("Error while shutting down Kafka consumer", e); + } + for(MigrationThread migrationThread : migrationThreads) { + migrationThread.shutdown(); + } + for(ProducerThread producerThread : producerThreads) { + producerThread.shutdown(); + } + for(ProducerThread producerThread : producerThreads) { + producerThread.awaitShutdown(); + } + logger.info("Kafka migration tool shutdown successfully"); + } + }); - int threadId = 0; - for(Object stream: (List)retKafkaStreams){ - MigrationThread thread = new MigrationThread(stream, producers, threadId); + // start consumer threads + for(Object stream: (List)retKafkaStreams) { + MigrationThread thread = new MigrationThread(stream, producerDataChannel, threadId); threadId ++; thread.start(); + migrationThreads.add(thread); + } + // start producer threads + for (int i = 0; i < numProducers; i++) { + kafkaProducerProperties_08.put("client.id", String.valueOf(i) + "-" + i); + ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08); + Producer producer = new Producer(producerConfig_08); + ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i); + producerThread.start(); + producerThreads.add(producerThread); } } catch (Throwable e){ - System.out.println("Kafka migration tool failed because of " + e); - e.printStackTrace(System.out); + System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e)); logger.error("Kafka migration tool failed: ", e); } } - private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException - { + private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException { for(OptionSpec arg : required) { if(!options.has(arg)) { System.err.println("Missing required argument \"" + arg + "\""); @@ -245,37 +268,53 @@ public class KafkaMigrationTool } - private static class MigrationThread extends Thread{ - private Object stream; - private List<Producer> producers; - private int threadId; - private String threadName; - private org.apache.log4j.Logger logger; + private static class ProducerDataChannel<T> { + private final int producerQueueSize; + private final BlockingQueue<T> producerRequestQueue; + + public ProducerDataChannel(int queueSize) { + producerQueueSize = queueSize; + producerRequestQueue = new ArrayBlockingQueue<T>(producerQueueSize); + } + + public void sendRequest(T data) throws InterruptedException { + producerRequestQueue.put(data); + } + + public T receiveRequest() throws InterruptedException { + return producerRequestQueue.take(); + } + } + + private static class MigrationThread extends Thread { + private final Object stream; + private final ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel; + private final int threadId; + private final String threadName; + private final org.apache.log4j.Logger logger; + private CountDownLatch shutdownComplete = new CountDownLatch(1); + private final AtomicBoolean isRunning = new AtomicBoolean(true); - MigrationThread(Object _stream, List<Producer> _producers, int _threadId){ + MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<String, byte[]>> _producerDataChannel, int _threadId) { stream = _stream; - producers = _producers; + producerDataChannel = _producerDataChannel; threadId = _threadId; threadName = "MigrationThread-" + threadId; - logger = org.apache.log4j.Logger.getLogger(threadName); + logger = org.apache.log4j.Logger.getLogger(MigrationThread.class.getName()); this.setName(threadName); } - public void run(){ - try{ + public void run() { + try { Method MessageGetPayloadMethod_07 = KafkaMessageClass_07.getMethod("payload"); Method KafkaGetMessageMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("message"); Method KafkaGetTopicMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("topic"); - Method ConsumerIteratorMethod = KafkaStream_07.getMethod("iterator"); Method KafkaStreamHasNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("hasNext"); Method KafkaStreamNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("next"); - Object iterator = ConsumerIteratorMethod.invoke(stream); - Iterator<Producer> producerCircularIterator = Utils.circularIterator(JavaConversions.asBuffer(producers)); - - while (((Boolean) KafkaStreamHasNextMethod_07.invoke(iterator)).booleanValue()){ + while (((Boolean) KafkaStreamHasNextMethod_07.invoke(iterator)).booleanValue()) { Object messageAndMetaData_07 = KafkaStreamNextMethod_07.invoke(iterator); Object message_07 = KafkaGetMessageMethod_07.invoke(messageAndMetaData_07); Object topic = KafkaGetTopicMethod_07.invoke(messageAndMetaData_07); @@ -283,36 +322,106 @@ public class KafkaMigrationTool int size = ((ByteBuffer)payload_07).remaining(); byte[] bytes = new byte[size]; ((ByteBuffer)payload_07).get(bytes); - logger.debug(String.format("Send kafka 08 message of size %d to topic %s", bytes.length, topic)); + if(logger.isDebugEnabled()) + logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic "+ topic); KeyedMessage<String, byte[]> producerData = new KeyedMessage((String)topic, null, bytes); - Producer nextProducer = producerCircularIterator.next(); - nextProducer.send(producerData); + producerDataChannel.sendRequest(producerData); } - logger.info(String.format("Migration thread %s finishes running", threadName)); + logger.info("Migration thread " + threadName + " finished running"); + } catch (InvocationTargetException t){ + logger.fatal("Migration thread failure due to root cause ", t.getCause()); } catch (Throwable t){ logger.fatal("Migration thread failure due to ", t); - t.printStackTrace(System.out); + } finally { + shutdownComplete.countDown(); + } + } + + public void shutdown() { + logger.info("Migration thread " + threadName + " shutting down"); + isRunning.set(false); + interrupt(); + try { + shutdownComplete.await(); + } catch(InterruptedException ie) { + logger.warn("Interrupt during shutdown of MigrationThread", ie); } + logger.info("Migration thread " + threadName + " shutdown complete"); } } + private static class ProducerThread extends Thread { + private final ProducerDataChannel<KeyedMessage<String, byte[]>> producerDataChannel; + private final Producer<String, byte[]> producer; + private final int threadId; + private String threadName; + private org.apache.log4j.Logger logger; + private CountDownLatch shutdownComplete = new CountDownLatch(1); + private KeyedMessage<String, byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null); + + public ProducerThread(ProducerDataChannel<KeyedMessage<String, byte[]>> _producerDataChannel, + Producer<String, byte[]> _producer, + int _threadId) { + producerDataChannel = _producerDataChannel; + producer = _producer; + threadId = _threadId; + threadName = "ProducerThread-" + threadId; + logger = org.apache.log4j.Logger.getLogger(ProducerThread.class.getName()); + this.setName(threadName); + } + + public void run() { + try{ + while(true) { + KeyedMessage<String, byte[]> data = producerDataChannel.receiveRequest(); + if(!data.equals(shutdownMessage)) + producer.send(data); + else + break; + } + logger.info("Producer thread " + threadName + " finished running"); + } catch (Throwable t){ + logger.fatal("Producer thread failure due to ", t); + } finally { + shutdownComplete.countDown(); + } + } + + public void shutdown() { + try { + logger.info("Producer thread " + threadName + " shutting down"); + producerDataChannel.sendRequest(shutdownMessage); + } catch(InterruptedException ie) { + logger.warn("Interrupt during shutdown of ProducerThread", ie); + } + } + + public void awaitShutdown() { + try { + shutdownComplete.await(); + logger.info("Producer thread " + threadName + " shutdown complete"); + } catch(InterruptedException ie) { + logger.warn("Interrupt during shutdown of ProducerThread", ie); + } + } + } /** - * A parent-last classloader that will try the child classloader first and then the parent. + * A parent-last class loader that will try the child class loader first and then the parent. * This takes a fair bit of doing because java really prefers parent-first. */ - private static class ParentLastURLClassLoader extends ClassLoader{ + private static class ParentLastURLClassLoader extends ClassLoader { private ChildURLClassLoader childClassLoader; /** - * This class allows me to call findClass on a classloader + * This class allows me to call findClass on a class loader */ - private static class FindClassClassLoader extends ClassLoader{ - public FindClassClassLoader(ClassLoader parent){ + private static class FindClassClassLoader extends ClassLoader { + public FindClassClassLoader(ClassLoader parent) { super(parent); } @Override - public Class<?> findClass(String name) throws ClassNotFoundException{ + public Class<?> findClass(String name) throws ClassNotFoundException { return super.findClass(name); } } @@ -327,14 +436,15 @@ public class KafkaMigrationTool super(urls, null); this.realParent = realParent; } + @Override - public Class<?> findClass(String name) throws ClassNotFoundException{ + public Class<?> findClass(String name) throws ClassNotFoundException { try{ // first try to use the URLClassLoader findClass return super.findClass(name); } - catch( ClassNotFoundException e ){ - // if that fails, we ask our real parent classloader to load the class (we give up) + catch( ClassNotFoundException e ) { + // if that fails, we ask our real parent class loader to load the class (we give up) return realParent.loadClass(name); } } @@ -347,11 +457,11 @@ public class KafkaMigrationTool @Override protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { - try{ - // first we try to find a class inside the child classloader + try { + // first we try to find a class inside the child class loader return childClassLoader.findClass(name); } - catch( ClassNotFoundException e ){ + catch( ClassNotFoundException e ) { // didn't find it, try the parent return super.loadClass(name, resolve); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/main/scala/kafka/utils/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 942d6c3..fe4c925 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -25,6 +25,7 @@ import java.lang.management._ import java.util.zip.CRC32 import javax.management._ import scala.collection._ +import mutable.ListBuffer import scala.collection.mutable import java.util.Properties import kafka.common.KafkaException @@ -572,5 +573,4 @@ object Utils extends Logging { * This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). */ def abs(n: Int) = n & 0x7fffffff - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/core/src/test/scala/unit/kafka/utils/UtilsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index cce6c8e..0b6244f 100644 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -21,9 +21,10 @@ import java.util.Arrays import java.nio.ByteBuffer import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite -import org.junit.Test import org.junit.Assert._ import kafka.common.KafkaException +import org.junit.{Test} +import kafka.tools.KafkaMigrationTool class UtilsTest extends JUnitSuite { @@ -53,7 +54,7 @@ class UtilsTest extends JUnitSuite { assertEquals(2, its.next()) assertEquals(1, its.next()) } - + @Test def testReadBytes() { for(testCase <- List("", "a", "abcd")) { http://git-wip-us.apache.org/repos/asf/kafka/blob/d925b157/perf/src/main/scala/kafka/perf/ProducerPerformance.scala ---------------------------------------------------------------------- diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index 507743e..851a99e 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -246,7 +246,7 @@ object ProducerPerformance extends Logging { while(j < messagesPerThread) { try { config.topics.foreach( - topic =>{ + topic => { val (producerData, bytesSent_) = generateProducerData(topic, j) bytesSent += bytesSent_ producer.send(producerData)