Repository: kafka Updated Branches: refs/heads/trunk 2c7fae0a4 -> 79aaf19f2
http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index d37de76..40ad0f3 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -55,8 +55,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness { val message = "hello" var producer: KafkaProducer[Integer, String] = null - def hwFile1 = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename)) - def hwFile2 = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename)) + def hwFile1 = new OffsetCheckpoint(new File(configProps1.logDirs.head, ReplicaManager.HighWatermarkFilename)) + def hwFile2 = new OffsetCheckpoint(new File(configProps2.logDirs.head, ReplicaManager.HighWatermarkFilename)) var servers = Seq.empty[KafkaServer] // Some tests restart the brokers then produce more data. But since test brokers use random ports, we need @@ -95,7 +95,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { producer.close() for (server <- servers) { server.shutdown() - Utils.delete(new File(server.config.logDirs(0))) + Utils.delete(new File(server.config.logDirs.head)) } super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala index 312edd4..e0b6db4 100755 --- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -177,8 +177,8 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = { for(logDir <- logDirs) { - val brokerMetadataOpt = (new BrokerMetadataCheckpoint( - new File(logDir + File.separator + brokerMetaPropsFile))).read() + val brokerMetadataOpt = new BrokerMetadataCheckpoint( + new File(logDir + File.separator + brokerMetaPropsFile)).read() brokerMetadataOpt match { case Some(brokerMetadata: BrokerMetadata) => if (brokerMetadata.brokerId != brokerId) return false http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 689b70b..7741698 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -105,7 +105,7 @@ class SimpleFetchTest { val partition = replicaManager.getOrCreatePartition(topic, partitionId) // create the leader replica with the local log - val leaderReplica = new Replica(configs(0).brokerId, partition, time, 0, Some(log)) + val leaderReplica = new Replica(configs.head.brokerId, partition, time, 0, Some(log)) leaderReplica.highWatermark = new LogOffsetMetadata(partitionHW) partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId) @@ -144,15 +144,15 @@ class SimpleFetchTest { */ @Test def testReadFromLog() { - val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count(); - val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count(); + val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count() + val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count() assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW, replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO, replicaManager.readFromLocalLog(true, false, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) - assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count()); - assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()); + assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count()) + assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala index 37d334b..741eec9 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala @@ -69,10 +69,10 @@ class ConsoleProducerTest { @Test def testParseKeyProp(): Unit = { val config = new ConsoleProducer.ProducerConfig(validArgs) - val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[LineMessageReader]; + val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[LineMessageReader] reader.init(System.in,ConsoleProducer.getReaderProps(config)) assert(reader.keySeparator == "#") - assert(reader.parseKey == true) + assert(reader.parseKey) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala index 56f5905..6a40510 100644 --- a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala +++ b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala @@ -22,7 +22,7 @@ import org.junit.{Test, After, Before} class IteratorTemplateTest extends Assertions { - val lst = (0 until 10) + val lst = 0 until 10 val iterator = new IteratorTemplate[Int]() { var i = 0 override def makeNext() = { http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala index 7c4b951..f39fa6b 100644 --- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala @@ -51,7 +51,7 @@ object JaasTestUtils { entries = Map( "username" -> username, "password" -> password - ) ++ validUsers.map { case (user, pass) => (s"user_$user"-> pass)} + ) ++ validUsers.map { case (user, pass) => s"user_$user" -> pass } ) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/MockScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala index 434c22a..e9dbbb1 100644 --- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala +++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala @@ -56,7 +56,7 @@ class MockScheduler(val time: Time) extends Scheduler { def tick() { this synchronized { val now = time.milliseconds - while(!tasks.isEmpty && tasks.head.nextExecution <= now) { + while(tasks.nonEmpty && tasks.head.nextExecution <= now) { /* pop and execute the task with the lowest next execution time */ val curr = tasks.dequeue curr.fun() @@ -78,7 +78,7 @@ class MockScheduler(val time: Time) extends Scheduler { } -case class MockTask(val name: String, val fun: () => Unit, var nextExecution: Long, val period: Long) extends Ordered[MockTask] { +case class MockTask(name: String, fun: () => Unit, var nextExecution: Long, period: Long) extends Ordered[MockTask] { def periodic = period >= 0 def compare(t: MockTask): Int = { if(t.nextExecution == nextExecution) http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/MockTime.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/MockTime.scala b/core/src/test/scala/unit/kafka/utils/MockTime.scala index 0858e04..21fb4d9 100644 --- a/core/src/test/scala/unit/kafka/utils/MockTime.scala +++ b/core/src/test/scala/unit/kafka/utils/MockTime.scala @@ -46,7 +46,7 @@ class MockTime(@volatile private var currentMs: Long) extends Time { scheduler.tick() } - override def toString() = "MockTime(%d)".format(milliseconds) + override def toString = "MockTime(%d)".format(milliseconds) } http://git-wip-us.apache.org/repos/asf/kafka/blob/79aaf19f/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 7df87fc..b42a6ba 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -94,7 +94,7 @@ object TestUtils extends Logging { val parentFile = new File(parent) parentFile.mkdirs() - org.apache.kafka.test.TestUtils.tempDirectory(parentFile.toPath, "kafka-"); + org.apache.kafka.test.TestUtils.tempDirectory(parentFile.toPath, "kafka-") } /** @@ -335,12 +335,12 @@ object TestUtils extends Logging { // check if the expected iterator is longer if (expected.hasNext) { - var length1 = length; + var length1 = length while (expected.hasNext) { expected.next length1 += 1 } - assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true); + assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true) } // check if the actual iterator was longer @@ -350,7 +350,7 @@ object TestUtils extends Logging { actual.next length2 += 1 } - assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " + length, true); + assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " + length, true) } } @@ -671,7 +671,7 @@ object TestUtils extends Logging { try{ val currentLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition) var newLeaderAndIsr: LeaderAndIsr = null - if(currentLeaderAndIsrOpt == None) + if(currentLeaderAndIsrOpt.isEmpty) newLeaderAndIsr = new LeaderAndIsr(leader, List(leader)) else{ newLeaderAndIsr = currentLeaderAndIsrOpt.get @@ -716,7 +716,7 @@ object TestUtils extends Logging { } else if (oldLeaderOpt.isDefined && oldLeaderOpt.get != l) { trace("Leader for partition [%s,%d] is changed from %d to %d".format(topic, partition, oldLeaderOpt.get, l)) isLeaderElectedOrChanged = true - } else if (!oldLeaderOpt.isDefined) { + } else if (oldLeaderOpt.isEmpty) { trace("Leader %d is elected for partition [%s,%d]".format(l, topic, partition)) isLeaderElectedOrChanged = true } else { @@ -856,7 +856,7 @@ object TestUtils extends Logging { // in sync replicas should not have any replica that is not in the new assigned replicas val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas, assignedReplicas), - phantomInSyncReplicas.size == 0) + phantomInSyncReplicas.isEmpty) } def ensureNoUnderReplicatedPartitions(zkUtils: ZkUtils, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int], @@ -1031,7 +1031,7 @@ object TestUtils extends Logging { "Topic path /brokers/topics/%s not deleted after /admin/delete_topic/%s path is deleted".format(topic, topic)) // ensure that the topic-partition has been deleted from all brokers' replica managers TestUtils.waitUntilTrue(() => - servers.forall(server => topicAndPartitions.forall(tp => server.replicaManager.getPartition(tp.topic, tp.partition) == None)), + servers.forall(server => topicAndPartitions.forall(tp => server.replicaManager.getPartition(tp.topic, tp.partition).isEmpty)), "Replica manager's should have deleted all of this topic's partitions") // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper assertTrue("Replica logs not deleted after delete topic is complete", @@ -1146,7 +1146,7 @@ class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] { @deprecated("This class is deprecated and it will be removed in a future release.", "0.10.0.0") class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner { def partition(data: Any, numPartitions: Int): Int = { - (data.asInstanceOf[String].length % numPartitions) + data.asInstanceOf[String].length % numPartitions } }
