kafka-1926; Replace kafka.utils.Utils with o.a.k.common.utils.Utils; patched by 
Tong Li; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c23d935
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c23d935
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c23d935

Branch: refs/heads/trunk
Commit: 9c23d93553a33c5d85231193614d192a9945796e
Parents: 53f3143
Author: Tong Li <liton...@us.ibm.com>
Authored: Sun Apr 5 21:46:11 2015 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Sun Apr 5 21:46:11 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/utils/Utils.java    | 124 +++-
 .../apache/kafka/common/utils/UtilsTest.java    |  33 +
 core/src/main/scala/kafka/Kafka.scala           |   3 +-
 .../kafka/admin/ConsumerGroupCommand.scala      |   1 +
 .../PreferredReplicaLeaderElectionCommand.scala |   4 +-
 .../kafka/admin/ReassignPartitionsCommand.scala |   9 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |   3 +-
 .../main/scala/kafka/client/ClientUtils.scala   |   4 +-
 core/src/main/scala/kafka/cluster/Broker.scala  |  16 +-
 .../main/scala/kafka/cluster/Partition.scala    |   2 +-
 .../kafka/consumer/ConsumerFetcherManager.scala |   2 +-
 .../scala/kafka/consumer/ConsumerIterator.scala |   2 +-
 .../kafka/consumer/PartitionAssignor.scala      |   4 +-
 .../main/scala/kafka/consumer/TopicCount.scala  |   4 +-
 .../consumer/ZookeeperConsumerConnector.scala   |   2 +-
 .../controller/ControllerChannelManager.scala   |   4 +-
 .../kafka/controller/KafkaController.scala      |   2 +-
 .../controller/PartitionStateMachine.scala      |   3 +-
 .../kafka/controller/ReplicaStateMachine.scala  |   2 +-
 .../kafka/controller/TopicDeletionManager.scala |   2 +-
 .../main/scala/kafka/log/FileMessageSet.scala   |   6 +-
 core/src/main/scala/kafka/log/Log.scala         |   6 +-
 .../scala/kafka/log/LogCleanerManager.scala     |   2 +-
 core/src/main/scala/kafka/log/LogConfig.scala   |   2 +-
 core/src/main/scala/kafka/log/LogManager.scala  |   8 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |   8 +-
 core/src/main/scala/kafka/log/OffsetIndex.scala |   8 +-
 core/src/main/scala/kafka/log/OffsetMap.scala   |   3 +-
 core/src/main/scala/kafka/message/Message.scala |   3 +-
 .../kafka/message/MessageAndMetadata.scala      |   2 +-
 .../scala/kafka/message/MessageWriter.scala     |   2 +-
 .../kafka/metrics/KafkaCSVMetricsReporter.scala |   4 +-
 .../kafka/metrics/KafkaMetricsConfig.scala      |   4 +-
 .../kafka/metrics/KafkaMetricsReporter.scala    |   6 +-
 .../network/BoundedByteBufferReceive.scala      |   4 +-
 .../main/scala/kafka/network/SocketServer.scala |   1 +
 .../kafka/producer/ByteArrayPartitioner.scala   |   1 +
 .../kafka/producer/DefaultPartitioner.scala     |   1 +
 .../main/scala/kafka/producer/Producer.scala    |   6 +-
 .../scala/kafka/producer/ProducerConfig.scala   |   6 +-
 .../producer/async/DefaultEventHandler.scala    |   9 +-
 .../kafka/server/AbstractFetcherManager.scala   |   5 +-
 .../kafka/server/AbstractFetcherThread.scala    |   2 +-
 .../kafka/server/BrokerMetadataCheckpoint.scala |   2 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  18 +-
 .../kafka/server/KafkaRequestHandler.scala      |   1 +
 .../main/scala/kafka/server/KafkaServer.scala   |  20 +-
 .../main/scala/kafka/server/MetadataCache.scala |   2 +-
 .../main/scala/kafka/server/OffsetManager.scala |   1 +
 .../kafka/server/ZookeeperLeaderElector.scala   |   2 +-
 .../scala/kafka/tools/ConsoleConsumer.scala     |   1 +
 .../scala/kafka/tools/DumpLogSegments.scala     |   5 +-
 .../scala/kafka/tools/KafkaMigrationTool.java   |   2 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    |  14 +-
 .../scala/kafka/tools/SimpleConsumerShell.scala |   1 +
 .../kafka/tools/StateChangeLogMerger.scala      |   4 +-
 .../scala/kafka/tools/TestEndToEndLatency.scala |  92 ---
 .../scala/kafka/tools/TestLogCleaning.scala     | 311 ---------
 .../scala/kafka/tools/UpdateOffsetsInZK.scala   |   4 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala | 347 ++++++++++
 core/src/main/scala/kafka/utils/Crc32.java      | 637 -------------------
 .../main/scala/kafka/utils/KafkaScheduler.scala |   4 +-
 .../scala/kafka/utils/Log4jController.scala     |   2 +-
 core/src/main/scala/kafka/utils/Logging.scala   |  10 +-
 core/src/main/scala/kafka/utils/Utils.scala     | 619 ------------------
 .../kafka/utils/VerifiableProperties.scala      |   2 +-
 .../kafka/api/ProducerCompressionTest.scala     |   4 +-
 .../scala/kafka/tools/TestEndToEndLatency.scala |  91 +++
 .../scala/kafka/tools/TestLogCleaning.scala     | 311 +++++++++
 .../test/scala/other/kafka/DeleteZKPath.scala   |   3 +-
 .../test/scala/other/kafka/StressTestLog.scala  |   2 +-
 .../scala/other/kafka/TestCrcPerformance.scala  |   3 +-
 .../other/kafka/TestLinearWriteSpeed.scala      |   4 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |   4 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |   2 +-
 .../kafka/consumer/ConsumerIteratorTest.scala   |   2 +-
 .../integration/KafkaServerTestHarness.scala    |   4 +-
 .../kafka/integration/PrimitiveApiTest.scala    |  12 +-
 .../kafka/integration/RollingBounceTest.scala   |   4 +-
 .../integration/UncleanLeaderElectionTest.scala |   4 +-
 .../unit/kafka/log/BrokerCompressionTest.scala  |   2 +-
 .../test/scala/unit/kafka/log/CleanerTest.scala |   5 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |   6 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |   4 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |   4 +-
 .../kafka/log4j/KafkaLog4jAppenderTest.scala    |   4 +-
 .../scala/unit/kafka/message/MessageTest.scala  |   3 +-
 .../unit/kafka/producer/AsyncProducerTest.scala |   4 +-
 .../unit/kafka/producer/ProducerTest.scala      |   4 +-
 .../unit/kafka/server/AdvertiseBrokerTest.scala |   4 +-
 .../server/HighwatermarkPersistenceTest.scala   |   4 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |  10 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |   4 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |   2 +-
 .../unit/kafka/server/LogRecoveryTest.scala     |   4 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |   2 +-
 .../server/ServerGenerateBrokerIdTest.scala     |  14 +-
 .../unit/kafka/server/ServerShutdownTest.scala  |  12 +-
 .../unit/kafka/server/ServerStartupTest.scala   |   6 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  14 +-
 .../test/scala/unit/kafka/utils/UtilsTest.scala |  33 +-
 .../scala/unit/kafka/zk/EmbeddedZookeeper.scala |  10 +-
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |   6 +-
 103 files changed, 1154 insertions(+), 1883 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
