kafka-1926; Replace kafka.utils.Utils with o.a.k.common.utils.Utils; patched by Tong Li; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c23d935 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c23d935 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c23d935 Branch: refs/heads/trunk Commit: 9c23d93553a33c5d85231193614d192a9945796e Parents: 53f3143 Author: Tong Li <liton...@us.ibm.com> Authored: Sun Apr 5 21:46:11 2015 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Sun Apr 5 21:46:11 2015 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/common/utils/Utils.java | 124 +++- .../apache/kafka/common/utils/UtilsTest.java | 33 + core/src/main/scala/kafka/Kafka.scala | 3 +- .../kafka/admin/ConsumerGroupCommand.scala | 1 + .../PreferredReplicaLeaderElectionCommand.scala | 4 +- .../kafka/admin/ReassignPartitionsCommand.scala | 9 +- .../main/scala/kafka/admin/TopicCommand.scala | 3 +- .../main/scala/kafka/client/ClientUtils.scala | 4 +- core/src/main/scala/kafka/cluster/Broker.scala | 16 +- .../main/scala/kafka/cluster/Partition.scala | 2 +- .../kafka/consumer/ConsumerFetcherManager.scala | 2 +- .../scala/kafka/consumer/ConsumerIterator.scala | 2 +- .../kafka/consumer/PartitionAssignor.scala | 4 +- .../main/scala/kafka/consumer/TopicCount.scala | 4 +- .../consumer/ZookeeperConsumerConnector.scala | 2 +- .../controller/ControllerChannelManager.scala | 4 +- .../kafka/controller/KafkaController.scala | 2 +- .../controller/PartitionStateMachine.scala | 3 +- .../kafka/controller/ReplicaStateMachine.scala | 2 +- .../kafka/controller/TopicDeletionManager.scala | 2 +- .../main/scala/kafka/log/FileMessageSet.scala | 6 +- core/src/main/scala/kafka/log/Log.scala | 6 +- .../scala/kafka/log/LogCleanerManager.scala | 2 +- core/src/main/scala/kafka/log/LogConfig.scala | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 8 +- core/src/main/scala/kafka/log/LogSegment.scala | 8 +- core/src/main/scala/kafka/log/OffsetIndex.scala | 8 +- core/src/main/scala/kafka/log/OffsetMap.scala | 3 +- core/src/main/scala/kafka/message/Message.scala | 3 +- .../kafka/message/MessageAndMetadata.scala | 2 +- .../scala/kafka/message/MessageWriter.scala | 2 +- .../kafka/metrics/KafkaCSVMetricsReporter.scala | 4 +- .../kafka/metrics/KafkaMetricsConfig.scala | 4 +- .../kafka/metrics/KafkaMetricsReporter.scala | 6 +- .../network/BoundedByteBufferReceive.scala | 4 +- .../main/scala/kafka/network/SocketServer.scala | 1 + .../kafka/producer/ByteArrayPartitioner.scala | 1 + .../kafka/producer/DefaultPartitioner.scala | 1 + .../main/scala/kafka/producer/Producer.scala | 6 +- .../scala/kafka/producer/ProducerConfig.scala | 6 +- .../producer/async/DefaultEventHandler.scala | 9 +- .../kafka/server/AbstractFetcherManager.scala | 5 +- .../kafka/server/AbstractFetcherThread.scala | 2 +- .../kafka/server/BrokerMetadataCheckpoint.scala | 2 +- .../main/scala/kafka/server/KafkaConfig.scala | 18 +- .../kafka/server/KafkaRequestHandler.scala | 1 + .../main/scala/kafka/server/KafkaServer.scala | 20 +- .../main/scala/kafka/server/MetadataCache.scala | 2 +- .../main/scala/kafka/server/OffsetManager.scala | 1 + .../kafka/server/ZookeeperLeaderElector.scala | 2 +- .../scala/kafka/tools/ConsoleConsumer.scala | 1 + .../scala/kafka/tools/DumpLogSegments.scala | 5 +- .../scala/kafka/tools/KafkaMigrationTool.java | 2 +- .../main/scala/kafka/tools/MirrorMaker.scala | 14 +- .../scala/kafka/tools/SimpleConsumerShell.scala | 1 + .../kafka/tools/StateChangeLogMerger.scala | 4 +- .../scala/kafka/tools/TestEndToEndLatency.scala | 92 --- .../scala/kafka/tools/TestLogCleaning.scala | 311 --------- .../scala/kafka/tools/UpdateOffsetsInZK.scala | 4 +- core/src/main/scala/kafka/utils/CoreUtils.scala | 347 ++++++++++ core/src/main/scala/kafka/utils/Crc32.java | 637 ------------------- .../main/scala/kafka/utils/KafkaScheduler.scala | 4 +- .../scala/kafka/utils/Log4jController.scala | 2 +- core/src/main/scala/kafka/utils/Logging.scala | 10 +- core/src/main/scala/kafka/utils/Utils.scala | 619 ------------------ .../kafka/utils/VerifiableProperties.scala | 2 +- .../kafka/api/ProducerCompressionTest.scala | 4 +- .../scala/kafka/tools/TestEndToEndLatency.scala | 91 +++ .../scala/kafka/tools/TestLogCleaning.scala | 311 +++++++++ .../test/scala/other/kafka/DeleteZKPath.scala | 3 +- .../test/scala/other/kafka/StressTestLog.scala | 2 +- .../scala/other/kafka/TestCrcPerformance.scala | 3 +- .../other/kafka/TestLinearWriteSpeed.scala | 4 +- .../unit/kafka/admin/AddPartitionsTest.scala | 4 +- .../test/scala/unit/kafka/admin/AdminTest.scala | 2 +- .../kafka/consumer/ConsumerIteratorTest.scala | 2 +- .../integration/KafkaServerTestHarness.scala | 4 +- .../kafka/integration/PrimitiveApiTest.scala | 12 +- .../kafka/integration/RollingBounceTest.scala | 4 +- .../integration/UncleanLeaderElectionTest.scala | 4 +- .../unit/kafka/log/BrokerCompressionTest.scala | 2 +- .../test/scala/unit/kafka/log/CleanerTest.scala | 5 +- .../kafka/log/LogCleanerIntegrationTest.scala | 6 +- .../scala/unit/kafka/log/LogManagerTest.scala | 4 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 4 +- .../kafka/log4j/KafkaLog4jAppenderTest.scala | 4 +- .../scala/unit/kafka/message/MessageTest.scala | 3 +- .../unit/kafka/producer/AsyncProducerTest.scala | 4 +- .../unit/kafka/producer/ProducerTest.scala | 4 +- .../unit/kafka/server/AdvertiseBrokerTest.scala | 4 +- .../server/HighwatermarkPersistenceTest.scala | 4 +- .../unit/kafka/server/KafkaConfigTest.scala | 10 +- .../unit/kafka/server/LeaderElectionTest.scala | 4 +- .../scala/unit/kafka/server/LogOffsetTest.scala | 2 +- .../unit/kafka/server/LogRecoveryTest.scala | 4 +- .../unit/kafka/server/OffsetCommitTest.scala | 2 +- .../server/ServerGenerateBrokerIdTest.scala | 14 +- .../unit/kafka/server/ServerShutdownTest.scala | 12 +- .../unit/kafka/server/ServerStartupTest.scala | 6 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 14 +- .../test/scala/unit/kafka/utils/UtilsTest.scala | 33 +- .../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 10 +- .../unit/kafka/zk/ZooKeeperTestHarness.scala | 6 +- 103 files changed, 1154 insertions(+), 1883 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java old mode 100644 new mode 100755 index 39e8d7c..f73eedb --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -14,15 +14,26 @@ package org.apache.kafka.common.utils; import java.io.IOException; import java.io.InputStream; +import java.io.File; +import java.io.FileInputStream; import java.io.OutputStream; import java.io.UnsupportedEncodingException; +import java.io.FileNotFoundException; +import java.io.StringWriter; +import java.io.PrintWriter; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.Properties; +import java.nio.channels.FileChannel; +import java.nio.charset.Charset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.kafka.common.KafkaException; public class Utils { @@ -33,6 +44,8 @@ public class Utils { public static final String NL = System.getProperty("line.separator"); + private static final Logger log = LoggerFactory.getLogger(Utils.class); + /** * Turn the given UTF8 byte array into a string * @@ -330,7 +343,7 @@ public class Utils { ? "[" + host + "]:" + port // IPv6 : host + ":" + port; } - + /** * Create a string representation of an array joined by the given separator * @param strs The array of items @@ -357,4 +370,113 @@ public class Utils { } return sb.toString(); } + + /** + * Read a properties file from the given path + * @param filename The path of the file to read + */ + public static Properties loadProps(String filename) throws IOException, FileNotFoundException { + Properties props = new Properties(); + InputStream propStream = null; + try { + propStream = new FileInputStream(filename); + props.load(propStream); + } finally { + if (propStream != null) + propStream.close(); + } + return props; + } + + /** + * Get the stack trace from an exception as a string + */ + public static String stackTrace(Throwable e) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + e.printStackTrace(pw); + return sw.toString(); + } + + /** + * Create a new thread + * @param name The name of the thread + * @param runnable The work for the thread to do + * @param daemon Should the thread block JVM shutdown? + * @return The unstarted thread + */ + public static Thread newThread(String name, Runnable runnable, Boolean daemon) { + Thread thread = new Thread(runnable, name); + thread.setDaemon(daemon); + thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread t, Throwable e) { + log.error("Uncaught exception in thread '" + t.getName() + "':", e); + } + }); + return thread; + } + + /** + * Create a daemon thread + * @param name The name of the thread + * @param runnable The runnable to execute in the background + * @return The unstarted thread + */ + public static Thread daemonThread(String name, Runnable runnable) { + return newThread(name, runnable, true); + } + + /** + * Print an error message and shutdown the JVM + * @param message The error message + */ + public static void croak(String message) { + System.err.println(message); + System.exit(1); + } + + /** + * Read a buffer into a Byte array for the given offset and length + */ + public static byte[] readBytes(ByteBuffer buffer, int offset, int length) { + byte[] dest = new byte[length]; + if (buffer.hasArray()) { + System.arraycopy(buffer.array(), buffer.arrayOffset() + offset, dest, 0, length); + } else { + buffer.mark(); + buffer.position(offset); + buffer.get(dest, 0, length); + buffer.reset(); + } + return dest; + } + + /** + * Read the given byte buffer into a Byte array + */ + public static byte[] readBytes(ByteBuffer buffer) { + return Utils.readBytes(buffer, 0, buffer.limit()); + } + + /** + * Attempt to read a file as a string + * @throws IOException + */ + public static String readFileAsString(String path, Charset charset) throws IOException { + if (charset == null) charset = Charset.defaultCharset(); + FileInputStream stream = new FileInputStream(new File(path)); + String result = new String(); + try { + FileChannel fc = stream.getChannel(); + MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size()); + result = charset.decode(bb).toString(); + } finally { + stream.close(); + } + return result; + } + + public static String readFileAsString(String path) throws IOException { + return Utils.readFileAsString(path, Charset.defaultCharset()); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java old mode 100644 new mode 100755 index 4b706d7..2ebe3c2 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.utils; import java.util.Arrays; import java.util.Collections; +import java.nio.ByteBuffer; import org.junit.Test; @@ -67,4 +68,36 @@ public class UtilsTest { assertEquals(0, Utils.abs(0)); assertEquals(1, Utils.abs(-1)); } + + private void subTest(ByteBuffer buffer) { + // The first byte should be 'A' + assertEquals('A', (Utils.readBytes(buffer, 0, 1))[0]); + + // The offset is 2, so the first 2 bytes should be skipped. + byte[] results = Utils.readBytes(buffer, 2, 3); + assertEquals('y', results[0]); + assertEquals(' ', results[1]); + assertEquals('S', results[2]); + assertEquals(3, results.length); + + // test readBytes without offset and length specified. + results = Utils.readBytes(buffer); + assertEquals('A', results[0]); + assertEquals('t', results[buffer.limit() - 1]); + assertEquals(buffer.limit(), results.length); + } + + @Test + public void testReadBytes() { + byte[] myvar = "Any String you want".getBytes(); + ByteBuffer buffer = ByteBuffer.allocate(myvar.length); + buffer.put(myvar); + buffer.rewind(); + + this.subTest(buffer); + + // test readonly buffer, different path + buffer = ByteBuffer.wrap(myvar).asReadOnlyBuffer(); + this.subTest(buffer); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/Kafka.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala old mode 100644 new mode 100755 index 37de7df..fb860e7 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -21,7 +21,8 @@ import scala.collection.JavaConversions._ import joptsimple.OptionParser import metrics.KafkaMetricsReporter import server.{KafkaConfig, KafkaServerStartable, KafkaServer} -import kafka.utils.{VerifiableProperties, CommandLineUtils, Utils, Logging} +import kafka.utils.{VerifiableProperties, CommandLineUtils, Logging} +import org.apache.kafka.common.utils.Utils object Kafka extends Logging { http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala old mode 100644 new mode 100755 index 89fa29a..1c3b380 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -30,6 +30,7 @@ import joptsimple.{OptionSpec, OptionParser} import scala.collection.{Set, mutable} import kafka.consumer.SimpleConsumer import collection.JavaConversions._ +import org.apache.kafka.common.utils.Utils object ConsumerGroupCommand { http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala old mode 100644 new mode 100755 index 79b5e0a..3b3cd67 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -22,7 +22,7 @@ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException import kafka.common.{TopicAndPartition, AdminCommandFailedException} import collection._ -import mutable.ListBuffer +import org.apache.kafka.common.utils.Utils object PreferredReplicaLeaderElectionCommand extends Logging { @@ -84,7 +84,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { val partition = p.get("partition").get.asInstanceOf[Int] TopicAndPartition(topic, partition) } - val duplicatePartitions = Utils.duplicates(partitions) + val duplicatePartitions = CoreUtils.duplicates(partitions) val partitionsSet = partitions.toSet if (duplicatePartitions.nonEmpty) throw new AdminOperationException("Preferred replica election data contains duplicate partitions: %s".format(duplicatePartitions.mkString(","))) http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala old mode 100644 new mode 100755 index 979992b..bbe3362 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -22,6 +22,7 @@ import collection._ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException import kafka.common.{TopicAndPartition, AdminCommandFailedException} +import org.apache.kafka.common.utils.Utils object ReassignPartitionsCommand extends Logging { @@ -81,12 +82,12 @@ object ReassignPartitionsCommand extends Logging { CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options") val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt) val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt) - val duplicateReassignments = Utils.duplicates(brokerListToReassign) + val duplicateReassignments = CoreUtils.duplicates(brokerListToReassign) if (duplicateReassignments.nonEmpty) throw new AdminCommandFailedException("Broker list contains duplicate entries: %s".format(duplicateReassignments.mkString(","))) val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) - val duplicateTopicsToReassign = Utils.duplicates(topicsToReassign) + val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign) if (duplicateTopicsToReassign.nonEmpty) throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(","))) val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) @@ -112,11 +113,11 @@ object ReassignPartitionsCommand extends Logging { val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString) if (partitionsToBeReassigned.isEmpty) throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile)) - val duplicateReassignedPartitions = Utils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas) => tp}) + val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas) => tp}) if (duplicateReassignedPartitions.nonEmpty) throw new AdminCommandFailedException("Partition reassignment contains duplicate topic partitions: %s".format(duplicateReassignedPartitions.mkString(","))) val duplicateEntries= partitionsToBeReassigned - .map{ case(tp,replicas) => (tp, Utils.duplicates(replicas))} + .map{ case(tp,replicas) => (tp, CoreUtils.duplicates(replicas))} .filter{ case (tp,duplicatedReplicas) => duplicatedReplicas.nonEmpty } if (duplicateEntries.nonEmpty) { val duplicatesMsg = duplicateEntries http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala old mode 100644 new mode 100755 index e36a9d1..60f0228 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -28,6 +28,7 @@ import scala.collection.JavaConversions._ import kafka.log.LogConfig import kafka.consumer.Whitelist import kafka.server.OffsetManager +import org.apache.kafka.common.utils.Utils object TopicCommand { @@ -228,7 +229,7 @@ object TopicCommand { val ret = new mutable.HashMap[Int, List[Int]]() for (i <- 0 until partitionList.size) { val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) - val duplicateBrokers = Utils.duplicates(brokerList) + val duplicateBrokers = CoreUtils.duplicates(brokerList) if (duplicateBrokers.nonEmpty) throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicateBrokers.mkString(","))) ret.put(i, brokerList.toList) http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/client/ClientUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala old mode 100644 new mode 100755 index ad4c9d2..f08aaf2 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -23,7 +23,7 @@ import kafka.cluster._ import kafka.api._ import kafka.producer._ import kafka.common.{ErrorMapping, KafkaException} -import kafka.utils.{Utils, Logging} +import kafka.utils.{CoreUtils, Logging} import java.util.Properties import util.Random import kafka.network.BlockingChannel @@ -98,7 +98,7 @@ object ClientUtils extends Logging{ * Parse a list of broker urls in the form host1:port1, host2:port2, ... */ def parseBrokerList(brokerListStr: String): Seq[BrokerEndpoint] = { - val brokersStr = Utils.parseCsvList(brokerListStr) + val brokersStr = CoreUtils.parseCsvList(brokerListStr) brokersStr.zipWithIndex.map { case (address, brokerId) => BrokerEndpoint.createBrokerEndPoint(brokerId, address) http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/cluster/Broker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala old mode 100644 new mode 100755 index 3933bb3..8e603b6 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -17,11 +17,13 @@ package kafka.cluster +import kafka.utils.CoreUtils._ +import kafka.utils.Json +import kafka.api.ApiUtils._ import java.nio.ByteBuffer import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException} import kafka.utils.Json -import kafka.utils.Utils._ import org.apache.kafka.common.protocol.SecurityProtocol /** @@ -139,14 +141,4 @@ case class Broker(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) { } } - override def equals(obj: Any): Boolean = { - obj match { - case null => false - // Yes, Scala compares lists element by element - case n: Broker => id == n.id && endPoints == n.endPoints - case _ => false - } - } - - override def hashCode(): Int = hashcode(id, endPoints) -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala old mode 100644 new mode 100755 index 6d142d6..3fb549c --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -18,7 +18,7 @@ package kafka.cluster import kafka.common._ import kafka.utils._ -import kafka.utils.Utils.{inReadLock,inWriteLock} +import kafka.utils.CoreUtils.{inReadLock,inWriteLock} import kafka.admin.AdminUtils import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala old mode 100644 new mode 100755 index 484a57f..6bb0d56 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -25,7 +25,7 @@ import scala.collection.immutable import collection.mutable.HashMap import scala.collection.mutable import java.util.concurrent.locks.ReentrantLock -import kafka.utils.Utils.inLock +import kafka.utils.CoreUtils.inLock import kafka.utils.ZkUtils._ import kafka.utils.{ShutdownableThread, SystemTime} import kafka.common.TopicAndPartition http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/consumer/ConsumerIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala old mode 100644 new mode 100755 index b00a4dc..0c5c451 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -17,7 +17,7 @@ package kafka.consumer -import kafka.utils.{IteratorTemplate, Logging, Utils} +import kafka.utils.{IteratorTemplate, Logging, CoreUtils} import java.util.concurrent.{TimeUnit, BlockingQueue} import kafka.serializer.Decoder import java.util.concurrent.atomic.AtomicReference http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/consumer/PartitionAssignor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala old mode 100644 new mode 100755 index bc2e5b4..4afda8b --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -19,7 +19,7 @@ package kafka.consumer import org.I0Itec.zkclient.ZkClient import kafka.common.TopicAndPartition -import kafka.utils.{Pool, Utils, ZkUtils, Logging} +import kafka.utils.{Pool, CoreUtils, ZkUtils, Logging} import scala.collection.mutable @@ -88,7 +88,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { "Topic %s has the following available consumer streams: %s\n".format(headTopic, headThreadIdSet)) } - val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted) + val threadAssignor = CoreUtils.circularIterator(headThreadIdSet.toSeq.sorted) info("Starting round-robin assignment with consumers " + ctx.consumers) val allTopicPartitions = ctx.partitionsForTopic.flatMap { case (topic, partitions) => http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/consumer/TopicCount.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala old mode 100644 new mode 100755 index 0954b3c..6994c8e --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -19,7 +19,7 @@ package kafka.consumer import scala.collection._ import org.I0Itec.zkclient.ZkClient -import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, Utils} +import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, CoreUtils} import kafka.common.KafkaException private[kafka] trait TopicCount { @@ -136,7 +136,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient, TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) } - def getTopicCountMap = Map(Utils.JSONEscapeString(topicFilter.regex) -> numStreams) + def getTopicCountMap = Map(CoreUtils.JSONEscapeString(topicFilter.regex) -> numStreams) def pattern: String = { topicFilter match { http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala old mode 100644 new mode 100755 index 3e8a75b..e250b94 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -32,7 +32,7 @@ import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.metrics._ import kafka.network.BlockingChannel import kafka.serializer._ -import kafka.utils.Utils.inLock +import kafka.utils.CoreUtils.inLock import kafka.utils.ZkUtils._ import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNodeExistsException http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala old mode 100644 new mode 100755 index fb596ba..97acdb2 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -17,7 +17,7 @@ package kafka.controller import kafka.network.{Receive, BlockingChannel} -import kafka.utils.{Utils, Logging, ShutdownableThread} +import kafka.utils.{CoreUtils, Logging, ShutdownableThread} import collection.mutable.HashMap import kafka.cluster.Broker import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} @@ -140,7 +140,7 @@ class RequestSendThread(val controllerId: Int, connectToBroker(toBroker, channel) isSendSuccessful = false // backoff before retrying the connection and send - Utils.swallowTrace(Thread.sleep(300)) + CoreUtils.swallowTrace(Thread.sleep(300)) } } if (receive != null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala old mode 100644 new mode 100755 index f2e62f5..3a09377 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -29,7 +29,7 @@ import kafka.log.LogConfig import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import kafka.utils.ZkUtils._ import kafka.utils._ -import kafka.utils.Utils._ +import kafka.utils.CoreUtils._ import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala old mode 100644 new mode 100755 index 2f0694b..92fd92d --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -25,9 +25,8 @@ import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateC import kafka.utils.{Logging, ZkUtils, ReplicationUtils} import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener} import org.I0Itec.zkclient.exception.ZkNodeExistsException -import org.apache.log4j.Logger import kafka.controller.Callbacks.CallbackBuilder -import kafka.utils.Utils._ +import kafka.utils.CoreUtils._ /** * This class represents the state machine for partitions. It defines the states that a partition can be in, and http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala old mode 100644 new mode 100755 index 3e87e1d..e5c56e0 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -24,7 +24,7 @@ import kafka.utils.{ZkUtils, ReplicationUtils, Logging} import org.I0Itec.zkclient.IZkChildListener import org.apache.log4j.Logger import kafka.controller.Callbacks._ -import kafka.utils.Utils._ +import kafka.utils.CoreUtils._ /** * This class represents the state machine for replicas. It defines the states that a replica can be in, and http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/controller/TopicDeletionManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala old mode 100644 new mode 100755 index e56f22d..64ecb49 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -18,7 +18,7 @@ package kafka.controller import collection.mutable import kafka.utils.{ShutdownableThread, Logging, ZkUtils} -import kafka.utils.Utils._ +import kafka.utils.CoreUtils._ import collection.Set import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.api.{StopReplicaResponse, RequestOrResponse} http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/FileMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala old mode 100644 new mode 100755 index b2652dd..2522604 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -66,12 +66,12 @@ class FileMessageSet private[kafka](@volatile var file: File, * Create a file message set with no slicing */ def this(file: File) = - this(file, Utils.openChannel(file, mutable = true)) + this(file, CoreUtils.openChannel(file, mutable = true)) /** * Create a file message set with mutable option */ - def this(file: File, mutable: Boolean) = this(file, Utils.openChannel(file, mutable)) + def this(file: File, mutable: Boolean) = this(file, CoreUtils.openChannel(file, mutable)) /** * Create a slice view of the file message set that begins and ends at the given byte offsets @@ -231,7 +231,7 @@ class FileMessageSet private[kafka](@volatile var file: File, * @return True iff this message set was deleted. */ def delete(): Boolean = { - Utils.swallow(channel.close()) + CoreUtils.swallow(channel.close()) file.delete() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala old mode 100644 new mode 100755 index a0745be..5563f2d --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -136,12 +136,12 @@ class Log(val dir: File, // we crashed in the middle of a swap operation, to recover: // if a log, swap it in and delete the .index file // if an index just delete it, it will be rebuilt - val baseName = new File(Utils.replaceSuffix(file.getPath, SwapFileSuffix, "")) + val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) if(baseName.getPath.endsWith(IndexFileSuffix)) { file.delete() } else if(baseName.getPath.endsWith(LogFileSuffix)){ // delete the index - val index = new File(Utils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix)) + val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix)) index.delete() // complete the swap operation val renamed = file.renameTo(baseName) @@ -627,7 +627,7 @@ class Log(val dir: File, removeLogMetrics() logSegments.foreach(_.delete()) segments.clear() - Utils.rm(dir) + CoreUtils.rm(dir) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/LogCleanerManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala old mode 100644 new mode 100755 index 351824b..f6795d3 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -24,7 +24,7 @@ import kafka.utils.{Logging, Pool} import kafka.server.OffsetCheckpoint import collection.mutable import java.util.concurrent.locks.ReentrantLock -import kafka.utils.Utils._ +import kafka.utils.CoreUtils._ import java.util.concurrent.TimeUnit import kafka.common.{LogCleaningAbortedException, TopicAndPartition} http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/LogConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala old mode 100644 new mode 100755 index 8b67aee..558c703 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -197,7 +197,7 @@ object LogConfig { * Parse the given properties instance into a LogConfig object */ def fromProps(props: Properties): LogConfig = { - import kafka.utils.Utils.evaluateDefaults + import kafka.utils.CoreUtils.evaluateDefaults val parsed = configDef.parse(evaluateDefaults(props)) new LogConfig(segmentSize = parsed.get(SegmentBytesProp).asInstanceOf[Int], segmentMs = parsed.get(SegmentMsProp).asInstanceOf[Long], http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala old mode 100644 new mode 100755 index 47d250a..a7a9b85 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -132,7 +132,7 @@ class LogManager(val logDirs: Array[File], dirContent <- Option(dir.listFiles).toList logDir <- dirContent if logDir.isDirectory } yield { - Utils.runnable { + CoreUtils.runnable { debug("Loading log '" + logDir.getName + "'") val topicPartition = Log.parseTopicPartitionName(logDir) @@ -210,7 +210,7 @@ class LogManager(val logDirs: Array[File], // stop the cleaner first if (cleaner != null) { - Utils.swallow(cleaner.shutdown()) + CoreUtils.swallow(cleaner.shutdown()) } // close logs in each dir @@ -223,7 +223,7 @@ class LogManager(val logDirs: Array[File], val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values val jobsForDir = logsInDir map { log => - Utils.runnable { + CoreUtils.runnable { // flush the log to ensure latest possible recovery point log.flush() log.close() @@ -244,7 +244,7 @@ class LogManager(val logDirs: Array[File], // mark that the shutdown was clean by creating marker file debug("Writing clean shutdown marker at " + dir) - Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile()) + CoreUtils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile()) } } catch { case e: ExecutionException => { http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala old mode 100644 new mode 100755 index 0256764..ed03953 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -254,10 +254,10 @@ class LogSegment(val log: FileMessageSet, * Change the suffix for the index and log file for this log segment */ def changeFileSuffixes(oldSuffix: String, newSuffix: String) { - val logRenamed = log.renameTo(new File(Utils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) + val logRenamed = log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) if(!logRenamed) throw new KafkaStorageException("Failed to change the log file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset)) - val indexRenamed = index.renameTo(new File(Utils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) + val indexRenamed = index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) if(!indexRenamed) throw new KafkaStorageException("Failed to change the index file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset)) } @@ -266,8 +266,8 @@ class LogSegment(val log: FileMessageSet, * Close this log segment */ def close() { - Utils.swallow(index.close) - Utils.swallow(log.close) + CoreUtils.swallow(index.close) + CoreUtils.swallow(log.close) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/OffsetIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala old mode 100644 new mode 100755 index ca82c04..4ab22de --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -24,7 +24,7 @@ import java.nio.channels._ import java.util.concurrent.locks._ import java.util.concurrent.atomic._ import kafka.utils._ -import kafka.utils.Utils.inLock +import kafka.utils.CoreUtils.inLock import kafka.common.InvalidOffsetException /** @@ -81,7 +81,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi idx.position(roundToExactMultiple(idx.limit, 8)) idx } finally { - Utils.swallow(raf.close()) + CoreUtils.swallow(raf.close()) } } @@ -287,7 +287,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi this.maxEntries = this.mmap.limit / 8 this.mmap.position(position) } finally { - Utils.swallow(raf.close()) + CoreUtils.swallow(raf.close()) } } } @@ -319,7 +319,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi def delete(): Boolean = { info("Deleting index " + this.file.getAbsolutePath) if(Os.isWindows) - Utils.swallow(forceUnmap(this.mmap)) + CoreUtils.swallow(forceUnmap(this.mmap)) this.file.delete() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/log/OffsetMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala old mode 100644 new mode 100755 index 42cdfbb..2940e47 --- a/core/src/main/scala/kafka/log/OffsetMap.scala +++ b/core/src/main/scala/kafka/log/OffsetMap.scala @@ -21,6 +21,7 @@ import java.util.Arrays import java.security.MessageDigest import java.nio.ByteBuffer import kafka.utils._ +import org.apache.kafka.common.utils.Utils trait OffsetMap { def slots: Int @@ -158,7 +159,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend * @return The byte offset in the buffer at which the ith probing for the given hash would reside */ private def positionOf(hash: Array[Byte], attempt: Int): Int = { - val probe = Utils.readInt(hash, math.min(attempt, hashSize - 4)) + math.max(0, attempt - hashSize + 4) + val probe = CoreUtils.readInt(hash, math.min(attempt, hashSize - 4)) + math.max(0, attempt - hashSize + 4) val slot = Utils.abs(probe) % slots this.probes += 1 slot * bytesPerEntry http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/message/Message.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala old mode 100644 new mode 100755 index 7ba280f..999b115 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -20,6 +20,7 @@ package kafka.message import java.nio._ import scala.math._ import kafka.utils._ +import org.apache.kafka.common.utils.Utils /** * Constants related to messages @@ -146,7 +147,7 @@ class Message(val buffer: ByteBuffer) { * Compute the checksum of the message from the message contents */ def computeChecksum(): Long = - Utils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset) + CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset) /** * Retrieve the previously computed CRC for this message http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/message/MessageAndMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/MessageAndMetadata.scala b/core/src/main/scala/kafka/message/MessageAndMetadata.scala old mode 100644 new mode 100755 index d693abc..26b75c8 --- a/core/src/main/scala/kafka/message/MessageAndMetadata.scala +++ b/core/src/main/scala/kafka/message/MessageAndMetadata.scala @@ -18,7 +18,7 @@ package kafka.message import kafka.serializer.Decoder -import kafka.utils.Utils +import org.apache.kafka.common.utils.Utils case class MessageAndMetadata[K, V](topic: String, partition: Int, private val rawMessage: Message, offset: Long, http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/message/MessageWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/MessageWriter.scala b/core/src/main/scala/kafka/message/MessageWriter.scala old mode 100644 new mode 100755 index 7eb72cb..0c6040e --- a/core/src/main/scala/kafka/message/MessageWriter.scala +++ b/core/src/main/scala/kafka/message/MessageWriter.scala @@ -20,7 +20,7 @@ package kafka.message import java.io.{InputStream, OutputStream} import java.nio.ByteBuffer -import kafka.utils.Crc32 +import org.apache.kafka.common.utils.Crc32 class MessageWriter(segmentSize: Int) extends BufferingOutputStream(segmentSize) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala old mode 100644 new mode 100755 index ea9559f..cc0da9f --- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala +++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala @@ -24,7 +24,7 @@ import com.yammer.metrics.Metrics import java.io.File import com.yammer.metrics.reporting.CsvReporter import java.util.concurrent.TimeUnit -import kafka.utils.{Utils, VerifiableProperties, Logging} +import kafka.utils.{CoreUtils, VerifiableProperties, Logging} private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean @@ -48,7 +48,7 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter if (!initialized) { val metricsConfig = new KafkaMetricsConfig(props) csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics")) - Utils.rm(csvDir) + CoreUtils.rm(csvDir) csvDir.mkdirs() underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir) if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala old mode 100644 new mode 100755 index 84f6208..ad9eb20 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala @@ -20,7 +20,7 @@ package kafka.metrics -import kafka.utils.{VerifiableProperties, Utils} +import kafka.utils.{VerifiableProperties, CoreUtils} class KafkaMetricsConfig(props: VerifiableProperties) { @@ -28,7 +28,7 @@ class KafkaMetricsConfig(props: VerifiableProperties) { * Comma-separated list of reporter types. These classes should be on the * classpath and will be instantiated at run-time. */ - val reporters = Utils.parseCsvList(props.getString("kafka.metrics.reporters", "")) + val reporters = CoreUtils.parseCsvList(props.getString("kafka.metrics.reporters", "")) /** * The metrics polling interval (in seconds). http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala old mode 100644 new mode 100755 index 14e4624..30fd0ea --- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala @@ -20,7 +20,7 @@ package kafka.metrics -import kafka.utils.{Utils, VerifiableProperties} +import kafka.utils.{CoreUtils, VerifiableProperties} import java.util.concurrent.atomic.AtomicBoolean @@ -56,10 +56,10 @@ object KafkaMetricsReporter { val metricsConfig = new KafkaMetricsConfig(verifiableProps) if(metricsConfig.reporters.size > 0) { metricsConfig.reporters.foreach(reporterType => { - val reporter = Utils.createObject[KafkaMetricsReporter](reporterType) + val reporter = CoreUtils.createObject[KafkaMetricsReporter](reporterType) reporter.init(verifiableProps) if (reporter.isInstanceOf[KafkaMetricsReporterMBean]) - Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName) + CoreUtils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName) }) ReporterStarted.set(true) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala old mode 100644 new mode 100755 index a442545..c0d7726 --- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala +++ b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala @@ -51,7 +51,7 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive var read = 0 // have we read the request size yet? if(sizeBuffer.remaining > 0) - read += Utils.read(channel, sizeBuffer) + read += CoreUtils.read(channel, sizeBuffer) // have we allocated the request buffer yet? if(contentBuffer == null && !sizeBuffer.hasRemaining) { sizeBuffer.rewind() @@ -64,7 +64,7 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive } // if we have a buffer read some stuff into it if(contentBuffer != null) { - read = Utils.read(channel, contentBuffer) + read = CoreUtils.read(channel, contentBuffer) // did we get everything? if(!contentBuffer.hasRemaining) { contentBuffer.rewind() http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala old mode 100644 new mode 100755 index 0ad9057..8fbea7b --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -33,6 +33,7 @@ import kafka.common.KafkaException import kafka.metrics.KafkaMetricsGroup import kafka.utils._ import com.yammer.metrics.core.{Gauge, Meter} +import org.apache.kafka.common.utils.Utils /** * An NIO socket server. The threading model is http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala old mode 100644 new mode 100755 index 6a3b02e..e6b100e --- a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala +++ b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala @@ -19,6 +19,7 @@ package kafka.producer import kafka.utils._ +import org.apache.kafka.common.utils.Utils class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner { def partition(key: Any, numPartitions: Int): Int = { http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/producer/DefaultPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala old mode 100644 new mode 100755 index 3afb22e..1141ed1 --- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala +++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala @@ -19,6 +19,7 @@ package kafka.producer import kafka.utils._ +import org.apache.kafka.common.utils.Utils class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner { private val random = new java.util.Random http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/producer/Producer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala old mode 100644 new mode 100755 index e38d2fa..4be06c8 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -58,9 +58,9 @@ class Producer[K,V](val config: ProducerConfig, def this(config: ProducerConfig) = this(config, new DefaultEventHandler[K,V](config, - Utils.createObject[Partitioner](config.partitionerClass, config.props), - Utils.createObject[Encoder[V]](config.serializerClass, config.props), - Utils.createObject[Encoder[K]](config.keySerializerClass, config.props), + CoreUtils.createObject[Partitioner](config.partitionerClass, config.props), + CoreUtils.createObject[Encoder[V]](config.serializerClass, config.props), + CoreUtils.createObject[Encoder[K]](config.keySerializerClass, config.props), new ProducerPool(config))) /** http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/producer/ProducerConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala old mode 100644 new mode 100755 index 3cdf23d..08a4e51 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -19,8 +19,8 @@ package kafka.producer import async.AsyncProducerConfig import java.util.Properties -import kafka.utils.{Utils, VerifiableProperties} -import kafka.message.{CompressionCodec, NoCompressionCodec} +import kafka.utils.{CoreUtils, VerifiableProperties} +import kafka.message.NoCompressionCodec import kafka.common.{InvalidConfigException, Config} object ProducerConfig extends Config { @@ -90,7 +90,7 @@ class ProducerConfig private (val props: VerifiableProperties) * * If the compression codec is NoCompressionCodec, compression is disabled for all topics */ - val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null)) + val compressedTopics = CoreUtils.parseCsvList(props.getString("compressed.topics", null)) /** The leader may be unavailable transiently, which can fail the sending of a message. * This property specifies the number of retries when such failures occur. http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala old mode 100644 new mode 100755 index 821901e..a6179a9 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -21,12 +21,13 @@ import kafka.common._ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} import kafka.producer._ import kafka.serializer.Encoder -import kafka.utils.{Utils, Logging, SystemTime} +import kafka.utils.{CoreUtils, Logging, SystemTime} import scala.util.Random import scala.collection.{Seq, Map} import scala.collection.mutable.{ArrayBuffer, HashMap, Set} import java.util.concurrent.atomic._ import kafka.api.{TopicMetadata, ProducerRequest} +import org.apache.kafka.common.utils.Utils class DefaultEventHandler[K,V](config: ProducerConfig, private val partitioner: Partitioner, @@ -64,7 +65,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) if (topicMetadataRefreshInterval >= 0 && SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { - Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) + CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) sendPartitionPerTopicCache.clear() topicMetadataToRefresh.clear lastTopicMetadataRefreshTime = SystemTime.milliseconds @@ -75,7 +76,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, // back off and update the topic metadata cache before attempting another send operation Thread.sleep(config.retryBackoffMs) // get topics of the outstanding produce requests and refresh metadata for those - Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) + CoreUtils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) sendPartitionPerTopicCache.clear() remainingRetries -= 1 producerStats.resendRate.mark() @@ -262,7 +263,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, if (logger.isTraceEnabled) { val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError) successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => - trace("Successfully sent message: %s".format(if(message.message.isNull) null else Utils.readString(message.message.payload))))) + trace("Successfully sent message: %s".format(if(message.message.isNull) null else message.message.toString())))) } val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1) http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/AbstractFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala old mode 100644 new mode 100755 index 94aa952..f8f9331 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -20,11 +20,12 @@ package kafka.server import scala.collection.mutable import scala.collection.Set import scala.collection.Map -import kafka.utils.{Utils, Logging} +import kafka.utils.Logging import kafka.cluster.BrokerEndpoint import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition import com.yammer.metrics.core.Gauge +import org.apache.kafka.common.utils.Utils abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1) extends Logging with KafkaMetricsGroup { @@ -128,4 +129,4 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri case class BrokerAndFetcherId(broker: BrokerEndpoint, fetcherId: Int) -case class BrokerAndInitialOffset(broker: BrokerEndpoint, initOffset: Long) \ No newline at end of file +case class BrokerAndInitialOffset(broker: BrokerEndpoint, initOffset: Long) http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala old mode 100644 new mode 100755 index 93f67d5..1e26de2 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -22,7 +22,7 @@ import kafka.utils.{Pool, ShutdownableThread} import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} -import kafka.utils.Utils.inLock +import kafka.utils.CoreUtils.inLock import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset} import kafka.metrics.KafkaMetricsGroup http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala old mode 100644 new mode 100755 index 0e542ff..01e8f72 --- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala +++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala @@ -20,7 +20,7 @@ package kafka.server import java.io._ import java.util.Properties import kafka.utils._ - +import org.apache.kafka.common.utils.Utils case class BrokerMetadata(brokerId: Int) http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala old mode 100644 new mode 100755 index cf1a5a6..69b772c --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -23,7 +23,7 @@ import kafka.api.ApiVersion import kafka.cluster.EndPoint import kafka.consumer.ConsumerConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} -import kafka.utils.Utils +import kafka.utils.CoreUtils import org.apache.kafka.common.config.ConfigDef import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection.{immutable, JavaConversions, Map} @@ -492,7 +492,7 @@ object KafkaConfig { * Parse the given properties instance into a KafkaConfig object */ def fromProps(props: Properties): KafkaConfig = { - import kafka.utils.Utils.evaluateDefaults + import kafka.utils.CoreUtils.evaluateDefaults val parsed = configDef.parse(evaluateDefaults(props)) new KafkaConfig( /** ********* Zookeeper Configuration ***********/ @@ -755,7 +755,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val advertisedHostName: String = _advertisedHostName.getOrElse(hostName) val advertisedPort: Int = _advertisedPort.getOrElse(port) val advertisedListeners = getAdvertisedListeners() - val logDirs = Utils.parseCsvList(_logDirs.getOrElse(_logDir)) + val logDirs = CoreUtils.parseCsvList(_logDirs.getOrElse(_logDir)) val logRollTimeMillis = _logRollTimeMillis.getOrElse(60 * 60 * 1000L * logRollTimeHours) val logRollTimeJitterMillis = _logRollTimeJitterMillis.getOrElse(60 * 60 * 1000L * logRollTimeJitterHours) @@ -780,7 +780,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ private def getMap(propName: String, propValue: String): Map[String, String] = { try { - Utils.parseCsvMap(propValue) + CoreUtils.parseCsvMap(propValue) } catch { case e: Exception => throw new IllegalArgumentException("Error parsing configuration property '%s': %s".format(propName, e.getMessage)) } @@ -789,7 +789,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ private def validateUniquePortAndProtocol(listeners: String) { val endpoints = try { - val listenerList = Utils.parseCsvList(listeners) + val listenerList = CoreUtils.parseCsvList(listeners) listenerList.map(listener => EndPoint.createEndPoint(listener)) } catch { case e: Exception => throw new IllegalArgumentException("Error creating broker listeners from '%s': %s".format(listeners, e.getMessage)) @@ -806,9 +806,9 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ private def getListeners(): immutable.Map[SecurityProtocol, EndPoint] = { if (_listeners.isDefined) { validateUniquePortAndProtocol(_listeners.get) - Utils.listenerListToEndPoints(_listeners.get) + CoreUtils.listenerListToEndPoints(_listeners.get) } else { - Utils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port) + CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port) } } @@ -818,9 +818,9 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ private def getAdvertisedListeners(): immutable.Map[SecurityProtocol, EndPoint] = { if (_advertisedListeners.isDefined) { validateUniquePortAndProtocol(_advertisedListeners.get) - Utils.listenerListToEndPoints(_advertisedListeners.get) + CoreUtils.listenerListToEndPoints(_advertisedListeners.get) } else if (_advertisedHostName.isDefined || _advertisedPort.isDefined ) { - Utils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort) + CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort) } else { getListeners() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/KafkaRequestHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala old mode 100644 new mode 100755 index 4d86bdf..a1558af --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -22,6 +22,7 @@ import kafka.utils._ import kafka.metrics.KafkaMetricsGroup import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Meter +import org.apache.kafka.common.utils.Utils /** * A thread that answers kafka requests. http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala old mode 100644 new mode 100755 index 9df2cf4..c63f4ba --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -319,27 +319,27 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val canShutdown = isShuttingDown.compareAndSet(false, true) if (canShutdown && shutdownLatch.getCount > 0) { - Utils.swallow(controlledShutdown()) + CoreUtils.swallow(controlledShutdown()) brokerState.newState(BrokerShuttingDown) if(socketServer != null) - Utils.swallow(socketServer.shutdown()) + CoreUtils.swallow(socketServer.shutdown()) if(requestHandlerPool != null) - Utils.swallow(requestHandlerPool.shutdown()) + CoreUtils.swallow(requestHandlerPool.shutdown()) if(offsetManager != null) offsetManager.shutdown() - Utils.swallow(kafkaScheduler.shutdown()) + CoreUtils.swallow(kafkaScheduler.shutdown()) if(apis != null) - Utils.swallow(apis.close()) + CoreUtils.swallow(apis.close()) if(replicaManager != null) - Utils.swallow(replicaManager.shutdown()) + CoreUtils.swallow(replicaManager.shutdown()) if(logManager != null) - Utils.swallow(logManager.shutdown()) + CoreUtils.swallow(logManager.shutdown()) if(consumerCoordinator != null) - Utils.swallow(consumerCoordinator.shutdown()) + CoreUtils.swallow(consumerCoordinator.shutdown()) if(kafkaController != null) - Utils.swallow(kafkaController.shutdown()) + CoreUtils.swallow(kafkaController.shutdown()) if(zkClient != null) - Utils.swallow(zkClient.close()) + CoreUtils.swallow(zkClient.close()) brokerState.newState(NotRunning) http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/MetadataCache.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala old mode 100644 new mode 100755 index 008f02b..4460b42 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -26,7 +26,7 @@ import kafka.controller.KafkaController.StateChangeLogger import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection.{Seq, Set, mutable} import kafka.utils.Logging -import kafka.utils.Utils._ +import kafka.utils.CoreUtils._ import java.util.concurrent.locks.ReentrantReadWriteLock http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala old mode 100644 new mode 100755 index 395b1db..420e2c3 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.types.{Struct, Schema, Field} import org.apache.kafka.common.protocol.types.Type.STRING import org.apache.kafka.common.protocol.types.Type.INT32 import org.apache.kafka.common.protocol.types.Type.INT64 +import org.apache.kafka.common.utils.Utils import kafka.utils._ import kafka.common._ http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala old mode 100644 new mode 100755 index a75818a..a5c5fb3 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -17,7 +17,7 @@ package kafka.server import kafka.utils.ZkUtils._ -import kafka.utils.Utils._ +import kafka.utils.CoreUtils._ import kafka.utils.{Json, SystemTime, Logging} import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.I0Itec.zkclient.IZkDataListener http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala old mode 100644 new mode 100755 index 910691e..9d9b781 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -28,6 +28,7 @@ import kafka.serializer._ import kafka.utils._ import kafka.metrics.KafkaMetricsReporter import kafka.consumer.{Blacklist,Whitelist,ConsumerConfig,Consumer} +import org.apache.kafka.common.utils.Utils /** * Consumer that dumps messages out to standard out. http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/tools/DumpLogSegments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala old mode 100644 new mode 100755 index b7a3630..fc11a2a --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -25,6 +25,7 @@ import collection.mutable import joptsimple.OptionParser import kafka.serializer.Decoder import kafka.utils.VerifiableProperties +import org.apache.kafka.common.utils.Utils object DumpLogSegments { @@ -64,8 +65,8 @@ object DumpLogSegments { val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue() val isDeepIteration = if(options.has(deepIterationOpt)) true else false - val valueDecoder: Decoder[_] = Utils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt), new VerifiableProperties) - val keyDecoder: Decoder[_] = Utils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt), new VerifiableProperties) + val valueDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt), new VerifiableProperties) + val keyDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt), new VerifiableProperties) val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, Long)]] val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Long, Long)]] http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/tools/KafkaMigrationTool.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java old mode 100644 new mode 100755 index 026d819..f19df0c --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -21,7 +21,6 @@ import joptsimple.*; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; -import kafka.utils.Utils; import java.io.File; import java.io.FileInputStream; @@ -39,6 +38,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.kafka.common.utils.Utils; /** http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala old mode 100644 new mode 100755 index ec07743..9548521 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -24,17 +24,19 @@ import java.util.{Collections, Properties} import com.yammer.metrics.core.Gauge import joptsimple.OptionParser -import kafka.consumer.{KafkaStream, Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector} +import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector} import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.message.MessageAndMetadata import kafka.metrics.KafkaMetricsGroup import kafka.serializer.DefaultDecoder -import kafka.utils.{CommandLineUtils, Logging, Utils} +import kafka.utils.{CommandLineUtils, Logging, CoreUtils} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} +import org.apache.kafka.common.utils.Utils import scala.collection.JavaConversions._ + /** * The mirror maker has the following architecture: * - There are N mirror maker thread shares one ZookeeperConsumerConnector and each owns a Kafka stream. @@ -207,9 +209,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val customRebalanceListener = { if (customRebalanceListenerClass != null) { if (rebalanceListenerArgs != null) - Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs)) + Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs)) else - Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)) + Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)) } else { None } @@ -237,9 +239,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { messageHandler = { if (customMessageHandlerClass != null) { if (messageHandlerArgs != null) - Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs) + CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs) else - Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass) + CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass) } else { defaultMirrorMakerMessageHandler } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala old mode 100644 new mode 100755 index 7379fe3..9a6804c --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -25,6 +25,7 @@ import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} import kafka.cluster.BrokerEndpoint import scala.collection.JavaConversions._ import kafka.common.TopicAndPartition +import org.apache.kafka.common.utils.Utils /** * Command line program to dump out messages to standard out using the simple consumer http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala old mode 100644 new mode 100755 index b34b8c7..8b523e7 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -22,7 +22,7 @@ import scala.util.matching.Regex import collection.mutable import java.util.Date import java.text.SimpleDateFormat -import kafka.utils.{Utils, Logging, CommandLineUtils} +import kafka.utils.{CoreUtils, Logging, CommandLineUtils} import kafka.common.Topic import java.io.{BufferedOutputStream, OutputStream} @@ -115,7 +115,7 @@ object StateChangeLogMerger extends Logging { } if (options.has(partitionsOpt)) { partitions = options.valueOf(partitionsOpt).split(",").toList.map(_.toInt) - val duplicatePartitions = Utils.duplicates(partitions) + val duplicatePartitions = CoreUtils.duplicates(partitions) if (duplicatePartitions.nonEmpty) { System.err.println("The list of partitions contains repeated entries: %s".format(duplicatePartitions.mkString(","))) System.exit(1) http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala deleted file mode 100644 index 48cff20..0000000 --- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer} - -import kafka.consumer._ - -import java.util.Properties -import java.util.Arrays - -object TestEndToEndLatency { - def main(args: Array[String]) { - if (args.length != 6) { - System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks") - System.exit(1) - } - - val brokerList = args(0) - val zkConnect = args(1) - val topic = args(2) - val numMessages = args(3).toInt - val consumerFetchMaxWait = args(4).toInt - val producerAcks = args(5).toInt - - val consumerProps = new Properties() - consumerProps.put("group.id", topic) - consumerProps.put("auto.commit.enable", "false") - consumerProps.put("auto.offset.reset", "largest") - consumerProps.put("zookeeper.connect", zkConnect) - consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString) - consumerProps.put("socket.timeout.ms", 1201000.toString) - - val config = new ConsumerConfig(consumerProps) - val connector = Consumer.create(config) - val stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head - val iter = stream.iterator - - val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") - producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") - producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString) - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) - - // make sure the consumer fetcher has started before sending data since otherwise - // the consumption from the tail will skip the first message and hence be blocked - Thread.sleep(5000) - - val message = "hello there beautiful".getBytes - var totalTime = 0.0 - val latencies = new Array[Long](numMessages) - for (i <- 0 until numMessages) { - val begin = System.nanoTime - producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message)) - val received = iter.next - val elapsed = System.nanoTime - begin - // poor man's progress bar - if (i % 1000 == 0) - println(i + "\t" + elapsed / 1000.0 / 1000.0) - totalTime += elapsed - latencies(i) = (elapsed / 1000 / 1000) - } - println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0)) - Arrays.sort(latencies) - val p50 = latencies((latencies.length * 0.5).toInt) - val p99 = latencies((latencies.length * 0.99).toInt) - val p999 = latencies((latencies.length * 0.999).toInt) - println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999)) - producer.close() - connector.commitOffsets(true) - connector.shutdown() - System.exit(0) - } -} \ No newline at end of file