Repository: kafka Updated Branches: refs/heads/trunk 53f31432a -> 9c23d9355
http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/kafka/tools/TestLogCleaning.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala new file mode 100755 index 0000000..8445894 --- /dev/null +++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import joptsimple.OptionParser +import java.util.Properties +import java.util.Random +import java.io._ +import kafka.consumer._ +import kafka.serializer._ +import kafka.utils._ +import kafka.log.FileMessageSet +import kafka.log.Log +import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig} + +/** + * This is a torture test that runs against an existing broker. Here is how it works: + * + * It produces a series of specially formatted messages to one or more partitions. Each message it produces + * it logs out to a text file. The messages have a limited set of keys, so there is duplication in the key space. + * + * The broker will clean its log as the test runs. + * + * When the specified number of messages have been produced we create a consumer and consume all the messages in the topic + * and write that out to another text file. + * + * Using a stable unix sort we sort both the producer log of what was sent and the consumer log of what was retrieved by the message key. + * Then we compare the final message in both logs for each key. If this final message is not the same for all keys we + * print an error and exit with exit code 1, otherwise we print the size reduction and exit with exit code 0. + */ +object TestLogCleaning { + + def main(args: Array[String]) { + val parser = new OptionParser + val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Long]) + .defaultsTo(Long.MaxValue) + val numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(5) + val brokerOpt = parser.accepts("broker", "Url to connect to.") + .withRequiredArg + .describedAs("url") + .ofType(classOf[String]) + val topicsOpt = parser.accepts("topics", "The number of topics to test.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val percentDeletesOpt = parser.accepts("percent-deletes", "The percentage of updates that are deletes.") + .withRequiredArg + .describedAs("percent") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val zkConnectOpt = parser.accepts("zk", "Zk url.") + .withRequiredArg + .describedAs("url") + .ofType(classOf[String]) + val sleepSecsOpt = parser.accepts("sleep", "Time to sleep between production and consumption.") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val dumpOpt = parser.accepts("dump", "Dump the message contents of a topic partition that contains test data from this test to standard out.") + .withRequiredArg + .describedAs("directory") + .ofType(classOf[String]) + + val options = parser.parse(args:_*) + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "An integration test for log cleaning.") + + if(options.has(dumpOpt)) { + dumpLog(new File(options.valueOf(dumpOpt))) + System.exit(0) + } + + CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt, numMessagesOpt) + + // parse options + val messages = options.valueOf(numMessagesOpt).longValue + val percentDeletes = options.valueOf(percentDeletesOpt).intValue + val dups = options.valueOf(numDupsOpt).intValue + val brokerUrl = options.valueOf(brokerOpt) + val topicCount = options.valueOf(topicsOpt).intValue + val zkUrl = options.valueOf(zkConnectOpt) + val sleepSecs = options.valueOf(sleepSecsOpt).intValue + + val testId = new Random().nextInt(Int.MaxValue) + val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray + + println("Producing %d messages...".format(messages)) + val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, percentDeletes) + println("Sleeping for %d seconds...".format(sleepSecs)) + Thread.sleep(sleepSecs * 1000) + println("Consuming messages...") + val consumedDataFile = consumeMessages(zkUrl, topics) + + val producedLines = lineCount(producedDataFile) + val consumedLines = lineCount(consumedDataFile) + val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble + println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction)) + + println("De-duplicating and validating output files...") + validateOutput(producedDataFile, consumedDataFile) + producedDataFile.delete() + consumedDataFile.delete() + } + + def dumpLog(dir: File) { + require(dir.exists, "Non-existent directory: " + dir.getAbsolutePath) + for(file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) { + val ms = new FileMessageSet(new File(dir, file)) + for(entry <- ms) { + val key = TestUtils.readString(entry.message.key) + val content = + if(entry.message.isNull) + null + else + TestUtils.readString(entry.message.payload) + println("offset = %s, key = %s, content = %s".format(entry.offset, key, content)) + } + } + } + + def lineCount(file: File): Int = io.Source.fromFile(file).getLines.size + + def validateOutput(producedDataFile: File, consumedDataFile: File) { + val producedReader = externalSort(producedDataFile) + val consumedReader = externalSort(consumedDataFile) + val produced = valuesIterator(producedReader) + val consumed = valuesIterator(consumedReader) + val producedDedupedFile = new File(producedDataFile.getAbsolutePath + ".deduped") + val producedDeduped = new BufferedWriter(new FileWriter(producedDedupedFile), 1024*1024) + val consumedDedupedFile = new File(consumedDataFile.getAbsolutePath + ".deduped") + val consumedDeduped = new BufferedWriter(new FileWriter(consumedDedupedFile), 1024*1024) + var total = 0 + var mismatched = 0 + while(produced.hasNext && consumed.hasNext) { + val p = produced.next() + producedDeduped.write(p.toString) + producedDeduped.newLine() + val c = consumed.next() + consumedDeduped.write(c.toString) + consumedDeduped.newLine() + if(p != c) + mismatched += 1 + total += 1 + } + producedDeduped.close() + consumedDeduped.close() + require(!produced.hasNext, "Additional values produced not found in consumer log.") + require(!consumed.hasNext, "Additional values consumed not found in producer log.") + println("Validated " + total + " values, " + mismatched + " mismatches.") + require(mismatched == 0, "Non-zero number of row mismatches.") + // if all the checks worked out we can delete the deduped files + producedDedupedFile.delete() + consumedDedupedFile.delete() + } + + def valuesIterator(reader: BufferedReader) = { + new IteratorTemplate[TestRecord] { + def makeNext(): TestRecord = { + var next = readNext(reader) + while(next != null && next.delete) + next = readNext(reader) + if(next == null) + allDone() + else + next + } + } + } + + def readNext(reader: BufferedReader): TestRecord = { + var line = reader.readLine() + if(line == null) + return null + var curr = new TestRecord(line) + while(true) { + line = peekLine(reader) + if(line == null) + return curr + val next = new TestRecord(line) + if(next == null || next.topicAndKey != curr.topicAndKey) + return curr + curr = next + reader.readLine() + } + null + } + + def peekLine(reader: BufferedReader) = { + reader.mark(4096) + val line = reader.readLine + reader.reset() + line + } + + def externalSort(file: File): BufferedReader = { + val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", "--temporary-directory=" + System.getProperty("java.io.tmpdir"), file.getAbsolutePath) + val process = builder.start() + new Thread() { + override def run() { + val exitCode = process.waitFor() + if(exitCode != 0) { + System.err.println("Process exited abnormally.") + while(process.getErrorStream.available > 0) { + System.err.write(process.getErrorStream().read()) + } + } + } + }.start() + new BufferedReader(new InputStreamReader(process.getInputStream()), 10*1024*1024) + } + + def produceMessages(brokerUrl: String, + topics: Array[String], + messages: Long, + dups: Int, + percentDeletes: Int): File = { + val producerProps = new Properties + producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") + producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) + val rand = new Random(1) + val keyCount = (messages / dups).toInt + val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt") + println("Logging produce requests to " + producedFile.getAbsolutePath) + val producedWriter = new BufferedWriter(new FileWriter(producedFile), 1024*1024) + for(i <- 0L until (messages * topics.length)) { + val topic = topics((i % topics.length).toInt) + val key = rand.nextInt(keyCount) + val delete = i % 100 < percentDeletes + val msg = + if(delete) + new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), null) + else + new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), i.toString.getBytes()) + producer.send(msg) + producedWriter.write(TestRecord(topic, key, i, delete).toString) + producedWriter.newLine() + } + producedWriter.close() + producer.close() + producedFile + } + + def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = { + val consumerProps = new Properties + consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue)) + consumerProps.setProperty("zookeeper.connect", zkUrl) + consumerProps.setProperty("consumer.timeout.ms", (20*1000).toString) + consumerProps.setProperty("auto.offset.reset", "smallest") + new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps)) + } + + def consumeMessages(zkUrl: String, topics: Array[String]): File = { + val connector = makeConsumer(zkUrl, topics) + val streams = connector.createMessageStreams(topics.map(topic => (topic, 1)).toMap, new StringDecoder, new StringDecoder) + val consumedFile = File.createTempFile("kafka-log-cleaner-consumed-", ".txt") + println("Logging consumed messages to " + consumedFile.getAbsolutePath) + val consumedWriter = new BufferedWriter(new FileWriter(consumedFile)) + for(topic <- topics) { + val stream = streams(topic).head + try { + for(item <- stream) { + val delete = item.message == null + val value = if(delete) -1L else item.message.toLong + consumedWriter.write(TestRecord(topic, item.key.toInt, value, delete).toString) + consumedWriter.newLine() + } + } catch { + case e: ConsumerTimeoutException => + } + } + consumedWriter.close() + connector.shutdown() + consumedFile + } + +} + +case class TestRecord(val topic: String, val key: Int, val value: Long, val delete: Boolean) { + def this(pieces: Array[String]) = this(pieces(0), pieces(1).toInt, pieces(2).toLong, pieces(3) == "d") + def this(line: String) = this(line.split("\t")) + override def toString() = topic + "\t" + key + "\t" + value + "\t" + (if(delete) "d" else "u") + def topicAndKey = topic + key +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/other/kafka/DeleteZKPath.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/DeleteZKPath.scala b/core/src/test/scala/other/kafka/DeleteZKPath.scala old mode 100644 new mode 100755 index 2554503..33c3ef8 --- a/core/src/test/scala/other/kafka/DeleteZKPath.scala +++ b/core/src/test/scala/other/kafka/DeleteZKPath.scala @@ -18,8 +18,9 @@ package kafka import consumer.ConsumerConfig -import utils.{ZKStringSerializer, ZkUtils, Utils} +import utils.{ZKStringSerializer, ZkUtils} import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.common.utils.Utils object DeleteZKPath { def main(args: Array[String]) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/other/kafka/StressTestLog.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala old mode 100644 new mode 100755 index e19b8b2..c0e248d --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -50,7 +50,7 @@ object StressTestLog { running.set(false) writer.join() reader.join() - Utils.rm(dir) + CoreUtils.rm(dir) } }) http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/other/kafka/TestCrcPerformance.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestCrcPerformance.scala b/core/src/test/scala/other/kafka/TestCrcPerformance.scala old mode 100644 new mode 100755 index 42e3c62..0c1e1ad --- a/core/src/test/scala/other/kafka/TestCrcPerformance.scala +++ b/core/src/test/scala/other/kafka/TestCrcPerformance.scala @@ -18,7 +18,8 @@ package kafka.log import java.util.Random import kafka.message._ -import kafka.utils.{TestUtils, Utils} +import kafka.utils.TestUtils +import org.apache.kafka.common.utils.Utils object TestCrcPerformance { http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala old mode 100644 new mode 100755 index 7211c25..3034c4f --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -193,7 +193,7 @@ object TestLinearWriteSpeed { } class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: ByteBufferMessageSet) extends Writable { - Utils.rm(dir) + CoreUtils.rm(dir) val log = new Log(dir, config, 0L, scheduler, SystemTime) def write(): Int = { log.append(messages, true) @@ -201,7 +201,7 @@ object TestLinearWriteSpeed { } def close() { log.close() - Utils.rm(log.dir) + CoreUtils.rm(log.dir) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala old mode 100644 new mode 100755 index 76b8b24..ab5d16c --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -22,7 +22,7 @@ import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import junit.framework.Assert._ -import kafka.utils.{ZkUtils, Utils, TestUtils} +import kafka.utils.{ZkUtils, CoreUtils, TestUtils} import kafka.cluster.Broker import kafka.client.ClientUtils import kafka.server.{KafkaConfig, KafkaServer} @@ -56,7 +56,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { servers.map(server => server.shutdown()) - servers.map(server => Utils.rm(server.config.logDirs)) + servers.map(server => CoreUtils.rm(server.config.logDirs)) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/admin/AdminTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala old mode 100644 new mode 100755 index 0305f70..cfe38df --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -397,7 +397,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { checkConfig(2*maxMessageSize, 2 * retentionMs) } finally { server.shutdown() - server.config.logDirs.map(Utils.rm(_)) + server.config.logDirs.map(CoreUtils.rm(_)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala old mode 100644 new mode 100755 index bb25467..db5302f --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -81,7 +81,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { assertFalse(iter.hasNext) assertEquals(0, queue.size) // Shutdown command has been consumed. assertEquals(5, receivedMessages.size) - val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload)) + val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => TestUtils.readString(m.message.payload)) assertEquals(unconsumed, receivedMessages) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala old mode 100644 new mode 100755 index f95fb62..447e421 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -21,7 +21,7 @@ import java.util.Arrays import scala.collection.mutable.Buffer import kafka.server._ -import kafka.utils.{TestUtils, Utils} +import kafka.utils.{CoreUtils, TestUtils} import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.common.KafkaException @@ -64,7 +64,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { servers.map(server => server.shutdown()) - servers.map(server => server.config.logDirs.map(Utils.rm(_))) + servers.map(server => server.config.logDirs.map(CoreUtils.rm(_))) super.tearDown } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala old mode 100644 new mode 100755 index f601d31..6a758a7 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -27,7 +27,7 @@ import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite import scala.collection._ import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} -import kafka.utils.{StaticPartitioner, TestUtils, Utils} +import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils} import kafka.serializer.StringEncoder import java.util.Properties @@ -82,7 +82,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with assertTrue(messageSet.iterator.hasNext) val fetchedMessageAndOffset = messageSet.head - assertEquals("test-message", Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8")) + assertEquals("test-message", TestUtils.readString(fetchedMessageAndOffset.message.payload, "UTF-8")) } def testDefaultEncoderProducerAndFetchWithCompression() { @@ -104,7 +104,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with assertTrue(messageSet.iterator.hasNext) val fetchedMessageAndOffset = messageSet.head - assertEquals("test-message", Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8")) + assertEquals("test-message", TestUtils.readString(fetchedMessageAndOffset.message.payload, "UTF-8")) } private def produceAndMultiFetch(producer: Producer[String, String]) { @@ -128,7 +128,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val response = consumer.fetch(request) for((topic, partition) <- topics) { val fetched = response.messageSet(topic, partition) - assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) + assertEquals(messages(topic), fetched.map(messageAndOffset => TestUtils.readString(messageAndOffset.message.payload))) } } @@ -193,7 +193,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val response = consumer.fetch(request) for((topic, partition) <- topics) { val fetched = response.messageSet(topic, partition) - assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) + assertEquals(messages(topic), fetched.map(messageAndOffset => TestUtils.readString(messageAndOffset.message.payload))) } } @@ -258,7 +258,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val response = consumer.fetch(request) for( (topic, partition) <- topics) { val fetched = response.messageSet(topic, partition) - assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) + assertEquals(messages(topic), fetched.map(messageAndOffset => TestUtils.readString(messageAndOffset.message.payload))) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala old mode 100644 new mode 100755 index 130b205..1113619 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -21,7 +21,7 @@ import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import junit.framework.Assert._ -import kafka.utils.{Utils, TestUtils} +import kafka.utils.{CoreUtils, TestUtils} import kafka.server.{KafkaConfig, KafkaServer} class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -41,7 +41,7 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { servers.map(server => server.shutdown()) - servers.map(server => Utils.rm(server.config.logDirs)) + servers.map(server => CoreUtils.rm(server.config.logDirs)) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala old mode 100644 new mode 100755 index 7c87b81..a130089 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -31,7 +31,7 @@ import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException} import kafka.producer.{KeyedMessage, Producer} import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.Utils +import kafka.utils.CoreUtils import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness @@ -79,7 +79,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { servers.map(server => shutdownServer(server)) - servers.map(server => Utils.rm(server.config.logDirs)) + servers.map(server => CoreUtils.rm(server.config.logDirs)) // restore log levels kafkaApisLogger.setLevel(Level.ERROR) http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala old mode 100644 new mode 100755 index fa4a8ad..375555f --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -45,7 +45,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin @After def tearDown() { - Utils.rm(logDir) + CoreUtils.rm(logDir) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/log/CleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala old mode 100644 new mode 100755 index c20e423..9792ed6 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -27,6 +27,7 @@ import kafka.common._ import kafka.utils._ import kafka.message._ import java.util.concurrent.atomic.AtomicLong +import org.apache.kafka.common.utils.Utils /** * Unit tests for the log cleaning logic @@ -40,7 +41,7 @@ class CleanerTest extends JUnitSuite { @After def teardown() { - Utils.rm(dir) + CoreUtils.rm(dir) } /** @@ -123,7 +124,7 @@ class CleanerTest extends JUnitSuite { /* extract all the keys from a log */ def keysInLog(log: Log): Iterable[Int] = - log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => Utils.readString(m.message.key).toInt)) + log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => TestUtils.readString(m.message.key).toInt)) def unkeyedMessageCountInLog(log: Log) = log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => !m.message.hasKey)).sum http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala old mode 100644 new mode 100755 index 07acd46..3b5aa9d --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -82,8 +82,8 @@ class LogCleanerIntegrationTest extends JUnitSuite { def readFromLog(log: Log): Iterable[(Int, Int)] = { for(segment <- log.logSegments; message <- segment.log) yield { - val key = Utils.readString(message.message.key).toInt - val value = Utils.readString(message.message.payload).toInt + val key = TestUtils.readString(message.message.key).toInt + val value = TestUtils.readString(message.message.payload).toInt key -> value } } @@ -99,7 +99,7 @@ class LogCleanerIntegrationTest extends JUnitSuite { @After def teardown() { - Utils.rm(logDir) + CoreUtils.rm(logDir) } /* create a cleaner instance and logs with the given parameters */ http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/log/LogManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala old mode 100644 new mode 100755 index 90cd530..0a26f5f --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -47,8 +47,8 @@ class LogManagerTest extends JUnit3Suite { override def tearDown() { if(logManager != null) logManager.shutdown() - Utils.rm(logDir) - logManager.logDirs.map(Utils.rm(_)) + CoreUtils.rm(logDir) + logManager.logDirs.map(CoreUtils.rm(_)) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala old mode 100644 new mode 100755 index bd9a409..069aa02 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -43,7 +43,7 @@ class LogTest extends JUnitSuite { @After def tearDown() { - Utils.rm(logDir) + CoreUtils.rm(logDir) } def createEmptyLogs(dir: File, offsets: Int*) { @@ -714,7 +714,7 @@ class LogTest extends JUnitSuite { log = new Log(logDir, config, recoveryPoint, time.scheduler, time) assertEquals(numMessages, log.logEndOffset) assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList)) - Utils.rm(logDir) + CoreUtils.rm(logDir) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala old mode 100644 new mode 100755 index 18361c1..41366a1 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -19,7 +19,7 @@ package kafka.log4j import kafka.consumer.SimpleConsumer import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{TestUtils, Utils, Logging} +import kafka.utils.{TestUtils, CoreUtils, Logging} import kafka.api.FetchRequestBuilder import kafka.producer.async.MissingConfigException import kafka.serializer.Encoder @@ -63,7 +63,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with override def tearDown() { simpleConsumerZk.close server.shutdown - Utils.rm(logDirZk) + CoreUtils.rm(logDirZk) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/message/MessageTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala old mode 100644 new mode 100755 index 7b74a0d..11c0f81 --- a/core/src/test/scala/unit/kafka/message/MessageTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala @@ -24,7 +24,8 @@ import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.{Before, Test} import kafka.utils.TestUtils -import kafka.utils.Utils +import kafka.utils.CoreUtils +import org.apache.kafka.common.utils.Utils case class MessageTestVal(val key: Array[Byte], val payload: Array[Byte], http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala old mode 100644 new mode 100755 index 3b82fb3..2169a5c --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -231,11 +231,11 @@ class AsyncProducerTest extends JUnit3Suite { topicPartitionInfos = topicPartitionInfos) val serializedData = handler.serialize(produceData) - val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload))) + val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, TestUtils.readString(d.message.payload))) // Test that the serialize handles seq from a Stream val streamedSerializedData = handler.serialize(Stream(produceData:_*)) - val deserializedStreamData = streamedSerializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload))) + val deserializedStreamData = streamedSerializedData.map(d => new KeyedMessage[String,String](d.topic, TestUtils.readString(d.message.payload))) TestUtils.checkEquals(produceData.iterator, deserializedData.iterator) TestUtils.checkEquals(produceData.iterator, deserializedStreamData.iterator) http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/producer/ProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala old mode 100644 new mode 100755 index a7ca142..4d2536b --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -92,8 +92,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ server1.shutdown server2.shutdown - Utils.rm(server1.config.logDirs) - Utils.rm(server2.config.logDirs) + CoreUtils.rm(server1.config.logDirs) + CoreUtils.rm(server2.config.logDirs) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala old mode 100644 new mode 100755 index 48d3143..e899b02 --- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala @@ -18,7 +18,7 @@ package kafka.server import junit.framework.Assert._ -import kafka.utils.{TestUtils, Utils, ZkUtils} +import kafka.utils.{TestUtils, CoreUtils, ZkUtils} import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.SecurityProtocol import org.scalatest.junit.JUnit3Suite @@ -41,7 +41,7 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { server.shutdown() - Utils.rm(server.config.logDirs) + CoreUtils.rm(server.config.logDirs) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala old mode 100644 new mode 100755 index 142e28e..60cd824 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -25,7 +25,7 @@ import org.junit._ import org.junit.Assert._ import kafka.common._ import kafka.cluster.Replica -import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils} +import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, CoreUtils} import java.util.concurrent.atomic.AtomicBoolean class HighwatermarkPersistenceTest extends JUnit3Suite { @@ -41,7 +41,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { @After def teardown() { for(manager <- logManagers; dir <- manager.logDirs) - Utils.rm(dir) + CoreUtils.rm(dir) } def testHighWatermarkPersistenceSinglePartition() { http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala old mode 100644 new mode 100755 index ca46ba9..b394719 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -21,7 +21,7 @@ import java.util.Properties import junit.framework.Assert._ import kafka.api.{ApiVersion, KAFKA_082} -import kafka.utils.{TestUtils, Utils} +import kafka.utils.{TestUtils, CoreUtils} import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.Test @@ -159,14 +159,14 @@ class KafkaConfigTest extends JUnit3Suite { props.put("port", "1111") val conf = KafkaConfig.fromProps(props) - assertEquals(Utils.listenerListToEndPoints("PLAINTEXT://myhost:1111"), conf.listeners) + assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://myhost:1111"), conf.listeners) // configuration with null host props.remove("host.name") val conf2 = KafkaConfig.fromProps(props) - assertEquals(Utils.listenerListToEndPoints("PLAINTEXT://:1111"), conf2.listeners) - assertEquals(Utils.listenerListToEndPoints("PLAINTEXT://:1111"), conf2.advertisedListeners) + assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://:1111"), conf2.listeners) + assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://:1111"), conf2.advertisedListeners) assertEquals(null, conf2.listeners(SecurityProtocol.PLAINTEXT).host) // configuration with advertised host and port, and no advertised listeners @@ -174,7 +174,7 @@ class KafkaConfigTest extends JUnit3Suite { props.put("advertised.port", "2222") val conf3 = KafkaConfig.fromProps(props) - assertEquals(conf3.advertisedListeners, Utils.listenerListToEndPoints("PLAINTEXT://otherhost:2222")) + assertEquals(conf3.advertisedListeners, CoreUtils.listenerListToEndPoints("PLAINTEXT://otherhost:2222")) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala old mode 100644 new mode 100755 index d9bdcef..26572f7 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -19,11 +19,11 @@ package kafka.server import junit.framework.Assert._ import kafka.api._ +import kafka.utils.{TestUtils, ZkUtils, CoreUtils} import kafka.cluster.Broker import kafka.common.ErrorMapping import kafka.controller.{ControllerChannelManager, ControllerContext, LeaderIsrAndControllerEpoch} import kafka.utils.TestUtils._ -import kafka.utils.{TestUtils, Utils, ZkUtils} import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.SecurityProtocol import org.scalatest.junit.JUnit3Suite @@ -50,7 +50,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { servers.map(server => server.shutdown()) - servers.map(server => Utils.rm(server.config.logDirs)) + servers.map(server => CoreUtils.rm(server.config.logDirs)) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala old mode 100644 new mode 100755 index 496bf0d..e57c1de --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -57,7 +57,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { simpleConsumer.close server.shutdown - Utils.rm(logDir) + CoreUtils.rm(logDir) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala old mode 100644 new mode 100755 index 92e49df..7688f26 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -19,7 +19,7 @@ package kafka.server import java.util.Properties import kafka.utils.TestUtils._ -import kafka.utils.{IntEncoder, Utils, TestUtils} +import kafka.utils.{IntEncoder, CoreUtils, TestUtils} import kafka.zk.ZooKeeperTestHarness import kafka.common._ import kafka.producer.{KeyedMessage, Producer} @@ -90,7 +90,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() for(server <- servers) { server.shutdown() - Utils.rm(server.config.logDirs(0)) + CoreUtils.rm(server.config.logDirs(0)) } super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala old mode 100644 new mode 100755 index a6bb690..652208a --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -71,7 +71,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { simpleConsumer.close server.shutdown - Utils.rm(logDir) + CoreUtils.rm(logDir) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala old mode 100644 new mode 100755 index 2bfaeb3..12269cd --- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -19,7 +19,7 @@ package kafka.server import java.util.Properties import kafka.zk.ZooKeeperTestHarness -import kafka.utils.{TestUtils, Utils} +import kafka.utils.{TestUtils, CoreUtils} import org.junit.Test import org.scalatest.junit.JUnit3Suite import junit.framework.Assert._ @@ -51,7 +51,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { server1.startup() assertEquals(server1.config.brokerId, 1001) server1.shutdown() - Utils.rm(server1.config.logDirs) + CoreUtils.rm(server1.config.logDirs) TestUtils.verifyNonDaemonThreadsStatus } @@ -75,9 +75,9 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { assertTrue(verifyBrokerMetadata(server1.config.logDirs,1001)) assertTrue(verifyBrokerMetadata(server2.config.logDirs,0)) assertTrue(verifyBrokerMetadata(server3.config.logDirs,1002)) - Utils.rm(server1.config.logDirs) - Utils.rm(server2.config.logDirs) - Utils.rm(server3.config.logDirs) + CoreUtils.rm(server1.config.logDirs) + CoreUtils.rm(server2.config.logDirs) + CoreUtils.rm(server3.config.logDirs) TestUtils.verifyNonDaemonThreadsStatus } @@ -100,7 +100,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { server1.startup() server1.shutdown() assertTrue(verifyBrokerMetadata(config1.logDirs, 1001)) - Utils.rm(server1.config.logDirs) + CoreUtils.rm(server1.config.logDirs) TestUtils.verifyNonDaemonThreadsStatus } @@ -117,7 +117,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { case e: kafka.common.InconsistentBrokerIdException => //success } server1.shutdown() - Utils.rm(server1.config.logDirs) + CoreUtils.rm(server1.config.logDirs) TestUtils.verifyNonDaemonThreadsStatus } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala old mode 100644 new mode 100755 index a20321f..95534e3 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -19,7 +19,7 @@ package kafka.server import kafka.zk.ZooKeeperTestHarness import kafka.consumer.SimpleConsumer import kafka.producer._ -import kafka.utils.{IntEncoder, TestUtils, Utils} +import kafka.utils.{IntEncoder, TestUtils, CoreUtils} import kafka.utils.TestUtils._ import kafka.api.FetchRequestBuilder import kafka.message.ByteBufferMessageSet @@ -84,7 +84,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build()) fetchedMessage = fetched.messageSet(topic, 0) } - assertEquals(sent1, fetchedMessage.map(m => Utils.readString(m.message.payload))) + assertEquals(sent1, fetchedMessage.map(m => TestUtils.readString(m.message.payload))) val newOffset = fetchedMessage.last.nextOffset // send some more messages @@ -95,12 +95,12 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build()) fetchedMessage = fetched.messageSet(topic, 0) } - assertEquals(sent2, fetchedMessage.map(m => Utils.readString(m.message.payload))) + assertEquals(sent2, fetchedMessage.map(m => TestUtils.readString(m.message.payload))) consumer.close() producer.close() server.shutdown() - Utils.rm(server.config.logDirs) + CoreUtils.rm(server.config.logDirs) verifyNonDaemonThreadsStatus } @@ -113,7 +113,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { server.startup() server.shutdown() server.awaitShutdown() - Utils.rm(server.config.logDirs) + CoreUtils.rm(server.config.logDirs) verifyNonDaemonThreadsStatus } @@ -141,7 +141,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { server.shutdown() server.awaitShutdown() } - Utils.rm(server.config.logDirs) + CoreUtils.rm(server.config.logDirs) verifyNonDaemonThreadsStatus } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala old mode 100644 new mode 100755 index 661ddd5..60e10b3 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -19,7 +19,7 @@ package kafka.server import org.scalatest.junit.JUnit3Suite import kafka.utils.ZkUtils -import kafka.utils.Utils +import kafka.utils.CoreUtils import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness @@ -39,7 +39,7 @@ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness { assertTrue(pathExists) server.shutdown() - Utils.rm(server.config.logDirs) + CoreUtils.rm(server.config.logDirs) } def testConflictBrokerRegistration { @@ -64,6 +64,6 @@ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(brokerRegistration, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1) server1.shutdown() - Utils.rm(server1.config.logDirs) + CoreUtils.rm(server1.config.logDirs) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala old mode 100644 new mode 100755 index a8ed142..5a9e84d --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -22,6 +22,7 @@ import java.nio._ import java.nio.channels._ import java.util.Random import java.util.Properties +import charset.Charset import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.utils.Utils._ @@ -82,7 +83,7 @@ object TestUtils extends Logging { Runtime.getRuntime().addShutdownHook(new Thread() { override def run() = { - Utils.rm(f) + CoreUtils.rm(f) } }) @@ -835,6 +836,17 @@ object TestUtils extends Logging { }), "Cleaner offset for deleted partition should have been removed") } + /** + * Translate the given buffer into a string + * @param buffer The buffer to translate + * @param encoding The encoding to use in translating bytes to characters + */ + def readString(buffer: ByteBuffer, encoding: String = Charset.defaultCharset.toString): String = { + val bytes = new Array[Byte](buffer.remaining) + buffer.get(bytes) + new String(bytes, encoding) + } + } class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] { http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/utils/UtilsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala old mode 100644 new mode 100755 index 8c3797a..9e8869c --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -24,8 +24,9 @@ import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite import org.junit.Assert._ import kafka.common.KafkaException -import kafka.utils.Utils.inLock +import kafka.utils.CoreUtils.inLock import org.junit.Test +import org.apache.kafka.common.utils.Utils class UtilsTest extends JUnitSuite { @@ -34,13 +35,13 @@ class UtilsTest extends JUnitSuite { @Test def testSwallow() { - Utils.swallow(logger.info, throw new KafkaException("test")) + CoreUtils.swallow(logger.info, throw new KafkaException("test")) } @Test def testCircularIterator() { val l = List(1, 2) - val itl = Utils.circularIterator(l) + val itl = CoreUtils.circularIterator(l) assertEquals(1, itl.next()) assertEquals(2, itl.next()) assertEquals(1, itl.next()) @@ -48,7 +49,7 @@ class UtilsTest extends JUnitSuite { assertFalse(itl.hasDefiniteSize) val s = Set(1, 2) - val its = Utils.circularIterator(s) + val its = CoreUtils.circularIterator(s) assertEquals(1, its.next()) assertEquals(2, its.next()) assertEquals(1, its.next()) @@ -75,10 +76,10 @@ class UtilsTest extends JUnitSuite { @Test def testReplaceSuffix() { - assertEquals("blah.foo.text", Utils.replaceSuffix("blah.foo.txt", ".txt", ".text")) - assertEquals("blah.foo", Utils.replaceSuffix("blah.foo.txt", ".txt", "")) - assertEquals("txt.txt", Utils.replaceSuffix("txt.txt.txt", ".txt", "")) - assertEquals("foo.txt", Utils.replaceSuffix("foo", "", ".txt")) + assertEquals("blah.foo.text", CoreUtils.replaceSuffix("blah.foo.txt", ".txt", ".text")) + assertEquals("blah.foo", CoreUtils.replaceSuffix("blah.foo.txt", ".txt", "")) + assertEquals("txt.txt", CoreUtils.replaceSuffix("txt.txt.txt", ".txt", "")) + assertEquals("foo.txt", CoreUtils.replaceSuffix("foo", "", ".txt")) } @Test @@ -87,7 +88,7 @@ class UtilsTest extends JUnitSuite { val buffer = ByteBuffer.allocate(4 * values.size) for(i <- 0 until values.length) { buffer.putInt(i*4, values(i)) - assertEquals("Written value should match read value.", values(i), Utils.readInt(buffer.array, i*4)) + assertEquals("Written value should match read value.", values(i), CoreUtils.readInt(buffer.array, i*4)) } } @@ -95,8 +96,8 @@ class UtilsTest extends JUnitSuite { def testCsvList() { val emptyString:String = "" val nullString:String = null - val emptyList = Utils.parseCsvList(emptyString) - val emptyListFromNullString = Utils.parseCsvList(nullString) + val emptyList = CoreUtils.parseCsvList(emptyString) + val emptyListFromNullString = CoreUtils.parseCsvList(nullString) val emptyStringList = Seq.empty[String] assertTrue(emptyList!=null) assertTrue(emptyListFromNullString!=null) @@ -107,32 +108,32 @@ class UtilsTest extends JUnitSuite { @Test def testCsvMap() { val emptyString: String = "" - val emptyMap = Utils.parseCsvMap(emptyString) + val emptyMap = CoreUtils.parseCsvMap(emptyString) val emptyStringMap = Map.empty[String, String] assertTrue(emptyMap != null) assertTrue(emptyStringMap.equals(emptyStringMap)) val kvPairsIpV6: String = "a:b:c:v,a:b:c:v" - val ipv6Map = Utils.parseCsvMap(kvPairsIpV6) + val ipv6Map = CoreUtils.parseCsvMap(kvPairsIpV6) for (m <- ipv6Map) { assertTrue(m._1.equals("a:b:c")) assertTrue(m._2.equals("v")) } val singleEntry:String = "key:value" - val singleMap = Utils.parseCsvMap(singleEntry) + val singleMap = CoreUtils.parseCsvMap(singleEntry) val value = singleMap.getOrElse("key", 0) assertTrue(value.equals("value")) val kvPairsIpV4: String = "192.168.2.1/30:allow, 192.168.2.1/30:allow" - val ipv4Map = Utils.parseCsvMap(kvPairsIpV4) + val ipv4Map = CoreUtils.parseCsvMap(kvPairsIpV4) for (m <- ipv4Map) { assertTrue(m._1.equals("192.168.2.1/30")) assertTrue(m._2.equals("allow")) } val kvPairsSpaces: String = "key:value , key: value" - val spaceMap = Utils.parseCsvMap(kvPairsSpaces) + val spaceMap = CoreUtils.parseCsvMap(kvPairsSpaces) for (m <- spaceMap) { assertTrue(m._1.equals("key")) assertTrue(m._2.equals("value")) http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala old mode 100644 new mode 100755 index 1d87506..2bca2cf --- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala +++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala @@ -21,7 +21,7 @@ import org.apache.zookeeper.server.ZooKeeperServer import org.apache.zookeeper.server.NIOServerCnxnFactory import kafka.utils.TestUtils import java.net.InetSocketAddress -import kafka.utils.Utils +import kafka.utils.CoreUtils import org.apache.kafka.common.utils.Utils.getPort class EmbeddedZookeeper() { @@ -36,10 +36,10 @@ class EmbeddedZookeeper() { val port = zookeeper.getClientPort() def shutdown() { - Utils.swallow(zookeeper.shutdown()) - Utils.swallow(factory.shutdown()) - Utils.rm(logDir) - Utils.rm(snapshotDir) + CoreUtils.swallow(zookeeper.shutdown()) + CoreUtils.swallow(factory.shutdown()) + CoreUtils.rm(logDir) + CoreUtils.rm(snapshotDir) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala old mode 100644 new mode 100755 index fedefb5..86bddea --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -19,7 +19,7 @@ package kafka.zk import org.scalatest.junit.JUnit3Suite import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZKStringSerializer, Utils} +import kafka.utils.{ZKStringSerializer, CoreUtils} trait ZooKeeperTestHarness extends JUnit3Suite { var zkPort: Int = -1 @@ -38,8 +38,8 @@ trait ZooKeeperTestHarness extends JUnit3Suite { } override def tearDown() { - Utils.swallow(zkClient.close()) - Utils.swallow(zookeeper.shutdown()) + CoreUtils.swallow(zkClient.close()) + CoreUtils.swallow(zookeeper.shutdown()) super.tearDown }