old mode 100644
new mode 100755
index 39e8d7c..f73eedb
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -14,15 +14,26 @@ package org.apache.kafka.common.utils;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
+import java.io.FileNotFoundException;
+import java.io.StringWriter;
+import java.io.PrintWriter;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.Properties;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.kafka.common.KafkaException;
 
 public class Utils {
@@ -33,6 +44,8 @@ public class Utils {
 
     public static final String NL = System.getProperty("line.separator");
 
+    private static final Logger log = LoggerFactory.getLogger(Utils.class);
+
     /**
      * Turn the given UTF8 byte array into a string
      * 
@@ -330,7 +343,7 @@ public class Utils {
                 ? "[" + host + "]:" + port // IPv6
                 : host + ":" + port;
     }
-    
+
     /**
      * Create a string representation of an array joined by the given separator
      * @param strs The array of items
@@ -357,4 +370,113 @@ public class Utils {
         }
         return sb.toString();
     }
+
+    /**
+     * Read a properties file from the given path
+     * @param filename The path of the file to read
+     */
+    public static Properties loadProps(String filename) throws IOException, 
FileNotFoundException {
+        Properties props = new Properties();
+        InputStream propStream = null;
+        try {
+            propStream = new FileInputStream(filename);
+            props.load(propStream);
+        } finally {
+            if (propStream != null)
+                propStream.close();
+        }
+        return props;
+    }
+
+    /**
+     * Get the stack trace from an exception as a string
+     */
+    public static String stackTrace(Throwable e) {
+        StringWriter sw = new StringWriter();
+        PrintWriter pw = new PrintWriter(sw);
+        e.printStackTrace(pw);
+        return sw.toString();
+    }
+
+    /**
+     * Create a new thread
+     * @param name The name of the thread
+     * @param runnable The work for the thread to do
+     * @param daemon Should the thread block JVM shutdown?
+     * @return The unstarted thread
+     */
+    public static Thread newThread(String name, Runnable runnable, Boolean 
daemon) {
+        Thread thread = new Thread(runnable, name);
+        thread.setDaemon(daemon);
+        thread.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
+            public void uncaughtException(Thread t, Throwable e) {
+                log.error("Uncaught exception in thread '" + t.getName() + 
"':", e);
+            }
+        });
+        return thread;
+    }
+
+    /**
+     * Create a daemon thread
+     * @param name The name of the thread
+     * @param runnable The runnable to execute in the background
+     * @return The unstarted thread
+     */
+    public static Thread daemonThread(String name, Runnable runnable) {
+        return newThread(name, runnable, true);
+    }
+
+    /**
+     * Print an error message and shutdown the JVM
+     * @param message The error message
+     */
+    public static void croak(String message) {
+        System.err.println(message);
+        System.exit(1);
+    }
+
+    /**
+     * Read a buffer into a Byte array for the given offset and length
+     */
+    public static byte[] readBytes(ByteBuffer buffer, int offset, int length) {
+        byte[] dest = new byte[length];
+        if (buffer.hasArray()) {
+            System.arraycopy(buffer.array(), buffer.arrayOffset() + offset, 
dest, 0, length);
+        } else {
+            buffer.mark();
+            buffer.position(offset);
+            buffer.get(dest, 0, length);
+            buffer.reset();
+        }
+        return dest;
+    }
+
+    /**
+     * Read the given byte buffer into a Byte array
+     */
+    public static byte[] readBytes(ByteBuffer buffer) {
+        return Utils.readBytes(buffer, 0, buffer.limit());
+    }
+
+    /**
+     * Attempt to read a file as a string
+     * @throws IOException 
+     */
+    public static String readFileAsString(String path, Charset charset) throws 
IOException {
+        if (charset == null) charset = Charset.defaultCharset();
+        FileInputStream stream = new FileInputStream(new File(path));
+        String result = new String();
+        try {
+            FileChannel fc = stream.getChannel();
+            MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, 
fc.size());
+            result = charset.decode(bb).toString();
+        } finally {
+            stream.close();
+        }
+        return result;
+    }
+
+    public static String readFileAsString(String path) throws IOException {
+        return Utils.readFileAsString(path, Charset.defaultCharset());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
old mode 100644
new mode 100755
index 4b706d7..2ebe3c2
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.utils;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.nio.ByteBuffer;
 
 import org.junit.Test;
 
@@ -67,4 +68,36 @@ public class UtilsTest {
         assertEquals(0, Utils.abs(0));
         assertEquals(1, Utils.abs(-1));
     }
+
+    private void subTest(ByteBuffer buffer) {
+        // The first byte should be 'A'
+        assertEquals('A', (Utils.readBytes(buffer, 0, 1))[0]);
+
+        // The offset is 2, so the first 2 bytes should be skipped.
+        byte[] results = Utils.readBytes(buffer, 2, 3);
+        assertEquals('y', results[0]);
+        assertEquals(' ', results[1]);
+        assertEquals('S', results[2]);
+        assertEquals(3, results.length);
+
+        // test readBytes without offset and length specified.
+        results = Utils.readBytes(buffer);
+        assertEquals('A', results[0]);
+        assertEquals('t', results[buffer.limit() - 1]);
+        assertEquals(buffer.limit(), results.length);
+    }
+
+    @Test
+    public void testReadBytes() {
+        byte[] myvar = "Any String you want".getBytes();
+        ByteBuffer buffer = ByteBuffer.allocate(myvar.length);
+        buffer.put(myvar);
+        buffer.rewind();
+
+        this.subTest(buffer);
+
+        // test readonly buffer, different path
+        buffer = ByteBuffer.wrap(myvar).asReadOnlyBuffer();
+        this.subTest(buffer);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala 
b/core/src/main/scala/kafka/Kafka.scala
old mode 100644
new mode 100755
index 37de7df..fb860e7
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -21,7 +21,8 @@ import scala.collection.JavaConversions._
 import joptsimple.OptionParser
 import metrics.KafkaMetricsReporter
 import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
-import kafka.utils.{VerifiableProperties, CommandLineUtils, Utils, Logging}
+import kafka.utils.{VerifiableProperties, CommandLineUtils, Logging}
+import org.apache.kafka.common.utils.Utils
 
 object Kafka extends Logging {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
old mode 100644
new mode 100755
index 89fa29a..1c3b380
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -30,6 +30,7 @@ import joptsimple.{OptionSpec, OptionParser}
 import scala.collection.{Set, mutable}
 import kafka.consumer.SimpleConsumer
 import collection.JavaConversions._
+import org.apache.kafka.common.utils.Utils
 
 
 object ConsumerGroupCommand {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala 
b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
old mode 100644
new mode 100755
index 79b5e0a..3b3cd67
--- 
a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ 
b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -22,7 +22,7 @@ import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import kafka.common.{TopicAndPartition, AdminCommandFailedException}
 import collection._
-import mutable.ListBuffer
+import org.apache.kafka.common.utils.Utils
 
 object PreferredReplicaLeaderElectionCommand extends Logging {
 
@@ -84,7 +84,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
               val partition = p.get("partition").get.asInstanceOf[Int]
               TopicAndPartition(topic, partition)
             }
-            val duplicatePartitions = Utils.duplicates(partitions)
+            val duplicatePartitions = CoreUtils.duplicates(partitions)
             val partitionsSet = partitions.toSet
             if (duplicatePartitions.nonEmpty)
               throw new AdminOperationException("Preferred replica election 
data contains duplicate partitions: 
%s".format(duplicatePartitions.mkString(",")))

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
old mode 100644
new mode 100755
index 979992b..bbe3362
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -22,6 +22,7 @@ import collection._
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import kafka.common.{TopicAndPartition, AdminCommandFailedException}
+import org.apache.kafka.common.utils.Utils
 
 object ReassignPartitionsCommand extends Logging {
 
@@ -81,12 +82,12 @@ object ReassignPartitionsCommand extends Logging {
       CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is 
used, command must include both --topics-to-move-json-file and --broker-list 
options")
     val topicsToMoveJsonFile = 
opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
     val brokerListToReassign = 
opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt)
-    val duplicateReassignments = Utils.duplicates(brokerListToReassign)
+    val duplicateReassignments = CoreUtils.duplicates(brokerListToReassign)
     if (duplicateReassignments.nonEmpty)
       throw new AdminCommandFailedException("Broker list contains duplicate 
entries: %s".format(duplicateReassignments.mkString(",")))
     val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
     val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
-    val duplicateTopicsToReassign = Utils.duplicates(topicsToReassign)
+    val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
     if (duplicateTopicsToReassign.nonEmpty)
       throw new AdminCommandFailedException("List of topics to reassign 
contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
     val topicPartitionsToReassign = 
ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
@@ -112,11 +113,11 @@ object ReassignPartitionsCommand extends Logging {
     val partitionsToBeReassigned = 
ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
     if (partitionsToBeReassigned.isEmpty)
       throw new AdminCommandFailedException("Partition reassignment data file 
%s is empty".format(reassignmentJsonFile))
-    val duplicateReassignedPartitions = 
Utils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas) => tp})
+    val duplicateReassignedPartitions = 
CoreUtils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas) => tp})
     if (duplicateReassignedPartitions.nonEmpty)
       throw new AdminCommandFailedException("Partition reassignment contains 
duplicate topic partitions: 
%s".format(duplicateReassignedPartitions.mkString(",")))
     val duplicateEntries= partitionsToBeReassigned
-      .map{ case(tp,replicas) => (tp, Utils.duplicates(replicas))}
+      .map{ case(tp,replicas) => (tp, CoreUtils.duplicates(replicas))}
       .filter{ case (tp,duplicatedReplicas) => duplicatedReplicas.nonEmpty }
     if (duplicateEntries.nonEmpty) {
       val duplicatesMsg = duplicateEntries

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala 
b/core/src/main/scala/kafka/admin/TopicCommand.scala
old mode 100644
new mode 100755
index e36a9d1..60f0228
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -28,6 +28,7 @@ import scala.collection.JavaConversions._
 import kafka.log.LogConfig
 import kafka.consumer.Whitelist
 import kafka.server.OffsetManager
+import org.apache.kafka.common.utils.Utils
 
 
 object TopicCommand {
@@ -228,7 +229,7 @@ object TopicCommand {
     val ret = new mutable.HashMap[Int, List[Int]]()
     for (i <- 0 until partitionList.size) {
       val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
-      val duplicateBrokers = Utils.duplicates(brokerList)
+      val duplicateBrokers = CoreUtils.duplicates(brokerList)
       if (duplicateBrokers.nonEmpty)
         throw new AdminCommandFailedException("Partition replica lists may not 
contain duplicate entries: %s".format(duplicateBrokers.mkString(",")))
       ret.put(i, brokerList.toList)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala 
b/core/src/main/scala/kafka/client/ClientUtils.scala
old mode 100644
new mode 100755
index ad4c9d2..f08aaf2
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -23,7 +23,7 @@ import kafka.cluster._
 import kafka.api._
 import kafka.producer._
 import kafka.common.{ErrorMapping, KafkaException}
-import kafka.utils.{Utils, Logging}
+import kafka.utils.{CoreUtils, Logging}
 import java.util.Properties
 import util.Random
 import kafka.network.BlockingChannel
@@ -98,7 +98,7 @@ object ClientUtils extends Logging{
    * Parse a list of broker urls in the form host1:port1, host2:port2, ... 
    */
   def parseBrokerList(brokerListStr: String): Seq[BrokerEndpoint] = {
-    val brokersStr = Utils.parseCsvList(brokerListStr)
+    val brokersStr = CoreUtils.parseCsvList(brokerListStr)
 
     brokersStr.zipWithIndex.map { case (address, brokerId) =>
       BrokerEndpoint.createBrokerEndPoint(brokerId, address)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala 
b/core/src/main/scala/kafka/cluster/Broker.scala
old mode 100644
new mode 100755
index 3933bb3..8e603b6
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -17,11 +17,13 @@
 
 package kafka.cluster
 
+import kafka.utils.CoreUtils._
+import kafka.utils.Json
+import kafka.api.ApiUtils._
 import java.nio.ByteBuffer
 
 import kafka.common.{BrokerEndPointNotAvailableException, 
BrokerNotAvailableException, KafkaException}
 import kafka.utils.Json
-import kafka.utils.Utils._
 import org.apache.kafka.common.protocol.SecurityProtocol
 
 /**
@@ -139,14 +141,4 @@ case class Broker(id: Int, endPoints: 
Map[SecurityProtocol, EndPoint]) {
     }
   }
 
-  override def equals(obj: Any): Boolean = {
-    obj match {
-      case null => false
-      // Yes, Scala compares lists element by element
-      case n: Broker => id == n.id && endPoints == n.endPoints
-      case _ => false
-    }
-  }
-
-  override def hashCode(): Int = hashcode(id, endPoints)
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
old mode 100644
new mode 100755
index 6d142d6..3fb549c
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -18,7 +18,7 @@ package kafka.cluster
 
 import kafka.common._
 import kafka.utils._
-import kafka.utils.Utils.{inReadLock,inWriteLock}
+import kafka.utils.CoreUtils.{inReadLock,inWriteLock}
 import kafka.admin.AdminUtils
 import kafka.api.{PartitionStateInfo, LeaderAndIsr}
 import kafka.log.LogConfig

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
old mode 100644
new mode 100755
index 484a57f..6bb0d56
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -25,7 +25,7 @@ import scala.collection.immutable
 import collection.mutable.HashMap
 import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
-import kafka.utils.Utils.inLock
+import kafka.utils.CoreUtils.inLock
 import kafka.utils.ZkUtils._
 import kafka.utils.{ShutdownableThread, SystemTime}
 import kafka.common.TopicAndPartition

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala 
b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
old mode 100644
new mode 100755
index b00a4dc..0c5c451
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -17,7 +17,7 @@
 
 package kafka.consumer
 
-import kafka.utils.{IteratorTemplate, Logging, Utils}
+import kafka.utils.{IteratorTemplate, Logging, CoreUtils}
 import java.util.concurrent.{TimeUnit, BlockingQueue}
 import kafka.serializer.Decoder
 import java.util.concurrent.atomic.AtomicReference

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala 
b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
old mode 100644
new mode 100755
index bc2e5b4..4afda8b
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -19,7 +19,7 @@ package kafka.consumer
 
 import org.I0Itec.zkclient.ZkClient
 import kafka.common.TopicAndPartition
-import kafka.utils.{Pool, Utils, ZkUtils, Logging}
+import kafka.utils.{Pool, CoreUtils, ZkUtils, Logging}
 
 import scala.collection.mutable
 
@@ -88,7 +88,7 @@ class RoundRobinAssignor() extends PartitionAssignor with 
Logging {
             "Topic %s has the following available consumer streams: 
%s\n".format(headTopic, headThreadIdSet))
       }
 
-      val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted)
+      val threadAssignor = 
CoreUtils.circularIterator(headThreadIdSet.toSeq.sorted)
 
       info("Starting round-robin assignment with consumers " + ctx.consumers)
       val allTopicPartitions = ctx.partitionsForTopic.flatMap { case (topic, 
partitions) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala 
b/core/src/main/scala/kafka/consumer/TopicCount.scala
old mode 100644
new mode 100755
index 0954b3c..6994c8e
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -19,7 +19,7 @@ package kafka.consumer
 
 import scala.collection._
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, Utils}
+import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, CoreUtils}
 import kafka.common.KafkaException
 
 private[kafka] trait TopicCount {
@@ -136,7 +136,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient,
     TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, 
Map(wildcardTopics.map((_, numStreams)): _*))
   }
 
-  def getTopicCountMap = Map(Utils.JSONEscapeString(topicFilter.regex) -> 
numStreams)
+  def getTopicCountMap = Map(CoreUtils.JSONEscapeString(topicFilter.regex) -> 
numStreams)
 
   def pattern: String = {
     topicFilter match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
old mode 100644
new mode 100755
index 3e8a75b..e250b94
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -32,7 +32,7 @@ import kafka.javaapi.consumer.ConsumerRebalanceListener
 import kafka.metrics._
 import kafka.network.BlockingChannel
 import kafka.serializer._
-import kafka.utils.Utils.inLock
+import kafka.utils.CoreUtils.inLock
 import kafka.utils.ZkUtils._
 import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
old mode 100644
new mode 100755
index fb596ba..97acdb2
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -17,7 +17,7 @@
 package kafka.controller
 
 import kafka.network.{Receive, BlockingChannel}
-import kafka.utils.{Utils, Logging, ShutdownableThread}
+import kafka.utils.{CoreUtils, Logging, ShutdownableThread}
 import collection.mutable.HashMap
 import kafka.cluster.Broker
 import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
@@ -140,7 +140,7 @@ class RequestSendThread(val controllerId: Int,
               connectToBroker(toBroker, channel)
               isSendSuccessful = false
               // backoff before retrying the connection and send
-              Utils.swallowTrace(Thread.sleep(300))
+              CoreUtils.swallowTrace(Thread.sleep(300))
           }
         }
         if (receive != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
old mode 100644
new mode 100755
index f2e62f5..3a09377
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -29,7 +29,7 @@ import kafka.log.LogConfig
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import kafka.utils.ZkUtils._
 import kafka.utils._
-import kafka.utils.Utils._
+import kafka.utils.CoreUtils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
old mode 100644
new mode 100755
index 2f0694b..92fd92d
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -25,9 +25,8 @@ import kafka.common.{LeaderElectionNotNeededException, 
TopicAndPartition, StateC
 import kafka.utils.{Logging, ZkUtils, ReplicationUtils}
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import org.apache.log4j.Logger
 import kafka.controller.Callbacks.CallbackBuilder
-import kafka.utils.Utils._
+import kafka.utils.CoreUtils._
 
 /**
  * This class represents the state machine for partitions. It defines the 
states that a partition can be in, and

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
old mode 100644
new mode 100755
index 3e87e1d..e5c56e0
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -24,7 +24,7 @@ import kafka.utils.{ZkUtils, ReplicationUtils, Logging}
 import org.I0Itec.zkclient.IZkChildListener
 import org.apache.log4j.Logger
 import kafka.controller.Callbacks._
-import kafka.utils.Utils._
+import kafka.utils.CoreUtils._
 
 /**
  * This class represents the state machine for replicas. It defines the states 
that a replica can be in, and

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
old mode 100644
new mode 100755
index e56f22d..64ecb49
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -18,7 +18,7 @@ package kafka.controller
 
 import collection.mutable
 import kafka.utils.{ShutdownableThread, Logging, ZkUtils}
-import kafka.utils.Utils._
+import kafka.utils.CoreUtils._
 import collection.Set
 import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.api.{StopReplicaResponse, RequestOrResponse}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala 
b/core/src/main/scala/kafka/log/FileMessageSet.scala
old mode 100644
new mode 100755
index b2652dd..2522604
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -66,12 +66,12 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
    * Create a file message set with no slicing
    */
   def this(file: File) = 
-    this(file, Utils.openChannel(file, mutable = true))
+    this(file, CoreUtils.openChannel(file, mutable = true))
 
   /**
    * Create a file message set with mutable option
    */
-  def this(file: File, mutable: Boolean) = this(file, Utils.openChannel(file, 
mutable))
+  def this(file: File, mutable: Boolean) = this(file, 
CoreUtils.openChannel(file, mutable))
   
   /**
    * Create a slice view of the file message set that begins and ends at the 
given byte offsets
@@ -231,7 +231,7 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
    * @return True iff this message set was deleted.
    */
   def delete(): Boolean = {
-    Utils.swallow(channel.close())
+    CoreUtils.swallow(channel.close())
     file.delete()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
old mode 100644
new mode 100755
index a0745be..5563f2d
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -136,12 +136,12 @@ class Log(val dir: File,
         // we crashed in the middle of a swap operation, to recover:
         // if a log, swap it in and delete the .index file
         // if an index just delete it, it will be rebuilt
-        val baseName = new File(Utils.replaceSuffix(file.getPath, 
SwapFileSuffix, ""))
+        val baseName = new File(CoreUtils.replaceSuffix(file.getPath, 
SwapFileSuffix, ""))
         if(baseName.getPath.endsWith(IndexFileSuffix)) {
           file.delete()
         } else if(baseName.getPath.endsWith(LogFileSuffix)){
           // delete the index
-          val index = new File(Utils.replaceSuffix(baseName.getPath, 
LogFileSuffix, IndexFileSuffix))
+          val index = new File(CoreUtils.replaceSuffix(baseName.getPath, 
LogFileSuffix, IndexFileSuffix))
           index.delete()
           // complete the swap operation
           val renamed = file.renameTo(baseName)
@@ -627,7 +627,7 @@ class Log(val dir: File,
       removeLogMetrics()
       logSegments.foreach(_.delete())
       segments.clear()
-      Utils.rm(dir)
+      CoreUtils.rm(dir)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
old mode 100644
new mode 100755
index 351824b..f6795d3
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -24,7 +24,7 @@ import kafka.utils.{Logging, Pool}
 import kafka.server.OffsetCheckpoint
 import collection.mutable
 import java.util.concurrent.locks.ReentrantLock
-import kafka.utils.Utils._
+import kafka.utils.CoreUtils._
 import java.util.concurrent.TimeUnit
 import kafka.common.{LogCleaningAbortedException, TopicAndPartition}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
old mode 100644
new mode 100755
index 8b67aee..558c703
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -197,7 +197,7 @@ object LogConfig {
    * Parse the given properties instance into a LogConfig object
    */
   def fromProps(props: Properties): LogConfig = {
-    import kafka.utils.Utils.evaluateDefaults
+    import kafka.utils.CoreUtils.evaluateDefaults
     val parsed = configDef.parse(evaluateDefaults(props))
     new LogConfig(segmentSize = parsed.get(SegmentBytesProp).asInstanceOf[Int],
                   segmentMs = parsed.get(SegmentMsProp).asInstanceOf[Long],

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
old mode 100644
new mode 100755
index 47d250a..a7a9b85
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -132,7 +132,7 @@ class LogManager(val logDirs: Array[File],
         dirContent <- Option(dir.listFiles).toList
         logDir <- dirContent if logDir.isDirectory
       } yield {
-        Utils.runnable {
+        CoreUtils.runnable {
           debug("Loading log '" + logDir.getName + "'")
 
           val topicPartition = Log.parseTopicPartitionName(logDir)
@@ -210,7 +210,7 @@ class LogManager(val logDirs: Array[File],
 
     // stop the cleaner first
     if (cleaner != null) {
-      Utils.swallow(cleaner.shutdown())
+      CoreUtils.swallow(cleaner.shutdown())
     }
 
     // close logs in each dir
@@ -223,7 +223,7 @@ class LogManager(val logDirs: Array[File],
       val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values
 
       val jobsForDir = logsInDir map { log =>
-        Utils.runnable {
+        CoreUtils.runnable {
           // flush the log to ensure latest possible recovery point
           log.flush()
           log.close()
@@ -244,7 +244,7 @@ class LogManager(val logDirs: Array[File],
 
         // mark that the shutdown was clean by creating marker file
         debug("Writing clean shutdown marker at " + dir)
-        Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())
+        CoreUtils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())
       }
     } catch {
       case e: ExecutionException => {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
old mode 100644
new mode 100755
index 0256764..ed03953
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -254,10 +254,10 @@ class LogSegment(val log: FileMessageSet,
    * Change the suffix for the index and log file for this log segment
    */
   def changeFileSuffixes(oldSuffix: String, newSuffix: String) {
-    val logRenamed = log.renameTo(new 
File(Utils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
+    val logRenamed = log.renameTo(new 
File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
     if(!logRenamed)
       throw new KafkaStorageException("Failed to change the log file suffix 
from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset))
-    val indexRenamed = index.renameTo(new 
File(Utils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix)))
+    val indexRenamed = index.renameTo(new 
File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix)))
     if(!indexRenamed)
       throw new KafkaStorageException("Failed to change the index file suffix 
from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset))
   }
@@ -266,8 +266,8 @@ class LogSegment(val log: FileMessageSet,
    * Close this log segment
    */
   def close() {
-    Utils.swallow(index.close)
-    Utils.swallow(log.close)
+    CoreUtils.swallow(index.close)
+    CoreUtils.swallow(log.close)
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala 
b/core/src/main/scala/kafka/log/OffsetIndex.scala
old mode 100644
new mode 100755
index ca82c04..4ab22de
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -24,7 +24,7 @@ import java.nio.channels._
 import java.util.concurrent.locks._
 import java.util.concurrent.atomic._
 import kafka.utils._
-import kafka.utils.Utils.inLock
+import kafka.utils.CoreUtils.inLock
 import kafka.common.InvalidOffsetException
 
 /**
@@ -81,7 +81,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: 
Long, val maxIndexSi
           idx.position(roundToExactMultiple(idx.limit, 8))
         idx
       } finally {
-        Utils.swallow(raf.close())
+        CoreUtils.swallow(raf.close())
       }
     }
   
@@ -287,7 +287,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: 
Long, val maxIndexSi
         this.maxEntries = this.mmap.limit / 8
         this.mmap.position(position)
       } finally {
-        Utils.swallow(raf.close())
+        CoreUtils.swallow(raf.close())
       }
     }
   }
@@ -319,7 +319,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: 
Long, val maxIndexSi
   def delete(): Boolean = {
     info("Deleting index " + this.file.getAbsolutePath)
     if(Os.isWindows)
-      Utils.swallow(forceUnmap(this.mmap))
+      CoreUtils.swallow(forceUnmap(this.mmap))
     this.file.delete()
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/OffsetMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala 
b/core/src/main/scala/kafka/log/OffsetMap.scala
old mode 100644
new mode 100755
index 42cdfbb..2940e47
--- a/core/src/main/scala/kafka/log/OffsetMap.scala
+++ b/core/src/main/scala/kafka/log/OffsetMap.scala
@@ -21,6 +21,7 @@ import java.util.Arrays
 import java.security.MessageDigest
 import java.nio.ByteBuffer
 import kafka.utils._
+import org.apache.kafka.common.utils.Utils
 
 trait OffsetMap {
   def slots: Int
@@ -158,7 +159,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: 
String = "MD5") extend
    * @return The byte offset in the buffer at which the ith probing for the 
given hash would reside
    */
   private def positionOf(hash: Array[Byte], attempt: Int): Int = {
-    val probe = Utils.readInt(hash, math.min(attempt, hashSize - 4)) + 
math.max(0, attempt - hashSize + 4)
+    val probe = CoreUtils.readInt(hash, math.min(attempt, hashSize - 4)) + 
math.max(0, attempt - hashSize + 4)
     val slot = Utils.abs(probe) % slots
     this.probes += 1
     slot * bytesPerEntry

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala 
b/core/src/main/scala/kafka/message/Message.scala
old mode 100644
new mode 100755
index 7ba280f..999b115
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -20,6 +20,7 @@ package kafka.message
 import java.nio._
 import scala.math._
 import kafka.utils._
+import org.apache.kafka.common.utils.Utils
 
 /**
  * Constants related to messages
@@ -146,7 +147,7 @@ class Message(val buffer: ByteBuffer) {
    * Compute the checksum of the message from the message contents
    */
   def computeChecksum(): Long = 
-    Utils.crc32(buffer.array, buffer.arrayOffset + MagicOffset,  buffer.limit 
- MagicOffset)
+    CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset,  
buffer.limit - MagicOffset)
   
   /**
    * Retrieve the previously computed CRC for this message

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/message/MessageAndMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageAndMetadata.scala 
b/core/src/main/scala/kafka/message/MessageAndMetadata.scala
old mode 100644
new mode 100755
index d693abc..26b75c8
--- a/core/src/main/scala/kafka/message/MessageAndMetadata.scala
+++ b/core/src/main/scala/kafka/message/MessageAndMetadata.scala
@@ -18,7 +18,7 @@
 package kafka.message
 
 import kafka.serializer.Decoder
-import kafka.utils.Utils
+import org.apache.kafka.common.utils.Utils
 
 case class MessageAndMetadata[K, V](topic: String, partition: Int,
                                     private val rawMessage: Message, offset: 
Long,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/message/MessageWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageWriter.scala 
b/core/src/main/scala/kafka/message/MessageWriter.scala
old mode 100644
new mode 100755
index 7eb72cb..0c6040e
--- a/core/src/main/scala/kafka/message/MessageWriter.scala
+++ b/core/src/main/scala/kafka/message/MessageWriter.scala
@@ -20,7 +20,7 @@ package kafka.message
 import java.io.{InputStream, OutputStream}
 import java.nio.ByteBuffer
 
-import kafka.utils.Crc32
+import org.apache.kafka.common.utils.Crc32
 
 class MessageWriter(segmentSize: Int) extends 
BufferingOutputStream(segmentSize) {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala 
b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
old mode 100644
new mode 100755
index ea9559f..cc0da9f
--- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
@@ -24,7 +24,7 @@ import com.yammer.metrics.Metrics
 import java.io.File
 import com.yammer.metrics.reporting.CsvReporter
 import java.util.concurrent.TimeUnit
-import kafka.utils.{Utils, VerifiableProperties, Logging}
+import kafka.utils.{CoreUtils, VerifiableProperties, Logging}
 
 
 private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean
@@ -48,7 +48,7 @@ private class KafkaCSVMetricsReporter extends 
KafkaMetricsReporter
       if (!initialized) {
         val metricsConfig = new KafkaMetricsConfig(props)
         csvDir = new File(props.getString("kafka.csv.metrics.dir", 
"kafka_metrics"))
-        Utils.rm(csvDir)
+        CoreUtils.rm(csvDir)
         csvDir.mkdirs()
         underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
         if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = 
false)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala 
b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
old mode 100644
new mode 100755
index 84f6208..ad9eb20
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
@@ -20,7 +20,7 @@
 
 package kafka.metrics
 
-import kafka.utils.{VerifiableProperties, Utils}
+import kafka.utils.{VerifiableProperties, CoreUtils}
 
 class KafkaMetricsConfig(props: VerifiableProperties) {
 
@@ -28,7 +28,7 @@ class KafkaMetricsConfig(props: VerifiableProperties) {
    * Comma-separated list of reporter types. These classes should be on the
    * classpath and will be instantiated at run-time.
    */
-  val reporters = 
Utils.parseCsvList(props.getString("kafka.metrics.reporters", ""))
+  val reporters = 
CoreUtils.parseCsvList(props.getString("kafka.metrics.reporters", ""))
 
   /**
    * The metrics polling interval (in seconds).

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala 
b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
old mode 100644
new mode 100755
index 14e4624..30fd0ea
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
@@ -20,7 +20,7 @@
 
 package kafka.metrics
 
-import kafka.utils.{Utils, VerifiableProperties}
+import kafka.utils.{CoreUtils, VerifiableProperties}
 import java.util.concurrent.atomic.AtomicBoolean
 
 
@@ -56,10 +56,10 @@ object KafkaMetricsReporter {
         val metricsConfig = new KafkaMetricsConfig(verifiableProps)
         if(metricsConfig.reporters.size > 0) {
           metricsConfig.reporters.foreach(reporterType => {
-            val reporter = 
Utils.createObject[KafkaMetricsReporter](reporterType)
+            val reporter = 
CoreUtils.createObject[KafkaMetricsReporter](reporterType)
             reporter.init(verifiableProps)
             if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
-              Utils.registerMBean(reporter, 
reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
+              CoreUtils.registerMBean(reporter, 
reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
           })
           ReporterStarted.set(true)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala 
b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
old mode 100644
new mode 100755
index a442545..c0d7726
--- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
+++ b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
@@ -51,7 +51,7 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: 
Int) extends Receive
     var read = 0
     // have we read the request size yet?
     if(sizeBuffer.remaining > 0)
-      read += Utils.read(channel, sizeBuffer)
+      read += CoreUtils.read(channel, sizeBuffer)
     // have we allocated the request buffer yet?
     if(contentBuffer == null && !sizeBuffer.hasRemaining) {
       sizeBuffer.rewind()
@@ -64,7 +64,7 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: 
Int) extends Receive
     }
     // if we have a buffer read some stuff into it
     if(contentBuffer != null) {
-      read = Utils.read(channel, contentBuffer)
+      read = CoreUtils.read(channel, contentBuffer)
       // did we get everything?
       if(!contentBuffer.hasRemaining) {
         contentBuffer.rewind()

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
old mode 100644
new mode 100755
index 0ad9057..8fbea7b
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -33,6 +33,7 @@ import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import com.yammer.metrics.core.{Gauge, Meter}
+import org.apache.kafka.common.utils.Utils
 
 /**
  * An NIO socket server. The threading model is

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala 
b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
old mode 100644
new mode 100755
index 6a3b02e..e6b100e
--- a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
@@ -19,6 +19,7 @@ package kafka.producer
 
 
 import kafka.utils._
+import org.apache.kafka.common.utils.Utils
 
 class ByteArrayPartitioner(props: VerifiableProperties = null) extends 
Partitioner {
   def partition(key: Any, numPartitions: Int): Int = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala 
b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
old mode 100644
new mode 100755
index 3afb22e..1141ed1
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
@@ -19,6 +19,7 @@ package kafka.producer
 
 
 import kafka.utils._
+import org.apache.kafka.common.utils.Utils
 
 class DefaultPartitioner(props: VerifiableProperties = null) extends 
Partitioner {
   private val random = new java.util.Random

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala 
b/core/src/main/scala/kafka/producer/Producer.scala
old mode 100644
new mode 100755
index e38d2fa..4be06c8
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -58,9 +58,9 @@ class Producer[K,V](val config: ProducerConfig,
   def this(config: ProducerConfig) =
     this(config,
          new DefaultEventHandler[K,V](config,
-                                      
Utils.createObject[Partitioner](config.partitionerClass, config.props),
-                                      
Utils.createObject[Encoder[V]](config.serializerClass, config.props),
-                                      
Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
+                                      
CoreUtils.createObject[Partitioner](config.partitionerClass, config.props),
+                                      
CoreUtils.createObject[Encoder[V]](config.serializerClass, config.props),
+                                      
CoreUtils.createObject[Encoder[K]](config.keySerializerClass, config.props),
                                       new ProducerPool(config)))
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/producer/ProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala 
b/core/src/main/scala/kafka/producer/ProducerConfig.scala
old mode 100644
new mode 100755
index 3cdf23d..08a4e51
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@ -19,8 +19,8 @@ package kafka.producer
 
 import async.AsyncProducerConfig
 import java.util.Properties
-import kafka.utils.{Utils, VerifiableProperties}
-import kafka.message.{CompressionCodec, NoCompressionCodec}
+import kafka.utils.{CoreUtils, VerifiableProperties}
+import kafka.message.NoCompressionCodec
 import kafka.common.{InvalidConfigException, Config}
 
 object ProducerConfig extends Config {
@@ -90,7 +90,7 @@ class ProducerConfig private (val props: VerifiableProperties)
    *
    *  If the compression codec is NoCompressionCodec, compression is disabled 
for all topics
    */
-  val compressedTopics = 
Utils.parseCsvList(props.getString("compressed.topics", null))
+  val compressedTopics = 
CoreUtils.parseCsvList(props.getString("compressed.topics", null))
 
   /** The leader may be unavailable transiently, which can fail the sending of 
a message.
     *  This property specifies the number of retries when such failures occur.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala 
b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
old mode 100644
new mode 100755
index 821901e..a6179a9
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -21,12 +21,13 @@ import kafka.common._
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
 import kafka.producer._
 import kafka.serializer.Encoder
-import kafka.utils.{Utils, Logging, SystemTime}
+import kafka.utils.{CoreUtils, Logging, SystemTime}
 import scala.util.Random
 import scala.collection.{Seq, Map}
 import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
 import java.util.concurrent.atomic._
 import kafka.api.{TopicMetadata, ProducerRequest}
+import org.apache.kafka.common.utils.Utils
 
 class DefaultEventHandler[K,V](config: ProducerConfig,
                                private val partitioner: Partitioner,
@@ -64,7 +65,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
       if (topicMetadataRefreshInterval >= 0 &&
           SystemTime.milliseconds - lastTopicMetadataRefreshTime > 
topicMetadataRefreshInterval) {
-        
Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, 
correlationId.getAndIncrement))
+        
CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet,
 correlationId.getAndIncrement))
         sendPartitionPerTopicCache.clear()
         topicMetadataToRefresh.clear
         lastTopicMetadataRefreshTime = SystemTime.milliseconds
@@ -75,7 +76,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         // back off and update the topic metadata cache before attempting 
another send operation
         Thread.sleep(config.retryBackoffMs)
         // get topics of the outstanding produce requests and refresh metadata 
for those
-        
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet,
 correlationId.getAndIncrement))
+        
CoreUtils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet,
 correlationId.getAndIncrement))
         sendPartitionPerTopicCache.clear()
         remainingRetries -= 1
         producerStats.resendRate.mark()
@@ -262,7 +263,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           if (logger.isTraceEnabled) {
             val successfullySentData = response.status.filter(_._2.error == 
ErrorMapping.NoError)
             successfullySentData.foreach(m => 
messagesPerTopic(m._1).foreach(message =>
-              trace("Successfully sent message: 
%s".format(if(message.message.isNull) null else 
Utils.readString(message.message.payload)))))
+              trace("Successfully sent message: 
%s".format(if(message.message.isNull) null else message.message.toString()))))
           }
           val failedPartitionsAndStatus = response.status.filter(_._2.error != 
ErrorMapping.NoError).toSeq
           failedTopicPartitions = 
failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
old mode 100644
new mode 100755
index 94aa952..f8f9331
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -20,11 +20,12 @@ package kafka.server
 import scala.collection.mutable
 import scala.collection.Set
 import scala.collection.Map
-import kafka.utils.{Utils, Logging}
+import kafka.utils.Logging
 import kafka.cluster.BrokerEndpoint
 import kafka.metrics.KafkaMetricsGroup
 import kafka.common.TopicAndPartition
 import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.utils.Utils
 
 abstract class AbstractFetcherManager(protected val name: String, clientId: 
String, numFetchers: Int = 1)
   extends Logging with KafkaMetricsGroup {
@@ -128,4 +129,4 @@ abstract class AbstractFetcherManager(protected val name: 
String, clientId: Stri
 
 case class BrokerAndFetcherId(broker: BrokerEndpoint, fetcherId: Int)
 
-case class BrokerAndInitialOffset(broker: BrokerEndpoint, initOffset: Long)
\ No newline at end of file
+case class BrokerAndInitialOffset(broker: BrokerEndpoint, initOffset: Long)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
old mode 100644
new mode 100755
index 93f67d5..1e26de2
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -22,7 +22,7 @@ import kafka.utils.{Pool, ShutdownableThread}
 import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
 import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, 
FetchRequestBuilder}
 import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, 
ErrorMapping}
-import kafka.utils.Utils.inLock
+import kafka.utils.CoreUtils.inLock
 import kafka.message.{InvalidMessageException, ByteBufferMessageSet, 
MessageAndOffset}
 import kafka.metrics.KafkaMetricsGroup
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala 
b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
old mode 100644
new mode 100755
index 0e542ff..01e8f72
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -20,7 +20,7 @@ package kafka.server
 import java.io._
 import java.util.Properties
 import kafka.utils._
-
+import org.apache.kafka.common.utils.Utils
 
 case class BrokerMetadata(brokerId: Int)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
old mode 100644
new mode 100755
index cf1a5a6..69b772c
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -23,7 +23,7 @@ import kafka.api.ApiVersion
 import kafka.cluster.EndPoint
 import kafka.consumer.ConsumerConfig
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, 
MessageSet}
-import kafka.utils.Utils
+import kafka.utils.CoreUtils
 import org.apache.kafka.common.config.ConfigDef
 import org.apache.kafka.common.protocol.SecurityProtocol
 import scala.collection.{immutable, JavaConversions, Map}
@@ -492,7 +492,7 @@ object KafkaConfig {
    * Parse the given properties instance into a KafkaConfig object
    */
   def fromProps(props: Properties): KafkaConfig = {
-    import kafka.utils.Utils.evaluateDefaults
+    import kafka.utils.CoreUtils.evaluateDefaults
     val parsed = configDef.parse(evaluateDefaults(props))
     new KafkaConfig(
       /** ********* Zookeeper Configuration ***********/
@@ -755,7 +755,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
   val advertisedHostName: String = _advertisedHostName.getOrElse(hostName)
   val advertisedPort: Int = _advertisedPort.getOrElse(port)
   val advertisedListeners = getAdvertisedListeners()
-  val logDirs = Utils.parseCsvList(_logDirs.getOrElse(_logDir))
+  val logDirs = CoreUtils.parseCsvList(_logDirs.getOrElse(_logDir))
 
   val logRollTimeMillis = _logRollTimeMillis.getOrElse(60 * 60 * 1000L * 
logRollTimeHours)
   val logRollTimeJitterMillis = _logRollTimeJitterMillis.getOrElse(60 * 60 * 
1000L * logRollTimeJitterHours)
@@ -780,7 +780,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
 
   private def getMap(propName: String, propValue: String): Map[String, String] 
= {
     try {
-      Utils.parseCsvMap(propValue)
+      CoreUtils.parseCsvMap(propValue)
     } catch {
       case e: Exception => throw new IllegalArgumentException("Error parsing 
configuration property '%s': %s".format(propName, e.getMessage))
     }
@@ -789,7 +789,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
   private def validateUniquePortAndProtocol(listeners: String) {
 
     val endpoints = try {
-      val listenerList = Utils.parseCsvList(listeners)
+      val listenerList = CoreUtils.parseCsvList(listeners)
       listenerList.map(listener => EndPoint.createEndPoint(listener))
     } catch {
       case e: Exception => throw new IllegalArgumentException("Error creating 
broker listeners from '%s': %s".format(listeners, e.getMessage))
@@ -806,9 +806,9 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
   private def getListeners(): immutable.Map[SecurityProtocol, EndPoint] = {
     if (_listeners.isDefined) {
       validateUniquePortAndProtocol(_listeners.get)
-      Utils.listenerListToEndPoints(_listeners.get)
+      CoreUtils.listenerListToEndPoints(_listeners.get)
     } else {
-      Utils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port)
+      CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port)
     }
   }
 
@@ -818,9 +818,9 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
   private def getAdvertisedListeners(): immutable.Map[SecurityProtocol, 
EndPoint] = {
     if (_advertisedListeners.isDefined) {
       validateUniquePortAndProtocol(_advertisedListeners.get)
-      Utils.listenerListToEndPoints(_advertisedListeners.get)
+      CoreUtils.listenerListToEndPoints(_advertisedListeners.get)
     } else if (_advertisedHostName.isDefined || _advertisedPort.isDefined ) {
-      Utils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName  + ":" 
+ advertisedPort)
+      CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName  + 
":" + advertisedPort)
     } else {
       getListeners()
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
old mode 100644
new mode 100755
index 4d86bdf..a1558af
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -22,6 +22,7 @@ import kafka.utils._
 import kafka.metrics.KafkaMetricsGroup
 import java.util.concurrent.TimeUnit
 import com.yammer.metrics.core.Meter
+import org.apache.kafka.common.utils.Utils
 
 /**
  * A thread that answers kafka requests.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
old mode 100644
new mode 100755
index 9df2cf4..c63f4ba
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -319,27 +319,27 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
 
       val canShutdown = isShuttingDown.compareAndSet(false, true)
       if (canShutdown && shutdownLatch.getCount > 0) {
-        Utils.swallow(controlledShutdown())
+        CoreUtils.swallow(controlledShutdown())
         brokerState.newState(BrokerShuttingDown)
         if(socketServer != null)
-          Utils.swallow(socketServer.shutdown())
+          CoreUtils.swallow(socketServer.shutdown())
         if(requestHandlerPool != null)
-          Utils.swallow(requestHandlerPool.shutdown())
+          CoreUtils.swallow(requestHandlerPool.shutdown())
         if(offsetManager != null)
           offsetManager.shutdown()
-        Utils.swallow(kafkaScheduler.shutdown())
+        CoreUtils.swallow(kafkaScheduler.shutdown())
         if(apis != null)
-          Utils.swallow(apis.close())
+          CoreUtils.swallow(apis.close())
         if(replicaManager != null)
-          Utils.swallow(replicaManager.shutdown())
+          CoreUtils.swallow(replicaManager.shutdown())
         if(logManager != null)
-          Utils.swallow(logManager.shutdown())
+          CoreUtils.swallow(logManager.shutdown())
         if(consumerCoordinator != null)
-          Utils.swallow(consumerCoordinator.shutdown())
+          CoreUtils.swallow(consumerCoordinator.shutdown())
         if(kafkaController != null)
-          Utils.swallow(kafkaController.shutdown())
+          CoreUtils.swallow(kafkaController.shutdown())
         if(zkClient != null)
-          Utils.swallow(zkClient.close())
+          CoreUtils.swallow(zkClient.close())
 
         brokerState.newState(NotRunning)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
old mode 100644
new mode 100755
index 008f02b..4460b42
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -26,7 +26,7 @@ import kafka.controller.KafkaController.StateChangeLogger
 import org.apache.kafka.common.protocol.SecurityProtocol
 import scala.collection.{Seq, Set, mutable}
 import kafka.utils.Logging
-import kafka.utils.Utils._
+import kafka.utils.CoreUtils._
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala 
b/core/src/main/scala/kafka/server/OffsetManager.scala
old mode 100644
new mode 100755
index 395b1db..420e2c3
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.types.{Struct, 
Schema, Field}
 import org.apache.kafka.common.protocol.types.Type.STRING
 import org.apache.kafka.common.protocol.types.Type.INT32
 import org.apache.kafka.common.protocol.types.Type.INT64
+import org.apache.kafka.common.utils.Utils
 
 import kafka.utils._
 import kafka.common._

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala 
b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
old mode 100644
new mode 100755
index a75818a..a5c5fb3
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,7 +17,7 @@
 package kafka.server
 
 import kafka.utils.ZkUtils._
-import kafka.utils.Utils._
+import kafka.utils.CoreUtils._
 import kafka.utils.{Json, SystemTime, Logging}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.IZkDataListener

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
old mode 100644
new mode 100755
index 910691e..9d9b781
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -28,6 +28,7 @@ import kafka.serializer._
 import kafka.utils._
 import kafka.metrics.KafkaMetricsReporter
 import kafka.consumer.{Blacklist,Whitelist,ConsumerConfig,Consumer}
+import org.apache.kafka.common.utils.Utils
 
 /**
  * Consumer that dumps messages out to standard out.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
old mode 100644
new mode 100755
index b7a3630..fc11a2a
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -25,6 +25,7 @@ import collection.mutable
 import joptsimple.OptionParser
 import kafka.serializer.Decoder
 import kafka.utils.VerifiableProperties
+import org.apache.kafka.common.utils.Utils
 
 object DumpLogSegments {
 
@@ -64,8 +65,8 @@ object DumpLogSegments {
     val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
     val isDeepIteration = if(options.has(deepIterationOpt)) true else false
   
-    val valueDecoder: Decoder[_] = 
Utils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt), new 
VerifiableProperties)
-    val keyDecoder: Decoder[_] = 
Utils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt), new 
VerifiableProperties)
+    val valueDecoder: Decoder[_] = 
CoreUtils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt), new 
VerifiableProperties)
+    val keyDecoder: Decoder[_] = 
CoreUtils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt), new 
VerifiableProperties)
 
     val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, 
Long)]]
     val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, 
List[(Long, Long)]]

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java 
b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
old mode 100644
new mode 100755
index 026d819..f19df0c
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -21,7 +21,6 @@ import joptsimple.*;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
-import kafka.utils.Utils;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -39,6 +38,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.common.utils.Utils;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
old mode 100644
new mode 100755
index ec07743..9548521
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -24,17 +24,19 @@ import java.util.{Collections, Properties}
 
 import com.yammer.metrics.core.Gauge
 import joptsimple.OptionParser
-import kafka.consumer.{KafkaStream, Blacklist, ConsumerConfig, 
ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, 
ZookeeperConsumerConnector}
+import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, 
ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector}
 import kafka.javaapi.consumer.ConsumerRebalanceListener
 import kafka.message.MessageAndMetadata
 import kafka.metrics.KafkaMetricsGroup
 import kafka.serializer.DefaultDecoder
-import kafka.utils.{CommandLineUtils, Logging, Utils}
+import kafka.utils.{CommandLineUtils, Logging, CoreUtils}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord, RecordMetadata}
+import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConversions._
 
+
 /**
  * The mirror maker has the following architecture:
  * - There are N mirror maker thread shares one ZookeeperConsumerConnector and 
each owns a Kafka stream.
@@ -207,9 +209,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     val customRebalanceListener = {
       if (customRebalanceListenerClass != null) {
         if (rebalanceListenerArgs != null)
-          
Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass,
 rebalanceListenerArgs))
+          
Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass,
 rebalanceListenerArgs))
         else
-          
Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass))
+          
Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass))
       } else {
         None
       }
@@ -237,9 +239,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     messageHandler = {
       if (customMessageHandlerClass != null) {
         if (messageHandlerArgs != null)
-          
Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, 
messageHandlerArgs)
+          
CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, 
messageHandlerArgs)
         else
-          
Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
+          
CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
       } else {
         defaultMirrorMakerMessageHandler
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 
b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
old mode 100644
new mode 100755
index 7379fe3..9a6804c
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -25,6 +25,7 @@ import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
 import kafka.cluster.BrokerEndpoint
 import scala.collection.JavaConversions._
 import kafka.common.TopicAndPartition
+import org.apache.kafka.common.utils.Utils
 
 /**
  * Command line program to dump out messages to standard out using the simple 
consumer

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala 
b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
old mode 100644
new mode 100755
index b34b8c7..8b523e7
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -22,7 +22,7 @@ import scala.util.matching.Regex
 import collection.mutable
 import java.util.Date
 import java.text.SimpleDateFormat
-import kafka.utils.{Utils, Logging, CommandLineUtils}
+import kafka.utils.{CoreUtils, Logging, CommandLineUtils}
 import kafka.common.Topic
 import java.io.{BufferedOutputStream, OutputStream}
 
@@ -115,7 +115,7 @@ object StateChangeLogMerger extends Logging {
     }
     if (options.has(partitionsOpt)) {
       partitions = 
options.valueOf(partitionsOpt).split(",").toList.map(_.toInt)
-      val duplicatePartitions = Utils.duplicates(partitions)
+      val duplicatePartitions = CoreUtils.duplicates(partitions)
       if (duplicatePartitions.nonEmpty) {
         System.err.println("The list of partitions contains repeated entries: 
%s".format(duplicatePartitions.mkString(",")))
         System.exit(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 
b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
deleted file mode 100644
index 48cff20..0000000
--- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, 
KafkaProducer}
-
-import kafka.consumer._
-
-import java.util.Properties
-import java.util.Arrays
-
-object TestEndToEndLatency {
-  def main(args: Array[String]) {
-    if (args.length != 6) {
-      System.err.println("USAGE: java " + getClass().getName + " broker_list 
zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks")
-      System.exit(1)
-    }
-
-    val brokerList = args(0)
-    val zkConnect = args(1)
-    val topic = args(2)
-    val numMessages = args(3).toInt
-    val consumerFetchMaxWait = args(4).toInt
-    val producerAcks = args(5).toInt
-
-    val consumerProps = new Properties()
-    consumerProps.put("group.id", topic)
-    consumerProps.put("auto.commit.enable", "false")
-    consumerProps.put("auto.offset.reset", "largest")
-    consumerProps.put("zookeeper.connect", zkConnect)
-    consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString)
-    consumerProps.put("socket.timeout.ms", 1201000.toString)
-
-    val config = new ConsumerConfig(consumerProps)
-    val connector = Consumer.create(config)
-    val stream = connector.createMessageStreams(Map(topic -> 
1)).get(topic).head.head
-    val iter = stream.iterator
-
-    val producerProps = new Properties()
-    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0")
-    producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
-    producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
-    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
-    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
-    val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
-
-    // make sure the consumer fetcher has started before sending data since 
otherwise
-    // the consumption from the tail will skip the first message and hence be 
blocked
-    Thread.sleep(5000)
-
-    val message = "hello there beautiful".getBytes
-    var totalTime = 0.0
-    val latencies = new Array[Long](numMessages)
-    for (i <- 0 until numMessages) {
-      val begin = System.nanoTime
-      producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, 
message))
-      val received = iter.next
-      val elapsed = System.nanoTime - begin
-      // poor man's progress bar
-      if (i % 1000 == 0)
-        println(i + "\t" + elapsed / 1000.0 / 1000.0)
-      totalTime += elapsed
-      latencies(i) = (elapsed / 1000 / 1000)
-    }
-    println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 
1000.0))
-    Arrays.sort(latencies)
-    val p50 = latencies((latencies.length * 0.5).toInt)
-    val p99 = latencies((latencies.length * 0.99).toInt) 
-    val p999 = latencies((latencies.length * 0.999).toInt)
-    println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, 
p999))
-    producer.close()
-    connector.commitOffsets(true)
-    connector.shutdown()
-    System.exit(0)
-  }
-}
\ No newline at end of file

Reply via email to